|
| 1 | +#!/usr/bin/env python3 |
| 2 | +"""AMQP message publisher for triggering GeoZarr conversion workflows. |
| 3 | +
|
| 4 | +Publishes JSON payloads to RabbitMQ exchanges with support for |
| 5 | +dynamic routing key templates based on payload fields. |
| 6 | +""" |
| 7 | + |
| 8 | +from __future__ import annotations |
| 9 | + |
| 10 | +import argparse |
| 11 | +import json |
| 12 | +import logging |
| 13 | +import sys |
| 14 | +from pathlib import Path |
| 15 | +from typing import Any |
| 16 | + |
| 17 | +import pika |
| 18 | +from tenacity import retry, stop_after_attempt, wait_exponential |
| 19 | + |
| 20 | +logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s") |
| 21 | +logger = logging.getLogger(__name__) |
| 22 | + |
| 23 | + |
| 24 | +def load_payload(payload_file: Path) -> dict[str, Any]: |
| 25 | + """Load JSON payload from file.""" |
| 26 | + try: |
| 27 | + data: dict[str, Any] = json.loads(payload_file.read_text()) |
| 28 | + return data |
| 29 | + except FileNotFoundError: |
| 30 | + logger.exception("Payload file not found", extra={"file": str(payload_file)}) |
| 31 | + sys.exit(1) |
| 32 | + except json.JSONDecodeError: |
| 33 | + logger.exception("Invalid JSON in payload file", extra={"file": str(payload_file)}) |
| 34 | + sys.exit(1) |
| 35 | + |
| 36 | + |
| 37 | +def format_routing_key(template: str, payload: dict[str, Any]) -> str: |
| 38 | + """Format routing key template using payload fields. |
| 39 | +
|
| 40 | + Example: "eopf.item.found.{collection}" → "eopf.item.found.sentinel-2-l2a" |
| 41 | + """ |
| 42 | + try: |
| 43 | + return template.format(**payload) |
| 44 | + except KeyError: |
| 45 | + logger.exception( |
| 46 | + "Missing required field in payload for routing key template", |
| 47 | + extra={"template": template, "available_fields": list(payload.keys())}, |
| 48 | + ) |
| 49 | + sys.exit(1) |
| 50 | + |
| 51 | + |
| 52 | +@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10)) |
| 53 | +def publish_message( |
| 54 | + host: str, |
| 55 | + port: int, |
| 56 | + user: str, |
| 57 | + password: str, |
| 58 | + exchange: str, |
| 59 | + routing_key: str, |
| 60 | + payload: dict[str, Any], |
| 61 | + virtual_host: str = "/", |
| 62 | +) -> None: |
| 63 | + """Publish message to RabbitMQ exchange with automatic retry.""" |
| 64 | + credentials = pika.PlainCredentials(user, password) |
| 65 | + parameters = pika.ConnectionParameters( |
| 66 | + host=host, |
| 67 | + port=port, |
| 68 | + virtual_host=virtual_host, |
| 69 | + credentials=credentials, |
| 70 | + ) |
| 71 | + |
| 72 | + logger.info("Connecting to amqp://%s@%s:%s%s", user, host, port, virtual_host) |
| 73 | + connection = pika.BlockingConnection(parameters) |
| 74 | + try: |
| 75 | + channel = connection.channel() |
| 76 | + channel.basic_publish( |
| 77 | + exchange=exchange, |
| 78 | + routing_key=routing_key, |
| 79 | + body=json.dumps(payload), |
| 80 | + properties=pika.BasicProperties( |
| 81 | + content_type="application/json", |
| 82 | + delivery_mode=2, |
| 83 | + ), |
| 84 | + ) |
| 85 | + logger.info("Published to exchange='%s' routing_key='%s'", exchange, routing_key) |
| 86 | + logger.debug("Payload: %s", json.dumps(payload, indent=2)) |
| 87 | + finally: |
| 88 | + connection.close() |
| 89 | + |
| 90 | + |
| 91 | +def main() -> None: |
| 92 | + """CLI entry point for AMQP message publisher.""" |
| 93 | + parser = argparse.ArgumentParser( |
| 94 | + description="Publish JSON payload to RabbitMQ exchange for workflow triggers" |
| 95 | + ) |
| 96 | + parser.add_argument("--host", required=True, help="RabbitMQ host") |
| 97 | + parser.add_argument("--port", type=int, default=5672, help="RabbitMQ port") |
| 98 | + parser.add_argument("--user", required=True, help="RabbitMQ username") |
| 99 | + parser.add_argument("--password", required=True, help="RabbitMQ password") |
| 100 | + parser.add_argument("--virtual-host", default="/", help="RabbitMQ virtual host") |
| 101 | + parser.add_argument("--exchange", required=True, help="RabbitMQ exchange name") |
| 102 | + parser.add_argument("--routing-key", help="Static routing key") |
| 103 | + parser.add_argument( |
| 104 | + "--routing-key-template", |
| 105 | + help="Template with {field} placeholders (e.g., 'eopf.item.found.{collection}')", |
| 106 | + ) |
| 107 | + parser.add_argument("--payload-file", type=Path, required=True, help="JSON payload file path") |
| 108 | + |
| 109 | + args = parser.parse_args() |
| 110 | + |
| 111 | + if not args.routing_key and not args.routing_key_template: |
| 112 | + parser.error("Must provide either --routing-key or --routing-key-template") |
| 113 | + if args.routing_key and args.routing_key_template: |
| 114 | + parser.error("Cannot use both --routing-key and --routing-key-template") |
| 115 | + |
| 116 | + payload = load_payload(args.payload_file) |
| 117 | + routing_key = args.routing_key or format_routing_key(args.routing_key_template, payload) |
| 118 | + |
| 119 | + try: |
| 120 | + publish_message( |
| 121 | + host=args.host, |
| 122 | + port=args.port, |
| 123 | + user=args.user, |
| 124 | + password=args.password, |
| 125 | + exchange=args.exchange, |
| 126 | + routing_key=routing_key, |
| 127 | + payload=payload, |
| 128 | + virtual_host=args.virtual_host, |
| 129 | + ) |
| 130 | + except Exception: |
| 131 | + logger.exception( |
| 132 | + "Failed to publish AMQP message", |
| 133 | + extra={ |
| 134 | + "exchange": args.exchange, |
| 135 | + "routing_key": routing_key, |
| 136 | + "host": args.host, |
| 137 | + }, |
| 138 | + ) |
| 139 | + sys.exit(1) |
| 140 | + |
| 141 | + |
| 142 | +if __name__ == "__main__": |
| 143 | + main() |
0 commit comments