Skip to content

Commit 92e9223

Browse files
committed
Add proper cluster awareness via libcluster
1 parent db5a338 commit 92e9223

File tree

19 files changed

+330
-65
lines changed

19 files changed

+330
-65
lines changed

app/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ LABEL edu.northwestern.library.app=meadow \
5050
edu.northwestern.library.stage=runtime
5151
RUN apt update -qq && apt install -y curl git jq libssl-dev libncurses5-dev
5252
ENV LANG=C.UTF-8
53-
EXPOSE 4000 4369
53+
EXPOSE 4000 4369 9100-9155
5454
COPY --from=build /app/_build/prod/rel/meadow /app
5555
WORKDIR /app
5656
RUN npm install -g @anthropic-ai/claude-code \

app/config/config.exs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ config :meadow, Meadow.Pipeline,
115115
Actions.FileSetComplete
116116
]
117117

118+
config :mime, :types, %{
119+
"text/event-stream" => ["event-stream"]
120+
}
121+
118122
config :ueberauth, Ueberauth, providers: [nusso: {Ueberauth.Strategy.NuSSO, []}]
119123

120124
import_config("#{Mix.env()}.exs")

app/lib/meadow/application.ex

Lines changed: 80 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,25 @@ defmodule Meadow.Application do
44
@moduledoc false
55

66
use Application
7+
use Retry
78
alias Meadow.Application.Children
89
alias Meadow.Config.Runtime
910

1011
require Logger
12+
require WaitForIt
1113

1214
def start(_type, _args) do
1315
result =
1416
Supervisor.start_link(
1517
[
16-
{DynamicSupervisor, max_restarts: 4096, strategy: :one_for_one, name: Meadow.Supervisor}
18+
{DynamicSupervisor,
19+
max_restarts: 4096, strategy: :one_for_one, name: Meadow.Supervisor},
20+
{Horde.Registry, name: Meadow.HordeRegistry, keys: :unique, members: :auto},
21+
{Horde.DynamicSupervisor,
22+
name: Meadow.HordeSupervisor,
23+
distribution_strategy: Horde.UniformQuorumDistribution,
24+
strategy: :one_for_one,
25+
members: :auto}
1726
],
1827
max_restarts: 4096,
1928
strategy: :one_for_one,
@@ -26,22 +35,48 @@ defmodule Meadow.Application do
2635
Runtime.configure!()
2736
end
2837

38+
if System.get_env("CLUSTER_ENABLED") == "true" do
39+
Logger.info("Starting libcluster")
40+
topologies = Application.get_env(:libcluster, :topologies, [])
41+
42+
DynamicSupervisor.start_child(
43+
Meadow.Supervisor,
44+
{Cluster.Supervisor, [topologies, [name: Meadow.ClusterSupervisor]]}
45+
)
46+
47+
Logger.info("Waiting for cluster")
48+
49+
WaitForIt.case_wait Horde.Cluster.members(Meadow.HordeSupervisor),
50+
timeout: :timer.seconds(10),
51+
interval: :timer.seconds(1) do
52+
nodes when length(nodes) > 1 ->
53+
Logger.info("Cluster formed with nodes: #{inspect(nodes)}")
54+
else
55+
Logger.warning("Timeout waiting for cluster formation. Continuing with single node.")
56+
end
57+
58+
Logger.info("Waiting for quorum")
59+
Horde.DynamicSupervisor.wait_for_quorum(Meadow.HordeSupervisor, :timer.seconds(10))
60+
end
61+
62+
Logger.info("Starting Anubis MCP server")
63+
Application.ensure_all_started(:anubis_mcp)
64+
2965
unless System.get_env("MEADOW_NO_REPO") do
3066
DynamicSupervisor.start_child(Meadow.Supervisor, Meadow.Repo)
3167
DynamicSupervisor.start_child(Meadow.Supervisor, Meadow.Repo.Indexing)
3268
Meadow.Repo.wait_for_connection()
3369
end
3470

35-
base_children = [
71+
[
3672
{Phoenix.PubSub, [name: Meadow.PubSub, adapter: Phoenix.PubSub.PG2]},
3773
Meadow.Telemetry,
3874
{Registry, keys: :unique, name: Meadow.TaskRegistry}
3975
]
76+
|> start_children()
4077

41-
children = base_children ++ Children.specs()
42-
43-
children
44-
|> Enum.each(&DynamicSupervisor.start_child(Meadow.Supervisor, &1))
78+
Children.specs()
79+
|> start_children()
4580

4681
:telemetry.attach(
4782
"reorder-file-sets-stop-handler",
@@ -53,10 +88,49 @@ defmodule Meadow.Application do
5388
result
5489
end
5590

91+
def start_children(children) do
92+
retry with: exponential_backoff() |> randomize() |> cap(10_000) |> Stream.take(10) do
93+
children
94+
|> Enum.each(fn spec ->
95+
if distributed?(spec) do
96+
Horde.DynamicSupervisor.start_child(Meadow.HordeSupervisor, spec)
97+
else
98+
DynamicSupervisor.start_child(Meadow.Supervisor, spec)
99+
end
100+
|> case do
101+
{:ok, _pid} ->
102+
:ok
103+
104+
{:error, {:already_started, _pid}} ->
105+
Logger.warning("Not starting #{inspect(spec)}: already started")
106+
:ok
107+
108+
{:error, reason} ->
109+
Logger.error("Failed to start child #{inspect(spec)}: #{inspect(reason)}")
110+
end
111+
end)
112+
end
113+
end
114+
56115
# Tell Phoenix to update the endpoint configuration
57116
# whenever the application is updated.
58117
def config_change(changed, _new, removed) do
59118
MeadowWeb.Endpoint.config_change(changed, removed)
60119
:ok
61120
end
121+
122+
defp spec_name(child_spec) do
123+
case child_spec do
124+
%{start: {_mod, _fun, [args]}} when is_list(args) -> Keyword.get(args, :name)
125+
%{start: {_mod, _fun, args}} when is_list(args) -> Keyword.get(args, :name)
126+
{_mod, opts} when is_list(opts) -> Keyword.get(opts, :name)
127+
%{id: {:via, Horde.Registry, _} = name} -> name
128+
_ -> nil
129+
end
130+
end
131+
132+
defp distributed?(child_spec), do: spec_name(child_spec) |> via_horde?()
133+
134+
defp via_horde?({:via, Horde.Registry, _}), do: true
135+
defp via_horde?(_), do: false
62136
end

app/lib/meadow/application/caches.ex

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,38 +3,39 @@ defmodule Meadow.Application.Caches do
33
Cache specs for Meadow.Application
44
"""
55
require Cachex.Spec
6+
import Cachex.Spec
67

78
def specs do
89
[
910
cache_spec(:global_cache, Meadow.Cache),
1011
cache_spec(
1112
:chat_conversation_cache,
1213
Meadow.Cache.Chat.Conversations,
13-
expiration: Cachex.Spec.expiration(default: :timer.hours(6)),
14+
expiration: expiration(default: :timer.hours(6)),
1415
stats: true
1516
),
1617
cache_spec(
1718
:coded_term_cache,
1819
Meadow.Cache.CodedTerms,
19-
expiration: Cachex.Spec.expiration(default: :timer.hours(6)),
20+
expiration: expiration(default: :timer.hours(6)),
2021
stats: true
2122
),
2223
cache_spec(
2324
:controlled_term_cache,
2425
Meadow.Cache.ControlledTerms,
25-
expiration: Cachex.Spec.expiration(default: :timer.hours(6)),
26+
expiration: expiration(default: :timer.hours(6)),
2627
stats: true
2728
),
2829
cache_spec(
2930
:preservation_check_job_cache,
3031
Meadow.Cache.PreservationChecks,
31-
expiration: Cachex.Spec.expiration(default: :timer.hours(6)),
32+
expiration: expiration(default: :timer.hours(6)),
3233
stats: true
3334
),
3435
cache_spec(
3536
:aws_credentials_cache,
3637
Meadow.Cache.AWS.Credentials,
37-
expiration: Cachex.Spec.expiration(default: :timer.hours(6)),
38+
expiration: expiration(default: :timer.hours(6)),
3839
stats: true
3940
)
4041
]
@@ -57,8 +58,9 @@ defmodule Meadow.Application.Caches do
5758
end
5859

5960
defp cache_spec(id, name, args \\ []) do
61+
args = Keyword.put_new(args, :router, router(module: Cachex.Router.Ring, options: [monitor: true]))
6062
%{
61-
id: id,
63+
id: [id, Node.self()] |> Enum.join("_") |> String.to_atom(),
6264
start: {Cachex, :start_link, [name, args]},
6365
type: :supervisor
6466
}

app/lib/meadow/application/children.ex

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@ defmodule Meadow.Application.Children do
1717
"csv_update_driver" => Meadow.CSVMetadataUpdateDriver,
1818
"database_listeners" => [
1919
{WalEx.Supervisor, Application.get_env(:meadow, WalEx)},
20-
{Meadow.Events.Works.Arks.Processor,
21-
token_count: 100, interval: 1_000, replenish_count: 10}
20+
{Meadow.Events.Works.Arks.Processor, token_count: 100, interval: 1_000, replenish_count: 10} |> distributed()
2221
],
23-
"scheduler" => Meadow.Scheduler,
22+
"scheduler" => Meadow.Scheduler |> distributed(),
2423
"work_creator" => [Meadow.Ingest.WorkCreator, Meadow.Ingest.WorkRedriver]
2524
}
2625
end
@@ -43,8 +42,8 @@ defmodule Meadow.Application.Children do
4342
MeadowWeb.Endpoint,
4443
{Absinthe.Subscription, MeadowWeb.Endpoint},
4544
MeadowWeb.Subscription,
46-
Anubis.Server.Registry,
47-
{MeadowWeb.MCP.Server, transport: :streamable_http}
45+
MeadowWeb.MCP.GlobalRegistry,
46+
{MeadowWeb.MCP.Server, transport: :streamable_http, registry: MeadowWeb.MCP.GlobalRegistry} |> distributed()
4847
],
4948
"web.notifiers" => [
5049
{Meadow.Ingest.Progress, interval: Config.progress_ping_interval()}
@@ -144,4 +143,26 @@ defmodule Meadow.Application.Children do
144143
{Plug.Cowboy, scheme: :http, plug: plug, options: [port: port]}
145144
end
146145
end
146+
147+
def distributed(%{start: {mod, fun, [args]}} = spec) do
148+
%{spec | start: {mod, fun, [via_horde(Keyword.put_new(args, :name, mod))]}}
149+
end
150+
151+
def distributed(%{start: {mod, fun, args}} = spec) do
152+
%{spec | start: {mod, fun, via_horde(Keyword.put_new(args, :name, mod))}}
153+
end
154+
155+
def distributed({mod, args}) do
156+
{mod, via_horde(Keyword.put_new(args, :name, mod))}
157+
end
158+
159+
def distributed(mod), do: {mod, via_horde([name: mod])}
160+
161+
defp via_horde(args) do
162+
Keyword.update(args, :name, nil, fn name ->
163+
if is_atom(name),
164+
do: {:via, Horde.Registry, {Meadow.HordeRegistry, name}},
165+
else: name
166+
end)
167+
end
147168
end

app/lib/meadow/config/runtime/dev.ex

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,11 @@ defmodule Meadow.Config.Runtime.Dev do
9797
callback_port: 3001,
9898
ssl_port: 3001
9999

100+
config :libcluster,
101+
topologies: [
102+
ecs: [strategy: Cluster.Strategy.LocalEpmd]
103+
]
104+
100105
if prefix = System.get_env("DEV_PREFIX") do
101106
config :meadow,
102107
dc_api: [

app/lib/meadow/config/runtime/prod.ex

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ defmodule Meadow.Config.Runtime.Prod do
33
Load and apply Meadow's runtime configuration for the production environment
44
"""
55

6+
import Meadow.Config.Secrets
7+
68
def configure! do
79
import Meadow.Config.Helper
810

@@ -12,5 +14,31 @@ defmodule Meadow.Config.Runtime.Prod do
1214
url: [host: System.get_env("MEADOW_HOSTNAME", "example.com"), port: 443],
1315
cache_static_manifest: "priv/static/cache_manifest.json",
1416
server: true
17+
18+
redis_host = get_secret(:redis, ["address"])
19+
redis_port = get_secret(:redis, ["port"])
20+
21+
# config :anubis_mcp, :session_store,
22+
# enabled: true,
23+
# adapter: Anubis.Server.Session.Store.Redis,
24+
# redis_url: "redis://#{redis_host}:#{redis_port}",
25+
# pool_size: 10,
26+
# ttl: :timer.minutes(30),
27+
# namespace: "meadow:mcp:sessions",
28+
# connection_name: :anubis_redis
29+
30+
if System.get_env("CLUSTER_ENABLED") == "true" do
31+
config :libcluster,
32+
topologies: [
33+
ecs: [
34+
strategy: Cluster.Strategy.DNSPoll,
35+
config: [
36+
polling_interval: 5_000,
37+
query: System.get_env("CLUSTER_DNS_NAME"),
38+
node_basename: "meadow"
39+
]
40+
]
41+
]
42+
end
1543
end
1644
end

app/lib/meadow/config/secrets.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ defmodule Meadow.Config.Secrets do
1616
meadow: "config/meadow",
1717
meadow_pipeline: "config/meadow-pipeline",
1818
nusso: "infrastructure/nusso",
19+
redis: "infrastructure/cache",
1920
wildcard_ssl: "config/wildcard_ssl"
2021
}
2122

app/lib/meadow/data/index_batcher.ex

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,27 @@ defmodule Meadow.Data.IndexBatcher do
5252
def clear(schema), do: dispatch(schema, :clear)
5353

5454
defp dispatch(schema, message) do
55-
case Process.whereis(:"#{schema}_batcher") do
56-
nil -> :noop
57-
pid -> GenServer.call(pid, message, :infinity)
55+
case Horde.Registry.whereis_name({Meadow.HordeRegistry, registry_name(schema)}) do
56+
pid when is_pid(pid) -> GenServer.call(pid, message, :infinity)
57+
_ -> :noop
5858
end
5959
end
6060

6161
def child_spec(schema, opts \\ []) do
6262
opts =
63-
[schema: schema, version: 2, name: :"#{schema.__schema__(:source)}_batcher"]
63+
[schema: schema, version: 2, name: registry_name(schema)]
6464
|> Keyword.merge(opts)
6565

6666
%{id: Module.concat(__MODULE__, schema), start: {__MODULE__, :start_link, [opts]}}
67+
|> Meadow.Application.Children.distributed()
68+
end
69+
70+
defp registry_name(source) when is_binary(source), do: :"#{source}_batcher"
71+
72+
defp registry_name(schema) do
73+
if Code.ensure_loaded?(schema) and function_exported?(schema, :__schema__, 1),
74+
do: registry_name(schema.__schema__(:source)),
75+
else: registry_name(to_string(schema))
6776
end
6877

6978
def start_link(args \\ []) do

0 commit comments

Comments
 (0)