Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ APP_PORT=8000


# JWT Authentication
# Important! Generate strong key
# Important!!! Generate strong key
SECRET_KEY=your-secret-key-please-change-in-production-at-least-32-characters-long
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=1440
Expand Down
67 changes: 67 additions & 0 deletions alembic/versions/5443b6bc781d_added_new_fields_to_transactions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""added new fields to transactions

Revision ID: 5443b6bc781d
Revises: a0149c5e906d
Create Date: 2025-10-25 12:46:10.924869

"""

from typing import Sequence, Union

import sqlalchemy as sa
import sqlmodel

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "5443b6bc781d"
down_revision: Union[str, Sequence[str], None] = "a0149c5e906d"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
# op.drop_table('celery_taskmeta')
# op.drop_table('celery_tasksetmeta')
op.add_column(
"transactions", sa.Column("reviewed_at", sa.DateTime(), nullable=True)
)
op.add_column(
"transactions",
sa.Column("review_comment", sqlmodel.AutoString(length=1000), nullable=True),
)
# ### end Alembic commands ###


def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("transactions", "review_comment")
op.drop_column("transactions", "reviewed_at")
# op.create_table('celery_tasksetmeta',
# sa.Column('id', sa.INTEGER(), autoincrement=False, nullable=False),
# sa.Column('taskset_id', sa.VARCHAR(length=155), autoincrement=False, nullable=True),
# sa.Column('result', postgresql.BYTEA(), autoincrement=False, nullable=True),
# sa.Column('date_done', postgresql.TIMESTAMP(), autoincrement=False, nullable=True),
# sa.PrimaryKeyConstraint('id', name=op.f('celery_tasksetmeta_pkey')),
# sa.UniqueConstraint('taskset_id', name=op.f('celery_tasksetmeta_taskset_id_key'), postgresql_include=[], postgresql_nulls_not_distinct=False)
# )
# op.create_table('celery_taskmeta',
# sa.Column('id', sa.INTEGER(), autoincrement=False, nullable=False),
# sa.Column('task_id', sa.VARCHAR(length=155), autoincrement=False, nullable=True),
# sa.Column('status', sa.VARCHAR(length=50), autoincrement=False, nullable=True),
# sa.Column('result', postgresql.BYTEA(), autoincrement=False, nullable=True),
# sa.Column('date_done', postgresql.TIMESTAMP(), autoincrement=False, nullable=True),
# sa.Column('traceback', sa.TEXT(), autoincrement=False, nullable=True),
# sa.Column('name', sa.VARCHAR(length=155), autoincrement=False, nullable=True),
# sa.Column('args', postgresql.BYTEA(), autoincrement=False, nullable=True),
# sa.Column('kwargs', postgresql.BYTEA(), autoincrement=False, nullable=True),
# sa.Column('worker', sa.VARCHAR(length=155), autoincrement=False, nullable=True),
# sa.Column('retries', sa.INTEGER(), autoincrement=False, nullable=True),
# sa.Column('queue', sa.VARCHAR(length=155), autoincrement=False, nullable=True),
# sa.PrimaryKeyConstraint('id', name=op.f('celery_taskmeta_pkey')),
# sa.UniqueConstraint('task_id', name=op.f('celery_taskmeta_task_id_key'), postgresql_include=[], postgresql_nulls_not_distinct=False)
# )
# ### end Alembic commands ###
2 changes: 2 additions & 0 deletions alembic/versions/8577a9d53b61_created_queue_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def upgrade() -> None:
"FLAGGED",
"FAILED",
"COMPLETED",
"ACCEPTED",
"REJECTED",
name="taskstatus",
),
nullable=False,
Expand Down
2 changes: 1 addition & 1 deletion alembic/versions/a0149c5e906d_created_user_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def downgrade() -> None:
"rules",
sa.Column("created_by", sa.VARCHAR(), autoincrement=False, nullable=True),
)
op.drop_constraint(None, "rules", type_="foreignkey") # type: ignore
# op.drop_constraint(None, "rules", type_="foreignkey") # type: ignore
op.drop_column("rules", "created_by_user_id")
# # op.create_table(
# "celery_taskmeta",
Expand Down
2 changes: 2 additions & 0 deletions alembic/versions/fd54c0552b1c_init_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ def upgrade() -> None:
"FLAGGED",
"FAILED",
"COMPLETED",
"ACCEPTED",
"REJECTED",
name="transactionstatus",
),
nullable=False,
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ services:
volumes:
- .:/app
working_dir: /app
ports:
- "9091:9091"
depends_on:
postgres:
condition: service_healthy
Expand Down
7 changes: 7 additions & 0 deletions monitoring/prometheus/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,10 @@ scrape_configs:
metrics_path: /metrics/
static_configs:
- targets: ["app:8001"]

# Celery worker metrics (from separate metrics server)
- job_name: "celery-metrics"
scrape_interval: 5s
metrics_path: /metrics
static_configs:
- targets: ["celery:9091"]
2 changes: 1 addition & 1 deletion src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class LoggingSettings(BaseSettings):
"""Logging configuration settings."""

level: str = Field(default="INFO")
json_format: bool = Field(default=False)
json_format: bool = Field(default=True)
enable_console: bool = Field(default=True)
enable_file: bool = Field(default=False)
file_path: str = Field(default="logs/findar.log")
Expand Down
Empty file added src/core/exception_handlers.py
Empty file.
Empty file added src/core/schemas.py
Empty file.
13 changes: 13 additions & 0 deletions src/modules/queue/celery_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,16 @@

# Explicitly import tasks to ensure they're registered
from . import tasks # noqa: F401, E402


# Start metrics server when Celery worker starts
@celery_app.on_after_configure.connect
def setup_metrics_server(sender, **kwargs):
"""Start Prometheus metrics server for Celery worker."""
try:
from .metrics_server import start_metrics_server

# Start metrics server on port 9091
start_metrics_server(port=9091)
except Exception as e:
print(f"⚠️ Failed to start Celery metrics server: {e}")
121 changes: 121 additions & 0 deletions src/modules/queue/metrics_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""
Metrics server for Celery worker.

Runs a simple HTTP server in a background thread to expose Prometheus metrics
from the Celery worker process.
"""

import threading
from wsgiref.simple_server import make_server

from prometheus_client import REGISTRY, generate_latest

# Set to track if server is already started
_server_started = False
_server_lock = threading.Lock()


class MetricsServer:
"""Simple WSGI server for exposing Prometheus metrics from Celery."""

def __init__(self, port: int = 9091):
"""
Initialize metrics server.

Args:
port: Port to run the metrics server on
"""
self.port = port
self.server = None
self.thread = None

def metrics_app(self, environ, start_response):
"""WSGI application to serve Prometheus metrics."""
if environ["PATH_INFO"] == "/metrics":
output = generate_latest(REGISTRY)
status = "200 OK"
headers = [("Content-Type", "text/plain; charset=utf-8")]
start_response(status, headers)
return [output]
else:
status = "404 Not Found"
headers = [("Content-Type", "text/plain")]
start_response(status, headers)
return [b"Not Found. Try /metrics"]

def start(self):
"""Start the metrics server in a background thread."""
global _server_started

with _server_lock:
if _server_started:
print("⚠️ Metrics server already started")
return

try:
# Create WSGI server
self.server = make_server("0.0.0.0", self.port, self.metrics_app)

# Make server non-blocking
self.server.timeout = 0.5

# Start server in daemon thread
self.thread = threading.Thread(
target=self._run_server, daemon=True, name="MetricsServer"
)
self.thread.start()

_server_started = True
print(
f"✅ Celery metrics server started on http://0.0.0.0:{self.port}/metrics"
)

except OSError as e:
if e.errno == 48: # Address already in use
print(
f"⚠️ Port {self.port} already in use, metrics server not started"
)
else:
print(f"❌ Failed to start metrics server: {e}")

def _run_server(self):
"""Run the server loop."""
try:
self.server.serve_forever()
except Exception as e:
print(f"❌ Metrics server error: {e}")

def stop(self):
"""Stop the metrics server."""
if self.server:
self.server.shutdown()
print("🛑 Metrics server stopped")


# Global instance
_metrics_server = None


def start_metrics_server(port: int = 9091):
"""
Start the Celery metrics server if not already running.

Args:
port: Port to run the metrics server on (default: 9091)
"""
global _metrics_server

if _metrics_server is None:
_metrics_server = MetricsServer(port=port)
_metrics_server.start()
else:
print("⚠️ Metrics server already initialized")


def stop_metrics_server():
"""Stop the Celery metrics server."""
global _metrics_server

if _metrics_server:
_metrics_server.stop()
_metrics_server = None
Loading