Skip to content

Commit d8e235d

Browse files
authored
Merge branch 'main' into feature/remove-large-in-clause-in-assets
2 parents ebc08dd + 805473d commit d8e235d

File tree

14 files changed

+270
-14
lines changed

14 files changed

+270
-14
lines changed

airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ class DagRun(StrictBaseModel):
296296
data_interval_start: UtcDateTime | None
297297
data_interval_end: UtcDateTime | None
298298
run_after: UtcDateTime
299-
start_date: UtcDateTime
299+
start_date: UtcDateTime | None
300300
end_date: UtcDateTime | None
301301
clear_number: int = 0
302302
run_type: DagRunType

airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,20 @@
3535
)
3636
from airflow.api_fastapi.execution_api.versions.v2026_03_31 import (
3737
AddNoteField,
38+
MakeDagRunStartDateNullable,
3839
ModifyDeferredTaskKwargsToJsonValue,
3940
RemoveUpstreamMapIndexesField,
4041
)
4142

4243
bundle = VersionBundle(
4344
HeadVersion(),
44-
Version("2026-03-31", ModifyDeferredTaskKwargsToJsonValue, RemoveUpstreamMapIndexesField, AddNoteField),
45+
Version(
46+
"2026-03-31",
47+
MakeDagRunStartDateNullable,
48+
ModifyDeferredTaskKwargsToJsonValue,
49+
RemoveUpstreamMapIndexesField,
50+
AddNoteField,
51+
),
4552
Version("2025-12-08", MovePreviousRunEndpoint, AddDagRunDetailEndpoint),
4653
Version("2025-11-07", AddPartitionKeyField),
4754
Version("2025-11-05", AddTriggeringUserNameField),

airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, schema
2323

24+
from airflow.api_fastapi.common.types import UtcDateTime
2425
from airflow.api_fastapi.execution_api.datamodels.taskinstance import (
2526
DagRun,
2627
TIDeferredStatePayload,
@@ -68,3 +69,29 @@ def remove_note_field(response: ResponseInfo) -> None: # type: ignore[misc]
6869
"""Remove note field for older API versions."""
6970
if "dag_run" in response.body and isinstance(response.body["dag_run"], dict):
7071
response.body["dag_run"].pop("note", None)
72+
73+
74+
class MakeDagRunStartDateNullable(VersionChange):
75+
"""Make DagRun.start_date field nullable for runs that haven't started yet."""
76+
77+
description = __doc__
78+
79+
instructions_to_migrate_to_previous_version = (schema(DagRun).field("start_date").had(type=UtcDateTime),)
80+
81+
@convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type]
82+
def ensure_start_date_in_ti_run_context(response: ResponseInfo) -> None: # type: ignore[misc]
83+
"""
84+
Ensure start_date is never None in DagRun for previous API versions.
85+
86+
Older Task SDK clients expect start_date to be non-nullable. When the
87+
DagRun hasn't started yet (e.g. queued), fall back to run_after.
88+
"""
89+
dag_run = response.body.get("dag_run")
90+
if isinstance(dag_run, dict) and dag_run.get("start_date") is None:
91+
dag_run["start_date"] = dag_run.get("run_after")
92+
93+
@convert_response_to_previous_version_for(DagRun) # type: ignore[arg-type]
94+
def ensure_start_date_in_dag_run(response: ResponseInfo) -> None: # type: ignore[misc]
95+
"""Ensure start_date is never None in direct DagRun responses for previous API versions."""
96+
if response.body.get("start_date") is None:
97+
response.body["start_date"] = response.body.get("run_after")

airflow-core/src/airflow/models/connection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ def _normalize_conn_type(conn_type):
226226
def _parse_from_uri(self, uri: str):
227227
schemes_count_in_uri = uri.count("://")
228228
if schemes_count_in_uri > 2:
229-
raise AirflowException(f"Invalid connection string: {uri}.")
229+
raise AirflowException("Invalid connection string.")
230230
host_with_protocol = schemes_count_in_uri == 2
231231
uri_parts = urlsplit(uri)
232232
conn_type = uri_parts.scheme
@@ -235,7 +235,7 @@ def _parse_from_uri(self, uri: str):
235235
if host_with_protocol:
236236
uri_splits = rest_of_the_url.split("://", 1)
237237
if "@" in uri_splits[0] or ":" in uri_splits[0]:
238-
raise AirflowException(f"Invalid connection string: {uri}.")
238+
raise AirflowException("Invalid connection string.")
239239
uri_parts = urlsplit(rest_of_the_url)
240240
protocol = uri_parts.scheme if host_with_protocol else None
241241
host = _parse_netloc_to_hostname(uri_parts)
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
import pytest
21+
22+
from airflow._shared.timezones import timezone
23+
from airflow.utils.state import DagRunState, State
24+
25+
from tests_common.test_utils.db import clear_db_runs
26+
27+
pytestmark = pytest.mark.db_test
28+
29+
TIMESTAMP_STR = "2024-09-30T12:00:00Z"
30+
TIMESTAMP = timezone.parse(TIMESTAMP_STR)
31+
32+
RUN_PATCH_BODY = {
33+
"state": "running",
34+
"hostname": "test-hostname",
35+
"unixname": "test-user",
36+
"pid": 12345,
37+
"start_date": TIMESTAMP_STR,
38+
}
39+
40+
41+
@pytest.fixture
42+
def old_ver_client(client):
43+
"""Client configured to use API version before start_date nullable change."""
44+
client.headers["Airflow-API-Version"] = "2025-12-08"
45+
return client
46+
47+
48+
class TestDagRunStartDateNullableBackwardCompat:
49+
"""Test that older API versions get a non-null start_date fallback."""
50+
51+
@pytest.fixture(autouse=True)
52+
def _freeze_time(self, time_machine):
53+
time_machine.move_to(TIMESTAMP_STR, tick=False)
54+
55+
def setup_method(self):
56+
clear_db_runs()
57+
58+
def teardown_method(self):
59+
clear_db_runs()
60+
61+
def test_old_version_gets_run_after_when_start_date_is_null(
62+
self,
63+
old_ver_client,
64+
session,
65+
create_task_instance,
66+
):
67+
ti = create_task_instance(
68+
task_id="test_start_date_nullable",
69+
state=State.QUEUED,
70+
dagrun_state=DagRunState.QUEUED,
71+
session=session,
72+
start_date=TIMESTAMP,
73+
)
74+
ti.dag_run.start_date = None # DagRun has not started yet
75+
session.commit()
76+
77+
response = old_ver_client.patch(f"/execution/task-instances/{ti.id}/run", json=RUN_PATCH_BODY)
78+
dag_run = response.json()["dag_run"]
79+
80+
assert response.status_code == 200
81+
assert dag_run["start_date"] is not None
82+
assert dag_run["start_date"] == dag_run["run_after"]
83+
84+
def test_head_version_allows_null_start_date(
85+
self,
86+
client,
87+
session,
88+
create_task_instance,
89+
):
90+
ti = create_task_instance(
91+
task_id="test_start_date_null_head",
92+
state=State.QUEUED,
93+
dagrun_state=DagRunState.QUEUED,
94+
session=session,
95+
start_date=TIMESTAMP,
96+
)
97+
ti.dag_run.start_date = None # DagRun has not started yet
98+
session.commit()
99+
100+
response = client.patch(f"/execution/task-instances/{ti.id}/run", json=RUN_PATCH_BODY)
101+
dag_run = response.json()["dag_run"]
102+
103+
assert response.status_code == 200
104+
assert dag_run["start_date"] is None
105+
106+
def test_old_version_preserves_real_start_date(
107+
self,
108+
old_ver_client,
109+
session,
110+
create_task_instance,
111+
):
112+
ti = create_task_instance(
113+
task_id="test_start_date_preserved",
114+
state=State.QUEUED,
115+
dagrun_state=DagRunState.RUNNING,
116+
session=session,
117+
start_date=TIMESTAMP,
118+
)
119+
assert ti.dag_run.start_date == TIMESTAMP # DagRun has already started
120+
session.commit()
121+
122+
response = old_ver_client.patch(f"/execution/task-instances/{ti.id}/run", json=RUN_PATCH_BODY)
123+
dag_run = response.json()["dag_run"]
124+
125+
assert response.status_code == 200
126+
assert dag_run["start_date"] is not None, "start_date should not be None when DagRun has started"
127+
assert dag_run["start_date"] == TIMESTAMP.isoformat().replace("+00:00", "Z")

airflow-core/tests/unit/models/test_connection.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ def clear_fernet_cache(self):
138138
{"param": "value"},
139139
None,
140140
),
141-
(
141+
( # Test password sanitisation
142142
"type://user:pass@protocol://host:port?param=value",
143143
None,
144144
None,
@@ -147,7 +147,29 @@ def clear_fernet_cache(self):
147147
None,
148148
None,
149149
None,
150-
r"Invalid connection string: type://user:pass@protocol://host:port?param=value.",
150+
r"Invalid connection string.",
151+
),
152+
(
153+
"foo:pwd@host://://",
154+
None,
155+
None,
156+
None,
157+
None,
158+
None,
159+
None,
160+
None,
161+
r"Invalid connection string.",
162+
),
163+
(
164+
"type://:foo@host/://://",
165+
None,
166+
None,
167+
None,
168+
None,
169+
None,
170+
None,
171+
None,
172+
r"Invalid connection string.",
151173
),
152174
(
153175
"type://host?int_param=123&bool_param=true&float_param=1.5&str_param=some_str",

providers/amazon/tests/system/amazon/aws/example_emr_eks.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import json
2020
import subprocess
21+
import time
2122
from datetime import datetime
2223

2324
import boto3
@@ -57,13 +58,15 @@
5758
JOB_ROLE_ARN_KEY = "JOB_ROLE_ARN"
5859
JOB_ROLE_NAME_KEY = "JOB_ROLE_NAME"
5960
SUBNETS_KEY = "SUBNETS"
61+
UPDATE_TRUST_POLICY_WAIT_TIME_KEY = "UPDATE_TRUST_POLICY_WAIT_TIME_KEY"
6062

6163
sys_test_context_task = (
6264
SystemTestContextBuilder()
6365
.add_variable(ROLE_ARN_KEY)
6466
.add_variable(JOB_ROLE_ARN_KEY)
6567
.add_variable(JOB_ROLE_NAME_KEY)
6668
.add_variable(SUBNETS_KEY, split_string=True)
69+
.add_variable(UPDATE_TRUST_POLICY_WAIT_TIME_KEY, optional=True, default_value=10)
6770
.build()
6871
)
6972

@@ -137,7 +140,7 @@ def delete_iam_oidc_identity_provider(cluster_name):
137140

138141

139142
@task
140-
def update_trust_policy_execution_role(cluster_name, cluster_namespace, role_name):
143+
def update_trust_policy_execution_role(cluster_name, cluster_namespace, role_name, wait_time):
141144
# Remove any already existing trusted entities added with "update-role-trust-policy"
142145
# Prevent getting an error "Cannot exceed quota for ACLSizePerRole"
143146
client = boto3.client("iam")
@@ -173,6 +176,9 @@ def update_trust_policy_execution_role(cluster_name, cluster_namespace, role_nam
173176
if build.returncode != 0:
174177
raise RuntimeError(err)
175178

179+
# Wait for IAM changes to propagate to avoid authentication failures
180+
time.sleep(wait_time)
181+
176182

177183
@task(trigger_rule=TriggerRule.ALL_DONE)
178184
def delete_virtual_cluster(virtual_cluster_id):
@@ -193,6 +199,7 @@ def delete_virtual_cluster(virtual_cluster_id):
193199
subnets = test_context[SUBNETS_KEY]
194200
job_role_arn = test_context[JOB_ROLE_ARN_KEY]
195201
job_role_name = test_context[JOB_ROLE_NAME_KEY]
202+
update_trust_policy_wait_time = test_context[UPDATE_TRUST_POLICY_WAIT_TIME_KEY]
196203

197204
s3_bucket_name = f"{env_id}-bucket"
198205
eks_cluster_name = f"{env_id}-cluster"
@@ -316,7 +323,9 @@ def delete_virtual_cluster(virtual_cluster_id):
316323
create_cluster_and_nodegroup,
317324
await_create_nodegroup,
318325
run_eksctl_commands(eks_cluster_name, eks_namespace),
319-
update_trust_policy_execution_role(eks_cluster_name, eks_namespace, job_role_name),
326+
update_trust_policy_execution_role(
327+
eks_cluster_name, eks_namespace, job_role_name, update_trust_policy_wait_time
328+
),
320329
# TEST BODY
321330
create_emr_eks_cluster,
322331
job_starter,

providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -877,8 +877,8 @@ def get_conn(self, schema: str | None = None) -> Any:
877877
auth_mechanism = db.extra_dejson.get("auth_mechanism", "KERBEROS")
878878
kerberos_service_name = db.extra_dejson.get("kerberos_service_name", "hive")
879879

880-
# Password should be set if and only if in LDAP or CUSTOM mode
881-
if auth_mechanism in ("LDAP", "CUSTOM"):
880+
# Password should be set if in LDAP, CUSTOM or PLAIN mode
881+
if auth_mechanism in ("LDAP", "CUSTOM", "PLAIN"):
882882
password = db.password
883883

884884
from pyhive.hive import connect

providers/apache/hive/tests/unit/apache/hive/hooks/test_hive.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,26 @@ def test_get_conn_with_password(self, mock_connect):
677677
database="default",
678678
)
679679

680+
@mock.patch("pyhive.hive.connect")
681+
def test_get_conn_with_password_plain(self, mock_connect):
682+
conn_id = "conn_plain_with_password"
683+
conn_env = CONN_ENV_PREFIX + conn_id.upper()
684+
685+
with mock.patch.dict(
686+
"os.environ",
687+
{conn_env: "jdbc+hive2://login:password@localhost:10000/default?auth_mechanism=PLAIN"},
688+
):
689+
HiveServer2Hook(hiveserver2_conn_id=conn_id).get_conn()
690+
mock_connect.assert_called_once_with(
691+
host="localhost",
692+
port=10000,
693+
auth="PLAIN",
694+
kerberos_service_name=None,
695+
username="login",
696+
password="password",
697+
database="default",
698+
)
699+
680700
@pytest.mark.parametrize(
681701
("host", "port", "schema", "message"),
682702
[

0 commit comments

Comments
 (0)