Skip to content

Commit 5ec9781

Browse files
committed
Using CRDTs for the processes only with Horde
1 parent a766c5a commit 5ec9781

File tree

11 files changed

+287
-87
lines changed

11 files changed

+287
-87
lines changed

lib/lightning/application.ex

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ defmodule Lightning.Application do
105105
[
106106
Lightning.PromEx,
107107
{Cluster.Supervisor, [topologies, [name: Lightning.ClusterSupervisor]]},
108+
{Horde.Registry,
109+
name: Lightning.HordeRegistry, keys: :unique, members: :auto},
110+
{Horde.DynamicSupervisor,
111+
name: Lightning.DistributedSupervisor,
112+
strategy: :one_for_one,
113+
members: :auto},
108114
{Lightning.Vault, Application.get_env(:lightning, Lightning.Vault, [])},
109115
# Start the Ecto repository
110116
Lightning.Repo,
@@ -174,6 +180,15 @@ defmodule Lightning.Application do
174180
:ok
175181
end
176182

183+
def start_phase(:init_rate_limiter, :normal, _args) do
184+
Horde.DynamicSupervisor.start_child(
185+
Lightning.DistributedSupervisor,
186+
Lightning.WebhookRateLimiter
187+
)
188+
189+
:ok
190+
end
191+
177192
def oban_opts do
178193
opts = Application.get_env(:lightning, Oban)
179194

lib/lightning/extensions/rate_limiting.ex

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,9 @@ defmodule Lightning.Extensions.RateLimiting do
88
@type message :: Lightning.Extensions.Message.t()
99

1010
defmodule Context do
11-
@moduledoc """
12-
Which user is making the request for a certain project.
13-
"""
11+
@type t :: %Context{project_id: Ecto.UUID.t()}
1412

15-
@type t :: %Context{
16-
project_id: Ecto.UUID.t(),
17-
user_id: Ecto.UUID.t() | nil
18-
}
19-
20-
defstruct [:project_id, :user_id]
13+
defstruct [:project_id]
2114
end
2215

2316
@callback limit_request(

lib/lightning/rate_limiters.ex

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,6 @@ defmodule Lightning.RateLimiters do
2222
| {:deny, non_neg_integer()}
2323
end
2424

25-
defmodule Webhook do
26-
@moduledoc false
27-
28-
use ReplicatedRateLimiter,
29-
default_capacity: 10,
30-
default_refill: 2
31-
end
32-
3325
@spec hit({:failure_email, String.t(), String.t()}) :: Mail.hit_result()
3426
def hit({:failure_email, workflow_id, user_id}) do
3527
[time_scale: time_scale, rate_limit: rate_limit] =
@@ -45,15 +37,6 @@ defmodule Lightning.RateLimiters do
4537
end
4638
end
4739

48-
def hit({:webhook, project_id}) do
49-
# 10 requests for a second, then 2 requests per second
50-
# Over a long enough period of time, this will allow 2 requests per second.
51-
# allow?("webhook_#{project_id}", 10, 2)
52-
# capacity and refill is by design a module attribute
53-
# TODO: passing it here might eliminate the need for macro for easier maintainance
54-
Webhook.allow?("webhook_#{project_id}")
55-
end
56-
5740
def child_spec(opts) do
5841
%{
5942
id: __MODULE__,
@@ -63,7 +46,7 @@ defmodule Lightning.RateLimiters do
6346
end
6447

6548
def start_link(opts) do
66-
children = [{Mail, opts}, {Webhook, opts}]
49+
children = [{Mail, opts}]
6750
Supervisor.start_link(children, strategy: :one_for_one)
6851
end
6952
end

lib/lightning/services/rate_limiter.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ defmodule Lightning.Services.RateLimiter do
77
import Lightning.Services.AdapterHelper
88

99
@impl true
10-
def limit_request(conn, context, opts) do
10+
def limit_request(conn, context, opts \\ []) do
1111
adapter().limit_request(conn, context, opts)
1212
end
1313

lib/lightning/webhook_rate_limiter.ex

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
defmodule Lightning.WebhookRateLimiter do
2+
@moduledoc false
3+
use GenServer
4+
5+
@capacity 10
6+
@refill_per_sec 2
7+
8+
require Logger
9+
10+
def child_spec(opts) do
11+
{id, name} =
12+
if name = Keyword.get(opts, :name) do
13+
{"#{__MODULE__}_#{name}", name}
14+
else
15+
{__MODULE__, __MODULE__}
16+
end
17+
18+
%{
19+
id: id,
20+
start: {__MODULE__, :start_link, [name]},
21+
shutdown: 10_000,
22+
restart: :transient
23+
}
24+
end
25+
26+
def start_link(name) do
27+
with {:error, {:already_started, pid}} <-
28+
GenServer.start_link(__MODULE__, [], name: via_tuple(name)) do
29+
Logger.info("already started at #{inspect(pid)}, returning :ignore")
30+
:ignore
31+
end
32+
end
33+
34+
@impl true
35+
def init([]) do
36+
Process.flag(:trap_exit, true)
37+
38+
{:ok, %{table: :ets.new(:table, [:set])}}
39+
end
40+
41+
def check_rate(bucket, cost \\ 1, name \\ __MODULE__) do
42+
name
43+
|> via_tuple()
44+
|> GenServer.call({:check_rate, bucket, cost})
45+
end
46+
47+
def inspect_table(name \\ __MODULE__) do
48+
name
49+
|> via_tuple()
50+
|> GenServer.call(:inspect_table)
51+
end
52+
53+
@impl true
54+
def handle_call({:check_rate, bucket, cost}, _from, %{table: table} = state) do
55+
{:reply, do_check_rate(table, bucket, cost), state}
56+
end
57+
58+
@impl true
59+
def handle_call(:inspect_table, _from, %{table: table} = state) do
60+
{:reply, :ets.info(table), state}
61+
end
62+
63+
@impl true
64+
def handle_info(
65+
{:EXIT, _from, {:name_conflict, {_key, _value}, registry, pid}},
66+
state
67+
) do
68+
Logger.info(
69+
"Stopping #{inspect({registry, pid})} as it has already started in another node."
70+
)
71+
72+
{:stop, :normal, state}
73+
end
74+
75+
def do_check_rate(table, bucket, cost) do
76+
now = System.monotonic_time(:millisecond)
77+
78+
:ets.insert_new(table, {bucket, {@capacity, now}})
79+
[{^bucket, {level, updated}}] = :ets.lookup(table, bucket)
80+
81+
refilled = div(now - updated, 1_000) * @refill_per_sec
82+
current = min(@capacity, level + refilled)
83+
84+
if current >= cost do
85+
level = current - cost
86+
:ets.insert(table, {bucket, {level, now}})
87+
88+
{:allow, level}
89+
else
90+
# can retry after 1 second
91+
{:deny, 1}
92+
end
93+
end
94+
95+
def capacity, do: @capacity
96+
def refill_per_second, do: @refill_per_sec
97+
98+
def via_tuple(name),
99+
do: {:via, Horde.Registry, {Lightning.HordeRegistry, name}}
100+
end

lib/lightning_web/controllers/webhooks_controller.ex

Lines changed: 51 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
defmodule LightningWeb.WebhooksController do
22
use LightningWeb, :controller
33

4-
alias Lightning.Extensions.RateLimiting
54
alias Lightning.Extensions.UsageLimiting.Action
65
alias Lightning.Extensions.UsageLimiting.Context
76
alias Lightning.Services.RateLimiter
@@ -10,7 +9,7 @@ defmodule LightningWeb.WebhooksController do
109
alias Lightning.WorkOrders
1110

1211
plug :reject_unfetched when action in [:create]
13-
# plug :check_rate when action in [:create]
12+
plug :check_rate when action in [:create]
1413

1514
# Reject requests with unfetched body params, as they are not supported
1615
# See Plug.Parsers in Endpoint for more information.
@@ -28,20 +27,36 @@ defmodule LightningWeb.WebhooksController do
2827
end
2928
end
3029

31-
# defp check_rate(conn, _params) do
32-
# # TODO: this may be _after_ the body has been parsed (into body_params), so we may need to
33-
# # may need to move this plug further upstream.
34-
# case Lightning.RateLimiters.check_rate(conn.project_id) do
35-
# {:allow, _} ->
36-
# conn
37-
# {:deny, timeout} ->
38-
# conn
39-
# |> put_status(429)
40-
# |> put_resp_header("retry-after", to_string(timeout))
41-
# |> json(%{"error" => "Too many requests"})
42-
# |> halt()
43-
# end
44-
# end
30+
# Note: Plug.Parsers is called before the Router.
31+
def check_rate(conn, _opts) do
32+
with %Workflows.Trigger{enabled: true, workflow: %{project_id: project_id}} <-
33+
conn.assigns.trigger,
34+
:ok <- RateLimiter.limit_request(conn, %Context{project_id: project_id}) do
35+
conn
36+
else
37+
%Workflows.Trigger{enabled: false} ->
38+
conn
39+
|> put_status(:forbidden)
40+
|> json(%{
41+
message:
42+
"Unable to process request, trigger is disabled. Enable it on OpenFn to allow requests to this endpoint."
43+
})
44+
|> halt()
45+
46+
{:error, :too_many_requests, %{text: message}} ->
47+
conn
48+
|> put_status(:too_many_requests)
49+
|> put_resp_header("retry-after", "1")
50+
|> json(%{"error" => message})
51+
|> halt()
52+
53+
_no_trigger_or_workflow ->
54+
conn
55+
|> put_status(:not_found)
56+
|> json(%{"error" => "Webhook not found"})
57+
|> halt()
58+
end
59+
end
4560

4661
@spec check(Plug.Conn.t(), %{path: binary()}) :: Plug.Conn.t()
4762
def check(conn, _params) do
@@ -53,30 +68,25 @@ defmodule LightningWeb.WebhooksController do
5368
end
5469

5570
@spec create(Plug.Conn.t(), %{path: binary()}) :: Plug.Conn.t()
56-
def create(conn, _params) do
57-
with %Workflows.Trigger{enabled: true, workflow: %{project_id: project_id}} =
58-
trigger <- conn.assigns.trigger,
59-
{:ok, without_run?} <- check_skip_run_creation(project_id),
60-
:ok <-
61-
RateLimiter.limit_request(
62-
conn,
63-
%RateLimiting.Context{project_id: project_id},
64-
[]
65-
) do
66-
{:ok, work_order} =
67-
WorkOrders.create_for(trigger,
68-
workflow: trigger.workflow,
69-
dataclip: %{
70-
body: conn.body_params,
71-
request: build_request(conn),
72-
type: :http_request,
73-
project_id: project_id
74-
},
75-
without_run: without_run?
76-
)
77-
78-
conn |> json(%{work_order_id: work_order.id})
79-
else
71+
def create(%{assigns: %{trigger: trigger}} = conn, _params) do
72+
%Workflows.Trigger{workflow: workflow} = trigger
73+
74+
case check_skip_run_creation(workflow.project_id) do
75+
{:ok, without_run?} ->
76+
{:ok, work_order} =
77+
WorkOrders.create_for(trigger,
78+
workflow: workflow,
79+
dataclip: %{
80+
body: conn.body_params,
81+
request: build_request(conn),
82+
type: :http_request,
83+
project_id: workflow.project_id
84+
},
85+
without_run: without_run?
86+
)
87+
88+
json(conn, %{work_order_id: work_order.id})
89+
8090
{:error, reason, %{text: message}} ->
8191
status =
8292
if reason == :too_many_requests,
@@ -85,19 +95,7 @@ defmodule LightningWeb.WebhooksController do
8595

8696
conn
8797
|> put_status(status)
88-
|> json(%{"error" => message})
89-
90-
nil ->
91-
conn
92-
|> put_status(:not_found)
93-
|> json(%{"error" => "Webhook not found"})
94-
95-
_disabled ->
96-
put_status(conn, :forbidden)
97-
|> json(%{
98-
message:
99-
"Unable to process request, trigger is disabled. Enable it on OpenFn to allow requests to this endpoint."
100-
})
98+
|> json(%{error: message})
10199
end
102100
end
103101

mix.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ defmodule Lightning.MixProject do
5555
[
5656
mod: {Lightning.Application, [:timex]},
5757
extra_applications: [:logger, :runtime_tools, :os_mon, :scrivener],
58-
start_phases: [seed_prom_ex_telemetry: []]
58+
start_phases: [seed_prom_ex_telemetry: [], init_rate_limiter: []]
5959
]
6060
end
6161

@@ -69,6 +69,7 @@ defmodule Lightning.MixProject do
6969
defp deps do
7070
[
7171
# {:rexbug, ">= 1.0.0", only: :test},
72+
{:horde, "~> 0.9.0"},
7273
{:bcrypt_elixir, "~> 3.2"},
7374
{:bodyguard, "~> 2.2"},
7475
{:broadway_kafka, "~> 0.4.2"},

mix.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
"hammer": {:hex, :hammer, "7.0.1", "136edcd81af44becbe6b73a958c109e2364ab0dc026d7b19892037dc2632078c", [:mix], [], "hexpm", "796edf14ab2aa80df72080210fcf944ee5e8868d8ece7a7511264d802f58cc2d"},
6262
"hammer_backend_mnesia": {:hex, :hammer_backend_mnesia, "0.7.0", "b2a8cccc1d3506bc4cf8e95750fa5bd491390f227e9a0d981ad375291d7bd1dc", [:mix], [{:hammer, "~> 7.0", [hex: :hammer, repo: "hexpm", optional: false]}], "hexpm", "f77a3d54df865aa8137926df6fadac2a81c06f1c1c22f4e98e32392a22bf9e3e"},
6363
"heroicons": {:hex, :heroicons, "0.5.6", "95d730e7179c633df32d95c1fdaaecdf81b0da11010b89b737b843ac176a7eb5", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:phoenix_live_view, ">= 0.18.2", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}], "hexpm", "ca267f02a5fa695a4178a737b649fb6644a2e399639d4ba7964c18e8a58c2352"},
64+
"horde": {:hex, :horde, "0.9.0", "522342bd7149aeed453c97692a8bca9cf7c9368c5a489afd802e575dc8df54a6", [:mix], [{:delta_crdt, "~> 0.6.2", [hex: :delta_crdt, repo: "hexpm", optional: false]}, {:libring, "~> 1.4", [hex: :libring, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 0.5.0 or ~> 1.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "fae11e5bc9c980038607d0c3338cdf7f97124a5d5382fd4b6fb6beaab8e214fe"},
6465
"hpax": {:hex, :hpax, "1.0.2", "762df951b0c399ff67cc57c3995ec3cf46d696e41f0bba17da0518d94acd4aac", [:mix], [], "hexpm", "2f09b4c1074e0abd846747329eaa26d535be0eb3d189fa69d812bfb8bfefd32f"},
6566
"httpoison": {:hex, :httpoison, "2.2.1", "87b7ed6d95db0389f7df02779644171d7319d319178f6680438167d7b69b1f3d", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "51364e6d2f429d80e14fe4b5f8e39719cacd03eb3f9a9286e61e216feac2d2df"},
6667
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
@@ -72,6 +73,7 @@
7273
"junit_formatter": {:hex, :junit_formatter, "3.4.0", "d0e8db6c34dab6d3c4154c3b46b21540db1109ae709d6cf99ba7e7a2ce4b1ac2", [:mix], [], "hexpm", "bb36e2ae83f1ced6ab931c4ce51dd3dbef1ef61bb4932412e173b0cfa259dacd"},
7374
"kafka_protocol": {:hex, :kafka_protocol, "4.1.9", "7c10d9adaba84c6f176f152e6ba8029c46dfb7cb12432587009128836cf9a44a", [:rebar3], [{:crc32cer, "0.1.11", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "14f89eed8329ff4c7b5448e318ee20a98bf5c1e5dc41b74b8af459dfb7590cef"},
7475
"libcluster": {:hex, :libcluster, "3.4.1", "271d2da892763bbef53c2872036c936fe8b80111eb1feefb2d30a3bb15c9b4f6", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.3", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "1d568157f069c6afa70ec0d736704cf799734bdbb6343f0322af4a980301c853"},
76+
"libring": {:hex, :libring, "1.7.0", "4f245d2f1476cd7ed8f03740f6431acba815401e40299208c7f5c640e1883bda", [:mix], [], "hexpm", "070e3593cb572e04f2c8470dd0c119bc1817a7a0a7f88229f43cf0345268ec42"},
7577
"makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"},
7678
"makeup_eex": {:hex, :makeup_eex, "0.1.2", "93a5ef3d28ed753215dba2d59cb40408b37cccb4a8205e53ef9b5319a992b700", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.16 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_html, "~> 0.1.0 or ~> 1.0", [hex: :makeup_html, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "6140eafb28215ad7182282fd21d9aa6dcffbfbe0eb876283bc6b768a6c57b0c3"},
7779
"makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"},

0 commit comments

Comments
 (0)