-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
2.2.1 (latest released)
Operating System
Amazon Linux 2
Versions of Apache Airflow Providers
apache_airflow-2.2.1.dist-info
apache_airflow_providers_amazon-2.3.0.dist-info
apache_airflow_providers_ftp-2.0.1.dist-info
apache_airflow_providers_http-2.0.1.dist-info
apache_airflow_providers_imap-2.0.1.dist-info
apache_airflow_providers_postgres-2.3.0.dist-info
apache_airflow_providers_sqlite-2.0.1.dist-info
Deployment
Other
Deployment details
pip3 installation to an Amazon Linux 2 based AMI running on an EC2 instance (t3.xlarge)
What happened
Our DAG runs 8 parallel tasks on 32 LocalExecutors using the os.fork option for execution.
We use custom operators based on Amazon providers to add Steps to an EMR Cluster.
Seemingly randomly, tasks get stuck in the Running state, producing little log output.
We can see from running ps ax that these "stuck" tasks have two task supervisor processes running.
Retrying the task, or killing one of these supervisor processes causes Airflow to retry the task which then runs successfully.
This is an intermittent issue that occurs seemingly randomly that renders Airflow unusable in a production setting.
Running processes:
21861 ? S 0:00 airflow worker -- LocalExecutor: ['airflow', 'tasks', 'run', 'dag1', 'one.four.five.task', 'backfill__2021-11-11T02:00:00+00:00', '--mark-success', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/dag1.py', '--cfg-path', '/tmp/tmpb6gpqufk']
23900 ? Sl 0:05 airflow task supervisor: ['airflow', 'tasks', 'run', 'dag1', 'one.four.five.task', 'backfill__2021-11-11T02:00:00+00:00', '--mark-success', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/dag1.py', '--cfg-path', '/tmp/tmpb6gpqufk']
24034 ? S 0:00 airflow task supervisor: ['airflow', 'tasks', 'run', 'dag1', 'one.four.five.task', 'backfill__2021-11-11T02:00:00+00:00', '--mark-success', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/dag1.py', '--cfg-path', '/tmp/tmpb6gpqufk']
Airflow UI task log for a hung task:
*** Reading remote log from Cloudwatch log_group: log-group-01 log_stream: dag1/one.four.five.task/2021-11-11T02_00_00+00_00/1.log.
[2021-11-11, 09:08:34 UTC]
--------------------------------------------------------------------------------
[2021-11-11, 09:08:34 UTC] Starting attempt 1 of 2
[2021-11-11, 09:08:34 UTC]
--------------------------------------------------------------------------------
[2021-11-11, 09:08:34 UTC] Marking success for <Task(EmrAddStepsOperator): one.four.five.task> on 2021-11-11 02:00:00+00:00
[2021-11-11, 09:08:34 UTC] Started process 24034 to run task
Airflow UI task log for a successful task:
*** Reading remote log from Cloudwatch log_group: log-group-01 log_stream: dag1/one.two.three.task/2021-11-11T02_00_00+00_00/1.log.
[2021-11-11, 09:07:29 UTC]
--------------------------------------------------------------------------------
[2021-11-11, 09:07:29 UTC] Starting attempt 1 of 2
[2021-11-11, 09:07:29 UTC]
--------------------------------------------------------------------------------
[2021-11-11, 09:07:29 UTC] Marking success for <Task(EmrAddStepsOperator): one.two.three.task> on 2021-11-11 02:00:00+00:00
[2021-11-11, 09:07:29 UTC] Started process 22814 to run task
[2021-11-11, 09:07:32 UTC] Running <TaskInstance: dag1.one.two.three.task backfill__2021-11-11T02:00:00+00:00 [running]> on host ip-REDACTED.eu-west-2.compute.internal
[2021-11-11, 09:07:32 UTC] Marking task as SUCCESS. dag_id=dag1, task_id=one.two.three.task, execution_date=20211111T020000, start_date=20211111T090518, end_date=20211111T090732
[2021-11-11, 09:07:34 UTC] State of this instance has been externally set to success. Terminating instance.
[2021-11-11, 09:07:34 UTC] Sending Signals.SIGTERM to GPID 22814
[2021-11-11, 09:07:34 UTC] Process psutil.Process(pid=22814, status='terminated', exitcode=<Negsignal.SIGTERM: -15>, started='09:07:29') (22814) terminated with exit code Negsignal.SIGTERM
What you expected to happen
Task should not hang in 'Running' state
How to reproduce
We have found that this issue occurs more often as the number of parallel tasks increases.
- Setup Airflow to run with the LocalExecutor with a parallelism of 32
- Create a DAG with 8+ parallel, long-running tasks
- Run the DAG repeatedly until a task gets stuck in the 'Running' state
Anything else
Roughly one DAG run in 10 with 8 parallel tasks and 40+ pending tasks
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct