Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .iex.exs
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
import Ecto.Query
if Code.loaded?(Ecto.Query) do
import Ecto.Query
end

alias Lightning.Repo
171 changes: 171 additions & 0 deletions bin/local_cluster
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
#!/usr/bin/env bash

# Function to show usage
show_usage() {
echo "Usage:"
echo " $0 [--proxy] --count <number> Start local cluster (default: 2 instances)"
echo " $0 connect <node_number> Connect to a specific node (1-4)"
echo ""
echo "Options:"
echo " --proxy Start a Caddy reverse proxy on port 4000 (nodes will start from 4001)"
echo " --count <num> Number of nodes to start (1-4, default: 2)"
exit 1
}

# Handle connect subcommand
if [ "$1" = "connect" ]; then
if [ -z "$2" ] || ! [[ "$2" =~ ^[1-4]$ ]]; then
echo "Error: Please specify a valid node number (1-4)"
show_usage
fi

NODE_NUM=$2
echo "Connecting to node${NODE_NUM}@127.0.0.1..."
exec iex --name "remote_shell${NODE_NUM}@127.0.0.1" --remsh "node${NODE_NUM}@127.0.0.1"
# The exec command replaces the current process, so we don't need an explicit exit
# If we reach this point, it means the exec failed, so we'll exit with its status code
exit $?
fi

# Parse arguments
USE_PROXY=false
INSTANCES=2

while [[ $# -gt 0 ]]; do
case $1 in
--proxy)
USE_PROXY=true
shift
;;
--count)
if [ -z "$2" ] || ! [[ "$2" =~ ^[0-9]+$ ]]; then
echo "Error: --count requires a numeric argument"
show_usage
fi
INSTANCES=$2
shift 2
;;
*)
echo "Unknown argument: $1"
show_usage
;;
esac
done

# Validate number of instances
if ! [[ "$INSTANCES" =~ ^[0-9]+$ ]]; then
echo "Error: Number of instances must be a positive integer"
show_usage
fi

if [ "$INSTANCES" -lt 1 ] || [ "$INSTANCES" -gt 4 ]; then
echo "Error: Number of instances must be between 1 and 4"
show_usage
fi

# Check for Caddy if proxy is requested
if [ "$USE_PROXY" = true ]; then
if ! command -v caddy &>/dev/null; then
echo "Error: Caddy is required for proxy mode but it's not installed"
echo "Please install Caddy first:"
echo " Mac: brew install caddy"
echo " Linux: sudo apt install caddy"
echo " Or visit: https://caddyserver.com/docs/install"
exit 1
fi
fi

# Array to store background PIDs
declare -a PIDS

# Colors for different processes
declare -a COLORS=(
"\033[0;36m" # Cyan
"\033[0;32m" # Green
"\033[0;35m" # Purple
"\033[0;33m" # Yellow
"\033[0;37m" # Gray (for proxy)
)
RESET="\033[0m"

# Cleanup function to kill all child processes
cleanup() {
echo "Shutting down all processes..."
for pid in "${PIDS[@]}"; do
kill "$pid" 2>/dev/null
done
exit 0
}

# Set up trap for cleanup
trap cleanup INT TERM

# Function to run a command with colored output
run_with_color() {
local color=$1
local prefix=$2
shift 2
# Run the command and color its output
"$@" 2>&1 | while read -r line; do
echo -e "${color}${prefix} | ${line}${RESET}"
done
}

# Create Caddy configuration if proxy is enabled
if [ "$USE_PROXY" = true ]; then
BASE_PORT=4001
CADDY_CONFIG=$(mktemp)
echo "Creating Caddy configuration..."
cat >"$CADDY_CONFIG" <<EOF
# Global options
{
admin off
auto_https off
http_port 4000
}

# Reverse proxy configuration
localhost:4000 {
reverse_proxy {
to $(for i in $(seq 1 "$INSTANCES"); do echo "localhost:$((BASE_PORT + i - 1))"; done | paste -sd " " -)
lb_policy round_robin
}
}
EOF

# Only log Caddy config if LOG_LEVEL is debug
if [ "${LOG_LEVEL:-}" = "debug" ]; then
echo "Caddy config:"
cat "$CADDY_CONFIG"
fi

# Start Caddy
run_with_color "${COLORS[4]}" "proxy" caddy run --adapter caddyfile --config "$CADDY_CONFIG" &
PIDS+=($!)

# Cleanup Caddy config on exit
trap 'rm -f "$CADDY_CONFIG"' EXIT

echo "Started reverse proxy on port 4000"
else
BASE_PORT=4000
fi

# Start the requested number of instances
for i in $(seq 1 "$INSTANCES"); do
export RTM_PORT=$((2222 + i - 1))
PORT=$((BASE_PORT + i - 1)) run_with_color "${COLORS[$i - 1]}" "node$i" elixir --name "[email protected]" -S mix phx.server &
PIDS+=($!)
done

if [ "$USE_PROXY" = true ]; then
echo "Started $INSTANCES node(s) on ports $((BASE_PORT))-$((BASE_PORT + INSTANCES - 1)) with load balancer on port 4000"
echo "RTM ports: 2222-$((2222 + INSTANCES - 1))"
else
echo "Started $INSTANCES node(s) on ports $((BASE_PORT))-$((BASE_PORT + INSTANCES - 1))"
echo "RTM ports: 2222-$((2222 + INSTANCES - 1))"
fi
echo "To connect to a specific node, use: $0 connect <node_number>"

# Wait for all background processes
wait
10 changes: 5 additions & 5 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ config :lightning, Lightning.Repo,
types: Lightning.PostgrexTypes,
log: :debug

config :hammer,
backend:
{Hammer.Backend.Mnesia,
[expiry_ms: 60_000 * 60 * 4, cleanup_interval_ms: 60_000 * 10]}

# Configures the endpoint
config :lightning, LightningWeb.Endpoint,
url: [host: "localhost"],
Expand All @@ -30,6 +25,11 @@ config :lightning, LightningWeb.Endpoint,
pubsub_server: Lightning.PubSub,
live_view: [signing_salt: "EfrmuOUr"]

config :lightning, Lightning.DistributedRateLimiter,
start: false,
capacity: 10,
refill_per_second: 2

config :lightning, Lightning.Extensions,
rate_limiter: Lightning.Extensions.RateLimiter,
usage_limiter: Lightning.Extensions.UsageLimiter,
Expand Down
7 changes: 2 additions & 5 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ config :lightning, LightningWeb.Endpoint,
"/8zedVJLxvmGGFoRExE3e870g7CGZZQ1Vq11A5MbQGPKOpK57MahVsPW6Wkkv61n",
server: true

config :lightning, Lightning.DistributedRateLimiter, start: true

config :lightning, Lightning.Runtime.RuntimeManager,
ws_url: "ws://localhost:4002/worker"

Expand Down Expand Up @@ -95,11 +97,6 @@ config :lightning, Lightning.Mailer, adapter: Swoosh.Adapters.Test
config :lightning, Lightning.AdaptorRegistry,
use_cache: "test/fixtures/adaptor_registry_cache.json"

config :hammer,
backend:
{Hammer.Backend.ETS,
[expiry_ms: 60_000 * 60 * 4, cleanup_interval_ms: 60_000 * 10]}

config :lightning, Lightning.FailureAlerter,
time_scale: 60_000,
rate_limit: 3
Expand Down
10 changes: 10 additions & 0 deletions lib/lightning.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ defmodule Lightning do
Phoenix.PubSub.broadcast(@pubsub, topic, msg)
end

@impl true
def broadcast_from(pid, topic, msg) do
Phoenix.PubSub.broadcast_from(@pubsub, pid, topic, msg)
end

@impl true
def local_broadcast(topic, msg) do
Phoenix.PubSub.local_broadcast(@pubsub, topic, msg)
Expand Down Expand Up @@ -60,6 +65,8 @@ defmodule Lightning do
# credo:disable-for-next-line
@callback current_time() :: DateTime.t()
@callback broadcast(binary(), {atom(), any()}) :: :ok | {:error, term()}
@callback broadcast_from(pid(), binary(), {atom(), any()}) ::
:ok | {:error, term()}
@callback local_broadcast(binary(), {atom(), any()}) :: :ok | {:error, term()}
@callback subscribe(binary()) :: :ok | {:error, term()}
@callback release() :: release_info()
Expand All @@ -71,6 +78,9 @@ defmodule Lightning do

def broadcast(topic, msg), do: impl().broadcast(topic, msg)

def broadcast_from(pid, topic, msg),
do: impl().broadcast_from(pid, topic, msg)

def local_broadcast(topic, msg), do: impl().local_broadcast(topic, msg)

def subscribe(topic), do: impl().subscribe(topic)
Expand Down
28 changes: 25 additions & 3 deletions lib/lightning/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ defmodule Lightning.Application do

require Logger

@rate_limiter_opts Application.compile_env!(
:lightning,
Lightning.DistributedRateLimiter
)

@impl true
def start(_type, _args) do
# Initialize ETS table for adapter lookup
Expand All @@ -26,8 +31,6 @@ defmodule Lightning.Application do
:mnesia.stop()
:mnesia.create_schema([node()])
:mnesia.start()
Hammer.Backend.Mnesia.create_mnesia_table(disc_copies: [node()])
:mnesia.wait_for_tables([:__hammer_backend_mnesia], 60_000)

# Only add the Sentry backend if a dsn is provided.
if Application.get_env(:sentry, :dsn),
Expand Down Expand Up @@ -107,6 +110,12 @@ defmodule Lightning.Application do
[
Lightning.PromEx,
{Cluster.Supervisor, [topologies, [name: Lightning.ClusterSupervisor]]},
{Horde.Registry,
name: Lightning.HordeRegistry, keys: :unique, members: :auto},
{Horde.DynamicSupervisor,
name: Lightning.DistributedSupervisor,
strategy: :one_for_one,
members: :auto},
{Lightning.Vault, Application.get_env(:lightning, Lightning.Vault, [])},
# Start the Ecto repository
Lightning.Repo,
Expand All @@ -128,7 +137,9 @@ defmodule Lightning.Application do
{Lightning.Runtime.RuntimeManager,
worker_secret: Lightning.Config.worker_secret(),
endpoint: LightningWeb.Endpoint},
{Lightning.KafkaTriggers.Supervisor, type: :supervisor}
{Lightning.KafkaTriggers.Supervisor, type: :supervisor},
# Start our rate limiter
{Lightning.RateLimiters, clean_period: :timer.minutes(10)}
# Start a worker by calling: Lightning.Worker.start_link(arg)
# {Lightning.Worker, arg}
]
Expand Down Expand Up @@ -174,6 +185,17 @@ defmodule Lightning.Application do
:ok
end

def start_phase(:init_rate_limiter, :normal, _args) do
if @rate_limiter_opts[:start] do
Horde.DynamicSupervisor.start_child(
Lightning.DistributedSupervisor,
{Lightning.DistributedRateLimiter, @rate_limiter_opts}
)
end

:ok
end

def oban_opts do
opts = Application.get_env(:lightning, Oban)

Expand Down
10 changes: 10 additions & 0 deletions lib/lightning/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,11 @@ defmodule Lightning.Config do
Application.get_env(:lightning, :gdpr_preferences)
end

@impl true
def max_dataclip_size_bytes do
Application.get_env(:lightning, :max_dataclip_size_bytes, 10_000_000)
end

@impl true
def external_metrics_module do
Application.get_env(:lightning, Lightning.Extensions, [])
Expand Down Expand Up @@ -346,6 +351,7 @@ defmodule Lightning.Config do
@callback book_demo_openfn_workflow_url() :: String.t()
@callback gdpr_banner() :: map() | false
@callback gdpr_preferences() :: map() | false
@callback max_dataclip_size_bytes() :: integer()
@callback external_metrics_module() :: module() | nil

@doc """
Expand Down Expand Up @@ -545,6 +551,10 @@ defmodule Lightning.Config do
impl().gdpr_preferences()
end

def max_dataclip_size_bytes do
impl().max_dataclip_size_bytes()
end

def external_metrics_module do
impl().external_metrics_module()
end
Expand Down
Loading