diff --git a/README.md b/README.md index 90f690ffe..098aac026 100644 --- a/README.md +++ b/README.md @@ -194,6 +194,8 @@ If you're using the default tenant, the URL is `ws://realtime-dev.localhost:4000 | BROADCAST_POOL_SIZE | number | Number of processes to relay Phoenix.PubSub messages across the cluster | | POSTGRES_CDC_SCOPE_SHARDS | number | Number of dynamic supervisor partitions used by the Postgres CDC extension. Defaults to 5. | | USERS_SCOPE_SHARDS | number | Number of dynamic supervisor partitions used by the Users extension. Defaults to 5. | +| NO_CHANNEL_TIMEOUT_IN_MS | number | Time in ms to check if a socket has no channels open and if so, disconnect it. Defaults to 10 minutes. | +| EGRESS_TELEMETRY_INTERVAL_IN_MS | number | Time in ms to send telemetry for egress. Defaults to 15 seconds. | The OpenTelemetry variables mentioned above are not an exhaustive list of all [supported environment variables](https://opentelemetry.io/docs/languages/sdk-configuration/). diff --git a/config/runtime.exs b/config/runtime.exs index a16b4be4e..0d1854623 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -73,6 +73,7 @@ websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_ users_scope_shards = Env.get_integer("USERS_SCOPE_SHARDS", 5) postgres_cdc_scope_shards = Env.get_integer("POSTGRES_CDC_SCOPE_SHARDS", 5) regional_broadcasting = Env.get_boolean("REGIONAL_BROADCASTING", false) +egress_telemetry_interval_in_ms = Env.get_integer("EGRESS_TELEMETRY_INTERVAL_IN_MS", :timer.seconds(15)) no_channel_timeout_in_ms = Env.get_integer("NO_CHANNEL_TIMEOUT_IN_MS", :timer.minutes(10)) if !(db_version in [nil, "ipv6", "ipv4"]), @@ -122,13 +123,14 @@ config :realtime, tenant_cache_expiration: tenant_cache_expiration, rpc_timeout: rpc_timeout, max_gen_rpc_clients: max_gen_rpc_clients, - no_channel_timeout_in_ms: no_channel_timeout_in_ms, platform: platform, pubsub_adapter: pubsub_adapter, broadcast_pool_size: broadcast_pool_size, users_scope_shards: users_scope_shards, postgres_cdc_scope_shards: postgres_cdc_scope_shards, - regional_broadcasting: regional_broadcasting + regional_broadcasting: regional_broadcasting, + egress_telemetry_interval_in_ms: egress_telemetry_interval_in_ms, + no_channel_timeout_in_ms: no_channel_timeout_in_ms if config_env() != :test && run_janitor? do config :realtime, diff --git a/lib/realtime/application.ex b/lib/realtime/application.ex index 45cc0271e..9148f5216 100644 --- a/lib/realtime/application.ex +++ b/lib/realtime/application.ex @@ -55,6 +55,7 @@ defmodule Realtime.Application do migration_partition_slots = Application.get_env(:realtime, :migration_partition_slots) connect_partition_slots = Application.get_env(:realtime, :connect_partition_slots) no_channel_timeout_in_ms = Application.get_env(:realtime, :no_channel_timeout_in_ms) + egress_telemetry_interval_in_ms = Application.get_env(:realtime, :egress_telemetry_interval_in_ms) children = [ @@ -94,7 +95,9 @@ defmodule Realtime.Application do strategy: :one_for_one, name: Connect.DynamicSupervisor, partitions: connect_partition_slots}, - {RealtimeWeb.RealtimeChannel.Tracker, check_interval_in_ms: no_channel_timeout_in_ms}, + {RealtimeWeb.RealtimeChannel.Tracker, + no_channel_timeout_in_ms: no_channel_timeout_in_ms, + egress_telemetry_interval_in_ms: egress_telemetry_interval_in_ms}, RealtimeWeb.Endpoint, RealtimeWeb.Presence ] ++ extensions_supervisors() ++ janitor_tasks() diff --git a/lib/realtime/monitoring/prom_ex/plugins/tenant.ex b/lib/realtime/monitoring/prom_ex/plugins/tenant.ex index b7b953ca9..ec463b1f8 100644 --- a/lib/realtime/monitoring/prom_ex/plugins/tenant.ex +++ b/lib/realtime/monitoring/prom_ex/plugins/tenant.ex @@ -23,10 +23,26 @@ defmodule Realtime.PromEx.Plugins.Tenant do channel_events(), replication_metrics(), subscription_metrics(), - payload_size_metrics() + payload_size_metrics(), + output_bytes_metrics() ] end + defp output_bytes_metrics do + Event.build( + :realtime_connections_output_bytes_metrics, + [ + last_value( + [:realtime, :connections, :output_bytes], + event_name: [:realtime, :connections, :output_bytes], + description: "The total count of output bytes for a tenant.", + measurement: :output_bytes, + tags: [:tenant] + ) + ] + ) + end + defp payload_size_metrics do Event.build( :realtime_tenant_payload_size_metrics, diff --git a/lib/realtime/telemetry/logger.ex b/lib/realtime/telemetry/logger.ex index cbc0c6cc4..790aebc2c 100644 --- a/lib/realtime/telemetry/logger.ex +++ b/lib/realtime/telemetry/logger.ex @@ -12,7 +12,8 @@ defmodule Realtime.Telemetry.Logger do [:realtime, :rate_counter, :channel, :events], [:realtime, :rate_counter, :channel, :joins], [:realtime, :rate_counter, :channel, :db_events], - [:realtime, :rate_counter, :channel, :presence_events] + [:realtime, :rate_counter, :channel, :presence_events], + [:realtime, :connections, :output_bytes] ] def start_link(args) do @@ -29,13 +30,24 @@ defmodule Realtime.Telemetry.Logger do Logs billing metrics for a tenant aggregated and emitted by a PromEx metric poller. """ + def handle_event([:realtime, :connections, :output_bytes], %{output_bytes: output_bytes}, %{tenant: tenant}, _config) do + meta = %{project: tenant} + + Logger.info( + ["Billing metrics: [:realtime, :connections, :output_bytes] output_bytes=" <> inspect(output_bytes)], + meta + ) + + :ok + end + def handle_event(event, measurements, %{tenant: tenant}, _config) do meta = %{project: tenant, measurements: measurements} Logger.info(["Billing metrics: ", inspect(event)], meta) :ok end - def handle_event(_event, _measurements, _metadata, _config) do + def handle_event(_, _, _, _config) do :ok end diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index 70a426357..8daa93d54 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -45,7 +45,7 @@ defmodule RealtimeWeb.RealtimeChannel do Process.flag(:max_heap_size, max_heap_size()) Process.flag(:fullsweep_after, @fullsweep_after) - Tracker.track(socket.transport_pid) + Tracker.track(socket.transport_pid, tenant_id) Logger.metadata(external_id: tenant_id, project: tenant_id) Logger.put_process_level(self(), log_level) @@ -499,10 +499,10 @@ defmodule RealtimeWeb.RealtimeChannel do end @impl true - def terminate(reason, %{transport_pid: transport_pid}) do + def terminate(reason, %{transport_pid: transport_pid, assigns: %{tenant: tenant_id}}) do Logger.debug("Channel terminated with reason: #{reason}") :telemetry.execute([:prom_ex, :plugin, :realtime, :disconnected], %{}) - Tracker.untrack(transport_pid) + Tracker.untrack(transport_pid, tenant_id) :ok end diff --git a/lib/realtime_web/channels/realtime_channel/tracker.ex b/lib/realtime_web/channels/realtime_channel/tracker.ex index 1dff3136e..59531db0e 100644 --- a/lib/realtime_web/channels/realtime_channel/tracker.ex +++ b/lib/realtime_web/channels/realtime_channel/tracker.ex @@ -8,29 +8,41 @@ defmodule RealtimeWeb.RealtimeChannel.Tracker do """ use GenServer require Logger + alias Realtime.Helpers + + defstruct [ + :no_channel_timeout_in_ms, + :no_channel_timeout_ref, + :egress_telemetry_interval_in_ms, + :egress_telemetry_ref + ] @table :channel_tracker - @zero_count_match [{{:"$1", :"$2"}, [{:"=<", :"$2", 0}], [:"$1"]}] - @zero_count_delete [{{:"$1", :"$2"}, [{:"=<", :"$2", 0}], [true]}] + @zero_count_match [{{{:"$1", :_}, :"$2"}, [{:"=<", :"$2", 0}], [:"$1"]}] + @zero_count_delete [{{{:"$1", :_}, :"$2"}, [{:"=<", :"$2", 0}], [true]}] + @egress_telemetry_match [{{{:"$1", :"$2"}, :"$3"}, [{:>, :"$3", 0}], [[:"$1", :"$2"]]}] + @doc """ Tracks a transport pid. """ - @spec track(pid()) :: integer() - def track(pid), do: :ets.update_counter(@table, pid, 1, {pid, 0}) + @spec track(pid(), binary()) :: integer() + def track(pid, tenant_external_id), + do: :ets.update_counter(@table, {pid, tenant_external_id}, 1, {{pid, tenant_external_id}, 0}) @doc """ Un-tracks a transport pid. """ - @spec untrack(pid()) :: integer() - def untrack(pid), do: :ets.update_counter(@table, pid, -1, {pid, 0}) + @spec untrack(pid(), binary()) :: integer() + def untrack(pid, tenant_external_id), + do: :ets.update_counter(@table, {pid, tenant_external_id}, -1, {{pid, tenant_external_id}, 0}) @doc """ Returns the number of channels open for a transport pid. """ - @spec count(pid()) :: integer() - def count(pid) do - case :ets.lookup(@table, pid) do - [{^pid, count}] -> count + @spec count(pid(), binary()) :: integer() + def count(pid, tenant_external_id) do + case :ets.lookup(@table, {pid, tenant_external_id}) do + [{{^pid, ^tenant_external_id}, count}] -> count [] -> 0 end end @@ -57,17 +69,54 @@ defmodule RealtimeWeb.RealtimeChannel.Tracker do @impl true def init(opts) do - check_interval_in_ms = Keyword.fetch!(opts, :check_interval_in_ms) - Process.send_after(self(), :check_channels, check_interval_in_ms) - {:ok, %{check_interval_in_ms: check_interval_in_ms}} + no_channel_timeout_in_ms = Keyword.fetch!(opts, :no_channel_timeout_in_ms) + no_channel_timeout_ref = Process.send_after(self(), :check_channels, no_channel_timeout_in_ms) + egress_telemetry_interval_in_ms = Keyword.fetch!(opts, :egress_telemetry_interval_in_ms) + egress_telemetry_ref = Process.send_after(self(), :send_egress_telemetry, egress_telemetry_interval_in_ms) + + {:ok, + %{ + no_channel_timeout_in_ms: no_channel_timeout_in_ms, + no_channel_timeout_ref: no_channel_timeout_ref, + egress_telemetry_interval_in_ms: egress_telemetry_interval_in_ms, + egress_telemetry_ref: egress_telemetry_ref + }} end @impl true def handle_info(:check_channels, state) do + %{no_channel_timeout_ref: no_channel_timeout_ref, no_channel_timeout_in_ms: no_channel_timeout_in_ms} = state chunked_killing() :ets.select_delete(@table, @zero_count_delete) - Process.send_after(self(), :check_channels, state.check_interval_in_ms) - {:noreply, state} + Helpers.cancel_timer(no_channel_timeout_ref) + no_channel_timeout_ref = Process.send_after(self(), :check_channels, no_channel_timeout_in_ms) + {:noreply, %{state | no_channel_timeout_ref: no_channel_timeout_ref}} + end + + def handle_info(:send_egress_telemetry, state) do + %{egress_telemetry_ref: egress_telemetry_ref, egress_telemetry_interval_in_ms: egress_telemetry_interval_in_ms} = + state + + Port.list() + |> Enum.flat_map(fn port -> + case Port.info(port, :links) do + {:links, pids} -> Enum.map(pids, fn pid -> {pid, port} end) + _ -> [] + end + end) + |> Enum.group_by(fn {pid, _} -> pid end, fn {_, port} -> port end) + |> collect_egress_telemetry() + |> Enum.each(fn {tenant_external_id, output_bytes} -> + if output_bytes > 0 do + :telemetry.execute([:realtime, :connections, :output_bytes], %{output_bytes: output_bytes}, %{ + tenant_external_id: tenant_external_id + }) + end + end) + + Helpers.cancel_timer(egress_telemetry_ref) + egress_telemetry_ref = Process.send_after(self(), :send_egress_telemetry, egress_telemetry_interval_in_ms) + {:noreply, %{state | egress_telemetry_ref: egress_telemetry_ref}} end defp chunked_killing(cont \\ nil) do @@ -84,5 +133,38 @@ defmodule RealtimeWeb.RealtimeChannel.Tracker do end end + defp collect_egress_telemetry(pid_and_port_list, cont \\ nil, acc \\ %{}) do + result = if cont, do: :ets.select(cont), else: :ets.select(@table, @egress_telemetry_match, 1000) + + case result do + :"$end_of_table" -> + acc + + {pids, cont} -> + acc = + Enum.reduce(pids, acc, fn [pid, tenant_external_id], acc -> + ports = Map.get(pid_and_port_list, pid, []) + + output_bytes = + Enum.sum_by(ports, fn port -> + case :inet.getstat(port, [:send_oct]) do + {:ok, stats} -> stats[:send_oct] + _ -> 0 + end + end) + + {_, acc} = + Map.get_and_update(acc, tenant_external_id, fn + nil -> {output_bytes, output_bytes} + output_bytes -> {output_bytes, output_bytes + output_bytes} + end) + + acc + end) + + collect_egress_telemetry(pid_and_port_list, cont, acc) + end + end + def table_name, do: @table end diff --git a/mix.exs b/mix.exs index 25064929b..5b1ba6578 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.61.0", + version: "2.61.2", elixir: "~> 1.18", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs index 5c64bff0c..e5916950d 100644 --- a/test/integration/rt_channel_test.exs +++ b/test/integration/rt_channel_test.exs @@ -2295,7 +2295,7 @@ defmodule Realtime.Integration.RtChannelTest do assert_receive %Message{topic: ^topic, event: "phx_close"}, 500 end - start_supervised!({Tracker, check_interval_in_ms: 100}) + start_supervised!({Tracker, no_channel_timeout_in_ms: 100}) # wait to trigger tracker assert_process_down(socket, 1000) assert [] = Tracker.list_pids() diff --git a/test/realtime/monitoring/prom_ex/plugins/tenant_test.exs b/test/realtime/monitoring/prom_ex/plugins/tenant_test.exs index 7ba18f29e..2d0056069 100644 --- a/test/realtime/monitoring/prom_ex/plugins/tenant_test.exs +++ b/test/realtime/monitoring/prom_ex/plugins/tenant_test.exs @@ -63,6 +63,14 @@ defmodule Realtime.PromEx.Plugins.TenantTest do %{tenant: external_id} ) end + + def fake_output_bytes(external_id) do + Realtime.Telemetry.execute( + [:realtime, :connections, :output_bytes], + %{output_bytes: 100}, + %{tenant: external_id} + ) + end end end) @@ -316,6 +324,16 @@ defmodule Realtime.PromEx.Plugins.TenantTest do assert metric_value(bucket_pattern) > 0 end + + test "egress bytes metric exists after check", context do + external_id = context.tenant.external_id + pattern = ~r/realtime_connections_output_bytes{tenant="#{external_id}"}\s(?\d+)/ + + FakeUserCounter.fake_output_bytes(external_id) + + Process.sleep(200) + assert metric_value(pattern) == 100 + end end defp metric_value(pattern) do diff --git a/test/realtime/telemetry/logger_test.exs b/test/realtime/telemetry/logger_test.exs index 640cfc7e2..d149a788f 100644 --- a/test/realtime/telemetry/logger_test.exs +++ b/test/realtime/telemetry/logger_test.exs @@ -13,9 +13,24 @@ defmodule Realtime.Telemetry.LoggerTest do test "logs on telemetry event" do start_link_supervised!({TelemetryLogger, handler_id: "telemetry-logger-test"}) - assert capture_log(fn -> - :telemetry.execute([:realtime, :connections], %{count: 1}, %{tenant: "tenant"}) - end) =~ "Billing metrics: [:realtime, :connections]" + log = + capture_log(fn -> + :telemetry.execute([:realtime, :connections], %{count: 1}, %{tenant: "tenant"}) + :telemetry.execute([:realtime, :rate_counter, :channel, :events], %{count: 1}, %{tenant: "tenant"}) + :telemetry.execute([:realtime, :rate_counter, :channel, :joins], %{count: 1}, %{tenant: "tenant"}) + :telemetry.execute([:realtime, :rate_counter, :channel, :db_events], %{count: 1}, %{tenant: "tenant"}) + :telemetry.execute([:realtime, :rate_counter, :channel, :presence_events], %{count: 1}, %{tenant: "tenant"}) + :telemetry.execute([:realtime, :connections, :output_bytes], %{output_bytes: 100}, %{tenant: "tenant"}) + end) + |> IO.inspect(label: "log") + + assert log =~ "project=tenant" + assert log =~ "Billing metrics: [:realtime, :connections]" + assert log =~ "Billing metrics: [:realtime, :rate_counter, :channel, :events]" + assert log =~ "Billing metrics: [:realtime, :rate_counter, :channel, :joins]" + assert log =~ "Billing metrics: [:realtime, :rate_counter, :channel, :db_events]" + assert log =~ "Billing metrics: [:realtime, :rate_counter, :channel, :presence_events]" + assert log =~ "Billing metrics: [:realtime, :connections, :output_bytes] output_bytes=100" end test "ignores events without tenant" do diff --git a/test/realtime_web/channels/realtime_channel/tracker_test.exs b/test/realtime_web/channels/realtime_channel/tracker_test.exs index 7137256c1..5d8c9798e 100644 --- a/test/realtime_web/channels/realtime_channel/tracker_test.exs +++ b/test/realtime_web/channels/realtime_channel/tracker_test.exs @@ -5,70 +5,137 @@ defmodule RealtimeWeb.RealtimeChannel.TrackerTest do alias RealtimeWeb.RealtimeChannel.Tracker setup do - start_supervised!({Tracker, check_interval_in_ms: 100}) + start_supervised!({Tracker, no_channel_timeout_in_ms: 50, egress_telemetry_interval_in_ms: 50}) :ets.delete_all_objects(Tracker.table_name()) - :ok + tenant = random_string() + %{tenant: tenant} end describe "track/2" do - test "is able to track channels per transport pid" do + test "is able to track channels per transport pid", %{tenant: tenant} do pid = self() - Tracker.track(pid) + Tracker.track(pid, tenant) - assert Tracker.count(pid) == 1 + assert Tracker.count(pid, tenant) == 1 end - test "is able to track multiple channels per transport pid" do + test "is able to track multiple channels per transport pid", %{tenant: tenant} do pid = self() - Tracker.track(pid) - Tracker.track(pid) + Tracker.track(pid, tenant) + Tracker.track(pid, tenant) - assert Tracker.count(pid) == 2 + assert Tracker.count(pid, tenant) == 2 end end - describe "untrack/1" do - test "is able to untrack a transport pid" do + describe "untrack/2" do + test "is able to untrack a transport pid", %{tenant: tenant} do pid = self() - Tracker.track(pid) - Tracker.untrack(pid) - assert Tracker.count(pid) == 0 + Tracker.track(pid, tenant) + Tracker.untrack(pid, tenant) + + assert Tracker.count(pid, tenant) == 0 end end - describe "count/1" do - test "is able to count the number of channels per transport pid" do + describe "count/2" do + test "is able to count the number of channels per transport pid", %{tenant: tenant} do pid = self() - Tracker.track(pid) - Tracker.track(pid) + Tracker.track(pid, tenant) + Tracker.track(pid, tenant) - assert Tracker.count(pid) == 2 + assert Tracker.count(pid, tenant) == 2 + assert Tracker.count(pid, "other_tenant_external_id") == 0 end end - describe "list_pids/0" do - test "is able to list all pids in the table and their count" do + describe "list_pids/1" do + test "is able to list all pids in the table and their count", %{tenant: tenant} do pid = self() - Tracker.track(pid) - Tracker.track(pid) + Tracker.track(pid, tenant) + Tracker.track(pid, tenant) + + assert Tracker.list_pids() == [{{pid, tenant}, 2}] + end + end + + def handle_telemetry(event, metadata, content, pid: pid), do: send(pid, {event, metadata, content}) + + describe "egress telemetry" do + setup do + event = [:realtime, :connections, :output_bytes] + test_pid = self() + :telemetry.attach(__MODULE__, event, &__MODULE__.handle_telemetry/4, pid: test_pid) + + on_exit(fn -> :telemetry.detach(__MODULE__) end) + + %{event: event} + end + + test "emits telemetry with output bytes for transport pids with TCP sockets", %{event: event, tenant: tenant} do + {:ok, listen_socket} = :gen_tcp.listen(0, [:binary, active: false, reuseaddr: true]) + {:ok, {_, port}} = :inet.sockname(listen_socket) + spawn_send_random_bytes(port, tenant) + + assert_receive {^event, %{output_bytes: output_bytes}, %{tenant_external_id: ^tenant}}, 200 + assert output_bytes == 5000 + + :ok = :gen_tcp.close(listen_socket) + end - assert Tracker.list_pids() == [{pid, 2}] + test "does not emit telemetry for transport pids with no TCP sockets", %{event: event, tenant: tenant} do + pid = spawn(fn -> :timer.sleep(:infinity) end) + Tracker.track(pid, tenant) + refute_receive {^event, _, %{tenant_external_id: ^tenant}}, 1000 + end + + test "does not emit telemetry for transport pids with no output bytes", %{event: event, tenant: tenant} do + {:ok, listen_socket} = :gen_tcp.listen(0, [:binary, active: false, reuseaddr: true]) + {:ok, {_, port}} = :inet.sockname(listen_socket) + spawn_send_random_bytes(port, tenant, 0) + refute_receive {^event, _, %{tenant_external_id: ^tenant}}, 1000 + end + + test "emits telemetry for multiple tenants and accumulates output bytes", %{event: event} do + tenant_1 = random_string() + tenant_2 = random_string() + {:ok, listen_socket} = :gen_tcp.listen(0, [:binary, active: false, reuseaddr: true]) + {:ok, {_, port}} = :inet.sockname(listen_socket) + + spawn_send_random_bytes(port, tenant_1) + spawn_send_random_bytes(port, tenant_2) + + assert_receive {^event, %{output_bytes: output_bytes_1}, %{tenant_external_id: ^tenant_1}}, 500 + assert_receive {^event, %{output_bytes: output_bytes_2}, %{tenant_external_id: ^tenant_2}}, 500 + + assert output_bytes_1 == 5000 + assert output_bytes_2 == 5000 + + spawn_send_random_bytes(port, tenant_1) + spawn_send_random_bytes(port, tenant_2) + + assert_receive {^event, %{output_bytes: output_bytes_1}, %{tenant_external_id: ^tenant_1}}, 500 + assert_receive {^event, %{output_bytes: output_bytes_2}, %{tenant_external_id: ^tenant_2}}, 500 + + assert output_bytes_1 == 10_000 + assert output_bytes_2 == 10_000 + + :ok = :gen_tcp.close(listen_socket) end end - test "kills tracked pid when no channels are open" do + test "kills tracked pid when no channels are open", %{tenant: tenant} do assert Tracker.table_name() |> :ets.tab2list() |> length() == 0 pids = for _ <- 1..10_500 do pid = spawn(fn -> :timer.sleep(:infinity) end) - Tracker.track(pid) - Tracker.untrack(pid) + Tracker.track(pid, tenant) + Tracker.untrack(pid, tenant) - # Check random negative numbers - Enum.random([true, false]) && Tracker.untrack(pid) + Enum.random([true, false]) && Tracker.untrack(pid, tenant) pid end @@ -77,4 +144,13 @@ defmodule RealtimeWeb.RealtimeChannel.TrackerTest do for pid <- pids, do: refute(Process.alive?(pid)) assert Tracker.table_name() |> :ets.tab2list() |> length() == 0 end + + defp spawn_send_random_bytes(port, tenant, payload_size \\ 250) do + spawn(fn -> + {:ok, socket} = :gen_tcp.connect(:localhost, port, [:binary, active: false]) + Tracker.track(self(), tenant) + for _ <- 1..20, do: :gen_tcp.send(socket, :crypto.strong_rand_bytes(payload_size)) + :timer.sleep(:infinity) + end) + end end