diff --git a/.tool-versions b/.tool-versions index f72829c..f535231 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ -elixir 1.8.2 -erlang 21.3 +elixir 1.18.4 +erlang 27.1 diff --git a/README.md b/README.md index e21550f..79fd336 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ Flume is a job processing system backed by [GenStage](https://github.com/elixir- - **Scheduled Jobs** - Jobs can be scheduled to run at any point in future. - **Rate Limiting** - Uses redis to maintain rate-limit on pipelines. - **Batch Processing** - Jobs are grouped based on size. -- **Logging** - Provides a behaviour `Flume.Logger` to define your own logger module. +- **Logging** - Uses Elixir's standard Logger for all logging needs. - **Pipeline Control** - Queues can be pause/resume at runtime. - **Instrumentation** - Metrics like worker duration and latency to fetch jobs from redis are emitted via [telemetry](https://github.com/beam-telemetry/telemetry). - **Exponential Back-off** - On failure, jobs are retried with exponential back-off. Minimum and maximum can be set via configuration. @@ -199,19 +199,19 @@ end **With default function** ```elixir -# 10 seconds -schedule_time = 10_000 +# Unix timestamp (e.g., 10 seconds from now) +unix_time_in_seconds = DateTime.utc_now() |> DateTime.to_unix() |> Kernel.+(10) -Flume.enqueue_in(:queue_name, schedule_time, MyApp.FancyWorker, [arg_1, arg_2]) +Flume.enqueue_in(:queue_name, unix_time_in_seconds, MyApp.FancyWorker, [arg_1, arg_2]) ``` **With custom function** ```elixir -# 10 seconds -schedule_time = 10_000 +# Unix timestamp (e.g., 10 seconds from now) +unix_time_in_seconds = DateTime.utc_now() |> DateTime.to_unix() |> Kernel.+(10) -Flume.enqueue_in(:queue_name, schedule_time, MyApp.FancyWorker, :myfunc, [arg_1, arg_2]) +Flume.enqueue_in(:queue_name, unix_time_in_seconds, MyApp.FancyWorker, :myfunc, [arg_1, arg_2]) ``` ### Rate Limiting diff --git a/config/config.exs b/config/config.exs index f34b00e..9176dd0 100644 --- a/config/config.exs +++ b/config/config.exs @@ -1,6 +1,6 @@ # This file is responsible for configuring your application -# and its dependencies with the aid of the Mix.Config module. -use Mix.Config +# and its dependencies with the aid of the Config module. +import Config config :flume, name: Flume, @@ -21,4 +21,4 @@ config :flume, scheduler_poll_interval: 10_000, max_retries: 10 -import_config "#{Mix.env()}.exs" +import_config "#{config_env()}.exs" diff --git a/config/dev.exs b/config/dev.exs index a159e2d..82fa23e 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -1,4 +1,4 @@ -use Mix.Config +import Config config :flume, name: Flume, diff --git a/config/prod.exs b/config/prod.exs new file mode 100644 index 0000000..5a88724 --- /dev/null +++ b/config/prod.exs @@ -0,0 +1,6 @@ +import Config + +# Do not print debug messages in production +config :logger, level: :info + +# Runtime production config is done by runtime.exs diff --git a/config/test.exs b/config/test.exs index 7a50c0c..c90e06a 100644 --- a/config/test.exs +++ b/config/test.exs @@ -1,24 +1,36 @@ -use Mix.Config +import Config +# Configure Flume for testing config :flume, name: Flume, host: {:system, "FLUME_REDIS_HOST", "127.0.0.1"}, port: {:system, "FLUME_REDIS_PORT", "6379"}, namespace: "flume_test", - database: 0, + database: 1, redis_timeout: 5000, redis_pool_size: 1, - pipelines: [%{name: "default_pipeline", queue: "default", max_demand: 1000}], + pipelines: [ + %{ + name: "default_pipeline", + queue: "default", + max_demand: 1000 + }, + %{ + name: "test_pipeline", + queue: "test", + max_demand: 1000 + } + ], backoff_initial: 1, backoff_max: 2, - scheduler_poll_interval: 10_000, - max_retries: 5 + scheduler_poll_interval: 1000, + max_retries: 3 config :logger, format: "[$level] $message\n", backends: [{LoggerFileBackend, :error_log}], - level: :warn + level: :warning config :logger, :error_log, path: "log/test.log", - level: :warn + level: :warning diff --git a/lib/flume.ex b/lib/flume.ex index 883b408..5978327 100644 --- a/lib/flume.ex +++ b/lib/flume.ex @@ -34,24 +34,24 @@ defmodule Flume do ]) end - def enqueue_in(queue, time_in_seconds, worker, args) do - apply(Config.queue_api_module(), :enqueue_in, [queue, time_in_seconds, worker, args]) + def enqueue_in(queue, unix_time_in_seconds, worker, args) do + apply(Config.queue_api_module(), :enqueue_in, [queue, unix_time_in_seconds, worker, args]) end - def enqueue_in(queue, time_in_seconds, worker, function_name, args) do + def enqueue_in(queue, unix_time_in_seconds, worker, function_name, args) do apply(Config.queue_api_module(), :enqueue_in, [ queue, - time_in_seconds, + unix_time_in_seconds, worker, function_name, args ]) end - def enqueue_in(queue, time_in_seconds, worker, function_name, args, opts) do + def enqueue_in(queue, unix_time_in_seconds, worker, function_name, args, opts) do apply(Config.queue_api_module(), :enqueue_in, [ queue, - time_in_seconds, + unix_time_in_seconds, worker, function_name, args, @@ -59,12 +59,12 @@ defmodule Flume do ]) end - @spec pause(Flume.Pipeline.Control.Options.option_spec()) :: list(:ok) + @spec pause_all(Flume.Pipeline.Control.Options.option_spec()) :: list(:ok) def pause_all(options \\ []) do Config.pipeline_names() |> Enum.map(&pause(&1, options)) end - @spec pause(Flume.Pipeline.Control.Options.option_spec()) :: list(:ok) + @spec resume_all(Flume.Pipeline.Control.Options.option_spec()) :: list(:ok) def resume_all(options \\ []) do Config.pipeline_names() |> Enum.map(&resume(&1, options)) end diff --git a/lib/flume/application.ex b/lib/flume/application.ex new file mode 100644 index 0000000..4daffa1 --- /dev/null +++ b/lib/flume/application.ex @@ -0,0 +1,31 @@ +defmodule Flume.Application do + @moduledoc false + + use Application + + @impl true + def start(_type, _args) do + # Get configuration from application environment + config = %{ + name: Application.get_env(:flume, :name, Flume), + host: Application.get_env(:flume, :host, "127.0.0.1"), + port: Application.get_env(:flume, :port, 6379), + namespace: Application.get_env(:flume, :namespace, "flume"), + database: Application.get_env(:flume, :database, 0), + redis_timeout: Application.get_env(:flume, :redis_timeout, 5000), + redis_pool_size: Application.get_env(:flume, :redis_pool_size, 10), + pipelines: Application.get_env(:flume, :pipelines, []), + backoff_initial: Application.get_env(:flume, :backoff_initial, 500), + backoff_max: Application.get_env(:flume, :backoff_max, 10_000), + scheduler_poll_interval: Application.get_env(:flume, :scheduler_poll_interval, 10_000), + max_retries: Application.get_env(:flume, :max_retries, 10), + instrumentation: Application.get_env(:flume, :instrumentation, [ + handler_module: Flume.Instrumentation.DefaultEventHandler, + handler_function: :handle, + metadata: [app_name: :flume] + ]) + } + + Flume.Supervisor.start_link(config) + end +end diff --git a/lib/flume/config.ex b/lib/flume/config.ex index 0025366..75f9664 100644 --- a/lib/flume/config.ex +++ b/lib/flume/config.ex @@ -1,10 +1,11 @@ defmodule Flume.Config do + use GenServer + @defaults %{ backoff_initial: 500, backoff_max: 10_000, database: 0, host: "127.0.0.1", - logger: Flume.DefaultLogger, mock: false, max_retries: 5, name: Flume, @@ -25,7 +26,12 @@ defmodule Flume.Config do handler_module: Flume.Instrumentation.DefaultEventHandler, handler_function: :handle, config: [app_name: :flume] - ] + ], + # Redis Sentinels configuration + sentinels: nil, + sentinel_group: nil, + sentinel_socket_opts: [], + sentinel_reconnect_timeout: 1000 } @integer_keys [ @@ -34,11 +40,40 @@ defmodule Flume.Config do :redis_timeout, :scheduler_poll_interval, :backoff_initial, - :backoff_max + :backoff_max, + :sentinel_reconnect_timeout ] alias Flume.Utils.IntegerExtension + # GenServer callbacks + def start_link(runtime_config \\ %{}) do + GenServer.start_link(__MODULE__, runtime_config, name: __MODULE__) + end + + def init(runtime_config) do + {:ok, runtime_config} + end + + def handle_call({:get, key}, _from, runtime_config) do + value = get_runtime_or_app_config(key, runtime_config) + {:reply, value, runtime_config} + end + + def handle_call({:get, key, fallback}, _from, runtime_config) do + value = get_runtime_or_app_config(key, runtime_config, fallback) + {:reply, value, runtime_config} + end + + def handle_call(:get_runtime_config, _from, runtime_config) do + {:reply, runtime_config, runtime_config} + end + + def handle_cast({:put, key, value}, runtime_config) do + {:noreply, Map.put(runtime_config, key, value)} + end + + # Public API Map.keys(@defaults) |> Enum.each(fn key -> def unquote(key)(), do: get(unquote(key)) @@ -47,6 +82,36 @@ defmodule Flume.Config do def get(key), do: get(key, default(key)) def get(key, fallback) do + case Process.whereis(__MODULE__) do + nil -> + # Fallback to application config if GenServer not started + get_app_config(key, fallback) + + _pid -> + GenServer.call(__MODULE__, {:get, key, fallback}) + end + end + + def put(key, value) do + GenServer.cast(__MODULE__, {:put, key, value}) + end + + def get_runtime_config do + case Process.whereis(__MODULE__) do + nil -> %{} + _pid -> GenServer.call(__MODULE__, :get_runtime_config) + end + end + + # Private functions + defp get_runtime_or_app_config(key, runtime_config, fallback \\ nil) do + case Map.get(runtime_config, key) do + nil -> get_app_config(key, fallback || default(key)) + value -> parse(key, value) + end + end + + defp get_app_config(key, fallback) do value = case Application.get_env(:flume, key, fallback) do {:system, varname} -> System.get_env(varname) @@ -69,18 +134,50 @@ defmodule Flume.Config do end end + defp parse(_key, {:system, env_var, default_value}) do + System.get_env(env_var, default_value) + end + defp parse(_key, value), do: value - def redis_opts(opts) do - [ + def redis_opts(opts \\ []) do + base_opts = [ host: Keyword.get(opts, :host, host()), port: Keyword.get(opts, :port, port()), database: Keyword.get(opts, :database, database()), password: Keyword.get(opts, :password, password()) ] + + # Add sentinel configuration if available + case {sentinels(), sentinel_group()} do + {nil, _} -> + base_opts + + {sentinels, nil} when is_list(sentinels) -> + base_opts + + {sentinels, group} when is_list(sentinels) and is_binary(group) -> + # Remove host and port when using sentinels as Redix doesn't allow both + sentinel_opts = [ + sentinel: [ + sentinels: sentinels, + group: group, + socket_opts: sentinel_socket_opts(), + timeout: sentinel_reconnect_timeout() + ] + ] + + base_opts + |> Keyword.delete(:host) + |> Keyword.delete(:port) + |> Keyword.merge(sentinel_opts) + + _ -> + base_opts + end end - def connection_opts(opts) do + def connection_opts(opts \\ []) do [timeout: Keyword.get(opts, :timeout, redis_timeout())] end @@ -114,4 +211,45 @@ defmodule Flume.Config do Flume.Pipeline.MockAPI end end + + @doc """ + Validates Redis Sentinels configuration + + ## Examples + + Basic sentinel configuration: + + sentinels: [ + [host: "sentinel1.example.com", port: 26379], + [host: "sentinel2.example.com", port: 26379] + ], + sentinel_group: "mymaster" + + Or using Redis URIs: + + sentinels: [ + "redis://sentinel1.example.com:26379", + "redis://sentinel2.example.com:26379" + ], + sentinel_group: "mymaster" + """ + def valid_sentinel_config? do + case {sentinels(), sentinel_group()} do + {nil, _} -> true + {sentinels, group} when is_list(sentinels) and is_binary(group) -> + Enum.all?(sentinels, &valid_sentinel_entry?/1) + _ -> false + end + end + + defp valid_sentinel_entry?(uri) when is_binary(uri) do + # Validate Redis URI format + uri =~ ~r/^redis:\/\// + end + + defp valid_sentinel_entry?(%{host: host, port: port}) + when is_binary(host) and is_integer(port), do: true + defp valid_sentinel_entry?([host: host, port: port]) + when is_binary(host) and is_integer(port), do: true + defp valid_sentinel_entry?(_), do: false end diff --git a/lib/flume/default_logger.ex b/lib/flume/default_logger.ex deleted file mode 100644 index 8a6c29e..0000000 --- a/lib/flume/default_logger.ex +++ /dev/null @@ -1,22 +0,0 @@ -defmodule Flume.DefaultLogger do - @behaviour Flume.Logger - - require Logger - - def debug(message, %{}) do - if Flume.Config.debug_log(), do: Logger.debug(message) - end - - def debug(message, opts) do - if Flume.Config.debug_log(), do: Logger.debug("#{message} - #{inspect(opts)}") - end - - def error(message, %{}), do: Logger.error(message) - def error(message, opts), do: Logger.error("#{message} - #{inspect(opts)}") - - def info(message, %{}), do: Logger.info(message) - def info(message, opts), do: Logger.info("#{message} - #{inspect(opts)}") - - def warn(message, %{}), do: Logger.warn(message) - def warn(message, opts), do: Logger.warn("#{message} - #{inspect(opts)}") -end diff --git a/lib/flume/instrumentation/default_event_handler.ex b/lib/flume/instrumentation/default_event_handler.ex index ce3da7c..3eb9509 100644 --- a/lib/flume/instrumentation/default_event_handler.ex +++ b/lib/flume/instrumentation/default_event_handler.ex @@ -1,9 +1,9 @@ defmodule Flume.Instrumentation.DefaultEventHandler do @behaviour Flume.Instrumentation.EventHandler - require Flume.Logger + require Logger - alias Flume.{Instrumentation, Logger} + alias Flume.Instrumentation def handle( event_name, diff --git a/lib/flume/logger.ex b/lib/flume/logger.ex deleted file mode 100644 index d350ab9..0000000 --- a/lib/flume/logger.ex +++ /dev/null @@ -1,59 +0,0 @@ -defmodule Flume.Logger do - @moduledoc """ - Behaviour module for logging to ensure that - the following callbacks are implemented - """ - - @callback debug(String.t(), map()) :: :ok | :error - @callback error(String.t(), map()) :: :ok | :error - @callback info(String.t(), map()) :: :ok | :error - @callback warn(String.t(), map()) :: :ok | :error - - defmacro debug(message) do - quote location: :keep do - apply(Flume.Config.logger(), :debug, [unquote(message), %{}]) - end - end - - defmacro error(message) do - quote location: :keep do - apply(Flume.Config.logger(), :error, [unquote(message), %{}]) - end - end - - defmacro info(message) do - quote location: :keep do - apply(Flume.Config.logger(), :info, [unquote(message), %{}]) - end - end - - defmacro warn(message) do - quote location: :keep do - apply(Flume.Config.logger(), :warn, [unquote(message), %{}]) - end - end - - defmacro debug(message, opts) do - quote location: :keep do - apply(Flume.Config.logger(), :debug, [unquote(message), unquote(opts)]) - end - end - - defmacro error(message, opts) do - quote location: :keep do - apply(Flume.Config.logger(), :error, [unquote(message), unquote(opts)]) - end - end - - defmacro info(message, opts) do - quote location: :keep do - apply(Flume.Config.logger(), :info, [unquote(message), unquote(opts)]) - end - end - - defmacro warn(message, opts) do - quote location: :keep do - apply(Flume.Config.logger(), :warn, [unquote(message), unquote(opts)]) - end - end -end diff --git a/lib/flume/pipeline/bulk_event/worker.ex b/lib/flume/pipeline/bulk_event/worker.ex index a82349f..ac32576 100644 --- a/lib/flume/pipeline/bulk_event/worker.ex +++ b/lib/flume/pipeline/bulk_event/worker.ex @@ -1,7 +1,8 @@ defmodule Flume.Pipeline.BulkEvent.Worker do - require Flume.{Instrumentation, Logger} + require Logger + require Flume.Instrumentation - alias Flume.{BulkEvent, Instrumentation, Logger} + alias Flume.{BulkEvent, Instrumentation} alias Flume.Pipeline.SystemEvent, as: SystemEventPipeline alias Flume.Pipeline.Context, as: WorkerContext diff --git a/lib/flume/pipeline/event.ex b/lib/flume/pipeline/event.ex index ec83d6f..bc483f1 100644 --- a/lib/flume/pipeline/event.ex +++ b/lib/flume/pipeline/event.ex @@ -39,19 +39,33 @@ defmodule Flume.Pipeline.Event do end def pause(pipeline_name, opts) do - unless opts[:temporary] do - RedisClient.set(paused_redis_key(pipeline_name), true) - end + # First, attempt the GenStage pause operation + case EventPipeline.Producer.pause(pipeline_name, opts[:async], opts[:timeout]) do + :ok -> + # Only set Redis key if GenStage pause succeeded and it's not temporary + unless opts[:temporary] do + RedisClient.set(paused_redis_key(pipeline_name), true) + end + :ok - EventPipeline.Producer.pause(pipeline_name, opts[:async], opts[:timeout]) + error -> + error + end end def resume(pipeline_name, opts) do - unless opts[:temporary] do - RedisClient.del(paused_redis_key(pipeline_name)) - end + # First, attempt the GenStage resume operation + case EventPipeline.Producer.resume(pipeline_name, opts[:async], opts[:timeout]) do + :ok -> + # Only delete Redis key if GenStage resume succeeded and it's not temporary + unless opts[:temporary] do + RedisClient.del(paused_redis_key(pipeline_name)) + end + :ok - EventPipeline.Producer.resume(pipeline_name, opts[:async], opts[:timeout]) + error -> + error + end end def pending_workers_count(pipeline_names \\ Flume.Config.pipeline_names()) do diff --git a/lib/flume/pipeline/event/consumer.ex b/lib/flume/pipeline/event/consumer.ex index bc637ac..8567025 100644 --- a/lib/flume/pipeline/event/consumer.ex +++ b/lib/flume/pipeline/event/consumer.ex @@ -6,6 +6,9 @@ defmodule Flume.Pipeline.Event.Consumer do use ConsumerSupervisor + require Flume.Instrumentation + + alias Flume.{Instrumentation} alias Flume.Pipeline.Event.Worker # Client API @@ -19,10 +22,29 @@ defmodule Flume.Pipeline.Event.Consumer do # Server callbacks def init({state, worker}) do + # Emit telemetry for consumer initialization + Instrumentation.execute( + [:flume, :consumer, :init], + %{system_time: System.system_time()}, + %{ + pipeline_name: state.name, + queue_name: state.queue, + rate_limit_count: Map.get(state, :rate_limit_count), + rate_limit_scale: Map.get(state, :rate_limit_scale), + rate_limit_key: Map.get(state, :rate_limit_key), + max_demand: state.max_demand + }, + Map.get(state, :instrument, false) + ) + instrument = Map.get(state, :instrument, false) children = [ - worker(worker, [%{name: state.name, instrument: instrument}], restart: :temporary) + %{ + id: worker, + start: {worker, :start_link, [%{name: state.name, instrument: instrument}]}, + restart: :temporary + } ] upstream = upstream_process_name(state.name) diff --git a/lib/flume/pipeline/event/producer.ex b/lib/flume/pipeline/event/producer.ex index 850df40..7e1f708 100644 --- a/lib/flume/pipeline/event/producer.ex +++ b/lib/flume/pipeline/event/producer.ex @@ -7,9 +7,10 @@ defmodule Flume.Pipeline.Event.Producer do """ use GenStage - require Flume.{Instrumentation, Logger} + require Logger + require Flume.Instrumentation - alias Flume.{Logger, Instrumentation, Utils, Config} + alias Flume.{Instrumentation, Utils, Config} alias Flume.Queue.Manager, as: QueueManager alias Flume.Pipeline.Event, as: EventPipeline @@ -44,6 +45,21 @@ defmodule Flume.Pipeline.Event.Producer do # Server callbacks def init(%{name: name} = state) do + # Emit telemetry for producer initialization + Instrumentation.execute( + [:flume, :producer, :init], + %{system_time: System.system_time()}, + %{ + pipeline_name: name, + queue_name: state.queue, + rate_limit_count: Map.get(state, :rate_limit_count), + rate_limit_scale: Map.get(state, :rate_limit_scale), + rate_limit_key: Map.get(state, :rate_limit_key), + max_demand: state.max_demand + }, + Map.get(state, :instrument, false) + ) + state = state |> Map.put(:paused, EventPipeline.paused_state(name)) @@ -117,9 +133,9 @@ defmodule Flume.Pipeline.Event.Producer do defp dispatch_events(%{demand: demand, batch_size: nil} = state) when demand > 0 do Logger.debug("#{state.name} [Producer] pulling #{demand} events") - {duration, _} = + {duration, fetch_result} = Instrumentation.measure do - fetch_result = take(demand, state) + take(demand, state) end case fetch_result do @@ -134,9 +150,9 @@ defmodule Flume.Pipeline.Event.Producer do Logger.debug("#{state.name} [Producer] pulling #{events_to_ask} events") - {duration, _} = + {duration, fetch_result} = Instrumentation.measure do - fetch_result = take(events_to_ask, state) + take(events_to_ask, state) end case fetch_result do @@ -155,6 +171,30 @@ defmodule Flume.Pipeline.Event.Producer do count = length(events) Logger.debug("#{state.name} [Producer] pulled #{count} events from source") + # Emit telemetry for events fetch + metadata = %{ + pipeline_name: state.name, + queue_name: state.queue, + rate_limit_count: Map.get(state, :rate_limit_count), + rate_limit_scale: Map.get(state, :rate_limit_scale), + rate_limit_key: Map.get(state, :rate_limit_key), + max_demand: state.max_demand + } + + Instrumentation.execute( + [:flume, :producer, :events, :fetch], + %{count: count}, + metadata, + Map.get(state, :instrument, false) + ) + + Instrumentation.execute( + [:flume, :producer, :events, :fetch, :stop], + %{duration: fetch_duration}, + metadata, + Map.get(state, :instrument, false) + ) + queue_atom = String.to_atom(state.queue) Instrumentation.execute( diff --git a/lib/flume/pipeline/event/producer_consumer.ex b/lib/flume/pipeline/event/producer_consumer.ex index bf4fe0f..4f5191d 100644 --- a/lib/flume/pipeline/event/producer_consumer.ex +++ b/lib/flume/pipeline/event/producer_consumer.ex @@ -7,9 +7,10 @@ defmodule Flume.Pipeline.Event.ProducerConsumer do """ use GenStage - require Flume.Logger + require Logger + require Flume.Instrumentation - alias Flume.{BulkEvent, Event, Logger} + alias Flume.{BulkEvent, Event, Instrumentation} # Client API def start_link(%{} = pipeline) do @@ -18,6 +19,21 @@ defmodule Flume.Pipeline.Event.ProducerConsumer do # Server callbacks def init(state) do + # Emit telemetry for producer consumer initialization + Instrumentation.execute( + [:flume, :producer_consumer, :init], + %{system_time: System.system_time()}, + %{ + pipeline_name: state.name, + queue_name: state.queue, + rate_limit_count: Map.get(state, :rate_limit_count), + rate_limit_scale: Map.get(state, :rate_limit_scale), + rate_limit_key: Map.get(state, :rate_limit_key), + max_demand: state.max_demand + }, + Map.get(state, :instrument, false) + ) + upstream = upstream_process_name(state.name) {:producer_consumer, state, @@ -26,7 +42,23 @@ defmodule Flume.Pipeline.Event.ProducerConsumer do # Process events one-by-one when batch_size is not set def handle_events(events, _from, %{batch_size: nil} = state) do - Logger.debug("#{state.name} [ProducerConsumer] received #{length(events)} events") + count = length(events) + Logger.debug("#{state.name} [ProducerConsumer] received #{count} events") + + # Emit telemetry for events received + Instrumentation.execute( + [:flume, :producer_consumer, :events, :received], + %{count: count}, + %{ + pipeline_name: state.name, + queue_name: state.queue, + rate_limit_count: Map.get(state, :rate_limit_count), + rate_limit_scale: Map.get(state, :rate_limit_scale), + rate_limit_key: Map.get(state, :rate_limit_key), + max_demand: state.max_demand + }, + Map.get(state, :instrument, false) + ) {:noreply, events, state} end @@ -35,7 +67,23 @@ defmodule Flume.Pipeline.Event.ProducerConsumer do # The consumer will receive each group as a single event # and process the group together def handle_events(events, _from, state) do - Logger.debug("#{state.name} [ProducerConsumer] received #{length(events)} events") + count = length(events) + Logger.debug("#{state.name} [ProducerConsumer] received #{count} events") + + # Emit telemetry for events received + Instrumentation.execute( + [:flume, :producer_consumer, :events, :received], + %{count: count}, + %{ + pipeline_name: state.name, + queue_name: state.queue, + rate_limit_count: Map.get(state, :rate_limit_count), + rate_limit_scale: Map.get(state, :rate_limit_scale), + rate_limit_key: Map.get(state, :rate_limit_key), + max_demand: state.max_demand + }, + Map.get(state, :instrument, false) + ) grouped_events = group_similar_events(events, state.batch_size) diff --git a/lib/flume/pipeline/event/worker.ex b/lib/flume/pipeline/event/worker.ex index d262bb9..61c3148 100644 --- a/lib/flume/pipeline/event/worker.ex +++ b/lib/flume/pipeline/event/worker.ex @@ -6,9 +6,10 @@ defmodule Flume.Pipeline.Event.Worker do Producer <- ProducerConsumer <- ConsumerSupervisor <- [**Consumer**] """ - require Flume.{Instrumentation, Logger} + require Logger + require Flume.Instrumentation - alias Flume.{BulkEvent, Event, Instrumentation, Logger} + alias Flume.{BulkEvent, Event, Instrumentation} alias Flume.Pipeline.BulkEvent, as: BulkEventPipeline alias Flume.Pipeline.SystemEvent, as: SystemEventPipeline alias Flume.Pipeline.Context, as: WorkerContext diff --git a/lib/flume/pipeline/system_event/consumer.ex b/lib/flume/pipeline/system_event/consumer.ex index 51c08d8..b02f4d6 100644 --- a/lib/flume/pipeline/system_event/consumer.ex +++ b/lib/flume/pipeline/system_event/consumer.ex @@ -8,7 +8,7 @@ defmodule Flume.Pipeline.SystemEvent.Consumer do alias Flume.Pipeline.SystemEvent.{Producer, Worker} - def start_link do + def start_link(_args \\ []) do ConsumerSupervisor.start_link(__MODULE__, :ok) end @@ -16,7 +16,11 @@ defmodule Flume.Pipeline.SystemEvent.Consumer do def init(:ok) do children = [ - worker(Worker, [], restart: :temporary) + %{ + id: Worker, + start: {Worker, :start_link, []}, + restart: :temporary + } ] { diff --git a/lib/flume/pipeline/system_event/supervisor.ex b/lib/flume/pipeline/system_event/supervisor.ex index 04f4500..4e83876 100644 --- a/lib/flume/pipeline/system_event/supervisor.ex +++ b/lib/flume/pipeline/system_event/supervisor.ex @@ -3,10 +3,10 @@ defmodule Flume.Pipeline.SystemEvent.Supervisor do alias Flume.Pipeline.SystemEvent - def start_link do + def start_link(_args \\ []) do children = [ - worker(SystemEvent.Producer, [0]), - worker(SystemEvent.Consumer, []) + {SystemEvent.Producer, 0}, + SystemEvent.Consumer ] opts = [ diff --git a/lib/flume/queue/default_api.ex b/lib/flume/queue/default_api.ex index adfd5dc..4e0b949 100644 --- a/lib/flume/queue/default_api.ex +++ b/lib/flume/queue/default_api.ex @@ -27,7 +27,7 @@ defmodule Flume.Queue.DefaultAPI do def enqueue_in( queue, - time_in_seconds, + unix_time_in_seconds, worker, function_name \\ :perform, args, @@ -36,7 +36,7 @@ defmodule Flume.Queue.DefaultAPI do Manager.enqueue_in( namespace(), queue, - time_in_seconds, + unix_time_in_seconds, worker, function_name, args, diff --git a/lib/flume/queue/job_validator.ex b/lib/flume/queue/job_validator.ex new file mode 100644 index 0000000..8a26523 --- /dev/null +++ b/lib/flume/queue/job_validator.ex @@ -0,0 +1,102 @@ +defmodule Flume.Queue.JobValidator do + @moduledoc """ + Validates jobs before they are enqueued to prevent invalid jobs that are + bound to fail from being processed. + """ + + alias Flume.Config + + @type validation_error :: + {:invalid_queue, String.t()} | + {:invalid_worker_module, atom()} | + {:invalid_worker_function, {atom(), atom(), non_neg_integer()}} + + @doc """ + Validates a job before enqueueing. + + Checks: + 1. Queue name against configured pipelines + 2. Worker module existence + 3. Worker function existence (name + arity) + + Returns `:ok` if valid, `{:error, reason}` if invalid. + """ + @spec validate_job(String.t() | atom(), atom(), atom(), list()) :: + :ok | {:error, validation_error()} + def validate_job(queue, worker_module, function_name, args) do + with :ok <- validate_queue(queue), + :ok <- validate_worker_module(worker_module), + :ok <- validate_worker_function(worker_module, function_name, args) do + :ok + end + end + + @doc """ + Validates multiple jobs for bulk enqueueing. + + Returns `:ok` if all jobs are valid, `{:error, {index, reason}}` if any job is invalid. + """ + @spec validate_bulk_jobs(String.t() | atom(), list()) :: + :ok | {:error, {non_neg_integer(), validation_error()}} + def validate_bulk_jobs(queue, jobs) do + jobs + |> Enum.with_index() + |> Enum.reduce_while(:ok, fn {job, index}, :ok -> + case validate_single_bulk_job(queue, job) do + :ok -> {:cont, :ok} + {:error, reason} -> {:halt, {:error, {index, reason}}} + end + end) + end + + # Private functions + + defp validate_single_bulk_job(queue, [worker_module, function_name, args]) do + validate_job(queue, worker_module, function_name, args) + end + + defp validate_single_bulk_job(queue, [worker_module, args]) do + validate_job(queue, worker_module, :perform, args) + end + + defp validate_single_bulk_job(_queue, _invalid_format) do + {:error, {:invalid_job_format, "Job must be [worker, args] or [worker, function, args]"}} + end + + defp validate_queue(queue) do + queue_name = to_string(queue) + configured_queues = Config.queues() + + if queue_name in configured_queues do + :ok + else + {:error, {:invalid_queue, "Queue '#{queue_name}' is not configured in pipelines"}} + end + end + + defp validate_worker_module(worker_module) when is_atom(worker_module) do + case Code.ensure_loaded(worker_module) do + {:module, _} -> :ok + {:error, _} -> {:error, {:invalid_worker_module, worker_module}} + end + end + + defp validate_worker_module(worker_module) do + {:error, {:invalid_worker_module, worker_module}} + end + + defp validate_worker_function(worker_module, function_name, args) + when is_atom(function_name) and is_list(args) do + arity = length(args) + + if function_exported?(worker_module, function_name, arity) do + :ok + else + {:error, {:invalid_worker_function, {worker_module, function_name, arity}}} + end + end + + defp validate_worker_function(worker_module, function_name, args) do + {:error, {:invalid_worker_function, {worker_module, function_name, length(args || [])}}} + end +end diff --git a/lib/flume/queue/manager.ex b/lib/flume/queue/manager.ex index adc4de3..885e960 100644 --- a/lib/flume/queue/manager.ex +++ b/lib/flume/queue/manager.ex @@ -1,9 +1,9 @@ defmodule Flume.Queue.Manager do - require Flume.Logger + require Logger - alias Flume.{Config, Event, Logger, Instrumentation, Utils, Redis} + alias Flume.{Config, Event, Instrumentation, Utils, Redis} alias Flume.Redis.Job - alias Flume.Queue.Backoff + alias Flume.Queue.{Backoff, JobValidator} alias Flume.Support.Time, as: TimeExtension @external_resource "priv/scripts/enqueue_processing_jobs.lua" @@ -17,53 +17,71 @@ defmodule Flume.Queue.Manager do args, opts \\ [] ) do - job = serialized_job(queue, worker, function_name, args, opts[:context]) - queue_atom = if is_atom(queue), do: queue, else: String.to_atom(queue) + case JobValidator.validate_job(queue, worker, function_name, args) do + :ok -> + job = serialized_job(queue, worker, function_name, args, opts[:context]) + queue_atom = if is_atom(queue), do: queue, else: String.to_atom(queue) - Instrumentation.execute( - [queue_atom, :enqueue], - %{payload_size: byte_size(job)}, - true - ) + Instrumentation.execute( + [queue_atom, :enqueue], + %{payload_size: byte_size(job)}, + true + ) + + Job.enqueue(queue_key(namespace, queue), job) - Job.enqueue(queue_key(namespace, queue), job) + {:error, reason} -> + {:error, format_validation_error(reason)} + end end def bulk_enqueue(namespace, queue, jobs, opts \\ []) do - jobs = - jobs - |> Enum.map(fn - [worker, function_name, args] -> - serialized_job(queue, worker, function_name, args, opts[:context]) - - [worker, args] -> - serialized_job(queue, worker, :perform, args, opts[:context]) - end) - - queue_atom = if is_atom(queue), do: queue, else: String.to_atom(queue) - - Instrumentation.execute( - [queue_atom, :enqueue], - %{payload_size: Utils.payload_size(jobs)}, - true - ) - - Job.bulk_enqueue(queue_key(namespace, queue), jobs) + case JobValidator.validate_bulk_jobs(queue, jobs) do + :ok -> + jobs = + jobs + |> Enum.map(fn + [worker, function_name, args] -> + serialized_job(queue, worker, function_name, args, opts[:context]) + + [worker, args] -> + serialized_job(queue, worker, :perform, args, opts[:context]) + end) + + queue_atom = if is_atom(queue), do: queue, else: String.to_atom(queue) + + Instrumentation.execute( + [queue_atom, :enqueue], + %{payload_size: Utils.payload_size(jobs)}, + true + ) + + Job.bulk_enqueue(queue_key(namespace, queue), jobs) + + {:error, {index, reason}} -> + {:error, "Job at index #{index}: #{format_validation_error(reason)}"} + end end def enqueue_in( namespace, queue, - time_in_seconds, + unix_time_in_seconds, worker, function_name, args, opts \\ [] ) do - queue_name = scheduled_key(namespace) - job = serialized_job(queue, worker, function_name, args, opts[:context]) + case JobValidator.validate_job(queue, worker, function_name, args) do + :ok -> + queue_name = scheduled_key(namespace) + job = serialized_job(queue, worker, function_name, args, opts[:context]) - schedule_job_at(queue_name, time_in_seconds, job) + schedule_job_at(queue_name, unix_time_in_seconds, job) + + {:error, reason} -> + {:error, format_validation_error(reason)} + end end def job_counts(namespace, [_queue | _] = queues) do @@ -296,4 +314,23 @@ defmodule Flume.Queue.Manager do {current_score, previous_score} end + + defp format_validation_error(reason) do + case reason do + {:invalid_queue, message} -> + "Invalid queue: #{message}" + + {:invalid_worker_module, module} -> + "Invalid worker module: #{inspect(module)} does not exist or could not be loaded" + + {:invalid_worker_function, {module, function, arity}} -> + "Invalid worker function: #{inspect(module)}.#{function}/#{arity} does not exist" + + {:invalid_job_format, message} -> + "Invalid job format: #{message}" + + _ -> + "Unknown validation error: #{inspect(reason)}" + end + end end diff --git a/lib/flume/queue/mock_api.ex b/lib/flume/queue/mock_api.ex index da2c47c..15cbdd6 100644 --- a/lib/flume/queue/mock_api.ex +++ b/lib/flume/queue/mock_api.ex @@ -59,7 +59,7 @@ defmodule Flume.Queue.MockAPI do def enqueue_in( queue, - time_in_seconds, + unix_time_in_seconds, worker, function_name \\ :perform, args, @@ -68,14 +68,14 @@ defmodule Flume.Queue.MockAPI do def enqueue_in( queue, - time_in_seconds, + unix_time_in_seconds, worker, function_name, args, [] ) do message = %{ - schedule_in: time_in_seconds, + schedule_in: unix_time_in_seconds, queue: queue, worker: worker, function_name: function_name, @@ -89,14 +89,14 @@ defmodule Flume.Queue.MockAPI do def enqueue_in( queue, - time_in_seconds, + unix_time_in_seconds, worker, function_name, args, opts ) do message = %{ - schedule_in: time_in_seconds, + schedule_in: unix_time_in_seconds, queue: queue, worker: worker, function_name: function_name, diff --git a/lib/flume/queue/processing_scheduler.ex b/lib/flume/queue/processing_scheduler.ex index 6d829ba..fa390b0 100644 --- a/lib/flume/queue/processing_scheduler.ex +++ b/lib/flume/queue/processing_scheduler.ex @@ -1,9 +1,9 @@ defmodule Flume.Queue.ProcessingScheduler do - require Flume.Logger + require Logger use GenServer - alias Flume.{Config, Logger} + alias Flume.Config alias Flume.Queue.Manager @max_limit 1000 @@ -31,7 +31,7 @@ defmodule Flume.Queue.ProcessingScheduler do end def handle_info(msg, state) do - Logger.warn("#{__MODULE__}: Unknown message - #{inspect(msg)}") + Logger.warning("#{__MODULE__}: Unknown message - #{inspect(msg)}") {:noreply, state} end diff --git a/lib/flume/queue/scheduler.ex b/lib/flume/queue/scheduler.ex index ca92a66..1946ed1 100644 --- a/lib/flume/queue/scheduler.ex +++ b/lib/flume/queue/scheduler.ex @@ -1,9 +1,8 @@ defmodule Flume.Queue.Scheduler do - require Flume.Logger + require Logger use GenServer - alias Flume.Logger alias Flume.Queue.Manager alias Flume.Support.Time @@ -30,7 +29,7 @@ defmodule Flume.Queue.Scheduler do end def handle_info(msg, state) do - Logger.warn("#{__MODULE__}: Unknown message - #{inspect(msg)}") + Logger.warning("#{__MODULE__}: Unknown message - #{inspect(msg)}") {:noreply, state} end diff --git a/lib/flume/redis/bulk_dequeue.ex b/lib/flume/redis/bulk_dequeue.ex index 84b3085..e600cb1 100644 --- a/lib/flume/redis/bulk_dequeue.ex +++ b/lib/flume/redis/bulk_dequeue.ex @@ -1,8 +1,8 @@ defmodule Flume.Redis.BulkDequeue do - require Flume.Logger + require Logger alias Flume.Redis.{Client, Lock} - alias Flume.{Config, Utils, Logger} + alias Flume.{Config, Utils} @dequeue_lock_suffix "bulk_dequeue_lock" @dequeue_lock_ttl Config.dequeue_lock_ttl() diff --git a/lib/flume/redis/client.ex b/lib/flume/redis/client.ex index e22cc1e..7302c7d 100644 --- a/lib/flume/redis/client.ex +++ b/lib/flume/redis/client.ex @@ -1,7 +1,7 @@ defmodule Flume.Redis.Client do - require Flume.Logger + require Logger - alias Flume.{Config, Logger} + alias Flume.Config alias Flume.Redis.Command # Redis commands diff --git a/lib/flume/redis/job.ex b/lib/flume/redis/job.ex index 2bcee97..b3668ee 100644 --- a/lib/flume/redis/job.ex +++ b/lib/flume/redis/job.ex @@ -1,7 +1,6 @@ defmodule Flume.Redis.Job do - require Flume.Logger + require Logger - alias Flume.Logger alias Flume.Support.Time alias Flume.Redis.{Client, Script, SortedSet, BulkDequeue} diff --git a/lib/flume/redis/lock.ex b/lib/flume/redis/lock.ex index 911dbf9..51fec58 100644 --- a/lib/flume/redis/lock.ex +++ b/lib/flume/redis/lock.ex @@ -1,5 +1,5 @@ defmodule Flume.Redis.Lock do - require Flume.Logger + require Logger alias Flume.Redis.{Client, Script} diff --git a/lib/flume/supervisor.ex b/lib/flume/supervisor.ex index c94b980..4619b26 100644 --- a/lib/flume/supervisor.ex +++ b/lib/flume/supervisor.ex @@ -3,39 +3,122 @@ defmodule Flume.Supervisor do Flume is a job processing system backed by Redis & GenStage. Each pipeline processes jobs from a specific Redis queue. Flume has a retry mechanism that keeps retrying the jobs with an exponential backoff. + + ## Configuration Options + + The supervisor accepts configuration options at startup, allowing multiple instances + with different settings: + + ```elixir + config = %{ + name: :my_flume, + namespace: "my_app", + pipelines: [ + %{name: "high", queue: "high_priority", max_demand: 100}, + %{name: "low", queue: "low_priority", max_demand: 10} + ], + redis_pool_size: 5, + host: "localhost", + port: 6379 + } + + {:ok, pid} = Flume.Supervisor.start_link(config) + ``` """ use Application - import Supervisor.Spec - alias Flume.Config def start(_type, _args) do Supervisor.start_link([], strategy: :one_for_one) end - def start_link do - children = - if Config.mock() do - [] - else - # This order matters, first we need to start all redix worker processes - # then all other processes. - [ - supervisor(Flume.Redis.Supervisor, []), - worker(Flume.Queue.Scheduler, [Config.scheduler_opts()]), - supervisor(Flume.Pipeline.SystemEvent.Supervisor, []), - supervisor(Task.Supervisor, [[name: Flume.SafeApplySupervisor]]) - ] ++ Flume.Support.Pipelines.list() - end + def start_link(config \\ %{}) do + children = build_children(config) opts = [ - strategy: :one_for_one, + strategy: :rest_for_one, max_restarts: 20, max_seconds: 5, - name: Flume.Supervisor + name: supervisor_name(config) ] {:ok, _pid} = Supervisor.start_link(children, opts) end + + defp build_children(config) do + # Always start Config GenServer first + base_children = [ + {Flume.Config, config} + ] + + if Map.get(config, :mock, Config.get(:mock, false)) do + base_children + else + # This order matters, first we need to start config and redis workers + # then all other processes. + base_children ++ + [ + Flume.Redis.Supervisor, + {Flume.Queue.Scheduler, scheduler_opts_from_config(config)}, + Flume.Pipeline.SystemEvent.Supervisor, + {Task.Supervisor, [name: task_supervisor_name(config)]} + ] ++ pipeline_children_from_config(config) + end + end + + defp supervisor_name(config) do + case Map.get(config, :name) do + nil -> Flume.Supervisor + name -> :"#{name}.Supervisor" + end + end + + defp task_supervisor_name(config) do + case Map.get(config, :name) do + nil -> Flume.SafeApplySupervisor + name -> :"#{name}.SafeApplySupervisor" + end + end + + defp scheduler_opts_from_config(config) do + [ + namespace: Map.get(config, :namespace, Config.get(:namespace, "flume")), + scheduler_poll_interval: Map.get(config, :scheduler_poll_interval, Config.get(:scheduler_poll_interval, 10_000)) + ] + end + + defp pipeline_children_from_config(config) do + pipelines = Map.get(config, :pipelines, Config.get(:pipelines, [])) + + Enum.flat_map(pipelines, fn pipeline -> + pipeline_struct = Flume.Pipeline.new(pipeline) + scheduler_options = scheduler_opts_from_config(config) ++ [queue: pipeline.queue] + Flume.Pipeline.Event.attach_instrumentation(pipeline_struct) + + [ + %{ + id: {Flume.Pipeline.Event.Producer, generate_id()}, + start: {Flume.Pipeline.Event.Producer, :start_link, [pipeline_struct]} + }, + %{ + id: {Flume.Pipeline.Event.ProducerConsumer, generate_id()}, + start: {Flume.Pipeline.Event.ProducerConsumer, :start_link, [pipeline_struct]} + }, + %{ + id: {Flume.Pipeline.Event.Consumer, generate_id()}, + start: {Flume.Pipeline.Event.Consumer, :start_link, [pipeline_struct]} + }, + %{ + id: {Flume.Queue.ProcessingScheduler, generate_id()}, + start: {Flume.Queue.ProcessingScheduler, :start_link, [scheduler_options]} + } + ] + end) + end + + defp generate_id do + <> = :crypto.strong_rand_bytes(8) + "#{part1}#{part2}" + end end diff --git a/lib/flume/support/pipelines.ex b/lib/flume/support/pipelines.ex deleted file mode 100644 index 329a8af..0000000 --- a/lib/flume/support/pipelines.ex +++ /dev/null @@ -1,33 +0,0 @@ -defmodule Flume.Support.Pipelines do - @moduledoc """ - This module returns the pipelines and its children - based on the configuration - """ - - alias Flume.Pipeline.Event, as: EventPipeline - alias Flume.Config, as: FlumeConfig - - @doc false - def list do - import Supervisor.Spec - - Flume.Config.pipelines() - |> Enum.flat_map(fn pipeline -> - pipeline_struct = Flume.Pipeline.new(pipeline) - scheduler_options = FlumeConfig.scheduler_opts() ++ [queue: pipeline.queue] - EventPipeline.attach_instrumentation(pipeline_struct) - - [ - worker(EventPipeline.Producer, [pipeline_struct], id: generate_id()), - worker(EventPipeline.ProducerConsumer, [pipeline_struct], id: generate_id()), - worker(EventPipeline.Consumer, [pipeline_struct], id: generate_id()), - worker(Flume.Queue.ProcessingScheduler, [scheduler_options], id: generate_id()) - ] - end) - end - - defp generate_id do - <> = :crypto.strong_rand_bytes(8) - "#{part1}#{part2}" - end -end diff --git a/lib/flume/utils/integer_extension.ex b/lib/flume/utils/integer_extension.ex index 8655b79..97eebe7 100644 --- a/lib/flume/utils/integer_extension.ex +++ b/lib/flume/utils/integer_extension.ex @@ -7,6 +7,11 @@ defmodule Flume.Utils.IntegerExtension do def parse(value, _default) when is_integer(value), do: value + def parse({:system, env_var, default_value}, default) do + System.get_env(env_var, to_string(default_value)) + |> parse(default) + end + def parse(value, default) when is_binary(value) do case Integer.parse(value) do {count, _} -> diff --git a/mix.exs b/mix.exs index f3d8915..d2a7cae 100644 --- a/mix.exs +++ b/mix.exs @@ -5,7 +5,7 @@ defmodule Flume.Mixfile do [ app: :flume, version: "0.2.0", - elixir: "~> 1.8", + elixir: "~> 1.18.4", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, deps: deps(), @@ -25,33 +25,24 @@ defmodule Flume.Mixfile do # Run "mix help compile.app" to learn about applications. def application do [ - applications: [ - :redix, - :logger_file_backend, - :gen_stage, - :jason, - :poolboy, - :retry, - :telemetry - ], extra_applications: [:logger], - mod: {Flume, []} + mod: {Flume.Application, []} ] end # Run "mix help deps" to learn about dependencies. defp deps do [ - {:redix, "~> 1.0"}, - {:gen_stage, "~> 0.14.0"}, - {:jason, "~> 1.1.0"}, - {:poolboy, "~> 1.5.1"}, + {:redix, "~> 1.5"}, + {:gen_stage, "~> 1.2"}, + {:jason, "~> 1.4"}, + {:poolboy, "~> 1.5"}, {:elixir_uuid, "~> 1.2"}, - {:logger_file_backend, "~> 0.0.10"}, - {:retry, "0.8.2"}, - {:benchee, "~> 1.0"}, - {:telemetry, "~> 1.0"}, - {:excoveralls, "~> 0.10.6", only: :test} + {:logger_file_backend, "~> 0.0.13"}, + {:retry, "~> 0.18"}, + {:benchee, "~> 1.3"}, + {:telemetry, "~> 1.2"}, + {:excoveralls, "~> 0.18", only: :test} ] end diff --git a/mix.lock b/mix.lock index c93abf2..05602b3 100644 --- a/mix.lock +++ b/mix.lock @@ -1,23 +1,53 @@ %{ - "benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm", "3ad58ae787e9c7c94dd7ceda3b587ec2c64604563e049b2a0e8baafae832addb"}, + "bandit": {:hex, :bandit, "1.7.0", "d1564f30553c97d3e25f9623144bb8df11f3787a26733f00b21699a128105c0c", [:mix], [{:hpax, "~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.18", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "3e2f7a98c7a11f48d9d8c037f7177cd39778e74d55c7af06fe6227c742a8168a"}, + "benchee": {:hex, :benchee, "1.4.0", "9f1f96a30ac80bab94faad644b39a9031d5632e517416a8ab0a6b0ac4df124ce", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.0", [hex: :statistex, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "299cd10dd8ce51c9ea3ddb74bb150f93d25e968f93e4c1fa31698a8e4fa5d715"}, + "castore": {:hex, :castore, "1.0.14", "4582dd7d630b48cf5e1ca8d3d42494db51e406b7ba704e81fbd401866366896a", [:mix], [], "hexpm", "7bc1b65249d31701393edaaac18ec8398d8974d52c647b7904d01b964137b9f4"}, "certifi": {:hex, :certifi, "2.5.1", "867ce347f7c7d78563450a18a6a28a8090331e77fa02380b4a21962a65d36ee5", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "805abd97539caf89ec6d4732c91e62ba9da0cda51ac462380bbd28ee697a8c42"}, "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm"}, "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, + "dns_cluster": {:hex, :dns_cluster, "0.1.3", "0bc20a2c88ed6cc494f2964075c359f8c2d00e1bf25518a6a6c7fd277c9b0c66", [:mix], [], "hexpm", "46cb7c4a1b3e52c7ad4cbe33ca5079fbde4840dedeafca2baf77996c2da1bc33"}, "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, - "excoveralls": {:hex, :excoveralls, "0.10.6", "e2b9718c9d8e3ef90bc22278c3f76c850a9f9116faf4ebe9678063310742edc2", [:mix], [{:hackney, "~> 1.13", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "b06c73492aa9940c4c29cfc1356bcf5540ae318f17b423749a0615a66ee3e049"}, - "gen_stage": {:hex, :gen_stage, "0.14.0", "65ae78509f85b59d360690ce3378d5096c3130a0694bab95b0c4ae66f3008fad", [:mix], [], "hexpm", "095d38418e538af99ac82043985d26724164b78736a1d0f137c308332ad46250"}, + "esbuild": {:hex, :esbuild, "0.10.0", "b0aa3388a1c23e727c5a3e7427c932d89ee791746b0081bbe56103e9ef3d291f", [:mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "468489cda427b974a7cc9f03ace55368a83e1a7be12fba7e30969af78e5f8c70"}, + "excoveralls": {:hex, :excoveralls, "0.18.5", "e229d0a65982613332ec30f07940038fe451a2e5b29bce2a5022165f0c9b157e", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "523fe8a15603f86d64852aab2abe8ddbd78e68579c8525ae765facc5eae01562"}, + "expo": {:hex, :expo, "1.1.0", "f7b9ed7fb5745ebe1eeedf3d6f29226c5dd52897ac67c0f8af62a07e661e5c75", [:mix], [], "hexpm", "fbadf93f4700fb44c331362177bdca9eeb8097e8b0ef525c9cc501cb9917c960"}, + "file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"}, + "finch": {:hex, :finch, "0.20.0", "5330aefb6b010f424dcbbc4615d914e9e3deae40095e73ab0c1bb0968933cadf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2658131a74d051aabfcba936093c903b8e89da9a1b63e430bee62045fa9b2ee2"}, + "floki": {:hex, :floki, "0.38.0", "62b642386fa3f2f90713f6e231da0fa3256e41ef1089f83b6ceac7a3fd3abf33", [:mix], [], "hexpm", "a5943ee91e93fb2d635b612caf5508e36d37548e84928463ef9dd986f0d1abd9"}, + "gen_stage": {:hex, :gen_stage, "1.3.2", "7c77e5d1e97de2c6c2f78f306f463bca64bf2f4c3cdd606affc0100b89743b7b", [:mix], [], "hexpm", "0ffae547fa777b3ed889a6b9e1e64566217413d018cabd825f786e843ffe63e7"}, + "gettext": {:hex, :gettext, "0.26.2", "5978aa7b21fada6deabf1f6341ddba50bc69c999e812211903b169799208f2a8", [:mix], [{:expo, "~> 0.5.1 or ~> 1.0", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "aa978504bcf76511efdc22d580ba08e2279caab1066b76bb9aa81c4a1e0a32a5"}, "hackney": {:hex, :hackney, "1.15.1", "9f8f471c844b8ce395f7b6d8398139e26ddca9ebc171a8b91342ee15a19963f4", [:rebar3], [{:certifi, "2.5.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.4", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "c2790c9f0f7205f4a362512192dee8179097394400e745e4d20bab7226a8eaad"}, + "hpax": {:hex, :hpax, "1.0.3", "ed67ef51ad4df91e75cc6a1494f851850c0bd98ebc0be6e81b026e765ee535aa", [:mix], [], "hexpm", "8eab6e1cfa8d5918c2ce4ba43588e894af35dbd8e91e6e55c817bca5847df34a"}, "idna": {:hex, :idna, "6.0.0", "689c46cbcdf3524c44d5f3dde8001f364cd7608a99556d8fbd8239a5798d4c10", [:rebar3], [{:unicode_util_compat, "0.4.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "4bdd305eb64e18b0273864920695cb18d7a2021f31a11b9c5fbcd9a253f936e2"}, - "jason": {:hex, :jason, "1.1.0", "9634bca30f2f7468dde3e704d5865319b1eb88e4a8cded5c995baf0aa957524f", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "23e9d96104cce9e74a63356f280ad7c7abc8ee68a45fb051d0845236bc94386c"}, - "logger_file_backend": {:hex, :logger_file_backend, "0.0.11", "3bbc5f31d3669e8d09d7a9443e86056fae7fc18e45c6f748c33b8c79a7e147a1", [:mix], [], "hexpm", "62be826f04644c62b0a2bc98a13e2e7ae52c0a4eda020f4c59d7287356d5e445"}, + "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, + "logger_file_backend": {:hex, :logger_file_backend, "0.0.14", "774bb661f1c3fed51b624d2859180c01e386eb1273dc22de4f4a155ef749a602", [:mix], [], "hexpm", "071354a18196468f3904ef09413af20971d55164267427f6257b52cfba03f9e6"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, + "mime": {:hex, :mime, "2.0.7", "b8d739037be7cd402aee1ba0306edfdef982687ee7e9859bee6198c1e7e2f128", [:mix], [], "hexpm", "6171188e399ee16023ffc5b76ce445eb6d9672e2e241d2df6050f3c771e80ccd"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, + "mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"}, + "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, + "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm", "17ef63abde837ad30680ea7f857dd9e7ced9476cdd7b0394432af4bfc241b960"}, + "phoenix": {:hex, :phoenix, "1.7.21", "14ca4f1071a5f65121217d6b57ac5712d1857e40a0833aff7a691b7870fc9a3b", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "336dce4f86cba56fed312a7d280bf2282c720abb6074bdb1b61ec8095bdd0bc9"}, + "phoenix_html": {:hex, :phoenix_html, "4.2.1", "35279e2a39140068fc03f8874408d58eef734e488fc142153f055c5454fd1c08", [:mix], [], "hexpm", "cff108100ae2715dd959ae8f2a8cef8e20b593f8dfd031c9cba92702cf23e053"}, + "phoenix_live_dashboard": {:hex, :phoenix_live_dashboard, "0.8.7", "405880012cb4b706f26dd1c6349125bfc903fb9e44d1ea668adaf4e04d4884b7", [:mix], [{:ecto, "~> 3.6.2 or ~> 3.7", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_mysql_extras, "~> 0.5", [hex: :ecto_mysql_extras, repo: "hexpm", optional: true]}, {:ecto_psql_extras, "~> 0.7", [hex: :ecto_psql_extras, repo: "hexpm", optional: true]}, {:ecto_sqlite3_extras, "~> 1.1.7 or ~> 1.2.0", [hex: :ecto_sqlite3_extras, repo: "hexpm", optional: true]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:phoenix_live_view, "~> 0.19 or ~> 1.0", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6 or ~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "3a8625cab39ec261d48a13b7468dc619c0ede099601b084e343968309bd4d7d7"}, + "phoenix_live_reload": {:hex, :phoenix_live_reload, "1.6.0", "2791fac0e2776b640192308cc90c0dbcf67843ad51387ed4ecae2038263d708d", [:mix], [{:file_system, "~> 0.2.10 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.4", [hex: :phoenix, repo: "hexpm", optional: false]}], "hexpm", "b3a1fa036d7eb2f956774eda7a7638cf5123f8f2175aca6d6420a7f95e598e1c"}, + "phoenix_live_view": {:hex, :phoenix_live_view, "1.0.17", "beeb16d83a7d3760f7ad463df94e83b087577665d2acc0bf2987cd7d9778068f", [:mix], [{:floki, "~> 0.36", [hex: :floki, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix, "~> 1.6.15 or ~> 1.7.0 or ~> 1.8.0-rc", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 3.3 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.15", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.2 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a4ca05c1eb6922c4d07a508a75bfa12c45e5f4d8f77ae83283465f02c53741e1"}, + "phoenix_pubsub": {:hex, :phoenix_pubsub, "2.1.3", "3168d78ba41835aecad272d5e8cd51aa87a7ac9eb836eabc42f6e57538e3731d", [:mix], [], "hexpm", "bba06bc1dcfd8cb086759f0edc94a8ba2bc8896d5331a1e2c2902bf8e36ee502"}, + "phoenix_template": {:hex, :phoenix_template, "1.0.4", "e2092c132f3b5e5b2d49c96695342eb36d0ed514c5b252a77048d5969330d639", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}], "hexpm", "2c0c81f0e5c6753faf5cca2f229c9709919aba34fab866d3bc05060c9c444206"}, + "plug": {:hex, :plug, "1.18.1", "5067f26f7745b7e31bc3368bc1a2b818b9779faa959b49c934c17730efc911cf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "57a57db70df2b422b564437d2d33cf8d33cd16339c1edb190cd11b1a3a546cc2"}, + "plug_crypto": {:hex, :plug_crypto, "2.1.1", "19bda8184399cb24afa10be734f84a16ea0a2bc65054e23a62bb10f06bc89491", [:mix], [], "hexpm", "6470bce6ffe41c8bd497612ffde1a7e4af67f36a15eea5f921af71cf3e11247c"}, "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, "poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [:rebar], [], "hexpm", "8f7168911120e13419e086e78d20e4d1a6776f1eee2411ac9f790af10813389f"}, - "redix": {:hex, :redix, "1.1.5", "6fc460d66a5c2287e83e6d73dddc8d527ff59cb4d4f298b41e03a4db8c3b2bd5", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "679afdd4c14502fe9c11387ff1cdcb33065a1cf511097da1eee407f17c7a418b"}, - "retry": {:hex, :retry, "0.8.2", "7b57bd5e1e7efeca04dd740cabdc3930c472cfa7e0186949de180c64417e9c35", [:mix], [], "hexpm", "ae8969bfea65bf2adf9e7cc6e59bc2d1b6d9e04bcb7d2c53b996d9b8a023fe78"}, + "redix": {:hex, :redix, "1.5.2", "ab854435a663f01ce7b7847f42f5da067eea7a3a10c0a9d560fa52038fd7ab48", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:nimble_options, "~> 0.5.0 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "78538d184231a5d6912f20567d76a49d1be7d3fca0e1aaaa20f4df8e1142dcb8"}, + "retry": {:hex, :retry, "0.19.0", "aeb326d87f62295d950f41e1255fe6f43280a1b390d36e280b7c9b00601ccbc2", [:mix], [], "hexpm", "85ef376aa60007e7bff565c366310966ec1bd38078765a0e7f20ec8a220d02ca"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.4", "f0eafff810d2041e93f915ef59899c923f4568f4585904d010387ed74988e77b", [:make, :mix, :rebar3], [], "hexpm", "603561dc0fd62f4f2ea9b890f4e20e1a0d388746d6e20557cafb1b16950de88c"}, - "telemetry": {:hex, :telemetry, "1.1.0", "a589817034a27eab11144ad24d5c0f9fab1f58173274b1e9bae7074af9cbee51", [:rebar3], [], "hexpm", "b727b2a1f75614774cff2d7565b64d0dfa5bd52ba517f16543e6fc7efcc0df48"}, - "unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm", "1d1848c40487cdb0b30e8ed975e34e025860c02e419cb615d255849f3427439d"} + "statistex": {:hex, :statistex, "1.1.0", "7fec1eb2f580a0d2c1a05ed27396a084ab064a40cfc84246dbfb0c72a5c761e5", [:mix], [], "hexpm", "f5950ea26ad43246ba2cce54324ac394a4e7408fdcf98b8e230f503a0cba9cf5"}, + "tailwind": {:hex, :tailwind, "0.3.1", "a89d2835c580748c7a975ad7dd3f2ea5e63216dc16d44f9df492fbd12c094bed", [:mix], [], "hexpm", "98a45febdf4a87bc26682e1171acdedd6317d0919953c353fcd1b4f9f4b676a2"}, + "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, + "telemetry_metrics": {:hex, :telemetry_metrics, "1.1.0", "5bd5f3b5637e0abea0426b947e3ce5dd304f8b3bc6617039e2b5a008adc02f8f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e7b79e8ddfde70adb6db8a6623d1778ec66401f366e9a8f5dd0955c56bc8ce67"}, + "telemetry_poller": {:hex, :telemetry_poller, "1.3.0", "d5c46420126b5ac2d72bc6580fb4f537d35e851cc0f8dbd571acf6d6e10f5ec7", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "51f18bed7128544a50f75897db9974436ea9bfba560420b646af27a9a9b35211"}, + "thousand_island": {:hex, :thousand_island, "1.3.14", "ad45ebed2577b5437582bcc79c5eccd1e2a8c326abf6a3464ab6c06e2055a34a", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d0d24a929d31cdd1d7903a4fe7f2409afeedff092d277be604966cd6aa4307ef"}, + "unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm", "1d1848c40487cdb0b30e8ed975e34e025860c02e419cb615d255849f3427439d"}, + "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, + "websock_adapter": {:hex, :websock_adapter, "0.5.8", "3b97dc94e407e2d1fc666b2fb9acf6be81a1798a2602294aac000260a7c4a47d", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "315b9a1865552212b5f35140ad194e67ce31af45bcee443d4ecb96b5fd3f3782"}, } diff --git a/test/flume/queue/manager_test.exs b/test/flume/queue/manager_test.exs index 1a48533..e8a034a 100644 --- a/test/flume/queue/manager_test.exs +++ b/test/flume/queue/manager_test.exs @@ -6,6 +6,12 @@ defmodule Flume.Queue.ManagerTest do alias Flume.{Config, Event, JobFactory} alias Flume.Redis.{Client, Job, SortedSet} + # Define a simple test worker to avoid module loading issues + defmodule TestWorker do + def perform(args), do: {:ok, args} + def process(args), do: {:ok, args} + end + @namespace Config.namespace() def max_time_range do @@ -16,7 +22,7 @@ defmodule Flume.Queue.ManagerTest do describe "enqueue/5" do test "enqueues a job into a queue" do - assert {:ok, _} = Manager.enqueue(@namespace, "test", Worker, "process", [1]) + assert {:ok, _} = Manager.enqueue(@namespace, "test", Flume.Queue.ManagerTest.TestWorker, "perform", [1]) end end @@ -24,15 +30,15 @@ defmodule Flume.Queue.ManagerTest do test "enqueues array of jobs into a queue" do assert {:ok, 2} = Manager.bulk_enqueue(@namespace, "test", [ - [Worker, "process", [1]], - [Worker, "process", [2]] + [Flume.Queue.ManagerTest.TestWorker, "perform", [1]], + [Flume.Queue.ManagerTest.TestWorker, "perform", [2]] ]) end end describe "enqueue_in/6" do test "enqueues a job at a scheduled time" do - assert {:ok, _} = Manager.enqueue_in(@namespace, "test", 10, Worker, "process", [1]) + assert {:ok, _} = Manager.enqueue_in(@namespace, "test", 10, Flume.Queue.ManagerTest.TestWorker, "perform", [1]) end end diff --git a/test/support/worker.ex b/test/support/worker.ex new file mode 100644 index 0000000..0714ab3 --- /dev/null +++ b/test/support/worker.ex @@ -0,0 +1,21 @@ +defmodule Worker do + @moduledoc """ + Simple worker module for testing purposes + """ + + @doc """ + Default perform function for job processing + """ + def perform(args) do + # Simple worker that just returns the processed arguments + {:ok, args} + end + + @doc """ + Process function for job processing with explicit documentation + """ + def process(args) do + # Simple worker that just returns the processed arguments + {:ok, args} + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index 2551065..0cbc4a4 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,3 +1,11 @@ ExUnit.start() -# Start flume supervision tree -Flume.Supervisor.start_link() + +# Ensure the application is started which will start the Flume supervisor +Mix.Task.run("app.start") + +# Load support modules for tests +Code.require_file("test/support/worker.ex") +Code.require_file("test/support/echo_worker.ex") + +# The Flume supervisor is already started by the application +# when Mix.Task.run("app.start") is called by tests