Skip to content

Commit 7a46497

Browse files
authored
Merge pull request #3528 from antcybersec/fix-smoke-test-timeout
Fix flaky smoke tests by adding SIGTERM signal handling
2 parents c145776 + 40f4b69 commit 7a46497

File tree

2 files changed

+190
-108
lines changed

2 files changed

+190
-108
lines changed

augur/application/cli/backend.py

Lines changed: 24 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,17 @@
1414
import traceback
1515
import requests
1616
from redis.exceptions import ConnectionError as RedisConnectionError
17-
from urllib.parse import urlparse
1817

1918
from augur.tasks.start_tasks import augur_collection_monitor, create_collection_status_records
2019
from augur.tasks.git.facade_tasks import clone_repos
2120
from augur.tasks.github.contributors import process_contributors
2221
from augur.tasks.github.util.github_api_key_handler import GithubApiKeyHandler
2322
from augur.tasks.gitlab.gitlab_api_key_handler import GitlabApiKeyHandler
2423
from augur.tasks.data_analysis.contributor_breadth_worker.contributor_breadth_worker import contributor_breadth_model
25-
from augur.tasks.init.redis_connection import get_redis_connection
2624
from augur.application.db.models import UserRepo
2725
from augur.application.db.session import DatabaseSession
2826
from augur.application.logs import AugurLogger
27+
from augur.application.service_manager import AugurServiceManager, cleanup_collection_status_and_rabbit, clean_collection_status
2928
from augur.application.db.lib import get_value
3029
from augur.application.cli import test_connection, test_db_connection, with_database, DatabaseContext
3130
import sqlalchemy as s
@@ -36,7 +35,6 @@
3635

3736
logger = AugurLogger("augur", reset_logfiles=reset_logs).get_logger()
3837

39-
4038
@click.group('server', short_help='Commands for controlling the backend API server & data collection workers')
4139
@click.pass_context
4240
def cli(ctx):
@@ -55,17 +53,23 @@ def start(ctx, disable_collection, development, pidfile, port):
5553
"""Start Augur's backend server."""
5654
with open(pidfile, "w") as pidfile_io:
5755
pidfile_io.write(str(os.getpid()))
58-
56+
57+
manager = AugurServiceManager(ctx, pidfile, disable_collection)
58+
59+
# Register signal handlers for graceful shutdown
60+
signal.signal(signal.SIGTERM, manager.shutdown_signal_handler)
61+
signal.signal(signal.SIGINT, manager.shutdown_signal_handler)
62+
5963
try:
6064
if os.environ.get('AUGUR_DOCKER_DEPLOY') != "1":
6165
raise_open_file_limit(100000)
62-
except Exception as e:
66+
except Exception as e:
6367
logger.error(
6468
''.join(traceback.format_exception(None, e, e.__traceback__)))
65-
69+
6670
logger.error("Failed to raise open file limit!")
6771
raise e
68-
72+
6973
if development:
7074
os.environ["AUGUR_DEV"] = "1"
7175
logger.info("Starting in development mode")
@@ -101,6 +105,7 @@ def start(ctx, disable_collection, development, pidfile, port):
101105

102106
gunicorn_command = f"gunicorn -c {gunicorn_location} -b {host}:{port} augur.api.server:app --log-file {gunicorn_log_file}"
103107
server = subprocess.Popen(gunicorn_command.split(" "))
108+
manager.server = server
104109

105110
logger.info("awaiting Gunicorn start")
106111
while not server.poll():
@@ -123,6 +128,7 @@ def start(ctx, disable_collection, development, pidfile, port):
123128
logger.info(f"The API is available at '{api_response.json()['route']}'")
124129

125130
processes = start_celery_worker_processes((core_worker_count, secondary_worker_count, facade_worker_count), disable_collection)
131+
manager.processes = processes
126132

127133
celery_beat_schedule_db = os.getenv("CELERYBEAT_SCHEDULE_DB", "celerybeat-schedule.db")
128134
if os.path.exists(celery_beat_schedule_db):
@@ -132,8 +138,10 @@ def start(ctx, disable_collection, development, pidfile, port):
132138
log_level = get_value("Logging", "log_level")
133139
celery_beat_process = None
134140
celery_command = f"celery -A augur.tasks.init.celery_app.celery_app beat -l {log_level.lower()} -s {celery_beat_schedule_db}"
135-
celery_beat_process = subprocess.Popen(celery_command.split(" "))
141+
celery_beat_process = subprocess.Popen(celery_command.split(" "))
142+
manager.celery_beat_process = celery_beat_process
136143
keypub = KeyPublisher()
144+
manager.keypub = keypub
137145

138146
if not disable_collection:
139147
if os.environ.get('AUGUR_DOCKER_DEPLOY') != "1":
@@ -180,29 +188,15 @@ def start(ctx, disable_collection, development, pidfile, port):
180188
try:
181189
server.wait()
182190
except KeyboardInterrupt:
183-
184-
if server:
185-
logger.info("Shutting down server")
186-
server.terminate()
187-
188-
logger.info("Shutting down all celery worker processes")
189-
for p in processes:
190-
if p:
191-
p.terminate()
192-
193-
if celery_beat_process:
194-
logger.info("Shutting down celery beat process")
195-
celery_beat_process.terminate()
196-
197-
if not disable_collection:
198-
191+
# Signal handler will take care of cleanup
192+
pass
193+
finally:
194+
# Ensure pidfile is cleaned up if we exit normally
195+
if os.path.exists(pidfile):
199196
try:
200-
keypub.shutdown()
201-
cleanup_collection_status_and_rabbit(logger, ctx.obj.engine)
202-
except RedisConnectionError:
203-
pass
204-
205-
os.unlink(pidfile)
197+
os.unlink(pidfile)
198+
except OSError as e:
199+
logger.error(f"Could not remove pidfile {pidfile}: {e}")
206200

207201
def start_celery_worker_processes(worker_counts: tuple[int, int, int], disable_collection=False):
208202
"""
@@ -344,84 +338,6 @@ def augur_stop(signal, logger, engine):
344338
cleanup_collection_status_and_rabbit(logger, engine)
345339

346340

347-
def cleanup_collection_status_and_rabbit(logger, engine):
348-
clear_redis_caches()
349-
350-
connection_string = get_value("RabbitMQ", "connection_string")
351-
352-
with DatabaseSession(logger, engine=engine) as session:
353-
354-
clean_collection_status(session)
355-
356-
clear_rabbitmq_messages(connection_string)
357-
358-
def clear_redis_caches():
359-
"""Clears the redis databases that celery and redis use."""
360-
361-
logger.info("Flushing all redis databases this instance was using")
362-
celery_purge_command = "celery -A augur.tasks.init.celery_app.celery_app purge -f"
363-
subprocess.call(celery_purge_command.split(" "))
364-
365-
redis_connection = get_redis_connection()
366-
redis_connection.flushdb()
367-
368-
def clear_all_message_queues(connection_string):
369-
queues = ['celery','secondary','scheduling','facade']
370-
371-
virtual_host_string = connection_string.split("/")[-1]
372-
373-
#Parse username and password with urllib
374-
parsed = urlparse(connection_string)
375-
376-
for q in queues:
377-
curl_cmd = f"curl -i -u {parsed.username}:{parsed.password} -XDELETE http://localhost:15672/api/queues/{virtual_host_string}/{q}"
378-
subprocess.call(curl_cmd.split(" "),stdout=subprocess.PIPE, stderr=subprocess.PIPE)
379-
380-
381-
def clear_rabbitmq_messages(connection_string):
382-
#virtual_host_string = connection_string.split("/")[-1]
383-
384-
logger.info("Clearing all messages from celery queue in rabbitmq")
385-
from augur.tasks.init.celery_app import celery_app
386-
celery_app.control.purge()
387-
388-
clear_all_message_queues(connection_string)
389-
#rabbitmq_purge_command = f"sudo rabbitmqctl purge_queue celery -p {virtual_host_string}"
390-
#subprocess.call(rabbitmq_purge_command.split(" "))
391-
392-
#Make sure that database reflects collection status when processes are killed/stopped.
393-
def clean_collection_status(session):
394-
session.execute_sql(s.sql.text("""
395-
UPDATE augur_operations.collection_status
396-
SET core_status='Pending',core_task_id = NULL
397-
WHERE core_status='Collecting' AND core_data_last_collected IS NULL;
398-
399-
UPDATE augur_operations.collection_status
400-
SET core_status='Success',core_task_id = NULL
401-
WHERE core_status='Collecting' AND core_data_last_collected IS NOT NULL;
402-
403-
UPDATE augur_operations.collection_status
404-
SET secondary_status='Pending',secondary_task_id = NULL
405-
WHERE secondary_status='Collecting' AND secondary_data_last_collected IS NULL;
406-
407-
UPDATE augur_operations.collection_status
408-
SET secondary_status='Success',secondary_task_id = NULL
409-
WHERE secondary_status='Collecting' AND secondary_data_last_collected IS NOT NULL;
410-
411-
UPDATE augur_operations.collection_status
412-
SET facade_status='Update', facade_task_id=NULL
413-
WHERE facade_status LIKE '%Collecting%' and facade_data_last_collected IS NULL;
414-
415-
UPDATE augur_operations.collection_status
416-
SET facade_status='Success', facade_task_id=NULL
417-
WHERE facade_status LIKE '%Collecting%' and facade_data_last_collected IS NOT NULL;
418-
419-
UPDATE augur_operations.collection_status
420-
SET facade_status='Pending', facade_task_id=NULL
421-
WHERE facade_status='Failed Clone' OR facade_status='Initializing';
422-
"""))
423-
#TODO: write timestamp for currently running repos.
424-
425341
def assign_orphan_repos_to_default_user(session):
426342
query = s.sql.text("""
427343
SELECT repo_id FROM repo WHERE repo_id NOT IN (SELECT repo_id FROM augur_operations.user_repos)
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
import sys
2+
import os
3+
import subprocess
4+
import sqlalchemy as s
5+
from augur.application.logs import AugurLogger
6+
from augur.application.db.session import DatabaseSession
7+
from augur.application.db.lib import get_value
8+
from augur.tasks.init.redis_connection import get_redis_connection
9+
from urllib.parse import urlparse
10+
11+
logger = AugurLogger("augur_servicemanager").get_logger()
12+
13+
14+
class AugurServiceManager:
15+
""" Provides a storage space for references to the various components of augur
16+
This enables them to all be properly shut down in the event a shutdown signal (SIGINT - AKA ctrl-c, or SIGTERM) is received
17+
"""
18+
def __init__(self, ctx, pidfile, disable_collection):
19+
self.ctx = ctx
20+
self.pidfile = pidfile
21+
self.disable_collection = disable_collection
22+
self.server = None
23+
self.processes = []
24+
self.celery_beat_process = None
25+
self.keypub = None
26+
self.shutting_down = False
27+
28+
def shutdown_signal_handler(self, signum, frame):
29+
if self.shutting_down:
30+
return
31+
32+
self.shutting_down = True
33+
logger.info(f"Received signal {signum}, shutting down gracefully")
34+
35+
# Stop server
36+
if self.server:
37+
logger.info("Stopping server")
38+
self.server.terminate()
39+
try:
40+
self.server.wait(timeout=5)
41+
except subprocess.TimeoutExpired:
42+
logger.warning("Server did not terminate in time, killing")
43+
self.server.kill()
44+
45+
# Stop celery workers
46+
logger.info("Stopping celery workers")
47+
for p in self.processes:
48+
if p and p.poll() is None:
49+
p.terminate()
50+
51+
# Wait for workers to terminate
52+
for p in self.processes:
53+
if p:
54+
try:
55+
p.wait(timeout=3)
56+
except subprocess.TimeoutExpired:
57+
logger.warning(f"Worker {p.pid} did not terminate in time, killing")
58+
p.kill()
59+
60+
# Stop celery beat
61+
if self.celery_beat_process:
62+
logger.info("Stopping celery beat")
63+
self.celery_beat_process.terminate()
64+
try:
65+
self.celery_beat_process.wait(timeout=3)
66+
except subprocess.TimeoutExpired:
67+
logger.warning("Celery beat did not terminate in time, killing")
68+
self.celery_beat_process.kill()
69+
70+
# Cleanup collection resources
71+
if not self.disable_collection:
72+
try:
73+
if self.keypub:
74+
self.keypub.shutdown()
75+
cleanup_collection_status_and_rabbit(logger, self.ctx.obj.engine)
76+
except Exception as e:
77+
logger.debug(f"Error during collection cleanup: {e}")
78+
79+
# Remove pidfile
80+
if os.path.exists(self.pidfile):
81+
try:
82+
os.unlink(self.pidfile)
83+
except OSError as e:
84+
logger.error(f"Could not remove pidfile {self.pidfile}: {e}")
85+
86+
sys.exit(0)
87+
88+
def cleanup_collection_status_and_rabbit(logger, engine):
89+
# TODO: tech debt: this should probbaly be in a helper function but its so tightly coupled with other stuff
90+
clear_redis_caches()
91+
92+
connection_string = get_value("RabbitMQ", "connection_string")
93+
94+
with DatabaseSession(logger, engine=engine) as session:
95+
96+
clean_collection_status(session)
97+
98+
clear_rabbitmq_messages(connection_string)
99+
100+
def clear_redis_caches():
101+
"""Clears the redis databases that celery and redis use."""
102+
103+
logger.info("Flushing all redis databases this instance was using")
104+
celery_purge_command = "celery -A augur.tasks.init.celery_app.celery_app purge -f"
105+
subprocess.call(celery_purge_command.split(" "))
106+
107+
redis_connection = get_redis_connection()
108+
redis_connection.flushdb()
109+
110+
#Make sure that database reflects collection status when processes are killed/stopped.
111+
def clean_collection_status(session):
112+
session.execute_sql(s.sql.text("""
113+
UPDATE augur_operations.collection_status
114+
SET core_status='Pending',core_task_id = NULL
115+
WHERE core_status='Collecting' AND core_data_last_collected IS NULL;
116+
117+
UPDATE augur_operations.collection_status
118+
SET core_status='Success',core_task_id = NULL
119+
WHERE core_status='Collecting' AND core_data_last_collected IS NOT NULL;
120+
121+
UPDATE augur_operations.collection_status
122+
SET secondary_status='Pending',secondary_task_id = NULL
123+
WHERE secondary_status='Collecting' AND secondary_data_last_collected IS NULL;
124+
125+
UPDATE augur_operations.collection_status
126+
SET secondary_status='Success',secondary_task_id = NULL
127+
WHERE secondary_status='Collecting' AND secondary_data_last_collected IS NOT NULL;
128+
129+
UPDATE augur_operations.collection_status
130+
SET facade_status='Update', facade_task_id=NULL
131+
WHERE facade_status LIKE '%Collecting%' and facade_data_last_collected IS NULL;
132+
133+
UPDATE augur_operations.collection_status
134+
SET facade_status='Success', facade_task_id=NULL
135+
WHERE facade_status LIKE '%Collecting%' and facade_data_last_collected IS NOT NULL;
136+
137+
UPDATE augur_operations.collection_status
138+
SET facade_status='Pending', facade_task_id=NULL
139+
WHERE facade_status='Failed Clone' OR facade_status='Initializing';
140+
"""))
141+
#TODO: write timestamp for currently running repos.
142+
143+
144+
def clear_rabbitmq_messages(connection_string):
145+
#virtual_host_string = connection_string.split("/")[-1]
146+
147+
logger.info("Clearing all messages from celery queue in rabbitmq")
148+
from augur.tasks.init.celery_app import celery_app
149+
celery_app.control.purge()
150+
151+
clear_all_message_queues(connection_string)
152+
#rabbitmq_purge_command = f"sudo rabbitmqctl purge_queue celery -p {virtual_host_string}"
153+
#subprocess.call(rabbitmq_purge_command.split(" "))
154+
155+
156+
def clear_all_message_queues(connection_string):
157+
queues = ['celery','secondary','scheduling','facade']
158+
159+
virtual_host_string = connection_string.split("/")[-1]
160+
161+
#Parse username and password with urllib
162+
parsed = urlparse(connection_string)
163+
164+
for q in queues:
165+
curl_cmd = f"curl -i -u {parsed.username}:{parsed.password} -XDELETE http://localhost:15672/api/queues/{virtual_host_string}/{q}"
166+
subprocess.call(curl_cmd.split(" "),stdout=subprocess.PIPE, stderr=subprocess.PIPE)

0 commit comments

Comments
 (0)