Skip to content

fix: improve worker validation, scheduling docs, and pipeline state consistency #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
elixir 1.8.2
erlang 21.3
elixir 1.18.4
erlang 27.1
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Flume is a job processing system backed by [GenStage](https://github.com/elixir-
- **Scheduled Jobs** - Jobs can be scheduled to run at any point in future.
- **Rate Limiting** - Uses redis to maintain rate-limit on pipelines.
- **Batch Processing** - Jobs are grouped based on size.
- **Logging** - Provides a behaviour `Flume.Logger` to define your own logger module.
- **Logging** - Uses Elixir's standard Logger for all logging needs.
- **Pipeline Control** - Queues can be pause/resume at runtime.
- **Instrumentation** - Metrics like worker duration and latency to fetch jobs from redis are emitted via [telemetry](https://github.com/beam-telemetry/telemetry).
- **Exponential Back-off** - On failure, jobs are retried with exponential back-off. Minimum and maximum can be set via configuration.
Expand Down Expand Up @@ -199,19 +199,19 @@ end
**With default function**

```elixir
# 10 seconds
schedule_time = 10_000
# Unix timestamp (e.g., 10 seconds from now)
unix_time_in_seconds = DateTime.utc_now() |> DateTime.to_unix() |> Kernel.+(10)

Flume.enqueue_in(:queue_name, schedule_time, MyApp.FancyWorker, [arg_1, arg_2])
Flume.enqueue_in(:queue_name, unix_time_in_seconds, MyApp.FancyWorker, [arg_1, arg_2])
```

**With custom function**

```elixir
# 10 seconds
schedule_time = 10_000
# Unix timestamp (e.g., 10 seconds from now)
unix_time_in_seconds = DateTime.utc_now() |> DateTime.to_unix() |> Kernel.+(10)

Flume.enqueue_in(:queue_name, schedule_time, MyApp.FancyWorker, :myfunc, [arg_1, arg_2])
Flume.enqueue_in(:queue_name, unix_time_in_seconds, MyApp.FancyWorker, :myfunc, [arg_1, arg_2])
```

### Rate Limiting
Expand Down
6 changes: 3 additions & 3 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# This file is responsible for configuring your application
# and its dependencies with the aid of the Mix.Config module.
use Mix.Config
# and its dependencies with the aid of the Config module.
import Config

config :flume,
name: Flume,
Expand All @@ -21,4 +21,4 @@ config :flume,
scheduler_poll_interval: 10_000,
max_retries: 10

import_config "#{Mix.env()}.exs"
import_config "#{config_env()}.exs"
2 changes: 1 addition & 1 deletion config/dev.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use Mix.Config
import Config

config :flume,
name: Flume,
Expand Down
6 changes: 6 additions & 0 deletions config/prod.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import Config

# Do not print debug messages in production
config :logger, level: :info

# Runtime production config is done by runtime.exs
26 changes: 19 additions & 7 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -1,24 +1,36 @@
use Mix.Config
import Config

# Configure Flume for testing
config :flume,
name: Flume,
host: {:system, "FLUME_REDIS_HOST", "127.0.0.1"},
port: {:system, "FLUME_REDIS_PORT", "6379"},
namespace: "flume_test",
database: 0,
database: 1,
redis_timeout: 5000,
redis_pool_size: 1,
pipelines: [%{name: "default_pipeline", queue: "default", max_demand: 1000}],
pipelines: [
%{
name: "default_pipeline",
queue: "default",
max_demand: 1000
},
%{
name: "test_pipeline",
queue: "test",
max_demand: 1000
}
],
backoff_initial: 1,
backoff_max: 2,
scheduler_poll_interval: 10_000,
max_retries: 5
scheduler_poll_interval: 1000,
max_retries: 3

config :logger,
format: "[$level] $message\n",
backends: [{LoggerFileBackend, :error_log}],
level: :warn
level: :warning

config :logger, :error_log,
path: "log/test.log",
level: :warn
level: :warning
16 changes: 8 additions & 8 deletions lib/flume.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,37 +34,37 @@ defmodule Flume do
])
end

def enqueue_in(queue, time_in_seconds, worker, args) do
apply(Config.queue_api_module(), :enqueue_in, [queue, time_in_seconds, worker, args])
def enqueue_in(queue, unix_time_in_seconds, worker, args) do
apply(Config.queue_api_module(), :enqueue_in, [queue, unix_time_in_seconds, worker, args])
end

def enqueue_in(queue, time_in_seconds, worker, function_name, args) do
def enqueue_in(queue, unix_time_in_seconds, worker, function_name, args) do
apply(Config.queue_api_module(), :enqueue_in, [
queue,
time_in_seconds,
unix_time_in_seconds,
worker,
function_name,
args
])
end

def enqueue_in(queue, time_in_seconds, worker, function_name, args, opts) do
def enqueue_in(queue, unix_time_in_seconds, worker, function_name, args, opts) do
apply(Config.queue_api_module(), :enqueue_in, [
queue,
time_in_seconds,
unix_time_in_seconds,
worker,
function_name,
args,
opts
])
end

@spec pause(Flume.Pipeline.Control.Options.option_spec()) :: list(:ok)
@spec pause_all(Flume.Pipeline.Control.Options.option_spec()) :: list(:ok)
def pause_all(options \\ []) do
Config.pipeline_names() |> Enum.map(&pause(&1, options))
end

@spec pause(Flume.Pipeline.Control.Options.option_spec()) :: list(:ok)
@spec resume_all(Flume.Pipeline.Control.Options.option_spec()) :: list(:ok)
def resume_all(options \\ []) do
Config.pipeline_names() |> Enum.map(&resume(&1, options))
end
Expand Down
31 changes: 31 additions & 0 deletions lib/flume/application.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule Flume.Application do
@moduledoc false

use Application

@impl true
def start(_type, _args) do
# Get configuration from application environment
config = %{
name: Application.get_env(:flume, :name, Flume),
host: Application.get_env(:flume, :host, "127.0.0.1"),
port: Application.get_env(:flume, :port, 6379),
namespace: Application.get_env(:flume, :namespace, "flume"),
database: Application.get_env(:flume, :database, 0),
redis_timeout: Application.get_env(:flume, :redis_timeout, 5000),
redis_pool_size: Application.get_env(:flume, :redis_pool_size, 10),
pipelines: Application.get_env(:flume, :pipelines, []),
backoff_initial: Application.get_env(:flume, :backoff_initial, 500),
backoff_max: Application.get_env(:flume, :backoff_max, 10_000),
scheduler_poll_interval: Application.get_env(:flume, :scheduler_poll_interval, 10_000),
max_retries: Application.get_env(:flume, :max_retries, 10),
instrumentation: Application.get_env(:flume, :instrumentation, [
handler_module: Flume.Instrumentation.DefaultEventHandler,
handler_function: :handle,
metadata: [app_name: :flume]
])
}

Flume.Supervisor.start_link(config)
end
end
150 changes: 144 additions & 6 deletions lib/flume/config.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
defmodule Flume.Config do
use GenServer

@defaults %{
backoff_initial: 500,
backoff_max: 10_000,
database: 0,
host: "127.0.0.1",
logger: Flume.DefaultLogger,
mock: false,
max_retries: 5,
name: Flume,
Expand All @@ -25,7 +26,12 @@ defmodule Flume.Config do
handler_module: Flume.Instrumentation.DefaultEventHandler,
handler_function: :handle,
config: [app_name: :flume]
]
],
# Redis Sentinels configuration
sentinels: nil,
sentinel_group: nil,
sentinel_socket_opts: [],
sentinel_reconnect_timeout: 1000
}

@integer_keys [
Expand All @@ -34,11 +40,40 @@ defmodule Flume.Config do
:redis_timeout,
:scheduler_poll_interval,
:backoff_initial,
:backoff_max
:backoff_max,
:sentinel_reconnect_timeout
]

alias Flume.Utils.IntegerExtension

# GenServer callbacks
def start_link(runtime_config \\ %{}) do
GenServer.start_link(__MODULE__, runtime_config, name: __MODULE__)
end

def init(runtime_config) do
{:ok, runtime_config}
end

def handle_call({:get, key}, _from, runtime_config) do
value = get_runtime_or_app_config(key, runtime_config)
{:reply, value, runtime_config}
end

def handle_call({:get, key, fallback}, _from, runtime_config) do
value = get_runtime_or_app_config(key, runtime_config, fallback)
{:reply, value, runtime_config}
end

def handle_call(:get_runtime_config, _from, runtime_config) do
{:reply, runtime_config, runtime_config}
end

def handle_cast({:put, key, value}, runtime_config) do
{:noreply, Map.put(runtime_config, key, value)}
end

# Public API
Map.keys(@defaults)
|> Enum.each(fn key ->
def unquote(key)(), do: get(unquote(key))
Expand All @@ -47,6 +82,36 @@ defmodule Flume.Config do
def get(key), do: get(key, default(key))

def get(key, fallback) do
case Process.whereis(__MODULE__) do
nil ->
# Fallback to application config if GenServer not started
get_app_config(key, fallback)

_pid ->
GenServer.call(__MODULE__, {:get, key, fallback})
end
end

def put(key, value) do
GenServer.cast(__MODULE__, {:put, key, value})
end

def get_runtime_config do
case Process.whereis(__MODULE__) do
nil -> %{}
_pid -> GenServer.call(__MODULE__, :get_runtime_config)
end
end

# Private functions
defp get_runtime_or_app_config(key, runtime_config, fallback \\ nil) do
case Map.get(runtime_config, key) do
nil -> get_app_config(key, fallback || default(key))
value -> parse(key, value)
end
end

defp get_app_config(key, fallback) do
value =
case Application.get_env(:flume, key, fallback) do
{:system, varname} -> System.get_env(varname)
Expand All @@ -69,18 +134,50 @@ defmodule Flume.Config do
end
end

defp parse(_key, {:system, env_var, default_value}) do
System.get_env(env_var, default_value)
end

defp parse(_key, value), do: value

def redis_opts(opts) do
[
def redis_opts(opts \\ []) do
base_opts = [
host: Keyword.get(opts, :host, host()),
port: Keyword.get(opts, :port, port()),
database: Keyword.get(opts, :database, database()),
password: Keyword.get(opts, :password, password())
]

# Add sentinel configuration if available
case {sentinels(), sentinel_group()} do
{nil, _} ->
base_opts

{sentinels, nil} when is_list(sentinels) ->
base_opts

{sentinels, group} when is_list(sentinels) and is_binary(group) ->
# Remove host and port when using sentinels as Redix doesn't allow both
sentinel_opts = [
sentinel: [
sentinels: sentinels,
group: group,
socket_opts: sentinel_socket_opts(),
timeout: sentinel_reconnect_timeout()
]
]

base_opts
|> Keyword.delete(:host)
|> Keyword.delete(:port)
|> Keyword.merge(sentinel_opts)

_ ->
base_opts
end
end

def connection_opts(opts) do
def connection_opts(opts \\ []) do
[timeout: Keyword.get(opts, :timeout, redis_timeout())]
end

Expand Down Expand Up @@ -114,4 +211,45 @@ defmodule Flume.Config do
Flume.Pipeline.MockAPI
end
end

@doc """
Validates Redis Sentinels configuration

## Examples

Basic sentinel configuration:

sentinels: [
[host: "sentinel1.example.com", port: 26379],
[host: "sentinel2.example.com", port: 26379]
],
sentinel_group: "mymaster"

Or using Redis URIs:

sentinels: [
"redis://sentinel1.example.com:26379",
"redis://sentinel2.example.com:26379"
],
sentinel_group: "mymaster"
"""
def valid_sentinel_config? do
case {sentinels(), sentinel_group()} do
{nil, _} -> true
{sentinels, group} when is_list(sentinels) and is_binary(group) ->
Enum.all?(sentinels, &valid_sentinel_entry?/1)
_ -> false
end
end

defp valid_sentinel_entry?(uri) when is_binary(uri) do
# Validate Redis URI format
uri =~ ~r/^redis:\/\//
end

defp valid_sentinel_entry?(%{host: host, port: port})
when is_binary(host) and is_integer(port), do: true
defp valid_sentinel_entry?([host: host, port: port])
when is_binary(host) and is_integer(port), do: true
defp valid_sentinel_entry?(_), do: false
end
Loading