Skip to content

POC of a symlink-based code sharing approach. #53417

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions airflow-core/src/airflow/_shared/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
1 change: 1 addition & 0 deletions airflow-core/src/airflow/_shared/timezone.py
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import json
from typing import TYPE_CHECKING

from airflow._shared import timezone
from airflow.exceptions import DagNotFound, DagRunAlreadyExists
from airflow.models import DagBag, DagModel, DagRun
from airflow.utils import timezone
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/api_fastapi/auth/tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import load_pem_private_key

from airflow.utils import timezone
from airflow._shared import timezone

if TYPE_CHECKING:
from jwt.algorithms import AllowedKeys, AllowedPrivateKeys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from sqlalchemy import Column, and_, case, func, not_, or_, select
from sqlalchemy.inspection import inspect

from airflow._shared import timezone
from airflow.api_fastapi.core_api.base import OrmClause
from airflow.api_fastapi.core_api.security import GetUserDep
from airflow.models import Base
Expand All @@ -56,7 +57,6 @@
from airflow.models.taskinstance import TaskInstance
from airflow.models.variable import Variable
from airflow.typing_compat import Self
from airflow.utils import timezone
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType

Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/api_fastapi/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
ConfigDict,
)

from airflow.utils import timezone
from airflow._shared import timezone

UtcDateTime = Annotated[AwareDatetime, AfterValidator(lambda d: d.astimezone(timezone.utc))]
"""UTCDateTime is a datetime with timezone information"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@

from pydantic import AliasPath, AwareDatetime, Field, NonNegativeInt, model_validator

from airflow._shared import timezone
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
from airflow.api_fastapi.core_api.datamodels.dag_versions import DagVersionResponse
from airflow.models import DagRun
from airflow.timetables.base import DataInterval
from airflow.utils import timezone
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

from pydantic import computed_field

from airflow._shared import timezone
from airflow.api_fastapi.core_api.base import BaseModel
from airflow.utils import timezone
from airflow.utils.state import TaskInstanceState
from airflow.utils.types import DagRunType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from sqlalchemy import and_, delete, func, select
from sqlalchemy.orm import joinedload, subqueryload

from airflow._shared import timezone
from airflow.api_fastapi.common.dagbag import DagBagDep
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
from airflow.api_fastapi.common.parameters import (
Expand Down Expand Up @@ -72,7 +73,6 @@
TaskOutletAssetReference,
)
from airflow.models.dag import DAG
from airflow.utils import timezone
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from sqlalchemy import select, update
from sqlalchemy.orm import joinedload

from airflow._shared import timezone
from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity
from airflow.api_fastapi.common.db.common import (
SessionDep,
Expand Down Expand Up @@ -56,7 +57,6 @@
_create_backfill,
_do_dry_run,
)
from airflow.utils import timezone
from airflow.utils.state import DagRunState

backfills_router = AirflowRouter(tags=["Backfill"], prefix="/backfills")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from sqlalchemy import select
from sqlalchemy.orm import joinedload

from airflow._shared import timezone
from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
from airflow.api_fastapi.common.router import AirflowRouter
Expand All @@ -34,7 +35,6 @@
from airflow.api_fastapi.core_api.security import GetUserDep, ReadableTIFilterDep, requires_access_dag
from airflow.models.hitl import HITLDetail as HITLDetailModel
from airflow.models.taskinstance import TaskInstance as TI
from airflow.utils import timezone

hitl_router = AirflowRouter(tags=["HumanInTheLoop"], prefix="/hitl-details")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from sqlalchemy import func, select
from sqlalchemy.sql.expression import case, false

from airflow._shared import timezone
from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.parameters import DateTimeQuery, OptionalDateTimeQuery
Expand All @@ -33,7 +34,6 @@
from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun, DagRunType
from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
from airflow.utils.state import DagRunState, TaskInstanceState

dashboard_router = AirflowRouter(tags=["Dashboard"], prefix="/dashboard")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from sqlalchemy.engine import Row
from sqlalchemy.orm import Session

from airflow._shared import timezone
from airflow.api_fastapi.common.parameters import RangeFilter
from airflow.api_fastapi.core_api.datamodels.ui.calendar import (
CalendarTimeRangeCollectionResponse,
Expand All @@ -38,7 +39,6 @@
from airflow.timetables._cron import CronMixin
from airflow.timetables.base import DataInterval, TimeRestriction
from airflow.timetables.simple import ContinuousTimetable
from airflow.utils import timezone

log = structlog.get_logger(logger_name=__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from sqlalchemy.sql import select
from structlog.contextvars import bind_contextvars

from airflow._shared import timezone
from airflow.api_fastapi.common.dagbag import DagBagDep
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.types import UtcDateTime
Expand Down Expand Up @@ -67,7 +68,6 @@
from airflow.sdk.definitions._internal.expandinput import NotFullyPopulated
from airflow.sdk.definitions.asset import Asset, AssetUniqueKey
from airflow.sdk.definitions.taskgroup import MappedTaskGroup
from airflow.utils import timezone
from airflow.utils.state import DagRunState, TaskInstanceState, TerminalTIState

if TYPE_CHECKING:
Expand Down
3 changes: 2 additions & 1 deletion airflow-core/src/airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

from sqlalchemy import func, select

from airflow._shared import timezone
from airflow.api.client import get_current_api_client
from airflow.api_fastapi.core_api.datamodels.dags import DAGResponse
from airflow.cli.simple_table import AirflowConsole
Expand All @@ -41,7 +42,7 @@
from airflow.models import DagBag, DagModel, DagRun, TaskInstance
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils import cli as cli_utils, timezone
from airflow.utils import cli as cli_utils
from airflow.utils.cli import get_dag, suppress_logs_and_warning, validate_dag_bundle_arg
from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
from airflow.utils.helpers import ask_yesno
Expand Down
3 changes: 2 additions & 1 deletion airflow-core/src/airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from typing import TYPE_CHECKING, Protocol, cast

from airflow import settings
from airflow._shared import timezone
from airflow.cli.simple_table import AirflowConsole
from airflow.cli.utils import fetch_dag_run_from_run_id_or_logical_date_string
from airflow.exceptions import AirflowConfigException, DagRunNotFound, TaskInstanceNotFound
Expand All @@ -41,7 +42,7 @@
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
from airflow.utils import cli as cli_utils, timezone
from airflow.utils import cli as cli_utils
from airflow.utils.cli import (
get_dag,
get_dag_by_file_location,
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from uuid6 import uuid7

import airflow.models
from airflow._shared import timezone
from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI
from airflow.configuration import conf
from airflow.dag_processing.bundles.manager import DagBundlesManager
Expand All @@ -65,7 +66,6 @@
from airflow.sdk.log import init_log_file, logging_processors
from airflow.stats import Stats
from airflow.traces.tracer import DebugTrace
from airflow.utils import timezone
from airflow.utils.file import list_py_file_paths, might_contain_dag
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
from sqlalchemy.orm import backref, foreign, relationship
from sqlalchemy.orm.session import make_transient

from airflow._shared import timezone
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.executors.executor_loader import ExecutorLoader
from airflow.listeners.listener import get_listener_manager
from airflow.models.base import ID_LEN, Base
from airflow.stats import Stats
from airflow.traces.tracer import DebugTrace, add_debug_span
from airflow.utils import timezone
from airflow.utils.helpers import convert_camel_to_snake
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from sqlalchemy.sql import expression

from airflow import settings
from airflow._shared import timezone
from airflow.api_fastapi.execution_api.datamodels.taskinstance import TIRunContext
from airflow.callbacks.callback_requests import DagCallbackRequest, TaskCallbackRequest
from airflow.configuration import conf
Expand Down Expand Up @@ -69,7 +70,6 @@
from airflow.timetables.simple import AssetTriggeredTimetable
from airflow.traces import utils as trace_utils
from airflow.traces.tracer import DebugTrace, Trace, add_debug_span
from airflow.utils import timezone
from airflow.utils.dates import datetime_to_nano
from airflow.utils.event_scheduler import EventScheduler
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from sqlalchemy import func, select
from structlog.contextvars import bind_contextvars as bind_log_contextvars

from airflow._shared import timezone
from airflow.configuration import conf
from airflow.executors import workloads
from airflow.jobs.base_job_runner import BaseJobRunner
Expand Down Expand Up @@ -69,7 +70,6 @@
from airflow.stats import Stats
from airflow.traces.tracer import DebugTrace, Trace, add_debug_span
from airflow.triggers import base as events
from airflow.utils import timezone
from airflow.utils.helpers import log_filename_template_renderer
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string
Expand Down
1 change: 1 addition & 0 deletions airflow-core/src/airflow/logging/_shared
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
from alembic import op
from sqlalchemy_utils import UUIDType

from airflow._shared import timezone
from airflow.migrations.db_types import TIMESTAMP, StringID
from airflow.migrations.utils import ignore_sqlite_value_error
from airflow.models.base import naming_convention
from airflow.utils import timezone

# revision identifiers, used by Alembic.
revision = "2b47dc6bc8df"
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
)
from sqlalchemy.orm import relationship

from airflow._shared import timezone
from airflow.models.base import Base, StringID
from airflow.settings import json
from airflow.utils import timezone
from airflow.utils.sqlalchemy import UtcDateTime

if TYPE_CHECKING:
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
from sqlalchemy.orm import relationship, validates
from sqlalchemy_jsonfield import JSONField

from airflow._shared import timezone
from airflow.exceptions import AirflowException, DagNotFound
from airflow.models.base import Base, StringID
from airflow.settings import json
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, with_row_locks
from airflow.utils.state import DagRunState
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from sqlalchemy.sql import Select, expression

from airflow import settings, utils
from airflow._shared import timezone
from airflow.assets.evaluation import AssetEvaluator
from airflow.configuration import conf as airflow_conf
from airflow.exceptions import (
Expand Down Expand Up @@ -94,7 +95,6 @@
NullTimetable,
OnceTimetable,
)
from airflow.utils import timezone
from airflow.utils.context import Context
from airflow.utils.dag_cycle_tester import check_cycle
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/dag_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
from sqlalchemy.orm import joinedload, relationship
from sqlalchemy_utils import UUIDType

from airflow._shared import timezone
from airflow.models.base import Base, StringID
from airflow.utils import timezone
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime, with_row_locks

Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from tabulate import tabulate

from airflow import settings
from airflow._shared import timezone
from airflow.configuration import conf
from airflow.exceptions import (
AirflowClusterPolicyError,
Expand All @@ -52,7 +53,6 @@
from airflow.listeners.listener import get_listener_manager
from airflow.models.base import Base, StringID
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.dag_cycle_tester import check_cycle
from airflow.utils.docs import get_docs_url
from airflow.utils.file import (
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/dagcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
from sqlalchemy.sql.expression import literal
from sqlalchemy_utils import UUIDType

from airflow._shared import timezone
from airflow.configuration import conf
from airflow.exceptions import DagCodeNotFound
from airflow.models.base import ID_LEN, Base
from airflow.utils import timezone
from airflow.utils.file import open_maybe_zipped
from airflow.utils.hashlib_wrapper import md5
from airflow.utils.session import NEW_SESSION, provide_session
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from sqlalchemy.sql.functions import coalesce
from sqlalchemy_utils import UUIDType

from airflow._shared import timezone
from airflow.callbacks.callback_requests import DagCallbackRequest
from airflow.configuration import conf as airflow_conf
from airflow.exceptions import AirflowException, TaskNotFound
Expand All @@ -77,7 +78,6 @@
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_states import SCHEDULEABLE_STATES
from airflow.traces.tracer import EmptySpan, Trace
from airflow.utils import timezone
from airflow.utils.dates import datetime_to_nano
from airflow.utils.helpers import chunks, is_container, prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down
Loading
Loading