diff --git a/warehouse/models/intermediate/gtfs/_int_gtfs.yaml b/warehouse/models/intermediate/gtfs/_int_gtfs.yaml index f5fb0a4931..a406ba7ab2 100644 --- a/warehouse/models/intermediate/gtfs/_int_gtfs.yaml +++ b/warehouse/models/intermediate/gtfs/_int_gtfs.yaml @@ -74,7 +74,8 @@ models: - not_null - name: service_id description: '{{ doc("gtfs_calendar__service_id") }}' - - name: service_date + - &service_date + name: service_date description: | Date on which this service was active (i.e., this date is betweem the `start_date` and `end_date` for this service). @@ -317,6 +318,26 @@ models: description: '{{ doc("column_num_arrival_times_populated_stop_times") }}' - name: num_departure_times_populated_stop_times description: '{{ doc("column_num_departure_times_populated_stop_times") }}' + - name: stop_pt_array + description: | + Array of ordered stop point geometries (dim_stop's pt_geom) for this trip_id. + - name: stop_id_array + description: | + Array of ordered stop IDs (dim_stop's stop_id) for this trip_id. + - name: stop_seq_array + description: | + Array of ordered stop sequences (dim_stop_time's stop_sequence) for this trip_id. + - name: arrival_sec_array + description: | + Array of ordered arrival seconds (dim_stop_time's arrival_time, departure_time converted to seconds from midnight) + for this trip_id. The minimum value between the two columns is used. + - name: avg_stop_spacing + description: | + The average distance between stops for this trip_id. + The difference between shape_dist_traveled is calculated for each ordered stop_id-stop_sequence, + and the average is taken across the stops included for the trip. + Units are the same as values provided in https://gtfs.org/documentation/schedule/reference/#stop_timestxt. + Note: this is an optional column in the GTFS specification, so may not be present for all operators. - name: int_gtfs_rt__distinct_download_configs description: | Distinct `dt`, `_config_extract_ts` pairs indicating @@ -640,3 +661,26 @@ models: # this will fail if run on all historical data, specifically for days where a given feed's time zone changed; # even then the number of failures is very small where: '__rt_sampled__' + - name: int_gtfs_rt_vs_sched__stops_served_vehicle_positions + description: | + This table joins int_gtfs_schedule__stop_times_grouped (stop times grouped by trip) + and fct_vehicle_locations_path (vehicle positions grouped by trip). + It unnests the stop positions so that the table is unique on the + feed_key-trip_instance_key-stop_id. + + The table finds the nearest distance between the scheduled stop's pt geom + and the vehicle locations path (linestring), in meters. + + columns: + - *feed_key + - name: gtfs_dataset_key + - *service_date + - name: trip_id + description: '{{ doc("gtfs_trips__trip_id") }}' + - name: iteration_num + - name: trip_instance_key + - name: stop_id + description: '{{ doc("gtfs_stops__stop_id") }}' + - name: meters_to_vp + description: | + Meters from a stop's pt_geom to the vehicle positions path. diff --git a/warehouse/models/intermediate/gtfs/int_gtfs_rt_vs_sched__stops_served_vehicle_positions.sql b/warehouse/models/intermediate/gtfs/int_gtfs_rt_vs_sched__stops_served_vehicle_positions.sql new file mode 100644 index 0000000000..b554443fd6 --- /dev/null +++ b/warehouse/models/intermediate/gtfs/int_gtfs_rt_vs_sched__stops_served_vehicle_positions.sql @@ -0,0 +1,97 @@ +{{ + config( + materialized='incremental', + partition_by={ + 'field': 'service_date', + 'data_type': 'date', + 'granularity': 'day', + }, + cluster_by=['vp_base64_url', 'schedule_feed_key'], + ) +}} + +WITH trips AS ( + SELECT + feed_key, + gtfs_dataset_key, + service_date, + trip_id, + iteration_num, + trip_instance_key, + FROM {{ ref('fct_scheduled_trips') }} +), + +vp_path AS ( + SELECT + service_date, + base64_url, + schedule_feed_key, + trip_instance_key, + ST_SIMPLIFY(ST_MAKELINE(pt_array), 5) AS pt_array + + FROM {{ ref('fct_vehicle_locations_path') }} +), + +stop_times_grouped AS ( + SELECT + feed_key, + trip_id, + iteration_num, + stop_pt_array, + stop_id_array, + FROM {{ ref('int_gtfs_schedule__stop_times_grouped') }} +), + +-- trip_grain: join trips to vp_path and attach an array of stop positions +vp_with_stops AS ( + SELECT + trips.feed_key AS schedule_feed_key, + trips.gtfs_dataset_key AS schedule_gtfs_dataset_key, + trips.service_date, + trips.trip_id, + trips.iteration_num, + trips.trip_instance_key, + + --stop_times_grouped.stop_pt_array, + --stop_times_grouped.stop_id_array, + + vp_path.base64_url AS vp_base64_url, + --vp_path.pt_array, + + pt_geom, + stop_id, + ST_HAUSDORFFDWITHIN( + pt_geom, vp_path.pt_array, 25 + ) AS is_within_25m, + + FROM trips + INNER JOIN stop_times_grouped + ON stop_times_grouped.feed_key = trips.feed_key + AND stop_times_grouped.trip_id = trips.trip_id + AND stop_times_grouped.iteration_num = trips.iteration_num + INNER JOIN vp_path + ON trips.service_date = vp_path.service_date + AND trips.feed_key = vp_path.schedule_feed_key + AND trips.trip_instance_key = vp_path.trip_instance_key + LEFT JOIN UNNEST(stop_times_grouped.stop_pt_array) AS pt_geom + LEFT JOIN UNNEST(stop_times_grouped.stop_id_array) AS stop_id +) + +-- unnest the arrays so that every stop_id/stop pt_geom is a row +-- becomes trip-stop grain +--unnested_stops AS ( +-- SELECT +-- vp_with_stops.* EXCEPT(stop_pt_array, stop_id_array), +-- stop_id, + + -- tried buffering stop pt geom to various distances, + -- but BQ resources were exhausted working with ST_SIMPLIFY(pt_array, 5) + -- so let's use distance to get at a similar metric + --ROUND(ST_DISTANCE(pt_geom, pt_array), 2) AS meters_to_vp + +-- FROM vp_with_stops +-- LEFT JOIN UNNEST(stop_pt_array) AS pt_geom +-- LEFT JOIN UNNEST(stop_id_array) AS stop_id +--) + +SELECT * FROM vp_with_stops diff --git a/warehouse/models/intermediate/gtfs/int_gtfs_schedule__stop_times_grouped.sql b/warehouse/models/intermediate/gtfs/int_gtfs_schedule__stop_times_grouped.sql index c88deeaa38..ec0db433c8 100644 --- a/warehouse/models/intermediate/gtfs/int_gtfs_schedule__stop_times_grouped.sql +++ b/warehouse/models/intermediate/gtfs/int_gtfs_schedule__stop_times_grouped.sql @@ -6,11 +6,13 @@ ) }} -WITH - -dim_stop_times AS ( - SELECT * FROM {{ ref('dim_stop_times') }} - WHERE {{ incremental_where(default_start_var='GTFS_SCHEDULE_START', this_dt_column='_feed_valid_from', filter_dt_column='_feed_valid_from', dev_lookback_days = None) }} +WITH dim_stop_times AS ( + SELECT * + FROM {{ ref('dim_stop_times') }} + WHERE {{ incremental_where(default_start_var='GTFS_SCHEDULE_START', + this_dt_column='_feed_valid_from', + filter_dt_column='_feed_valid_from', dev_lookback_days = None) + }} ), int_gtfs_schedule__frequencies_stop_times AS ( @@ -23,8 +25,11 @@ stops AS ( feed_key, stop_id, stop_timezone_coalesced, - COUNT(*) AS ct + COUNT(*) AS ct, + MAX(ST_ASTEXT(pt_geom)) AS pt_geom, -- can this be written better + FROM {{ ref('dim_stops') }} + WHERE stop_id IS NOT NULL GROUP BY 1, 2, 3 -- we can have duplicate stop IDs within a given feed (this is not valid, but happens) @@ -48,7 +53,10 @@ stops_times_with_tz AS ( COALESCE(LAST_VALUE(stop_timezone_coalesced) OVER (PARTITION BY feed_key, trip_id ORDER BY dim_stop_times.stop_sequence - ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), feed_timezone) AS trip_end_timezone + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), feed_timezone) AS trip_end_timezone, + ST_GEOGFROMTEXT(stops.pt_geom) AS pt_geom, + COALESCE(dim_stop_times.shape_dist_traveled, freq.shape_dist_traveled) AS shape_dist, + FROM dim_stop_times LEFT JOIN stops USING (feed_key, stop_id) @@ -56,6 +64,16 @@ stops_times_with_tz AS ( USING (feed_key, trip_id, stop_id) ), +-- add difference in shape_dist_traveled +stops_times_with_tz2 AS ( + SELECT + *, + LEAD(shape_dist) OVER ( + PARTITION BY feed_key, trip_id, iteration_num ORDER BY stop_sequence + ) - shape_dist AS difference_dist_traveled + FROM stops_times_with_tz +), + grouped AS ( SELECT trip_id, @@ -126,10 +144,33 @@ grouped AS ( departure_time IS NOT NULL ) AS num_departure_times_populated_stop_times, - FROM stops_times_with_tz + -- add stop position geometries and order of stops + ARRAY_AGG( + -- ignore nulls so it doesn't error out if there's a null point + pt_geom IGNORE NULLS + ORDER BY stop_sequence, stop_id) + AS stop_pt_array, + ARRAY_AGG( + -- ignore nulls so it doesn't error out if there's a null point + stop_id IGNORE NULLS + ORDER BY stop_sequence, stop_id) + AS stop_id_array, + ARRAY_AGG( + stop_sequence + ORDER BY stop_sequence + ) AS stop_seq_array, + -- just keep arrival for now + ARRAY_AGG( + LEAST(COALESCE(trip_stop_arrival_sec, trip_stop_departure_sec)) IGNORE NULLS + ORDER BY stop_sequence + ) AS arrival_sec_array, + AVG(difference_dist_traveled) AS avg_stop_spacing, + + FROM stops_times_with_tz2 GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9 ), + int_gtfs_schedule__stop_times_grouped AS ( SELECT {{ dbt_utils.generate_surrogate_key(['feed_key', 'trip_id', 'trip_first_departure_sec']) }} AS key, @@ -187,7 +228,14 @@ int_gtfs_schedule__stop_times_grouped AS ( num_approximate_timepoint_stop_times, num_exact_timepoint_stop_times, num_arrival_times_populated_stop_times, - num_departure_times_populated_stop_times + num_departure_times_populated_stop_times, + + stop_pt_array, + stop_id_array, + stop_seq_array, + arrival_sec_array, + avg_stop_spacing + FROM grouped ) diff --git a/warehouse/models/mart/gtfs/fct_vehicle_locations_path.sql b/warehouse/models/mart/gtfs/fct_vehicle_locations_path.sql index d074c353a0..2e672640c8 100644 --- a/warehouse/models/mart/gtfs/fct_vehicle_locations_path.sql +++ b/warehouse/models/mart/gtfs/fct_vehicle_locations_path.sql @@ -7,7 +7,7 @@ 'data_type': 'date', 'granularity': 'day', }, - cluster_by='base64_url', + cluster_by=['base64_url', 'schedule_feed_key'], ) }} diff --git a/warehouse/models/mart/gtfs/fct_vehicle_positions_stop_metrics.sql b/warehouse/models/mart/gtfs/fct_vehicle_positions_stop_metrics.sql new file mode 100644 index 0000000000..625c3f997a --- /dev/null +++ b/warehouse/models/mart/gtfs/fct_vehicle_positions_stop_metrics.sql @@ -0,0 +1,60 @@ +{{ + config( + materialized='incremental', + incremental_strategy='insert_overwrite', + partition_by={ + 'field': 'service_date', + 'data_type': 'date', + 'granularity': 'day' + }, + cluster_by=['vp_base64_url', 'schedule_feed_key'], + ) +}} + +WITH stops_served_vp AS ( + SELECT * + FROM {{ ref('int_gtfs_rt_vs_sched__stops_served_vehicle_positions') }} +), + +stops_grouped AS ( + SELECT + *, + CASE + WHEN meters_to_vp <= 10 THEN TRUE + ELSE FALSE + END AS is_near_10m, + CASE WHEN meters_to_vp <= 25 THEN TRUE + ELSE FALSE + END AS is_near_25m, + FROM stops_served_vp +), + +trip_counts_by_group AS ( + SELECT + vp_base64_url, + schedule_feed_key, + schedule_gtfs_dataset_key, + service_date, + stop_id, + + COUNTIF(is_near_10m) AS near_10m, + COUNTIF(is_near_25m) AS near_25m, + COUNT(*) AS n_vp_trips, + + FROM stops_grouped + GROUP BY GROUPING SETS( + vp_base64_url, schedule_feed_key, schedule_gtfs_dataset_key, + service_date, stop_id, + is_near_25m, is_near_10m) + -- grouping set will return the null placeholder + -- which captures the groups were near_stop_* = false for every stop +), + +vp_stop_metrics AS ( + SELECT + * + FROM trip_counts_by_group + WHERE stop_id IS NOT NULL +) + +SELECT * FROM vp_stop_metrics diff --git a/warehouse/models/mart/gtfs/test_vehicle_locations.sql b/warehouse/models/mart/gtfs/test_vehicle_locations.sql new file mode 100644 index 0000000000..1c18bb77c3 --- /dev/null +++ b/warehouse/models/mart/gtfs/test_vehicle_locations.sql @@ -0,0 +1,23 @@ +{{ + config( + materialized='incremental', + incremental_strategy='insert_overwrite', + partition_by = { + 'field': 'dt', + 'data_type': 'date', + 'granularity': 'day', + }, + cluster_by=['dt', 'base64_url', 'location'], + on_schema_change='append_new_columns' + ) +}} + +WITH fct_vehicle_locations AS ( + SELECT * + --FROM {{ ref('fct_vehicle_locations') }} + FROM `cal-itp-data-infra.mart_gtfs.fct_vehicle_locations` + WHERE service_date >= '2025-06-22' AND service_date <= '2025-06-28' +) + + +SELECT * FROM fct_vehicle_locations diff --git a/warehouse/models/mart/gtfs/test_vehicle_locations_path.sql b/warehouse/models/mart/gtfs/test_vehicle_locations_path.sql new file mode 100644 index 0000000000..7ec790b93d --- /dev/null +++ b/warehouse/models/mart/gtfs/test_vehicle_locations_path.sql @@ -0,0 +1,25 @@ +{{ + config( + materialized='incremental', + incremental_strategy='insert_overwrite', + partition_by = { + 'field': 'service_date', + 'data_type': 'date', + 'granularity': 'day', + }, + cluster_by=['service_date', 'base64_url', 'pt_array'], + on_schema_change='append_new_columns' + ) +}} + +WITH fct_vehicle_locations_path AS ( + SELECT + * EXCEPT(pt_array), + ST_MAKELINE(pt_array) AS pt_array, + --FROM {{ ref('fct_vehicle_locations_path') }} + FROM `cal-itp-data-infra.mart_gtfs.fct_vehicle_locations_path` + WHERE service_date >= '2025-06-22' AND service_date <= '2025-06-28' +) + + +SELECT * FROM fct_vehicle_locations_path