Skip to content

Commit 9d0fb4d

Browse files
committed
fix: Add specific error for increased message rates
Add specific error for increased message rate errors but also refactored some logging approach to start simplifying that code
1 parent ef18c27 commit 9d0fb4d

File tree

10 files changed

+115
-125
lines changed

10 files changed

+115
-125
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ This is the list of operational codes that can help you understand your deployme
223223
| ChannelRateLimitReached | The number of channels you can create has reached its limit |
224224
| ConnectionRateLimitReached | The number of connected clients as reached its limit |
225225
| ClientJoinRateLimitReached | The rate of joins per second from your clients as reached the channel limits |
226+
| MessagePerSecondRateLimitReached | The rate of messages per second from your clients as reached the channel limits |
226227
| RealtimeDisabledForTenant | Realtime has been disabled for the tenant |
227228
| UnableToConnectToTenantDatabase | Realtime was not able to connect to the tenant's database |
228229
| DatabaseLackOfConnections | Realtime was not able to connect to the tenant's database due to not having enough available connections |

config/config.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ config :tailwind,
5151
# Configures Elixir's Logger
5252
config :logger, :console,
5353
format: "$time $metadata[$level] $message\n",
54-
metadata: [:request_id, :project, :external_id, :application_name, :error_code]
54+
metadata: [:request_id, :project, :external_id, :application_name, :error_code, :sub, :exp, :iss]
5555

5656
# Use Jason for JSON parsing in Phoenix
5757
config :phoenix, :json_library, Jason

config/test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ config :logger,
4848
# Configures Elixir's Logger
4949
config :logger, :console,
5050
format: "$time $metadata[$level] $message\n",
51-
metadata: [:request_id, :project, :external_id, :application_name, :sub, :exp, :iss]
51+
metadata: [:request_id, :project, :external_id, :application_name, :error_code, :sub, :exp, :iss]
5252

5353
config :opentelemetry,
5454
span_processor: :simple,

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 36 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ defmodule RealtimeWeb.RealtimeChannel do
2929
@confirm_token_ms_interval :timer.minutes(5)
3030

3131
@impl true
32-
def join("realtime:", _params, _socket) do
33-
Logging.log_error_message(:error, "TopicNameRequired", "You must provide a topic name")
32+
def join("realtime:", _params, socket) do
33+
Logging.log_error_message(:error, "TopicNameRequired", "You must provide a topic name", socket)
3434
end
3535

3636
def join("realtime:" <> sub_topic = topic, params, socket) do
@@ -54,9 +54,9 @@ defmodule RealtimeWeb.RealtimeChannel do
5454

5555
with :ok <- SignalHandler.shutdown_in_progress?(),
5656
:ok <- only_private?(tenant_id, socket),
57-
:ok <- limit_joins(socket.assigns),
57+
:ok <- limit_joins(socket),
5858
:ok <- limit_channels(socket),
59-
:ok <- limit_max_users(socket.assigns),
59+
:ok <- limit_max_users(socket),
6060
:ok <- start_db_rate_counter(tenant_id),
6161
{:ok, claims, confirm_token_ref, access_token, _} <- confirm_token(socket),
6262
{:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id),
@@ -113,109 +113,84 @@ defmodule RealtimeWeb.RealtimeChannel do
113113
{:ok, state, assign(socket, assigns)}
114114
else
115115
{:error, :expired_token, msg} ->
116-
Logging.log_error_with_token_metadata("InvalidJWTToken", msg, socket.assigns.access_token)
116+
Logging.log_error_with_token_metadata("InvalidJWTToken", msg, socket)
117117

118118
{:error, :missing_claims} ->
119119
msg = "Fields `role` and `exp` are required in JWT"
120-
Logging.log_error_with_token_metadata("InvalidJWTToken", msg, socket.assigns.access_token)
120+
Logging.log_error_with_token_metadata("InvalidJWTToken", msg, socket)
121121

122122
{:error, :unauthorized, msg} ->
123-
Logging.log_error_message(:error, "Unauthorized", msg)
123+
Logging.log_error_message(:error, "Unauthorized", msg, socket)
124124

125125
{:error, :too_many_channels} ->
126126
msg = "Too many channels"
127-
Logging.log_error_message(:error, "ChannelRateLimitReached", msg)
127+
Logging.log_error_message(:error, "ChannelRateLimitReached", msg, socket)
128128

129129
{:error, :too_many_connections} ->
130130
msg = "Too many connected users"
131-
Logging.log_error_message(:error, "ConnectionRateLimitReached", msg)
131+
Logging.log_error_message(:error, "ConnectionRateLimitReached", msg, socket)
132132

133133
{:error, :too_many_joins} ->
134134
msg = "Too many joins per second"
135-
Logging.log_error_message(:error, "ClientJoinRateLimitReached", msg)
135+
Logging.log_error_message(:error, "ClientJoinRateLimitReached", msg, socket)
136136

137137
{:error, :increase_connection_pool} ->
138138
msg = "Please increase your connection pool size"
139-
Logging.log_error_message(:error, "IncreaseConnectionPool", msg)
139+
Logging.log_error_message(:error, "IncreaseConnectionPool", msg, socket)
140140

141141
{:error, :tenant_db_too_many_connections} ->
142142
msg = "Database can't accept more connections, Realtime won't connect"
143-
Logging.log_error_message(:error, "DatabaseLackOfConnections", msg)
143+
Logging.log_error_message(:error, "DatabaseLackOfConnections", msg, socket)
144144

145145
{:error, :unable_to_set_policies, error} ->
146-
Logging.log_error_message(:error, "UnableToSetPolicies", error)
146+
Logging.log_error_message(:error, "UnableToSetPolicies", error, socket)
147147
{:error, %{reason: "Realtime was unable to connect to the project database"}}
148148

149149
{:error, :tenant_database_unavailable} ->
150150
Logging.log_error_message(
151151
:error,
152152
"UnableToConnectToProject",
153-
"Realtime was unable to connect to the project database"
153+
"Realtime was unable to connect to the project database",
154+
socket
154155
)
155156

156157
{:error, :rpc_error, :timeout} ->
157-
Logging.log_error_message(:error, "TimeoutOnRpcCall", "Node request timeout")
158+
Logging.log_error_message(:error, "TimeoutOnRpcCall", "Node request timeout", socket)
158159

159160
{:error, :rpc_error, reason} ->
160-
Logging.log_error_message(:error, "ErrorOnRpcCall", "RPC call error: " <> inspect(reason))
161+
Logging.log_error_message(:error, "ErrorOnRpcCall", "RPC call error: " <> inspect(reason), socket)
161162

162163
{:error, :initializing} ->
163-
Logging.log_error_message(
164-
:error,
165-
"InitializingProjectConnection",
166-
"Realtime is initializing the project connection"
167-
)
164+
msg = "Realtime is initializing the project connection"
165+
Logging.log_error_message(:error, "InitializingProjectConnection", msg, socket)
168166

169167
{:error, :tenant_database_connection_initializing} ->
170-
Logging.log_error_message(
171-
:error,
172-
"InitializingProjectConnection",
173-
"Connecting to the project database"
174-
)
168+
msg = "Connecting to the project database"
169+
Logging.log_error_message(:error, "InitializingProjectConnection", msg, socket)
175170

176171
{:error, :token_malformed, msg} ->
177-
Logging.log_error_message(:error, "MalformedJWT", msg)
172+
Logging.log_error_message(:error, "MalformedJWT", msg, socket)
178173

179174
{:error, invalid_exp} when is_integer(invalid_exp) and invalid_exp <= 0 ->
180-
Logging.log_error_with_token_metadata(
181-
"InvalidJWTToken",
182-
"Token expiration time is invalid",
183-
socket.assigns.access_token
184-
)
175+
Logging.log_error_with_token_metadata("InvalidJWTToken", "Token expiration time is invalid", socket)
185176

186177
{:error, :private_only} ->
187-
Logging.log_error_message(
188-
:error,
189-
"PrivateOnly",
190-
"This project only allows private channels"
191-
)
178+
Logging.log_error_message(:error, "PrivateOnly", "This project only allows private channels", socket)
192179

193180
{:error, :tenant_not_found} ->
194-
Logging.log_error_message(
195-
:error,
196-
"TenantNotFound",
197-
"Tenant with the given ID does not exist"
198-
)
181+
Logging.log_error_message(:error, "TenantNotFound", "Tenant with the given ID does not exist", socket)
199182

200183
{:error, :tenant_suspended} ->
201-
Logging.log_error_message(
202-
:error,
203-
"RealtimeDisabledForTenant",
204-
"Realtime disabled for this tenant"
205-
)
184+
Logging.log_error_message(:error, "RealtimeDisabledForTenant", "Realtime disabled for this tenant", socket)
206185

207186
{:error, :signature_error} ->
208-
Logging.log_error_message(:error, "JwtSignatureError", "Failed to validate JWT signature")
187+
Logging.log_error_message(:error, "JwtSignatureError", "Failed to validate JWT signature", socket)
209188

210189
{:error, :shutdown_in_progress} ->
211-
Logging.log_error_message(
212-
:error,
213-
"RealtimeRestarting",
214-
"Realtime is restarting, please standby"
215-
)
190+
Logging.log_error_message(:error, "RealtimeRestarting", "Realtime is restarting, please standby", socket)
216191

217192
{:error, error} ->
218-
Logging.log_error_message(:error, "UnknownErrorOnChannel", error)
193+
Logging.log_error_message(:error, "UnknownErrorOnChannel", error, socket)
219194
{:error, %{reason: "Unknown Error on Channel"}}
220195
end
221196
end
@@ -227,7 +202,7 @@ defmodule RealtimeWeb.RealtimeChannel do
227202
)
228203
when avg > max do
229204
message = "Too many messages per second"
230-
205+
Logging.log_error_message(:warning, "MessagePerSecondRateLimitReached", message, socket)
231206
shutdown_response(socket, message)
232207
end
233208

@@ -379,6 +354,7 @@ defmodule RealtimeWeb.RealtimeChannel do
379354
def handle_in(_, _, %{assigns: %{rate_counter: %{avg: avg}, limits: %{max_events_per_second: max}}} = socket)
380355
when avg > max do
381356
message = "Too many messages per second"
357+
Logging.log_error_message(:error, "MessagePerSecondRateLimitReached", message, socket)
382358

383359
shutdown_response(socket, message)
384360
end
@@ -478,7 +454,7 @@ defmodule RealtimeWeb.RealtimeChannel do
478454
wait
479455
end
480456

481-
def limit_joins(%{tenant: tenant, limits: limits}) do
457+
def limit_joins(%{assigns: %{tenant: tenant, limits: limits}} = socket) do
482458
id = Tenants.joins_per_second_key(tenant)
483459
GenCounter.new(id)
484460

@@ -501,7 +477,7 @@ defmodule RealtimeWeb.RealtimeChannel do
501477
{:error, :too_many_joins}
502478

503479
error ->
504-
Logging.log_error_message(:error, "UnknownErrorOnCounter", error)
480+
Logging.log_error_message(:error, "UnknownErrorOnCounter", error, socket)
505481
{:error, error}
506482
end
507483
end
@@ -517,7 +493,7 @@ defmodule RealtimeWeb.RealtimeChannel do
517493
end
518494
end
519495

520-
defp limit_max_users(%{limits: %{max_concurrent_users: max_conn_users}, tenant: tenant}) do
496+
defp limit_max_users(%{assigns: %{limits: %{max_concurrent_users: max_conn_users}, tenant: tenant}}) do
521497
conns = Realtime.UsersCounter.tenant_users(tenant)
522498

523499
if conns < max_conn_users,
@@ -645,10 +621,9 @@ defmodule RealtimeWeb.RealtimeChannel do
645621
end
646622

647623
defp shutdown_response(socket, message) when is_binary(message) do
648-
%{assigns: %{channel_name: channel_name, access_token: access_token}} = socket
649-
metadata = log_metadata(access_token)
624+
%{assigns: %{channel_name: channel_name}} = socket
650625
push_system_message("system", socket, "error", message, channel_name)
651-
log_warning("ChannelShutdown", message, metadata)
626+
Logging.log_warning_with_token_metadata("ChannelShutdown", message, socket)
652627
{:stop, :normal, socket}
653628
end
654629

@@ -820,17 +795,4 @@ defmodule RealtimeWeb.RealtimeChannel do
820795
do: {:error, :private_only},
821796
else: :ok
822797
end
823-
824-
defp log_metadata(access_token) do
825-
access_token
826-
|> Joken.peek_claims()
827-
|> then(fn
828-
{:ok, claims} -> Map.get(claims, "sub")
829-
_ -> nil
830-
end)
831-
|> then(fn
832-
nil -> []
833-
sub -> [sub: sub]
834-
end)
835-
end
836798
end

lib/realtime_web/channels/realtime_channel/logging.ex

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,12 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do
4343
@spec log_error_with_token_metadata(
4444
code :: binary(),
4545
msg :: binary(),
46-
token :: Joken.bearer_token(),
47-
metadata :: keyword()
46+
socket :: Phoenix.Socket.t()
4847
) :: {:error, %{reason: binary()}}
49-
def log_error_with_token_metadata(code, msg, token, metadata \\ []) do
50-
if metadata == [], do: Logger.metadata()
51-
metadata = update_metadata_with_token_claims(metadata, token)
52-
log_error_message(:error, code, msg, metadata)
48+
def log_error_with_token_metadata(code, msg, %{assigns: %{access_token: access_token, tenant: tenant_id}} = socket) do
49+
Logger.metadata(external_id: tenant_id, project: tenant_id)
50+
update_metadata_with_token_claims(access_token)
51+
log_error_message(:error, code, msg, socket)
5352
end
5453

5554
@doc """
@@ -58,25 +57,25 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do
5857
@spec log_warning_with_token_metadata(
5958
code :: binary(),
6059
msg :: binary(),
61-
token :: Joken.bearer_token(),
62-
metadata :: keyword()
60+
socket :: Phoenix.Socket.t()
6361
) :: {:error, %{reason: binary()}}
64-
def log_warning_with_token_metadata(code, msg, token, metadata \\ []) do
65-
if metadata == [], do: Logger.metadata()
66-
metadata = update_metadata_with_token_claims(metadata, token)
67-
log_error_message(:warning, code, msg, metadata)
62+
def log_warning_with_token_metadata(code, msg, %{assigns: %{access_token: access_token, tenant: tenant_id}} = socket) do
63+
Logger.metadata(external_id: tenant_id, project: tenant_id)
64+
update_metadata_with_token_claims(access_token)
65+
log_error_message(:error, code, msg, socket)
6866
end
6967

70-
defp update_metadata_with_token_claims(metadata, token) do
68+
defp update_metadata_with_token_claims(token) do
7169
case Joken.peek_claims(token) do
7270
{:ok, claims} ->
7371
sub = Map.get(claims, "sub")
7472
exp = Map.get(claims, "exp")
7573
iss = Map.get(claims, "iss")
76-
metadata ++ [sub: sub, exp: exp, iss: iss]
74+
75+
Logger.metadata(sub: sub, exp: exp, iss: iss)
7776

7877
_ ->
79-
metadata
78+
nil
8079
end
8180
end
8281

@@ -122,19 +121,22 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do
122121
level :: :error | :warning,
123122
code :: binary(),
124123
error :: term(),
125-
keyword()
124+
socket :: Phoenix.Socket.t()
126125
) :: {:error, %{reason: binary()}}
127-
def log_error_message(level, code, error, metadata \\ [])
126+
def log_error_message(level, code, error, socket)
128127

129-
def log_error_message(:error, code, error, metadata) do
128+
def log_error_message(:error, code, error, %{assigns: %{tenant: tenant_id}}) do
130129
if code in system_errors(), do: Telemetry.execute([:realtime, :channel, :error], %{code: code}, %{code: code})
130+
Logger.metadata(external_id: tenant_id, project: tenant_id)
131131

132-
log_error(code, error, metadata)
132+
log_error(code, error, Logger.metadata())
133133
{:error, %{reason: error}}
134134
end
135135

136-
def log_error_message(:warning, code, error, metadata) do
137-
log_warning(code, error, metadata)
136+
def log_error_message(:warning, code, error, %{assigns: %{tenant: tenant_id}}) do
137+
Logger.metadata(external_id: tenant_id, project: tenant_id)
138+
139+
log_warning(code, error, Logger.metadata())
138140
{:error, %{reason: error}}
139141
end
140142
end

lib/realtime_web/channels/user_socket.ex

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@ defmodule RealtimeWeb.UserSocket do
2828
%{uri: %{host: host}, x_headers: headers} = opts
2929

3030
{:ok, external_id} = Database.get_external_id(host)
31+
token = access_token(params, headers)
32+
socket = socket |> assign(:access_token, token) |> assign(:tenant, external_id)
33+
3134
Logger.metadata(external_id: external_id, project: external_id)
3235
Logger.put_process_level(self(), :error)
3336

34-
token = access_token(params, headers)
35-
3637
with %Tenant{
3738
extensions: extensions,
3839
jwt_secret: jwt_secret,
@@ -68,19 +69,20 @@ defmodule RealtimeWeb.UserSocket do
6869
}
6970

7071
assigns = Map.from_struct(assigns)
71-
72-
{:ok, assign(socket, assigns)}
72+
socket = assign(socket, assigns)
73+
{:ok, socket}
7374
else
7475
nil ->
7576
log_error("TenantNotFound", "Tenant not found: #{external_id}")
7677
{:error, :tenant_not_found}
7778

7879
{:error, :expired_token, msg} ->
79-
Logging.log_error_with_token_metadata("InvalidJWTToken", msg, token)
80+
Logging.log_error_with_token_metadata("InvalidJWTToken", msg, socket)
8081
{:error, :expired_token}
8182

8283
{:error, :missing_claims} ->
83-
Logging.log_error_with_token_metadata("InvalidJWTToken", "Fields `role` and `exp` are required in JWT", token)
84+
msg = "Fields `role` and `exp` are required in JWT"
85+
Logging.log_error_with_token_metadata("InvalidJWTToken", msg, socket)
8486
{:error, :missing_claims}
8587

8688
{:error, :token_malformed} ->

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.40.2",
7+
version: "2.40.3",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

0 commit comments

Comments
 (0)