Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion warehouse/models/intermediate/gtfs/_int_gtfs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ models:
- not_null
- name: service_id
description: '{{ doc("gtfs_calendar__service_id") }}'
- name: service_date
- &service_date
name: service_date
description: |
Date on which this service was active (i.e., this date is betweem the
`start_date` and `end_date` for this service).
Expand Down Expand Up @@ -317,6 +318,26 @@ models:
description: '{{ doc("column_num_arrival_times_populated_stop_times") }}'
- name: num_departure_times_populated_stop_times
description: '{{ doc("column_num_departure_times_populated_stop_times") }}'
- name: stop_pt_array
description: |
Array of ordered stop point geometries (dim_stop's pt_geom) for this trip_id.
- name: stop_id_array
description: |
Array of ordered stop IDs (dim_stop's stop_id) for this trip_id.
- name: stop_seq_array
description: |
Array of ordered stop sequences (dim_stop_time's stop_sequence) for this trip_id.
- name: arrival_sec_array
description: |
Array of ordered arrival seconds (dim_stop_time's arrival_time, departure_time converted to seconds from midnight)
for this trip_id. The minimum value between the two columns is used.
- name: avg_stop_spacing
description: |
The average distance between stops for this trip_id.
The difference between shape_dist_traveled is calculated for each ordered stop_id-stop_sequence,
and the average is taken across the stops included for the trip.
Units are the same as values provided in https://gtfs.org/documentation/schedule/reference/#stop_timestxt.
Note: this is an optional column in the GTFS specification, so may not be present for all operators.
- name: int_gtfs_rt__distinct_download_configs
description: |
Distinct `dt`, `_config_extract_ts` pairs indicating
Expand Down Expand Up @@ -640,3 +661,26 @@ models:
# this will fail if run on all historical data, specifically for days where a given feed's time zone changed;
# even then the number of failures is very small
where: '__rt_sampled__'
- name: int_gtfs_rt_vs_sched__stops_served_vehicle_positions
description: |
This table joins int_gtfs_schedule__stop_times_grouped (stop times grouped by trip)
and fct_vehicle_locations_path (vehicle positions grouped by trip).
It unnests the stop positions so that the table is unique on the
feed_key-trip_instance_key-stop_id.

The table finds the nearest distance between the scheduled stop's pt geom
and the vehicle locations path (linestring), in meters.

columns:
- *feed_key
- name: gtfs_dataset_key
- *service_date
- name: trip_id
description: '{{ doc("gtfs_trips__trip_id") }}'
- name: iteration_num
- name: trip_instance_key
- name: stop_id
description: '{{ doc("gtfs_stops__stop_id") }}'
- name: meters_to_vp
description: |
Meters from a stop's pt_geom to the vehicle positions path.
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
{{
config(
materialized='incremental',
partition_by={
'field': 'service_date',
'data_type': 'date',
'granularity': 'day',
},
cluster_by=['vp_base64_url', 'schedule_feed_key'],
)
}}

WITH trips AS (
SELECT
feed_key,
gtfs_dataset_key,
service_date,
trip_id,
iteration_num,
trip_instance_key,
FROM {{ ref('fct_scheduled_trips') }}
),

vp_path AS (
SELECT
service_date,
base64_url,
schedule_feed_key,
trip_instance_key,
ST_SIMPLIFY(ST_MAKELINE(pt_array), 5) AS pt_array

FROM {{ ref('fct_vehicle_locations_path') }}
),

stop_times_grouped AS (
SELECT
feed_key,
trip_id,
iteration_num,
stop_pt_array,
stop_id_array,
FROM {{ ref('int_gtfs_schedule__stop_times_grouped') }}
),

-- trip_grain: join trips to vp_path and attach an array of stop positions
vp_with_stops AS (
SELECT
trips.feed_key AS schedule_feed_key,
trips.gtfs_dataset_key AS schedule_gtfs_dataset_key,
trips.service_date,
trips.trip_id,
trips.iteration_num,
trips.trip_instance_key,

--stop_times_grouped.stop_pt_array,
--stop_times_grouped.stop_id_array,

vp_path.base64_url AS vp_base64_url,
--vp_path.pt_array,

pt_geom,
stop_id,
ST_HAUSDORFFDWITHIN(
pt_geom, vp_path.pt_array, 25
) AS is_within_25m,

FROM trips
INNER JOIN stop_times_grouped
ON stop_times_grouped.feed_key = trips.feed_key
AND stop_times_grouped.trip_id = trips.trip_id
AND stop_times_grouped.iteration_num = trips.iteration_num
INNER JOIN vp_path
ON trips.service_date = vp_path.service_date
AND trips.feed_key = vp_path.schedule_feed_key
AND trips.trip_instance_key = vp_path.trip_instance_key
LEFT JOIN UNNEST(stop_times_grouped.stop_pt_array) AS pt_geom
LEFT JOIN UNNEST(stop_times_grouped.stop_id_array) AS stop_id
)

-- unnest the arrays so that every stop_id/stop pt_geom is a row
-- becomes trip-stop grain
--unnested_stops AS (
-- SELECT
-- vp_with_stops.* EXCEPT(stop_pt_array, stop_id_array),
-- stop_id,

-- tried buffering stop pt geom to various distances,
-- but BQ resources were exhausted working with ST_SIMPLIFY(pt_array, 5)
-- so let's use distance to get at a similar metric
--ROUND(ST_DISTANCE(pt_geom, pt_array), 2) AS meters_to_vp

-- FROM vp_with_stops
-- LEFT JOIN UNNEST(stop_pt_array) AS pt_geom
-- LEFT JOIN UNNEST(stop_id_array) AS stop_id
--)

SELECT * FROM vp_with_stops
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
)
}}

WITH

dim_stop_times AS (
SELECT * FROM {{ ref('dim_stop_times') }}
WHERE {{ incremental_where(default_start_var='GTFS_SCHEDULE_START', this_dt_column='_feed_valid_from', filter_dt_column='_feed_valid_from', dev_lookback_days = None) }}
WITH dim_stop_times AS (
SELECT *
FROM {{ ref('dim_stop_times') }}
WHERE {{ incremental_where(default_start_var='GTFS_SCHEDULE_START',
this_dt_column='_feed_valid_from',
filter_dt_column='_feed_valid_from', dev_lookback_days = None)
}}
),

int_gtfs_schedule__frequencies_stop_times AS (
Expand All @@ -23,8 +25,11 @@ stops AS (
feed_key,
stop_id,
stop_timezone_coalesced,
COUNT(*) AS ct
COUNT(*) AS ct,
MAX(ST_ASTEXT(pt_geom)) AS pt_geom, -- can this be written better

FROM {{ ref('dim_stops') }}

WHERE stop_id IS NOT NULL
GROUP BY 1, 2, 3
-- we can have duplicate stop IDs within a given feed (this is not valid, but happens)
Expand All @@ -48,14 +53,27 @@ stops_times_with_tz AS (
COALESCE(LAST_VALUE(stop_timezone_coalesced)
OVER (PARTITION BY feed_key, trip_id
ORDER BY dim_stop_times.stop_sequence
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), feed_timezone) AS trip_end_timezone
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), feed_timezone) AS trip_end_timezone,
ST_GEOGFROMTEXT(stops.pt_geom) AS pt_geom,
COALESCE(dim_stop_times.shape_dist_traveled, freq.shape_dist_traveled) AS shape_dist,

FROM dim_stop_times
LEFT JOIN stops
USING (feed_key, stop_id)
LEFT JOIN int_gtfs_schedule__frequencies_stop_times freq
USING (feed_key, trip_id, stop_id)
),

-- add difference in shape_dist_traveled
stops_times_with_tz2 AS (
SELECT
*,
LEAD(shape_dist) OVER (
PARTITION BY feed_key, trip_id, iteration_num ORDER BY stop_sequence
) - shape_dist AS difference_dist_traveled
FROM stops_times_with_tz
),

grouped AS (
SELECT
trip_id,
Expand Down Expand Up @@ -126,10 +144,33 @@ grouped AS (
departure_time IS NOT NULL
) AS num_departure_times_populated_stop_times,

FROM stops_times_with_tz
-- add stop position geometries and order of stops
ARRAY_AGG(
-- ignore nulls so it doesn't error out if there's a null point
pt_geom IGNORE NULLS
ORDER BY stop_sequence, stop_id)
AS stop_pt_array,
ARRAY_AGG(
-- ignore nulls so it doesn't error out if there's a null point
stop_id IGNORE NULLS
ORDER BY stop_sequence, stop_id)
AS stop_id_array,
ARRAY_AGG(
stop_sequence
ORDER BY stop_sequence
) AS stop_seq_array,
-- just keep arrival for now
ARRAY_AGG(
LEAST(COALESCE(trip_stop_arrival_sec, trip_stop_departure_sec)) IGNORE NULLS
ORDER BY stop_sequence
) AS arrival_sec_array,
AVG(difference_dist_traveled) AS avg_stop_spacing,

FROM stops_times_with_tz2
GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9
),


int_gtfs_schedule__stop_times_grouped AS (
SELECT
{{ dbt_utils.generate_surrogate_key(['feed_key', 'trip_id', 'trip_first_departure_sec']) }} AS key,
Expand Down Expand Up @@ -187,7 +228,14 @@ int_gtfs_schedule__stop_times_grouped AS (
num_approximate_timepoint_stop_times,
num_exact_timepoint_stop_times,
num_arrival_times_populated_stop_times,
num_departure_times_populated_stop_times
num_departure_times_populated_stop_times,

stop_pt_array,
stop_id_array,
stop_seq_array,
arrival_sec_array,
avg_stop_spacing

FROM grouped
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
'data_type': 'date',
'granularity': 'day',
},
cluster_by='base64_url',
cluster_by=['base64_url', 'schedule_feed_key'],
)
}}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={
'field': 'service_date',
'data_type': 'date',
'granularity': 'day'
},
cluster_by=['vp_base64_url', 'schedule_feed_key'],
)
}}

WITH stops_served_vp AS (
SELECT *
FROM {{ ref('int_gtfs_rt_vs_sched__stops_served_vehicle_positions') }}
),

stops_grouped AS (
SELECT
*,
CASE
WHEN meters_to_vp <= 10 THEN TRUE
ELSE FALSE
END AS is_near_10m,
CASE WHEN meters_to_vp <= 25 THEN TRUE
ELSE FALSE
END AS is_near_25m,
FROM stops_served_vp
),

trip_counts_by_group AS (
SELECT
vp_base64_url,
schedule_feed_key,
schedule_gtfs_dataset_key,
service_date,
stop_id,

COUNTIF(is_near_10m) AS near_10m,
COUNTIF(is_near_25m) AS near_25m,
COUNT(*) AS n_vp_trips,

FROM stops_grouped
GROUP BY GROUPING SETS(
vp_base64_url, schedule_feed_key, schedule_gtfs_dataset_key,
service_date, stop_id,
is_near_25m, is_near_10m)
-- grouping set will return the null placeholder
-- which captures the groups were near_stop_* = false for every stop
),

vp_stop_metrics AS (
SELECT
*
FROM trip_counts_by_group
WHERE stop_id IS NOT NULL
)

SELECT * FROM vp_stop_metrics
23 changes: 23 additions & 0 deletions warehouse/models/mart/gtfs/test_vehicle_locations.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by = {
'field': 'dt',
'data_type': 'date',
'granularity': 'day',
},
cluster_by=['dt', 'base64_url', 'location'],
on_schema_change='append_new_columns'
)
}}

WITH fct_vehicle_locations AS (
SELECT *
--FROM {{ ref('fct_vehicle_locations') }}
FROM `cal-itp-data-infra.mart_gtfs.fct_vehicle_locations`
WHERE service_date >= '2025-06-22' AND service_date <= '2025-06-28'
)


SELECT * FROM fct_vehicle_locations
25 changes: 25 additions & 0 deletions warehouse/models/mart/gtfs/test_vehicle_locations_path.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by = {
'field': 'service_date',
'data_type': 'date',
'granularity': 'day',
},
cluster_by=['service_date', 'base64_url', 'pt_array'],
on_schema_change='append_new_columns'
)
}}

WITH fct_vehicle_locations_path AS (
SELECT
* EXCEPT(pt_array),
ST_MAKELINE(pt_array) AS pt_array,
--FROM {{ ref('fct_vehicle_locations_path') }}
FROM `cal-itp-data-infra.mart_gtfs.fct_vehicle_locations_path`
WHERE service_date >= '2025-06-22' AND service_date <= '2025-06-28'
)


SELECT * FROM fct_vehicle_locations_path
Loading