diff --git a/.iex.exs b/.iex.exs index 4289c885a9..a26dcdf6f4 100644 --- a/.iex.exs +++ b/.iex.exs @@ -1,2 +1,5 @@ -import Ecto.Query +if Code.loaded?(Ecto.Query) do + import Ecto.Query +end + alias Lightning.Repo diff --git a/bin/local_cluster b/bin/local_cluster new file mode 100755 index 0000000000..cabb00d5d3 --- /dev/null +++ b/bin/local_cluster @@ -0,0 +1,171 @@ +#!/usr/bin/env bash + +# Function to show usage +show_usage() { + echo "Usage:" + echo " $0 [--proxy] --count Start local cluster (default: 2 instances)" + echo " $0 connect Connect to a specific node (1-4)" + echo "" + echo "Options:" + echo " --proxy Start a Caddy reverse proxy on port 4000 (nodes will start from 4001)" + echo " --count Number of nodes to start (1-4, default: 2)" + exit 1 +} + +# Handle connect subcommand +if [ "$1" = "connect" ]; then + if [ -z "$2" ] || ! [[ "$2" =~ ^[1-4]$ ]]; then + echo "Error: Please specify a valid node number (1-4)" + show_usage + fi + + NODE_NUM=$2 + echo "Connecting to node${NODE_NUM}@127.0.0.1..." + exec iex --name "remote_shell${NODE_NUM}@127.0.0.1" --remsh "node${NODE_NUM}@127.0.0.1" + # The exec command replaces the current process, so we don't need an explicit exit + # If we reach this point, it means the exec failed, so we'll exit with its status code + exit $? +fi + +# Parse arguments +USE_PROXY=false +INSTANCES=2 + +while [[ $# -gt 0 ]]; do + case $1 in + --proxy) + USE_PROXY=true + shift + ;; + --count) + if [ -z "$2" ] || ! [[ "$2" =~ ^[0-9]+$ ]]; then + echo "Error: --count requires a numeric argument" + show_usage + fi + INSTANCES=$2 + shift 2 + ;; + *) + echo "Unknown argument: $1" + show_usage + ;; + esac +done + +# Validate number of instances +if ! [[ "$INSTANCES" =~ ^[0-9]+$ ]]; then + echo "Error: Number of instances must be a positive integer" + show_usage +fi + +if [ "$INSTANCES" -lt 1 ] || [ "$INSTANCES" -gt 4 ]; then + echo "Error: Number of instances must be between 1 and 4" + show_usage +fi + +# Check for Caddy if proxy is requested +if [ "$USE_PROXY" = true ]; then + if ! command -v caddy &>/dev/null; then + echo "Error: Caddy is required for proxy mode but it's not installed" + echo "Please install Caddy first:" + echo " Mac: brew install caddy" + echo " Linux: sudo apt install caddy" + echo " Or visit: https://caddyserver.com/docs/install" + exit 1 + fi +fi + +# Array to store background PIDs +declare -a PIDS + +# Colors for different processes +declare -a COLORS=( + "\033[0;36m" # Cyan + "\033[0;32m" # Green + "\033[0;35m" # Purple + "\033[0;33m" # Yellow + "\033[0;37m" # Gray (for proxy) +) +RESET="\033[0m" + +# Cleanup function to kill all child processes +cleanup() { + echo "Shutting down all processes..." + for pid in "${PIDS[@]}"; do + kill "$pid" 2>/dev/null + done + exit 0 +} + +# Set up trap for cleanup +trap cleanup INT TERM + +# Function to run a command with colored output +run_with_color() { + local color=$1 + local prefix=$2 + shift 2 + # Run the command and color its output + "$@" 2>&1 | while read -r line; do + echo -e "${color}${prefix} | ${line}${RESET}" + done +} + +# Create Caddy configuration if proxy is enabled +if [ "$USE_PROXY" = true ]; then + BASE_PORT=4001 + CADDY_CONFIG=$(mktemp) + echo "Creating Caddy configuration..." + cat >"$CADDY_CONFIG" <" + +# Wait for all background processes +wait diff --git a/config/config.exs b/config/config.exs index 896270e90c..e1e15e1813 100644 --- a/config/config.exs +++ b/config/config.exs @@ -14,11 +14,6 @@ config :lightning, Lightning.Repo, types: Lightning.PostgrexTypes, log: :debug -config :hammer, - backend: - {Hammer.Backend.Mnesia, - [expiry_ms: 60_000 * 60 * 4, cleanup_interval_ms: 60_000 * 10]} - # Configures the endpoint config :lightning, LightningWeb.Endpoint, url: [host: "localhost"], @@ -30,6 +25,11 @@ config :lightning, LightningWeb.Endpoint, pubsub_server: Lightning.PubSub, live_view: [signing_salt: "EfrmuOUr"] +config :lightning, Lightning.DistributedRateLimiter, + start: false, + capacity: 10, + refill_per_second: 2 + config :lightning, Lightning.Extensions, rate_limiter: Lightning.Extensions.RateLimiter, usage_limiter: Lightning.Extensions.UsageLimiter, diff --git a/config/test.exs b/config/test.exs index 11e0893b9c..477aa9b70c 100644 --- a/config/test.exs +++ b/config/test.exs @@ -53,6 +53,8 @@ config :lightning, LightningWeb.Endpoint, "/8zedVJLxvmGGFoRExE3e870g7CGZZQ1Vq11A5MbQGPKOpK57MahVsPW6Wkkv61n", server: true +config :lightning, Lightning.DistributedRateLimiter, start: true + config :lightning, Lightning.Runtime.RuntimeManager, ws_url: "ws://localhost:4002/worker" @@ -95,11 +97,6 @@ config :lightning, Lightning.Mailer, adapter: Swoosh.Adapters.Test config :lightning, Lightning.AdaptorRegistry, use_cache: "test/fixtures/adaptor_registry_cache.json" -config :hammer, - backend: - {Hammer.Backend.ETS, - [expiry_ms: 60_000 * 60 * 4, cleanup_interval_ms: 60_000 * 10]} - config :lightning, Lightning.FailureAlerter, time_scale: 60_000, rate_limit: 3 diff --git a/lib/lightning.ex b/lib/lightning.ex index 287611d140..6716fd74fb 100644 --- a/lib/lightning.ex +++ b/lib/lightning.ex @@ -26,6 +26,11 @@ defmodule Lightning do Phoenix.PubSub.broadcast(@pubsub, topic, msg) end + @impl true + def broadcast_from(pid, topic, msg) do + Phoenix.PubSub.broadcast_from(@pubsub, pid, topic, msg) + end + @impl true def local_broadcast(topic, msg) do Phoenix.PubSub.local_broadcast(@pubsub, topic, msg) @@ -60,6 +65,8 @@ defmodule Lightning do # credo:disable-for-next-line @callback current_time() :: DateTime.t() @callback broadcast(binary(), {atom(), any()}) :: :ok | {:error, term()} + @callback broadcast_from(pid(), binary(), {atom(), any()}) :: + :ok | {:error, term()} @callback local_broadcast(binary(), {atom(), any()}) :: :ok | {:error, term()} @callback subscribe(binary()) :: :ok | {:error, term()} @callback release() :: release_info() @@ -71,6 +78,9 @@ defmodule Lightning do def broadcast(topic, msg), do: impl().broadcast(topic, msg) + def broadcast_from(pid, topic, msg), + do: impl().broadcast_from(pid, topic, msg) + def local_broadcast(topic, msg), do: impl().local_broadcast(topic, msg) def subscribe(topic), do: impl().subscribe(topic) diff --git a/lib/lightning/application.ex b/lib/lightning/application.ex index 7b811ced49..9a546d157c 100644 --- a/lib/lightning/application.ex +++ b/lib/lightning/application.ex @@ -7,6 +7,11 @@ defmodule Lightning.Application do require Logger + @rate_limiter_opts Application.compile_env!( + :lightning, + Lightning.DistributedRateLimiter + ) + @impl true def start(_type, _args) do # Initialize ETS table for adapter lookup @@ -26,8 +31,6 @@ defmodule Lightning.Application do :mnesia.stop() :mnesia.create_schema([node()]) :mnesia.start() - Hammer.Backend.Mnesia.create_mnesia_table(disc_copies: [node()]) - :mnesia.wait_for_tables([:__hammer_backend_mnesia], 60_000) # Only add the Sentry backend if a dsn is provided. if Application.get_env(:sentry, :dsn), @@ -107,6 +110,12 @@ defmodule Lightning.Application do [ Lightning.PromEx, {Cluster.Supervisor, [topologies, [name: Lightning.ClusterSupervisor]]}, + {Horde.Registry, + name: Lightning.HordeRegistry, keys: :unique, members: :auto}, + {Horde.DynamicSupervisor, + name: Lightning.DistributedSupervisor, + strategy: :one_for_one, + members: :auto}, {Lightning.Vault, Application.get_env(:lightning, Lightning.Vault, [])}, # Start the Ecto repository Lightning.Repo, @@ -128,7 +137,9 @@ defmodule Lightning.Application do {Lightning.Runtime.RuntimeManager, worker_secret: Lightning.Config.worker_secret(), endpoint: LightningWeb.Endpoint}, - {Lightning.KafkaTriggers.Supervisor, type: :supervisor} + {Lightning.KafkaTriggers.Supervisor, type: :supervisor}, + # Start our rate limiter + {Lightning.RateLimiters, clean_period: :timer.minutes(10)} # Start a worker by calling: Lightning.Worker.start_link(arg) # {Lightning.Worker, arg} ] @@ -174,6 +185,17 @@ defmodule Lightning.Application do :ok end + def start_phase(:init_rate_limiter, :normal, _args) do + if @rate_limiter_opts[:start] do + Horde.DynamicSupervisor.start_child( + Lightning.DistributedSupervisor, + {Lightning.DistributedRateLimiter, @rate_limiter_opts} + ) + end + + :ok + end + def oban_opts do opts = Application.get_env(:lightning, Oban) diff --git a/lib/lightning/config.ex b/lib/lightning/config.ex index fc435756d4..b5942107b0 100644 --- a/lib/lightning/config.ex +++ b/lib/lightning/config.ex @@ -295,6 +295,11 @@ defmodule Lightning.Config do Application.get_env(:lightning, :gdpr_preferences) end + @impl true + def max_dataclip_size_bytes do + Application.get_env(:lightning, :max_dataclip_size_bytes, 10_000_000) + end + @impl true def external_metrics_module do Application.get_env(:lightning, Lightning.Extensions, []) @@ -346,6 +351,7 @@ defmodule Lightning.Config do @callback book_demo_openfn_workflow_url() :: String.t() @callback gdpr_banner() :: map() | false @callback gdpr_preferences() :: map() | false + @callback max_dataclip_size_bytes() :: integer() @callback external_metrics_module() :: module() | nil @doc """ @@ -545,6 +551,10 @@ defmodule Lightning.Config do impl().gdpr_preferences() end + def max_dataclip_size_bytes do + impl().max_dataclip_size_bytes() + end + def external_metrics_module do impl().external_metrics_module() end diff --git a/lib/lightning/config/bootstrap.ex b/lib/lightning/config/bootstrap.ex index 15e2368894..f3c3d5e48d 100644 --- a/lib/lightning/config/bootstrap.ex +++ b/lib/lightning/config/bootstrap.ex @@ -114,7 +114,8 @@ defmodule Lightning.Config.Bootstrap do "RTM", &Utils.ensure_boolean/1, Utils.get_env([:lightning, Lightning.Runtime.RuntimeManager, :start]) - ) + ), + port: env!("RTM_PORT", :integer, 2222) config :lightning, :workers, private_key: @@ -399,6 +400,10 @@ defmodule Lightning.Config.Bootstrap do config :logger, :level, log_level end + if log_level == :debug do + config :libcluster, debug: true + end + database_url = env!("DATABASE_URL", :string, nil) config :lightning, Lightning.Repo, @@ -409,7 +414,6 @@ defmodule Lightning.Config.Bootstrap do queue_interval: env!("DATABASE_QUEUE_INTERVAL", :integer, 1000) host = env!("URL_HOST", :string, "example.com") - port = env!("PORT", :integer, 4000) url_port = env!("URL_PORT", :integer, 443) config :lightning, @@ -455,18 +459,6 @@ defmodule Lightning.Config.Bootstrap do You can generate one by calling: mix phx.gen.secret """ - listen_address = - env!( - "LISTEN_ADDRESS", - fn address -> - address - |> String.split(".") - |> Enum.map(&String.to_integer/1) - |> List.to_tuple() - end, - {127, 0, 0, 1} - ) - origins = env!( "ORIGINS", @@ -481,40 +473,10 @@ defmodule Lightning.Config.Bootstrap do url_scheme = env!("URL_SCHEME", :string, "https") - idle_timeout = - env!( - "IDLE_TIMEOUT", - fn str -> - case Integer.parse(str) do - :error -> 60_000 - {val, _} -> val * 1_000 - end - end, - 60_000 - ) - config :lightning, LightningWeb.Endpoint, url: [host: host, port: url_port, scheme: url_scheme], secret_key_base: secret_key_base, check_origin: origins, - http: [ - ip: listen_address, - port: port, - compress: true, - protocol_options: [ - # Note that if a request is more than 10x the max dataclip size, we cut - # the connection immediately to prevent memory issues via the - # :max_skip_body_length setting. - max_skip_body_length: - Application.get_env( - :lightning, - :max_dataclip_size_bytes, - 10_000_000 - ) * - 10, - idle_timeout: idle_timeout - ] - ], server: true end @@ -530,6 +492,8 @@ defmodule Lightning.Config.Bootstrap do assert_receive_timeout: env!("ASSERT_RECEIVE_TIMEOUT", :integer, 1000) end + config :lightning, LightningWeb.Endpoint, http: http_config(config_env()) + config :sentry, dsn: env!("SENTRY_DSN", :string, nil), filter: Lightning.SentryEventFilter, @@ -803,4 +767,69 @@ defmodule Lightning.Config.Bootstrap do value -> value end end + + defp http_config(env, opts \\ []) + # Production environment configuration + defp http_config(:prod, opts) do + port = Keyword.get(opts, :port) || env!("PORT", :integer, 4000) + + listen_address = + env!( + "LISTEN_ADDRESS", + fn address -> + address + |> String.split(".") + |> Enum.map(&String.to_integer/1) + |> List.to_tuple() + end, + {127, 0, 0, 1} + ) + + idle_timeout = + env!( + "IDLE_TIMEOUT", + fn str -> + case Integer.parse(str) do + :error -> 60_000 + {val, _} -> val * 1_000 + end + end, + 60_000 + ) + + [ + ip: listen_address, + port: port, + compress: true, + protocol_options: [ + # Note that if a request is more than 10x the max dataclip size, we cut + # the connection immediately to prevent memory issues via the + # :max_skip_body_length setting. + max_skip_body_length: + Application.get_env( + :lightning, + :max_dataclip_size_bytes, + 10_000_000 + ) * 10, + idle_timeout: idle_timeout + ] + ] + end + + # Default configuration for non-production environments + defp http_config(_env, opts) do + port = + Keyword.get(opts, :port) || + env!( + "PORT", + :integer, + get_env(:lightning, [LightningWeb.Endpoint, :http, :port]) + ) + + [ + ip: {0, 0, 0, 0}, + port: port, + compress: true + ] + end end diff --git a/lib/lightning/distributed_rate_limiter.ex b/lib/lightning/distributed_rate_limiter.ex new file mode 100644 index 0000000000..c981872123 --- /dev/null +++ b/lib/lightning/distributed_rate_limiter.ex @@ -0,0 +1,108 @@ +defmodule Lightning.DistributedRateLimiter do + @moduledoc false + use GenServer + + require Logger + + def child_spec(opts) do + {id, name} = + if name = Keyword.get(opts, :name) do + {"#{__MODULE__}_#{name}", name} + else + {__MODULE__, __MODULE__} + end + + %{ + id: id, + start: {__MODULE__, :start_link, [Keyword.put(opts, :name, name)]}, + shutdown: 10_000, + restart: :transient + } + end + + def start_link(opts) do + name = Keyword.fetch!(opts, :name) + + with {:error, {:already_started, pid}} <- + GenServer.start_link(__MODULE__, opts, name: via_tuple(name)) do + Logger.info("already started at #{inspect(pid)}, returning :ignore") + :ignore + end + end + + @impl true + def init(opts) do + Process.flag(:trap_exit, true) + + capacity = Keyword.fetch!(opts, :capacity) + refill = Keyword.fetch!(opts, :refill_per_second) + + {:ok, + %{ + table: :ets.new(:table, [:set]), + capacity: capacity, + refill_per_second: refill + }} + end + + def check_rate(bucket, opts \\ []) do + name = Keyword.get(opts, :name, __MODULE__) + + name + |> via_tuple() + |> GenServer.call({:check_rate, bucket, opts}) + end + + def inspect_table(name \\ __MODULE__) do + name + |> via_tuple() + |> GenServer.call(:inspect_table) + end + + @impl true + def handle_call({:check_rate, bucket, opts}, _from, state) do + {:reply, do_check_rate(state, bucket, opts), state} + end + + @impl true + def handle_call(:inspect_table, _from, %{table: table} = state) do + {:reply, :ets.info(table), state} + end + + @impl true + def handle_info( + {:EXIT, _from, {:name_conflict, {_key, _value}, registry, pid}}, + state + ) do + Logger.info( + "Stopping #{inspect({registry, pid})} as it has already started in another node." + ) + + {:stop, :normal, state} + end + + def do_check_rate(%{table: table} = config, bucket, opts) do + now = System.monotonic_time(:millisecond) + capacity = opts[:capacity] || config[:capacity] + refill_per_sec = opts[:refill_per_second] || config[:refill_per_second] + + :ets.insert_new(table, {bucket, {capacity, now}}) + [{^bucket, {level, updated}}] = :ets.lookup(table, bucket) + + refilled = div(now - updated, 1_000) * refill_per_sec + current = min(capacity, level + refilled) + + if current >= 1 do + level = current - 1 + :ets.insert(table, {bucket, {level, now}}) + + {:allow, level} + else + wait_ms = 1_000 - (now - updated) + {:deny, wait_ms} + end + end + + def via_tuple(name), + do: {:via, Horde.Registry, {Lightning.HordeRegistry, name}} +end diff --git a/lib/lightning/extensions/rate_limiting.ex b/lib/lightning/extensions/rate_limiting.ex index 9371b79198..4f8c891d1e 100644 --- a/lib/lightning/extensions/rate_limiting.ex +++ b/lib/lightning/extensions/rate_limiting.ex @@ -9,15 +9,11 @@ defmodule Lightning.Extensions.RateLimiting do defmodule Context do @moduledoc """ - Which user is making the request for a certain project. + Context for the object (bucket) under rate limiting. """ + @type t :: %Context{project_id: Ecto.UUID.t()} - @type t :: %Context{ - project_id: Ecto.UUID.t(), - user_id: Ecto.UUID.t() | nil - } - - defstruct [:project_id, :user_id] + defstruct [:project_id] end @callback limit_request( diff --git a/lib/lightning/pipeline/failure_alerter.ex b/lib/lightning/pipeline/failure_alerter.ex index 455c3fb72a..e48a705930 100644 --- a/lib/lightning/pipeline/failure_alerter.ex +++ b/lib/lightning/pipeline/failure_alerter.ex @@ -46,31 +46,14 @@ defmodule Lightning.FailureAlerter do "run_logs" => run_logs, "recipient" => recipient }) do - [time_scale: time_scale, rate_limit: rate_limit] = - Application.fetch_env!(:lightning, __MODULE__) - - run_url = - url( - LightningWeb.Endpoint, - ~p"/projects/#{project_id}/runs/#{run_id}" - ) + run_url = ~p"/projects/#{project_id}/runs/#{run_id}" work_order_url = - url( - LightningWeb.Endpoint, - ~p"/projects/#{project_id}/history?filters[workorder_id]=#{work_order_id}" - ) - - # rate limiting per workflow AND user - bucket_key = "#{workflow_id}::#{recipient.id}" + ~p"/projects/#{project_id}/history?filters[workorder_id]=#{work_order_id}" - Hammer.check_rate( - bucket_key, - time_scale, - rate_limit - ) + Lightning.RateLimiters.hit({:failure_email, workflow_id, recipient.id}) |> case do - {:allow, count} -> + {:allow, %{count: count, time_scale: time_scale, rate_limit: rate_limit}} -> Lightning.FailureEmail.deliver_failure_email(recipient.email, %{ work_order_id: work_order_id, work_order_url: work_order_url, @@ -85,28 +68,9 @@ defmodule Lightning.FailureAlerter do workflow_id: workflow_id, recipient: recipient }) - |> case do - {:ok, _metadata} -> - nil - - # :ok - - _ -> - # decrement the counter when email is not delivered - Hammer.check_rate_inc( - bucket_key, - time_scale, - rate_limit, - -1 - ) - - nil - # {:cancel, "Failure email was not sent"} or Logger - end {:deny, _} -> nil - # {:cancel, "Failure notification rate limit is reached"} or Logger end end end diff --git a/lib/lightning/rate_limiters.ex b/lib/lightning/rate_limiters.ex new file mode 100644 index 0000000000..4db0763cbf --- /dev/null +++ b/lib/lightning/rate_limiters.ex @@ -0,0 +1,52 @@ +defmodule Lightning.RateLimiters do + @moduledoc false + + defmodule Mail do + @moduledoc false + + # WARNING: When changing the algorithm, you must also update the mnesia table name. + # The default is to use __MODULE__, passing `:table` to the `use Hammer` macro + # allows you to specify a custom table name. + use Hammer, + backend: Hammer.Mnesia, + algorithm: :leaky_bucket, + table: :mail_limiter + + @type hit_result :: + {:allow, + %{ + count: non_neg_integer(), + time_scale: non_neg_integer(), + rate_limit: non_neg_integer() + }} + | {:deny, non_neg_integer()} + end + + @spec hit({:failure_email, String.t(), String.t()}) :: Mail.hit_result() + def hit({:failure_email, workflow_id, user_id}) do + [time_scale: time_scale, rate_limit: rate_limit] = + Application.fetch_env!(:lightning, Lightning.FailureAlerter) + + Mail.hit("#{workflow_id}::#{user_id}", time_scale, rate_limit) + |> case do + {:allow, count} -> + {:allow, %{count: count, time_scale: time_scale, rate_limit: rate_limit}} + + {:deny, count} -> + {:deny, count} + end + end + + def child_spec(opts) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [opts]}, + type: :supervisor + } + end + + def start_link(opts) do + children = [{Mail, opts}] + Supervisor.start_link(children, strategy: :one_for_one) + end +end diff --git a/lib/lightning/services/rate_limiter.ex b/lib/lightning/services/rate_limiter.ex index f069314aee..c11f952e5e 100644 --- a/lib/lightning/services/rate_limiter.ex +++ b/lib/lightning/services/rate_limiter.ex @@ -7,7 +7,7 @@ defmodule Lightning.Services.RateLimiter do import Lightning.Services.AdapterHelper @impl true - def limit_request(conn, context, opts) do + def limit_request(conn, context, opts \\ []) do adapter().limit_request(conn, context, opts) end diff --git a/lib/lightning_web/channels/worker_channel.ex b/lib/lightning_web/channels/worker_channel.ex index 4273315d38..71cd03cf5b 100644 --- a/lib/lightning_web/channels/worker_channel.ex +++ b/lib/lightning_web/channels/worker_channel.ex @@ -22,10 +22,17 @@ defmodule LightningWeb.WorkerChannel do @impl true def handle_in( "claim", - %{"demand" => demand, "worker_name" => worker_name}, + %{"demand" => demand} = payload, socket ) do - case Runs.claim(demand, sanitise_worker_name(worker_name)) do + worker_name = + payload["worker_name"] + |> case do + "" -> nil + worker_name -> worker_name + end + + case Runs.claim(demand, worker_name) do {:ok, runs} -> runs = runs @@ -47,10 +54,6 @@ defmodule LightningWeb.WorkerChannel do end end - defp sanitise_worker_name(""), do: nil - - defp sanitise_worker_name(worker_name), do: worker_name - defp run_options(run) do Ecto.assoc(run, :workflow) |> Lightning.Repo.one() diff --git a/lib/lightning_web/controllers/webhooks_controller.ex b/lib/lightning_web/controllers/webhooks_controller.ex index 8839ada8d5..c5967610d2 100644 --- a/lib/lightning_web/controllers/webhooks_controller.ex +++ b/lib/lightning_web/controllers/webhooks_controller.ex @@ -1,7 +1,6 @@ defmodule LightningWeb.WebhooksController do use LightningWeb, :controller - alias Lightning.Extensions.RateLimiting alias Lightning.Extensions.UsageLimiting.Action alias Lightning.Extensions.UsageLimiting.Context alias Lightning.Services.RateLimiter @@ -10,6 +9,7 @@ defmodule LightningWeb.WebhooksController do alias Lightning.WorkOrders plug :reject_unfetched when action in [:create] + plug :check_rate when action in [:create] # Reject requests with unfetched body params, as they are not supported # See Plug.Parsers in Endpoint for more information. @@ -27,7 +27,38 @@ defmodule LightningWeb.WebhooksController do end end - @spec create(Plug.Conn.t(), %{path: binary()}) :: Plug.Conn.t() + # Note: Plug.Parsers is called before the Router. + def check_rate(conn, _opts) do + with %Workflows.Trigger{enabled: true, workflow: %{project_id: project_id}} <- + conn.assigns.trigger, + :ok <- RateLimiter.limit_request(conn, %Context{project_id: project_id}) do + conn + else + %Workflows.Trigger{enabled: false} -> + conn + |> put_status(:forbidden) + |> json(%{ + message: + "Unable to process request, trigger is disabled. Enable it on OpenFn to allow requests to this endpoint." + }) + |> halt() + + {:error, :too_many_requests, %{text: message}} -> + conn + |> put_status(:too_many_requests) + |> put_resp_header("retry-after", "1") + |> json(%{"error" => message}) + |> halt() + + _no_trigger_or_workflow -> + conn + |> put_status(:not_found) + |> json(%{"error" => "Webhook not found"}) + |> halt() + end + end + + @spec check(Plug.Conn.t(), %{path: binary()}) :: Plug.Conn.t() def check(conn, _params) do put_status(conn, :ok) |> json(%{ @@ -37,30 +68,25 @@ defmodule LightningWeb.WebhooksController do end @spec create(Plug.Conn.t(), %{path: binary()}) :: Plug.Conn.t() - def create(conn, _params) do - with %Workflows.Trigger{enabled: true, workflow: %{project_id: project_id}} = - trigger <- conn.assigns.trigger, - {:ok, without_run?} <- check_skip_run_creation(project_id), - :ok <- - RateLimiter.limit_request( - conn, - %RateLimiting.Context{project_id: project_id}, - [] - ) do - {:ok, work_order} = - WorkOrders.create_for(trigger, - workflow: trigger.workflow, - dataclip: %{ - body: conn.body_params, - request: build_request(conn), - type: :http_request, - project_id: project_id - }, - without_run: without_run? - ) - - conn |> json(%{work_order_id: work_order.id}) - else + def create(%{assigns: %{trigger: trigger}} = conn, _params) do + %Workflows.Trigger{workflow: workflow} = trigger + + case check_skip_run_creation(workflow.project_id) do + {:ok, without_run?} -> + {:ok, work_order} = + WorkOrders.create_for(trigger, + workflow: workflow, + dataclip: %{ + body: conn.body_params, + request: build_request(conn), + type: :http_request, + project_id: workflow.project_id + }, + without_run: without_run? + ) + + json(conn, %{work_order_id: work_order.id}) + {:error, reason, %{text: message}} -> status = if reason == :too_many_requests, @@ -69,19 +95,7 @@ defmodule LightningWeb.WebhooksController do conn |> put_status(status) - |> json(%{"error" => message}) - - nil -> - conn - |> put_status(:not_found) - |> json(%{"error" => "Webhook not found"}) - - _disabled -> - put_status(conn, :forbidden) - |> json(%{ - message: - "Unable to process request, trigger is disabled. Enable it on OpenFn to allow requests to this endpoint." - }) + |> json(%{error: message}) end end diff --git a/lib/lightning_web/plug_configs.ex b/lib/lightning_web/plug_configs.ex index 0a175a806b..1384fbef49 100644 --- a/lib/lightning_web/plug_configs.ex +++ b/lib/lightning_web/plug_configs.ex @@ -11,7 +11,7 @@ defmodule LightningWeb.PlugConfigs do :multipart, { :json, - length: Application.fetch_env!(:lightning, :max_dataclip_size_bytes) + length: Lightning.Config.max_dataclip_size_bytes() } ], pass: ["*/*"], diff --git a/mix.exs b/mix.exs index 5f53fd4b01..573664231e 100644 --- a/mix.exs +++ b/mix.exs @@ -55,7 +55,7 @@ defmodule Lightning.MixProject do [ mod: {Lightning.Application, [:timex]}, extra_applications: [:logger, :runtime_tools, :os_mon, :scrivener], - start_phases: [seed_prom_ex_telemetry: []] + start_phases: [seed_prom_ex_telemetry: [], init_rate_limiter: []] ] end @@ -69,6 +69,7 @@ defmodule Lightning.MixProject do defp deps do [ # {:rexbug, ">= 1.0.0", only: :test}, + {:horde, "~> 0.9.0"}, {:bcrypt_elixir, "~> 3.2"}, {:bodyguard, "~> 2.2"}, {:broadway_kafka, "~> 0.4.2"}, @@ -79,6 +80,7 @@ defmodule Lightning.MixProject do {:credo, "~> 1.7.3", only: [:test, :dev]}, {:crontab, "~> 1.1"}, {:dialyxir, "~> 1.4.2", only: [:test, :dev], runtime: false}, + {:delta_crdt, "~> 0.6.5"}, {:ecto_enum, "~> 1.4"}, {:ecto_psql_extras, "~> 0.8.2"}, {:ecto_sql, "~> 3.11"}, @@ -132,8 +134,8 @@ defmodule Lightning.MixProject do {:timex, "~> 3.7"}, {:replug, "~> 0.1.0"}, {:phoenix_swoosh, "~> 1.2.1"}, - {:hammer_backend_mnesia, "~> 0.6"}, - {:hammer, "~> 6.0"}, + {:hammer_backend_mnesia, "~> 0.7.0"}, + {:hammer, "~> 7.0"}, {:dotenvy, "~> 0.8.0"}, {:goth, "~> 1.3"}, {:gcs_signed_url, "~> 0.4.6"}, diff --git a/mix.lock b/mix.lock index adbca248fa..01de534be4 100644 --- a/mix.lock +++ b/mix.lock @@ -25,6 +25,7 @@ "db_connection": {:hex, :db_connection, "2.7.0", "b99faa9291bb09892c7da373bb82cba59aefa9b36300f6145c5f201c7adf48ec", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dcf08f31b2701f857dfc787fbad78223d61a32204f217f15e881dd93e4bdd3ff"}, "decimal": {:hex, :decimal, "2.3.0", "3ad6255aa77b4a3c4f818171b12d237500e63525c2fd056699967a3e7ea20f62", [:mix], [], "hexpm", "a4d66355cb29cb47c3cf30e71329e58361cfcb37c34235ef3bf1d7bf3773aeac"}, "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, + "delta_crdt": {:hex, :delta_crdt, "0.6.5", "c7bb8c2c7e60f59e46557ab4e0224f67ba22f04c02826e273738f3dcc4767adc", [:mix], [{:merkle_map, "~> 0.2.0", [hex: :merkle_map, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c6ae23a525d30f96494186dd11bf19ed9ae21d9fe2c1f1b217d492a7cc7294ae"}, "dialyxir": {:hex, :dialyxir, "1.4.5", "ca1571ac18e0f88d4ab245f0b60fa31ff1b12cbae2b11bd25d207f865e8ae78a", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b0fb08bb8107c750db5c0b324fa2df5ceaa0f9307690ee3c1f6ba5b9eb5d35c3"}, "dotenvy": {:hex, :dotenvy, "0.8.0", "777486ad485668317c56afc53a7cbcd74f43e4e34588ba8e95a73e15a360050e", [:mix], [], "hexpm", "1f535066282388cbd109743d337ac46ff0708195780d4b5778bb83491ab1b654"}, "earmark": {:hex, :earmark, "1.4.47", "7e7596b84fe4ebeb8751e14cbaeaf4d7a0237708f2ce43630cfd9065551f94ca", [:mix], [], "hexpm", "3e96bebea2c2d95f3b346a7ff22285bc68a99fbabdad9b655aa9c6be06c698f8"}, @@ -57,9 +58,10 @@ "google_gax": {:hex, :google_gax, "0.4.1", "310105070626013712c56f8007b6ff7b4ead02ecad1efe7888350c6eaba52783", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:poison, ">= 3.0.0 and < 5.0.0", [hex: :poison, repo: "hexpm", optional: false]}, {:tesla, "~> 1.2", [hex: :tesla, repo: "hexpm", optional: false]}], "hexpm", "aef7dce7e04840c0e611f962475e3223d27d50ebd5e7d8e9e963c5e9e3b1ca79"}, "goth": {:hex, :goth, "1.4.3", "80e86225ae174844e6a77b61982fafadfc715277db465e0956348d8bdd56b231", [:mix], [{:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:jose, "~> 1.11", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm", "34e2012ed1af2fe2446083da60a988fd9829943d30e4447832646c3a6863a7e6"}, "hackney": {:hex, :hackney, "1.23.0", "55cc09077112bcb4a69e54be46ed9bc55537763a96cd4a80a221663a7eafd767", [:rebar3], [{:certifi, "~> 2.14.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "6cd1c04cd15c81e5a493f167b226a15f0938a84fc8f0736ebe4ddcab65c0b44e"}, - "hammer": {:hex, :hammer, "6.2.1", "5ae9c33e3dceaeb42de0db46bf505bd9c35f259c8defb03390cd7556fea67ee2", [:mix], [{:poolboy, "~> 1.5", [hex: :poolboy, repo: "hexpm", optional: false]}], "hexpm", "b9476d0c13883d2dc0cc72e786bac6ac28911fba7cc2e04b70ce6a6d9c4b2bdc"}, - "hammer_backend_mnesia": {:hex, :hammer_backend_mnesia, "0.6.1", "d10d94fc29cbffbf04ecb3c3127d705ce4cc1cecfb9f3d6b18a554c3cae9af2c", [:mix], [{:hammer, "~> 6.1", [hex: :hammer, repo: "hexpm", optional: false]}], "hexpm", "85ad2ef6ebe035207dd9a03a116dc6a7ee43fbd53e8154cf32a1e33b9200fb62"}, + "hammer": {:hex, :hammer, "7.0.1", "136edcd81af44becbe6b73a958c109e2364ab0dc026d7b19892037dc2632078c", [:mix], [], "hexpm", "796edf14ab2aa80df72080210fcf944ee5e8868d8ece7a7511264d802f58cc2d"}, + "hammer_backend_mnesia": {:hex, :hammer_backend_mnesia, "0.7.0", "b2a8cccc1d3506bc4cf8e95750fa5bd491390f227e9a0d981ad375291d7bd1dc", [:mix], [{:hammer, "~> 7.0", [hex: :hammer, repo: "hexpm", optional: false]}], "hexpm", "f77a3d54df865aa8137926df6fadac2a81c06f1c1c22f4e98e32392a22bf9e3e"}, "heroicons": {:hex, :heroicons, "0.5.6", "95d730e7179c633df32d95c1fdaaecdf81b0da11010b89b737b843ac176a7eb5", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:phoenix_live_view, ">= 0.18.2", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}], "hexpm", "ca267f02a5fa695a4178a737b649fb6644a2e399639d4ba7964c18e8a58c2352"}, + "horde": {:hex, :horde, "0.9.0", "522342bd7149aeed453c97692a8bca9cf7c9368c5a489afd802e575dc8df54a6", [:mix], [{:delta_crdt, "~> 0.6.2", [hex: :delta_crdt, repo: "hexpm", optional: false]}, {:libring, "~> 1.4", [hex: :libring, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 0.5.0 or ~> 1.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "fae11e5bc9c980038607d0c3338cdf7f97124a5d5382fd4b6fb6beaab8e214fe"}, "hpax": {:hex, :hpax, "1.0.2", "762df951b0c399ff67cc57c3995ec3cf46d696e41f0bba17da0518d94acd4aac", [:mix], [], "hexpm", "2f09b4c1074e0abd846747329eaa26d535be0eb3d189fa69d812bfb8bfefd32f"}, "httpoison": {:hex, :httpoison, "2.2.1", "87b7ed6d95db0389f7df02779644171d7319d319178f6680438167d7b69b1f3d", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "51364e6d2f429d80e14fe4b5f8e39719cacd03eb3f9a9286e61e216feac2d2df"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, @@ -71,12 +73,14 @@ "junit_formatter": {:hex, :junit_formatter, "3.4.0", "d0e8db6c34dab6d3c4154c3b46b21540db1109ae709d6cf99ba7e7a2ce4b1ac2", [:mix], [], "hexpm", "bb36e2ae83f1ced6ab931c4ce51dd3dbef1ef61bb4932412e173b0cfa259dacd"}, "kafka_protocol": {:hex, :kafka_protocol, "4.1.9", "7c10d9adaba84c6f176f152e6ba8029c46dfb7cb12432587009128836cf9a44a", [:rebar3], [{:crc32cer, "0.1.11", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "14f89eed8329ff4c7b5448e318ee20a98bf5c1e5dc41b74b8af459dfb7590cef"}, "libcluster": {:hex, :libcluster, "3.4.1", "271d2da892763bbef53c2872036c936fe8b80111eb1feefb2d30a3bb15c9b4f6", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.3", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "1d568157f069c6afa70ec0d736704cf799734bdbb6343f0322af4a980301c853"}, + "libring": {:hex, :libring, "1.7.0", "4f245d2f1476cd7ed8f03740f6431acba815401e40299208c7f5c640e1883bda", [:mix], [], "hexpm", "070e3593cb572e04f2c8470dd0c119bc1817a7a0a7f88229f43cf0345268ec42"}, "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, "makeup_eex": {:hex, :makeup_eex, "0.1.2", "93a5ef3d28ed753215dba2d59cb40408b37cccb4a8205e53ef9b5319a992b700", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.16 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_html, "~> 0.1.0 or ~> 1.0", [hex: :makeup_html, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "6140eafb28215ad7182282fd21d9aa6dcffbfbe0eb876283bc6b768a6c57b0c3"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"}, "makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"}, "makeup_html": {:hex, :makeup_html, "0.1.1", "c3d4abd39d5f7e925faca72ada6e9cc5c6f5fa7cd5bc0158315832656cf14d7f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "44f2a61bc5243645dd7fafeaa6cc28793cd22f3c76b861e066168f9a5b2c26a4"}, "meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"}, + "merkle_map": {:hex, :merkle_map, "0.2.1", "01a88c87a6b9fb594c67c17ebaf047ee55ffa34e74297aa583ed87148006c4c8", [:mix], [], "hexpm", "fed4d143a5c8166eee4fa2b49564f3c4eace9cb252f0a82c1613bba905b2d04d"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mime": {:hex, :mime, "1.6.0", "dabde576a497cef4bbdd60aceee8160e02a6c89250d6c0b29e56c0dfb00db3d2", [:mix], [], "hexpm", "31a1a8613f8321143dde1dafc36006a17d28d02bdfecb9e95a880fa7aabd19a7"}, "mimerl": {:hex, :mimerl, "1.3.0", "d0cd9fc04b9061f82490f6581e0128379830e78535e017f7780f37fea7545726", [:rebar3], [], "hexpm", "a1e15a50d1887217de95f0b9b0793e32853f7c258a5cd227650889b38839fe9d"}, diff --git a/test/lightning/distributed_rate_limiter_test.exs b/test/lightning/distributed_rate_limiter_test.exs new file mode 100644 index 0000000000..664edeb29f --- /dev/null +++ b/test/lightning/distributed_rate_limiter_test.exs @@ -0,0 +1,95 @@ +defmodule Lightning.DistributedRateLimiterTest do + @moduledoc false + use ExUnit.Case + + alias Lightning.DistributedRateLimiter + + @default_capacity 10 + + describe "check_rate/2" do + test "allows up to the capacity and refills on multiple buckets" do + initial_capacity = @default_capacity + bucket1 = "project#{System.unique_integer()}" + bucket2 = "project#{System.unique_integer()}" + + Enum.each(1..initial_capacity, fn i -> + level = initial_capacity - i + + assert match?( + {:allow, ^level}, + DistributedRateLimiter.check_rate(bucket1) + ) + + assert match?( + {:allow, ^level}, + DistributedRateLimiter.check_rate(bucket2) + ) + end) + end + + test "denies after consuming the bucket" do + initial_capacity = @default_capacity + bucket1 = "project#{System.unique_integer()}" + bucket2 = "project#{System.unique_integer()}" + + Enum.each(1..initial_capacity, fn i -> + assert {:allow, level} = DistributedRateLimiter.check_rate(bucket1) + assert level == initial_capacity - i + end) + + assert {:allow, level} = DistributedRateLimiter.check_rate(bucket2) + assert level == initial_capacity - 1 + + assert {:deny, wait_ms} = DistributedRateLimiter.check_rate(bucket1) + assert 500 < wait_ms and wait_ms <= 1_000 + end + + # For testing the replication use manual procedure: + # 0. Disable Endpoint server + # 1. Run node1 on one terminal: iex --sname node1@localhost --cookie hordecookie -S mix phx.server + # 2. Run node2 on another terminal: iex --sname node2@localhost --cookie hordecookie -S mix phx.server + # 3. Call Lightning.DistributedRateLimiter.inspect_table() on both iex and they show the same ets table process and node. + test "works on top of a single worker of a distributed dynamic supervisor" do + {:ok, peer, _node1, node2} = start_nodes(:node1, :node2, ~c"localhost") + + :rpc.call(node2, Application, :ensure_all_started, [:mix]) + :rpc.call(node2, Application, :ensure_all_started, [:lightning]) + + # Copy current code paths to the peer node + :rpc.call(node2, :code, :add_paths, [:code.get_path()]) + + assert [ + {Lightning.DistributedSupervisor, :node1@localhost}, + {Lightning.DistributedSupervisor, :node2@localhost} + ] = Horde.Cluster.members(Lightning.DistributedSupervisor) + + assert [{:undefined, _pid, :worker, [Lightning.DistributedRateLimiter]}] = + Horde.DynamicSupervisor.which_children( + Lightning.DistributedSupervisor + ) + + :peer.stop(peer) + end + end + + defp start_nodes(node1, node2, host) do + # Start the main node + node1_sname = :"#{node1}@#{host}" + {:ok, _pid} = Node.start(node1_sname, :shortnames) + true = Node.set_cookie(:delicious_cookie) + cookie = Node.get_cookie() |> to_charlist() + + # Start the peer node + {:ok, peer, node2_sname} = + :peer.start(%{ + name: node2, + host: host, + cookie: cookie, + args: [~c"-setcookie", cookie] + }) + + assert node2_sname in Node.list() + + {:ok, peer, node1_sname, node2_sname} + end +end diff --git a/test/lightning/extensions/rate_limiter_test.exs b/test/lightning/extensions/rate_limiter_test.exs index e32a9f9ac1..af71e33c76 100644 --- a/test/lightning/extensions/rate_limiter_test.exs +++ b/test/lightning/extensions/rate_limiter_test.exs @@ -8,10 +8,7 @@ defmodule Lightning.Extensions.RateLimiterTest do Enum.each(1..100, fn _i -> assert RateLimiter.limit_request( conn, - %Context{ - project_id: Ecto.UUID.generate(), - user_id: Ecto.UUID.generate() - }, + %Context{project_id: Ecto.UUID.generate()}, [] ) == :ok end) diff --git a/test/lightning/failure_alert_test.exs b/test/lightning/failure_alert_test.exs index 7a1b053e32..663eaf2632 100644 --- a/test/lightning/failure_alert_test.exs +++ b/test/lightning/failure_alert_test.exs @@ -200,6 +200,8 @@ defmodule Lightning.FailureAlertTest do assert_receive {:email, %Swoosh.Email{subject: ^s3}}, 1500 + assert Lightning.RateLimiters.Mail.get(run_1.work_order.workflow_id) == 0 + s4 = "\"workflow-b\" (#{project.name}) failed" assert_receive {:email, %Swoosh.Email{subject: ^s4}}, 1500 @@ -250,32 +252,6 @@ defmodule Lightning.FailureAlertTest do refute_email_sent(subject: ^s1) end - test "does not increment the rate-limiter counter when an email is not delivered.", - %{runs: [run, _, _], workorders: [workorder, _, _], project: project} do - [time_scale: time_scale, rate_limit: rate_limit] = - Application.fetch_env!(:lightning, Lightning.FailureAlerter) - - FailureAlerter.alert_on_failure(run) - - {:ok, {0, ^rate_limit, _, _, _}} = - Hammer.inspect_bucket(workorder.workflow_id, time_scale, rate_limit) - - assert_email_sent(subject: "\"workflow-a\" (#{project.name}) failed") - - Mimic.stub(Lightning.FailureEmail, :deliver_failure_email, fn _, _ -> - {:error} - end) - - FailureAlerter.alert_on_failure(run) - - subject = "\"workflow-a\" (#{project.name}) failed" - refute_email_sent(subject: ^subject) - - # nothing changed - {:ok, {0, ^rate_limit, _, _, _}} = - Hammer.inspect_bucket(workorder.workflow_id, time_scale, rate_limit) - end - test "failure alert is sent on run complete", %{ runs: [run, _, _], project: project diff --git a/test/lightning/rate_limiters_test.exs b/test/lightning/rate_limiters_test.exs new file mode 100644 index 0000000000..47998f50a8 --- /dev/null +++ b/test/lightning/rate_limiters_test.exs @@ -0,0 +1,13 @@ +defmodule Lightning.RateLimitersTest do + use ExUnit.Case, async: true + + alias Lightning.RateLimiters + + describe "Mail" do + test "returns a hit result" do + id = Ecto.UUID.generate() + assert RateLimiters.Mail.hit(id, 1, 1) == {:allow, 1} + assert RateLimiters.Mail.hit(id, 1, 1) == {:deny, 1000} + end + end +end diff --git a/test/lightning_web/controllers/webhooks_controller_test.exs b/test/lightning_web/controllers/webhooks_controller_test.exs index ce5b3686ef..cf46cdec72 100644 --- a/test/lightning_web/controllers/webhooks_controller_test.exs +++ b/test/lightning_web/controllers/webhooks_controller_test.exs @@ -2,6 +2,7 @@ defmodule LightningWeb.WebhooksControllerTest do use LightningWeb.ConnCase, async: false import Lightning.Factories + import Mox alias Lightning.Extensions.MockRateLimiter alias Lightning.Extensions.StubRateLimiter @@ -12,6 +13,8 @@ defmodule LightningWeb.WebhooksControllerTest do alias Lightning.Runs alias Lightning.WorkOrders + setup :set_mox_from_context + describe "a POST request to '/i'" do setup [:stub_rate_limiter_ok, :stub_usage_limiter_ok] @@ -56,7 +59,9 @@ defmodule LightningWeb.WebhooksControllerTest do |> Repo.preload(:triggers) |> with_snapshot() - Application.put_env(:lightning, :max_dataclip_size_bytes, 1_000_000) + Mox.stub(Lightning.MockConfig, :max_dataclip_size_bytes, fn -> + 1_000_000 + end) smaller_body = %{"data" => %{a: String.duplicate("a", 500_000)}} diff --git a/test/support/mock.ex b/test/support/mock.ex index 269db5bbc5..39b240bd4b 100644 --- a/test/support/mock.ex +++ b/test/support/mock.ex @@ -8,6 +8,10 @@ defmodule Lightning.Stub do @impl true def broadcast(topic, msg), do: Lightning.API.broadcast(topic, msg) + @impl true + def broadcast_from(pid, topic, msg), + do: Lightning.API.broadcast_from(pid, topic, msg) + @impl true def local_broadcast(topic, msg), do: Lightning.API.local_broadcast(topic, msg) diff --git a/test/test_helper.exs b/test/test_helper.exs index cc0209588f..9a710135a2 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -54,5 +54,12 @@ Application.put_env(:lightning, Lightning.Extensions, external_metrics: Lightning.Extensions.ExternalMetrics ) +epmd_path = System.find_executable("epmd") +port = Port.open({:spawn_executable, epmd_path}, []) +os_pid = Keyword.get(Port.info(port), :os_pid) + +# Configuring a "shutdown hook" to stop epmd after everything is done. +System.at_exit(fn _ -> System.shell("kill -TERM #{os_pid}") end) + ExUnit.start() Ecto.Adapters.SQL.Sandbox.mode(Lightning.Repo, :manual)