Skip to content

Commit a4c6ea1

Browse files
authored
fix(web-analytics): handle multi-set partition correctly (#35231)
1 parent 09a1d35 commit a4c6ea1

File tree

3 files changed

+213
-9
lines changed

3 files changed

+213
-9
lines changed
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import pytest
2+
from datetime import datetime, UTC
3+
from unittest.mock import Mock, patch, call
4+
5+
import dagster
6+
from dagster import TimeWindow
7+
8+
from dags.web_preaggregated_daily import (
9+
pre_aggregate_web_analytics_data,
10+
)
11+
from posthog.models.web_preaggregated.sql import DROP_PARTITION_SQL
12+
13+
14+
class TestPartitionHandling:
15+
def setup_method(self):
16+
self.mock_context = Mock(spec=dagster.AssetExecutionContext)
17+
self.mock_context.log = Mock()
18+
self.mock_context.op_config = {"team_ids": [1, 2], "extra_clickhouse_settings": ""}
19+
20+
@pytest.mark.parametrize(
21+
"start_date_str,end_date_str,expected_partitions",
22+
[
23+
# Single day partition
24+
("2024-01-01", "2024-01-02", ["2024-01-01"]),
25+
# Two day partition
26+
("2024-01-01", "2024-01-03", ["2024-01-01", "2024-01-02"]),
27+
# Week-long partition
28+
(
29+
"2024-01-01",
30+
"2024-01-08",
31+
["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04", "2024-01-05", "2024-01-06", "2024-01-07"],
32+
),
33+
# Month-long partition (testing edge case)
34+
("2024-01-01", "2024-02-01", [f"2024-01-{day:02d}" for day in range(1, 32)]),
35+
],
36+
)
37+
@patch("dags.web_preaggregated_daily.sync_execute")
38+
def test_partition_dropping_for_different_time_windows(
39+
self, mock_sync_execute, start_date_str, end_date_str, expected_partitions
40+
):
41+
start_datetime = datetime.fromisoformat(start_date_str).replace(tzinfo=UTC)
42+
end_datetime = datetime.fromisoformat(end_date_str).replace(tzinfo=UTC)
43+
self.mock_context.partition_time_window = TimeWindow(start_datetime, end_datetime)
44+
45+
mock_sql_generator = Mock(return_value="INSERT INTO test_table ...")
46+
pre_aggregate_web_analytics_data(
47+
context=self.mock_context,
48+
table_name="web_stats_daily",
49+
sql_generator=mock_sql_generator,
50+
)
51+
52+
actual_drop_calls = [
53+
call_args for call_args in mock_sync_execute.call_args_list if "DROP PARTITION" in str(call_args)
54+
]
55+
assert len(actual_drop_calls) == len(expected_partitions)
56+
57+
@patch("dags.web_preaggregated_daily.sync_execute")
58+
def test_partition_drop_error_handling(self, mock_sync_execute):
59+
start_datetime = datetime(2024, 1, 1, tzinfo=UTC)
60+
end_datetime = datetime(2024, 1, 3, tzinfo=UTC)
61+
self.mock_context.partition_time_window = TimeWindow(start_datetime, end_datetime)
62+
63+
def side_effect(sql):
64+
if "DROP PARTITION" in sql:
65+
raise Exception("Partition doesn't exist")
66+
return None
67+
68+
mock_sync_execute.side_effect = side_effect
69+
mock_sql_generator = Mock(return_value="INSERT INTO test_table ...")
70+
71+
pre_aggregate_web_analytics_data(
72+
context=self.mock_context,
73+
table_name="web_stats_daily",
74+
sql_generator=mock_sql_generator,
75+
)
76+
77+
assert self.mock_context.log.info.call_count >= 4
78+
insert_calls = [call_args for call_args in mock_sync_execute.call_args_list if "INSERT INTO" in str(call_args)]
79+
assert len(insert_calls) == 1
80+
81+
@patch("dags.web_preaggregated_daily.sync_execute")
82+
def test_granularity_parameter_usage(self, mock_sync_execute):
83+
start_datetime = datetime(2024, 1, 1, tzinfo=UTC)
84+
end_datetime = datetime(2024, 1, 2, tzinfo=UTC)
85+
self.mock_context.partition_time_window = TimeWindow(start_datetime, end_datetime)
86+
87+
mock_sql_generator = Mock(return_value="INSERT INTO test_table ...")
88+
pre_aggregate_web_analytics_data(
89+
context=self.mock_context,
90+
table_name="web_stats_daily",
91+
sql_generator=mock_sql_generator,
92+
)
93+
94+
expected_sql = DROP_PARTITION_SQL("web_stats_daily", "2024-01-01", granularity="daily")
95+
assert call(expected_sql) in mock_sync_execute.call_args_list
96+
assert "'20240101'" in expected_sql
97+
98+
def test_missing_partition_time_window_raises_error(self):
99+
self.mock_context.partition_time_window = None
100+
mock_sql_generator = Mock()
101+
102+
with pytest.raises(dagster.Failure, match="This asset should only be run with a partition_time_window"):
103+
pre_aggregate_web_analytics_data(
104+
context=self.mock_context,
105+
table_name="web_stats_daily",
106+
sql_generator=mock_sql_generator,
107+
)
108+
109+
@patch("dags.web_preaggregated_daily.sync_execute")
110+
def test_insert_query_failure_raises_dagster_failure(self, mock_sync_execute):
111+
start_datetime = datetime(2024, 1, 1, tzinfo=UTC)
112+
end_datetime = datetime(2024, 1, 2, tzinfo=UTC)
113+
self.mock_context.partition_time_window = TimeWindow(start_datetime, end_datetime)
114+
115+
def side_effect(sql):
116+
if "INSERT INTO" in sql:
117+
raise Exception("Insert failed")
118+
return None
119+
120+
mock_sync_execute.side_effect = side_effect
121+
mock_sql_generator = Mock(return_value="INSERT INTO test_table ...")
122+
123+
with pytest.raises(dagster.Failure, match="Failed to pre-aggregate web_stats_daily"):
124+
pre_aggregate_web_analytics_data(
125+
context=self.mock_context,
126+
table_name="web_stats_daily",
127+
sql_generator=mock_sql_generator,
128+
)
129+
130+
@patch("dags.web_preaggregated_daily.sync_execute")
131+
def test_same_start_and_end_date_drops_partition(self, mock_sync_execute):
132+
start_datetime = datetime(2024, 1, 1, tzinfo=UTC)
133+
end_datetime = datetime(2024, 1, 1, tzinfo=UTC)
134+
self.mock_context.partition_time_window = TimeWindow(start_datetime, end_datetime)
135+
136+
mock_sql_generator = Mock(return_value="INSERT INTO test_table ...")
137+
pre_aggregate_web_analytics_data(
138+
context=self.mock_context,
139+
table_name="web_stats_daily",
140+
sql_generator=mock_sql_generator,
141+
)
142+
143+
drop_calls = [call_args for call_args in mock_sync_execute.call_args_list if "DROP PARTITION" in str(call_args)]
144+
assert len(drop_calls) == 1
145+
146+
insert_calls = [call_args for call_args in mock_sync_execute.call_args_list if "INSERT INTO" in str(call_args)]
147+
assert len(insert_calls) == 1

dags/web_preaggregated_daily.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,17 +71,27 @@ def pre_aggregate_web_analytics_data(
7171
date_end = end_datetime.strftime("%Y-%m-%d")
7272

7373
try:
74-
# Drop the partition first, ensuring a clean state before insertion
74+
# Drop all partitions in the time window, ensuring a clean state before insertion
7575
# Note: No ON CLUSTER needed since tables are replicated (not sharded) and replication handles distribution
76-
drop_partition_query = DROP_PARTITION_SQL(table_name, date_start)
77-
context.log.info(f"Dropping partition for {date_start}: {drop_partition_query}")
76+
current_date = start_datetime.date()
77+
end_date = end_datetime.date()
7878

79-
try:
80-
sync_execute(drop_partition_query)
81-
context.log.info(f"Successfully dropped partition for {date_start}")
82-
except Exception as drop_error:
83-
# Partition might not exist when running for the first time or when running in a empty backfill, which is fine
84-
context.log.info(f"Partition for {date_start} doesn't exist or couldn't be dropped: {drop_error}")
79+
# For time windows: start is inclusive, end is exclusive (except for single-day partitions)
80+
while current_date < end_date or (current_date == start_datetime.date() == end_date):
81+
partition_date_str = current_date.strftime("%Y-%m-%d")
82+
drop_partition_query = DROP_PARTITION_SQL(table_name, partition_date_str, granularity="daily")
83+
context.log.info(f"Dropping partition for {partition_date_str}: {drop_partition_query}")
84+
85+
try:
86+
sync_execute(drop_partition_query)
87+
context.log.info(f"Successfully dropped partition for {partition_date_str}")
88+
except Exception as drop_error:
89+
# Partition might not exist when running for the first time or when running in a empty backfill, which is fine
90+
context.log.info(
91+
f"Partition for {partition_date_str} doesn't exist or couldn't be dropped: {drop_error}"
92+
)
93+
94+
current_date += timedelta(days=1)
8595

8696
insert_query = sql_generator(
8797
date_start=date_start,

posthog/models/web_preaggregated/test_web_preaggregated_sql.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,3 +145,50 @@ def test_hourly_vs_daily_partition_difference(self):
145145
assert "'20240115'" in daily_sql
146146
# Hourly should be YYYYMMDDHH format (defaulting to 00)
147147
assert "'2024011500'" in hourly_sql
148+
149+
def test_multi_day_partition_scenario_daily(self):
150+
dates = ["2024-01-01", "2024-01-02", "2024-01-03"]
151+
152+
for date in dates:
153+
sql = DROP_PARTITION_SQL("web_stats_daily", date, granularity="daily")
154+
expected_partition = date.replace("-", "")
155+
assert f"'{expected_partition}'" in sql
156+
assert "ALTER TABLE web_stats_daily" in sql
157+
assert "DROP PARTITION" in sql
158+
159+
def test_month_boundary_partitions(self):
160+
test_cases = [
161+
("2024-01-31", "daily", "20240131"),
162+
("2024-02-01", "daily", "20240201"),
163+
("2024-02-29", "daily", "20240229"), # Leap year
164+
("2024-03-01", "daily", "20240301"),
165+
]
166+
167+
for date, granularity, expected_partition in test_cases:
168+
sql = DROP_PARTITION_SQL("web_stats_daily", date, granularity=granularity)
169+
assert f"'{expected_partition}'" in sql
170+
171+
def test_year_boundary_partitions(self):
172+
test_cases = [
173+
("2023-12-31", "daily", "20231231"),
174+
("2024-01-01", "daily", "20240101"),
175+
]
176+
177+
for date, granularity, expected_partition in test_cases:
178+
sql = DROP_PARTITION_SQL("web_stats_daily", date, granularity=granularity)
179+
assert f"'{expected_partition}'" in sql
180+
181+
def test_granularity_parameter_is_case_sensitive_and_invalid_defaults_to_daily(self):
182+
date = "2024-01-15"
183+
184+
# Test valid granularities
185+
daily_sql = DROP_PARTITION_SQL("test_table", date, granularity="daily")
186+
hourly_sql = DROP_PARTITION_SQL("test_table", date, granularity="hourly")
187+
188+
assert "'20240115'" in daily_sql
189+
assert "'2024011500'" in hourly_sql
190+
191+
# Test that invalid granularity defaults to daily behavior
192+
# (This tests the else clause in the function)
193+
invalid_sql = DROP_PARTITION_SQL("test_table", date, granularity="invalid")
194+
assert "'20240115'" in invalid_sql

0 commit comments

Comments
 (0)