Skip to content

ghga-de/dlq-service

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

13 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

tests Coverage Status

Dlq Service

DLQ Service - a service to manage the dead letter queue for Kafka events

Description

The DLQ Service provides a way to manage Kafka topics designated as dead letter queues via a RESTful API interface.

The DLQ Service subscribes to the configured DLQ topic and saves all inbound events to the database. Using the REST API to interact with a given service + topic, events can be previewed, discarded, modified, and requeued for the original service to re-consume them. When requeuing an event, the DLQ service publishes the event to a Kafka "retry" topic, whose name is automatically derived from the service name included in the event's DLQ information.

Installation

We recommend using the provided Docker container.

A pre-built version is available at docker hub:

docker pull ghga/dlq-service:2.0.3

Or you can build the container yourself from the ./Dockerfile:

# Execute in the repo's root dir:
docker build -t ghga/dlq-service:2.0.3 .

For production-ready deployment, we recommend using Kubernetes, however, for simple use cases, you could execute the service using docker on a single server:

# The entrypoint is preconfigured:
docker run -p 8080:8080 ghga/dlq-service:2.0.3 --help

If you prefer not to use containers, you may install the service from source:

# Execute in the repo's root dir:
pip install .

# To run the service:
dlqs --help

Configuration

Parameters

The service requires the following configuration parameters:

  • mongo_dsn (string, format: multi-host-uri, required): MongoDB connection string. Might include credentials. For more information see: https://naiveskill.com/mongodb-connection-string/. Length must be at least 1.

    Examples:

    "mongodb://localhost:27017"
  • db_name (string, required): Name of the database located on the MongoDB server.

    Examples:

    "my-database"
  • mongo_timeout: Timeout in seconds for API calls to MongoDB. The timeout applies to all steps needed to complete the operation, including server selection, connection checkout, serialization, and server-side execution. When the timeout expires, PyMongo raises a timeout exception. If set to None, the operation will not time out (default MongoDB behavior). Default: null.

    • Any of

      • integer: Exclusive minimum: 0.

      • null

    Examples:

    300
    600
    null
  • service_name (string): Short name of this service. Default: "dlqs".

  • service_instance_id (string, required): A string that uniquely identifies this instance across all instances of this service. This is included in log messages.

    Examples:

    "germany-bw-instance-001"
  • kafka_servers (array, required): A list of connection strings to connect to Kafka bootstrap servers.

    • Items (string)

    Examples:

    [
        "localhost:9092"
    ]
  • kafka_security_protocol (string): Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL. Must be one of: ["PLAINTEXT", "SSL"]. Default: "PLAINTEXT".

  • kafka_ssl_cafile (string): Certificate Authority file path containing certificates used to sign broker certificates. If a CA is not specified, the default system CA will be used if found by OpenSSL. Default: "".

  • kafka_ssl_certfile (string): Optional filename of client certificate, as well as any CA certificates needed to establish the certificate's authenticity. Default: "".

  • kafka_ssl_keyfile (string): Optional filename containing the client private key. Default: "".

  • kafka_ssl_password (string, format: password, write-only): Optional password to be used for the client private key. Default: "".

  • generate_correlation_id (boolean): A flag, which, if False, will result in an error when inbound requests don't possess a correlation ID. If True, requests without a correlation ID will be assigned a newly generated ID in the correlation ID middleware function. Default: true.

    Examples:

    true
    false
  • kafka_max_message_size (integer): The largest message size that can be transmitted, in bytes, before compression. Only services that have a need to send/receive larger messages should set this. When used alongside compression, this value can be set to something greater than the broker's message.max.bytes field, which effectively concerns the compressed message size. Exclusive minimum: 0. Default: 1048576.

    Examples:

    1048576
    16777216
  • kafka_compression_type: The compression type used for messages. Valid values are: None, gzip, snappy, lz4, and zstd. If None, no compression is applied. This setting is only relevant for the producer and has no effect on the consumer. If set to a value, the producer will compress messages before sending them to the Kafka broker. If unsure, zstd provides a good balance between speed and compression ratio. Default: null.

    • Any of

      • string: Must be one of: ["gzip", "snappy", "lz4", "zstd"].

      • null

    Examples:

    null
    "gzip"
    "snappy"
    "lz4"
    "zstd"
  • kafka_max_retries (integer): The maximum number of times to immediately retry consuming an event upon failure. Works independently of the dead letter queue. Minimum: 0. Default: 0.

    Examples:

    0
    1
    2
    3
    5
  • kafka_enable_dlq (boolean): A flag to toggle the dead letter queue. If set to False, the service will crash upon exhausting retries instead of publishing events to the DLQ. If set to True, the service will publish events to the DLQ topic after exhausting all retries. Default: false.

    Examples:

    true
    false
  • kafka_dlq_topic (string): The name of the topic used to resolve error-causing events. Default: "dlq".

    Examples:

    "dlq"
  • kafka_retry_backoff (integer): The number of seconds to wait before retrying a failed event. The backoff time is doubled for each retry attempt. Minimum: 0. Default: 0.

    Examples:

    0
    1
    2
    3
    5
  • log_level (string): The minimum log level to capture. Must be one of: ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "TRACE"]. Default: "INFO".

  • log_format: If set, will replace JSON formatting with the specified string format. If not set, has no effect. In addition to the standard attributes, the following can also be specified: timestamp, service, instance, level, correlation_id, and details. Default: null.

    • Any of

      • string

      • null

    Examples:

    "%(timestamp)s - %(service)s - %(level)s - %(message)s"
    "%(asctime)s - Severity: %(levelno)s - %(msg)s"
  • log_traceback (boolean): Whether to include exception tracebacks in log messages. Default: true.

  • host (string): IP of the host. Default: "127.0.0.1".

  • port (integer): Port to expose the server on the specified host. Default: 8080.

  • auto_reload (boolean): A development feature. Set to True to automatically reload the server upon code changes. Default: false.

  • workers (integer): Number of workers processes to run. Default: 1.

  • api_root_path (string): Root path at which the API is reachable. This is relative to the specified host and port. Default: "".

  • openapi_url (string): Path to get the openapi specification in JSON format. This is relative to the specified host and port. Default: "/openapi.json".

  • docs_url (string): Path to host the swagger documentation. This is relative to the specified host and port. Default: "/docs".

  • cors_allowed_origins: A list of origins that should be permitted to make cross-origin requests. By default, cross-origin requests are not allowed. You can use ['*'] to allow any origin. Default: null.

    • Any of

      • array

        • Items (string)
      • null

    Examples:

    [
        "https://example.org",
        "https://www.example.org"
    ]
  • cors_allow_credentials: Indicate that cookies should be supported for cross-origin requests. Defaults to False. Also, cors_allowed_origins cannot be set to ['*'] for credentials to be allowed. The origins must be explicitly specified. Default: null.

    • Any of

      • boolean

      • null

    Examples:

    [
        "https://example.org",
        "https://www.example.org"
    ]
  • cors_allowed_methods: A list of HTTP methods that should be allowed for cross-origin requests. Defaults to ['GET']. You can use ['*'] to allow all standard methods. Default: null.

    • Any of

      • array

        • Items (string)
      • null

    Examples:

    [
        "*"
    ]
  • cors_allowed_headers: A list of HTTP request headers that should be supported for cross-origin requests. Defaults to []. You can use ['*'] to allow all headers. The Accept, Accept-Language, Content-Language and Content-Type headers are always allowed for CORS requests. Default: null.

    • Any of

      • array

        • Items (string)
      • null

    Examples:

    []
  • token_hashes (array, required): List of token hashes corresponding to the tokens that can be used to authenticate calls to this service. Hashes are made with SHA-256.

    • Items (string)

    Examples:

    "7ad83b6b9183c91674eec897935bc154ba9ff9704f8be0840e77f476b5062b6e"

Usage:

A template YAML for configuring the service can be found at ./example_config.yaml. Please adapt it, rename it to .dlqs.yaml, and place it in one of the following locations:

  • in the current working directory where you execute the service (on Linux: ./.dlqs.yaml)
  • in your home directory (on Linux: ~/.dlqs.yaml)

The config yaml will be automatically parsed by the service.

Important: If you are using containers, the locations refer to paths within the container.

All parameters mentioned in the ./example_config.yaml could also be set using environment variables or file secrets.

For naming the environment variables, just prefix the parameter name with dlqs_, e.g. for the host set an environment variable named dlqs_host (you may use both upper or lower cases, however, it is standard to define all env variables in upper cases).

To use file secrets, please refer to the corresponding section of the pydantic documentation.

HTTP API

An OpenAPI specification for this service can be found here.

Architecture and Design:

Development

For setting up the development environment, we rely on the devcontainer feature of VS Code in combination with Docker Compose.

To use it, you have to have Docker Compose as well as VS Code with its "Remote - Containers" extension (ms-vscode-remote.remote-containers) installed. Then open this repository in VS Code and run the command Remote-Containers: Reopen in Container from the VS Code "Command Palette".

This will give you a full-fledged, pre-configured development environment including:

  • infrastructural dependencies of the service (databases, etc.)
  • all relevant VS Code extensions pre-installed
  • pre-configured linting and auto-formatting
  • a pre-configured debugger
  • automatic license-header insertion

Moreover, inside the devcontainer, a command dev_install is available for convenience. It installs the service with all development dependencies, and it installs pre-commit.

The installation is performed automatically when you build the devcontainer. However, if you update dependencies in the ./pyproject.toml or the lock/requirements-dev.txt, please run it again.

License

This repository is free to use and modify according to the Apache 2.0 License.

README Generation

This README file is auto-generated, please see .readme_generation/README.md for details.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 5