Skip to content

Conversation

mjumbewu
Copy link
Contributor

@mjumbewu mjumbewu commented Feb 7, 2025

Description

  • Update Dockerfile and Dockerfile.composer to use apache/airflow:2.10.2-python3.11
  • Create a requirements-composer-2.11.1-airflow-2.10.2.txt file with the appropriate dependencies
  • Create a Python 3.11 environment and install the dependencies
    pip install -r requirements-composer-2.11.1-airflow-2.10.2.txt -r requirements.txt
  • Verify that the docker image builds at all. Useful to do this as a separate step for better visibility into errors:
    docker build . -t calitp-airflow-test
  • Follow other testing instructions in the airflow/README.md file.

As of 2025 Feb 6, the following packages in requirements.txt of the airflow image need to be updated to work with Python 3.11:

  • boto3 -- our Airflow image was pinned to boto3==1.36.15, which requires botocore 1.29.165, requires urllib3<1.27, which is too old for the newer Composer images.
  • platformdirs -- our image was pinned to platformdirs<3,>=2.5, whereas the current Composer image requires 4.3.6. Oddly, the previous Composer image we were using required platformdirs==3.2.0, so I'm not actually sure how that was working without conflict (except that our requirements were installed after the Composer requirements).
  • pydantic -- We had pinned pydantic==1.9 because of typing extension conflicts, which should no longer be an issue, as we are using Python 3.11 in the new Composer image.

Additionally, the following package version requirements have been loosened in calitp-data-infra:

  • pydantic = ">1.9"
  • pendulum = ">2.1.2"
  • google-cloud-secret-manager = ">2.16.4"

Replaced all imports from pydantic to use pydantic.v1 compatibility layer within Pydantic 2+ (found with pydantic(?!\.v1)).

In order to test the updated calitp-data-infra package, I had to copy the packages folder into the airflow folder, add COPY ./packages/calitp-data-infra/ /tmp/calitp-data-infra/ to the airflow Dockerfile, and then use calitp-data-infra @ file:///tmp/calitp-data-infra in the requirements file.

As a follow-on, we should consider upgrading our use of Pydantic to the latest version. The bump-pydantic tool should help with that.

Resolves #3767

Type of change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation

How has this been tested?

Include commands/logs/screenshots as relevant.

If making changes to dbt models, please run the command poetry run dbt run -s CHANGED_MODEL and poetry run dbt test -s CHANGED_MODEL, then include the output in this section of the PR.

Post-merge follow-ups

Document any actions that must be taken post-merge to deploy or otherwise implement the changes in this PR (for example, running a full refresh of some incremental model in dbt). If these actions will take more than a few hours after the merge or if they will be completed by someone other than the PR author, please create a dedicated follow-up issue and link it here to track resolution.

  • No action required
  • Actions required (specified below)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend moving these changes to a separate PR so it gets created and uploaded first, and you can use the package in the rest of the PR.

@mjumbewu mjumbewu force-pushed the cloud-composer-python311-upgrade branch from 474eb07 to 4c5fbd1 Compare March 3, 2025 21:23
@HaroldBooker
Copy link

@mjumbewu can you take a look at this one?

@ohrite ohrite changed the title Cloud composer python311 upgrade Pydantic 2.0 Jul 9, 2025
@ohrite ohrite changed the title Pydantic 2.0 Upgrade to pydantic 2.0 Jul 9, 2025
@ohrite ohrite force-pushed the cloud-composer-python311-upgrade branch from 4c5fbd1 to c69a177 Compare July 15, 2025 18:01
Copy link

github-actions bot commented Jul 15, 2025

Terraform plan in iac/cal-itp-data-infra/airflow/us

Plan: 0 to add, 7 to change, 0 to destroy.
Terraform used the selected providers to generate the following execution
plan. Resource actions are indicated with the following symbols:
!~  update in-place

Terraform will perform the following actions:

  # google_storage_bucket_object.calitp-composer["dags/airtable_loader_v2/generate_gtfs_download_configs.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-composer" {
!~      crc32c              = "LLSRKg==" -> (known after apply)
!~      detect_md5hash      = "WEFwfVbJm4J6LdTF0abD2A==" -> "different hash"
!~      generation          = 1751416672748889 -> (known after apply)
        id                  = "calitp-composer-dags/airtable_loader_v2/generate_gtfs_download_configs.py"
!~      md5hash             = "WEFwfVbJm4J6LdTF0abD2A==" -> (known after apply)
        name                = "dags/airtable_loader_v2/generate_gtfs_download_configs.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-composer["dags/download_gtfs_schedule_v2/download_schedule_feeds.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-composer" {
!~      crc32c              = "3CFrIg==" -> (known after apply)
!~      detect_md5hash      = "tuDGKx58gvxzc6Anuo4Sxg==" -> "different hash"
!~      generation          = 1751416672559844 -> (known after apply)
        id                  = "calitp-composer-dags/download_gtfs_schedule_v2/download_schedule_feeds.py"
!~      md5hash             = "tuDGKx58gvxzc6Anuo4Sxg==" -> (known after apply)
        name                = "dags/download_gtfs_schedule_v2/download_schedule_feeds.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-composer["dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-composer" {
!~      crc32c              = "aeKgAA==" -> (known after apply)
!~      detect_md5hash      = "vFfVIvQMq+ESrqU6gS4ftQ==" -> "different hash"
!~      generation          = 1751416670670207 -> (known after apply)
        id                  = "calitp-composer-dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py"
!~      md5hash             = "vFfVIvQMq+ESrqU6gS4ftQ==" -> (known after apply)
        name                = "dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-composer["plugins/operators/littlepay_raw_sync.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-composer" {
!~      crc32c              = "nw1M5g==" -> (known after apply)
!~      detect_md5hash      = "PA6EtdCRGpIH3sdNi7QiSw==" -> "different hash"
!~      generation          = 1751416671641733 -> (known after apply)
        id                  = "calitp-composer-plugins/operators/littlepay_raw_sync.py"
!~      md5hash             = "PA6EtdCRGpIH3sdNi7QiSw==" -> (known after apply)
        name                = "plugins/operators/littlepay_raw_sync.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-composer["plugins/operators/scrape_ntd_xlsx.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-composer" {
!~      crc32c              = "3lHrkQ==" -> (known after apply)
!~      detect_md5hash      = "n5buwLrUiAM5+k8Tp+ZhWQ==" -> "different hash"
!~      generation          = 1751416674405014 -> (known after apply)
        id                  = "calitp-composer-plugins/operators/scrape_ntd_xlsx.py"
!~      md5hash             = "n5buwLrUiAM5+k8Tp+ZhWQ==" -> (known after apply)
        name                = "plugins/operators/scrape_ntd_xlsx.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-composer["plugins/operators/scrape_state_geoportal.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-composer" {
!~      crc32c              = "3pMECQ==" -> (known after apply)
!~      detect_md5hash      = "qwOK0bYTQ/9mzvdFyKCNGQ==" -> "different hash"
!~      generation          = 1751416675477427 -> (known after apply)
        id                  = "calitp-composer-plugins/operators/scrape_state_geoportal.py"
!~      md5hash             = "qwOK0bYTQ/9mzvdFyKCNGQ==" -> (known after apply)
        name                = "plugins/operators/scrape_state_geoportal.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-composer["plugins/utils.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-composer" {
!~      crc32c              = "ZshQSQ==" -> (known after apply)
!~      detect_md5hash      = "7UdL9NZ+pHtC/CmwXFEL0g==" -> "different hash"
!~      generation          = 1751416670997258 -> (known after apply)
        id                  = "calitp-composer-plugins/utils.py"
!~      md5hash             = "7UdL9NZ+pHtC/CmwXFEL0g==" -> (known after apply)
        name                = "plugins/utils.py"
#        (17 unchanged attributes hidden)
    }

Plan: 0 to add, 7 to change, 0 to destroy.

📝 Plan generated in Plan Airflow DAGs #463

Copy link

github-actions bot commented Jul 15, 2025

Terraform plan in iac/cal-itp-data-infra-staging/airflow/us

Plan: 5 to add, 9 to change, 0 to destroy.
Terraform used the selected providers to generate the following execution
plan. Resource actions are indicated with the following symbols:
+   create
!~  update in-place

Terraform will perform the following actions:

  # google_storage_bucket_object.calitp-staging-composer["dags/airtable_loader_v2/generate_gtfs_download_configs.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-staging-composer" {
!~      crc32c              = "LLSRKg==" -> (known after apply)
!~      detect_md5hash      = "WEFwfVbJm4J6LdTF0abD2A==" -> "different hash"
!~      generation          = 1749661090914530 -> (known after apply)
        id                  = "calitp-staging-composer-dags/airtable_loader_v2/generate_gtfs_download_configs.py"
!~      md5hash             = "WEFwfVbJm4J6LdTF0abD2A==" -> (known after apply)
        name                = "dags/airtable_loader_v2/generate_gtfs_download_configs.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-staging-composer["dags/download_gtfs_schedule_v2/download_schedule_feeds.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-staging-composer" {
!~      crc32c              = "3CFrIg==" -> (known after apply)
!~      detect_md5hash      = "tuDGKx58gvxzc6Anuo4Sxg==" -> "different hash"
!~      generation          = 1749661091724383 -> (known after apply)
        id                  = "calitp-staging-composer-dags/download_gtfs_schedule_v2/download_schedule_feeds.py"
!~      md5hash             = "tuDGKx58gvxzc6Anuo4Sxg==" -> (known after apply)
        name                = "dags/download_gtfs_schedule_v2/download_schedule_feeds.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-staging-composer["dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-staging-composer" {
!~      crc32c              = "aeKgAA==" -> (known after apply)
!~      detect_md5hash      = "vFfVIvQMq+ESrqU6gS4ftQ==" -> "different hash"
!~      generation          = 1749661090706886 -> (known after apply)
        id                  = "calitp-staging-composer-dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py"
!~      md5hash             = "vFfVIvQMq+ESrqU6gS4ftQ==" -> (known after apply)
        name                = "dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-staging-composer["plugins/operators/littlepay_raw_sync.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-staging-composer" {
!~      crc32c              = "nw1M5g==" -> (known after apply)
!~      detect_md5hash      = "PA6EtdCRGpIH3sdNi7QiSw==" -> "different hash"
!~      generation          = 1750274852243314 -> (known after apply)
        id                  = "calitp-staging-composer-plugins/operators/littlepay_raw_sync.py"
!~      md5hash             = "PA6EtdCRGpIH3sdNi7QiSw==" -> (known after apply)
        name                = "plugins/operators/littlepay_raw_sync.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-staging-composer["plugins/operators/scrape_ntd_xlsx.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-staging-composer" {
!~      crc32c              = "3lHrkQ==" -> (known after apply)
!~      detect_md5hash      = "n5buwLrUiAM5+k8Tp+ZhWQ==" -> "different hash"
!~      generation          = 1749661098074948 -> (known after apply)
        id                  = "calitp-staging-composer-plugins/operators/scrape_ntd_xlsx.py"
!~      md5hash             = "n5buwLrUiAM5+k8Tp+ZhWQ==" -> (known after apply)
        name                = "plugins/operators/scrape_ntd_xlsx.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-staging-composer["plugins/operators/scrape_state_geoportal.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-staging-composer" {
!~      crc32c              = "3pMECQ==" -> (known after apply)
!~      detect_md5hash      = "qwOK0bYTQ/9mzvdFyKCNGQ==" -> "different hash"
!~      generation          = 1749661098094424 -> (known after apply)
        id                  = "calitp-staging-composer-plugins/operators/scrape_state_geoportal.py"
!~      md5hash             = "qwOK0bYTQ/9mzvdFyKCNGQ==" -> (known after apply)
        name                = "plugins/operators/scrape_state_geoportal.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-staging-composer["plugins/utils.py"] will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-staging-composer" {
!~      crc32c              = "ZshQSQ==" -> (known after apply)
!~      detect_md5hash      = "7UdL9NZ+pHtC/CmwXFEL0g==" -> "different hash"
!~      generation          = 1749661089565423 -> (known after apply)
        id                  = "calitp-staging-composer-plugins/utils.py"
!~      md5hash             = "7UdL9NZ+pHtC/CmwXFEL0g==" -> (known after apply)
        name                = "plugins/utils.py"
#        (17 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-staging-composer-catalog will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-staging-composer-catalog" {
!~      content             = (sensitive value)
!~      crc32c              = "lHo3jQ==" -> (known after apply)
!~      detect_md5hash      = "Kzyj6qWh6V5vCBLXNfgW9Q==" -> "different hash"
!~      generation          = 1754938421374278 -> (known after apply)
        id                  = "calitp-staging-composer-data/warehouse/target/catalog.json"
!~      md5hash             = "Kzyj6qWh6V5vCBLXNfgW9Q==" -> (known after apply)
        name                = "data/warehouse/target/catalog.json"
#        (16 unchanged attributes hidden)
    }

  # google_storage_bucket_object.calitp-staging-composer-dags["models/intermediate/gtfs/int_gtfs_rt__trip_updates_trip_stop_day_map_grouping.sql"] will be created
+   resource "google_storage_bucket_object" "calitp-staging-composer-dags" {
+       bucket         = "calitp-staging-composer"
+       content        = (sensitive value)
+       content_type   = (known after apply)
+       crc32c         = (known after apply)
+       detect_md5hash = "different hash"
+       generation     = (known after apply)
+       id             = (known after apply)
+       kms_key_name   = (known after apply)
+       md5hash        = (known after apply)
+       md5hexhash     = (known after apply)
+       media_link     = (known after apply)
+       name           = "data/warehouse/models/intermediate/gtfs/int_gtfs_rt__trip_updates_trip_stop_day_map_grouping.sql"
+       output_name    = (known after apply)
+       self_link      = (known after apply)
+       source         = "../../../../warehouse/models/intermediate/gtfs/int_gtfs_rt__trip_updates_trip_stop_day_map_grouping.sql"
+       storage_class  = (known after apply)
    }

  # google_storage_bucket_object.calitp-staging-composer-dags["models/mart/gtfs/fct_stop_time_metrics.sql"] will be created
+   resource "google_storage_bucket_object" "calitp-staging-composer-dags" {
+       bucket         = "calitp-staging-composer"
+       content        = (sensitive value)
+       content_type   = (known after apply)
+       crc32c         = (known after apply)
+       detect_md5hash = "different hash"
+       generation     = (known after apply)
+       id             = (known after apply)
+       kms_key_name   = (known after apply)
+       md5hash        = (known after apply)
+       md5hexhash     = (known after apply)
+       media_link     = (known after apply)
+       name           = "data/warehouse/models/mart/gtfs/fct_stop_time_metrics.sql"
+       output_name    = (known after apply)
+       self_link      = (known after apply)
+       source         = "../../../../warehouse/models/mart/gtfs/fct_stop_time_metrics.sql"
+       storage_class  = (known after apply)
    }

  # google_storage_bucket_object.calitp-staging-composer-dags["models/mart/gtfs/fct_stop_time_updates_sample.sql"] will be created
+   resource "google_storage_bucket_object" "calitp-staging-composer-dags" {
+       bucket         = "calitp-staging-composer"
+       content        = (sensitive value)
+       content_type   = (known after apply)
+       crc32c         = (known after apply)
+       detect_md5hash = "different hash"
+       generation     = (known after apply)
+       id             = (known after apply)
+       kms_key_name   = (known after apply)
+       md5hash        = (known after apply)
+       md5hexhash     = (known after apply)
+       media_link     = (known after apply)
+       name           = "data/warehouse/models/mart/gtfs/fct_stop_time_updates_sample.sql"
+       output_name    = (known after apply)
+       self_link      = (known after apply)
+       source         = "../../../../warehouse/models/mart/gtfs/fct_stop_time_updates_sample.sql"
+       storage_class  = (known after apply)
    }

  # google_storage_bucket_object.calitp-staging-composer-dags["models/mart/gtfs/fct_trip_updates_stop_metrics.sql"] will be created
+   resource "google_storage_bucket_object" "calitp-staging-composer-dags" {
+       bucket         = "calitp-staging-composer"
+       content        = (sensitive value)
+       content_type   = (known after apply)
+       crc32c         = (known after apply)
+       detect_md5hash = "different hash"
+       generation     = (known after apply)
+       id             = (known after apply)
+       kms_key_name   = (known after apply)
+       md5hash        = (known after apply)
+       md5hexhash     = (known after apply)
+       media_link     = (known after apply)
+       name           = "data/warehouse/models/mart/gtfs/fct_trip_updates_stop_metrics.sql"
+       output_name    = (known after apply)
+       self_link      = (known after apply)
+       source         = "../../../../warehouse/models/mart/gtfs/fct_trip_updates_stop_metrics.sql"
+       storage_class  = (known after apply)
    }

  # google_storage_bucket_object.calitp-staging-composer-dags["models/mart/gtfs/fct_trip_updates_trip_metrics.sql"] will be created
+   resource "google_storage_bucket_object" "calitp-staging-composer-dags" {
+       bucket         = "calitp-staging-composer"
+       content        = (sensitive value)
+       content_type   = (known after apply)
+       crc32c         = (known after apply)
+       detect_md5hash = "different hash"
+       generation     = (known after apply)
+       id             = (known after apply)
+       kms_key_name   = (known after apply)
+       md5hash        = (known after apply)
+       md5hexhash     = (known after apply)
+       media_link     = (known after apply)
+       name           = "data/warehouse/models/mart/gtfs/fct_trip_updates_trip_metrics.sql"
+       output_name    = (known after apply)
+       self_link      = (known after apply)
+       source         = "../../../../warehouse/models/mart/gtfs/fct_trip_updates_trip_metrics.sql"
+       storage_class  = (known after apply)
    }

  # google_storage_bucket_object.calitp-staging-composer-manifest will be updated in-place
!~  resource "google_storage_bucket_object" "calitp-staging-composer-manifest" {
!~      content             = (sensitive value)
!~      crc32c              = "21cxqg==" -> (known after apply)
!~      detect_md5hash      = "Z7SYWkQVGJixDDXvD4i86w==" -> "different hash"
!~      generation          = 1754938422967236 -> (known after apply)
        id                  = "calitp-staging-composer-data/warehouse/target/manifest.json"
!~      md5hash             = "Z7SYWkQVGJixDDXvD4i86w==" -> (known after apply)
        name                = "data/warehouse/target/manifest.json"
#        (16 unchanged attributes hidden)
    }

Plan: 5 to add, 9 to change, 0 to destroy.

📝 Plan generated in Plan Airflow DAGs #463

@ohrite ohrite force-pushed the cloud-composer-python311-upgrade branch from c69a177 to 6899a7a Compare July 22, 2025 20:52
Includes update to Pydantic 2.x, which requires a change in the imports of the package.
@ohrite ohrite force-pushed the cloud-composer-python311-upgrade branch from 6899a7a to 7d6b466 Compare August 12, 2025 16:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Upgrade to composer-2.8.6-airflow-2.7.3

3 participants