Skip to content

Commit c417929

Browse files
committed
Merge branch 'fix-warnings-for-1.4'
2 parents 4d85222 + da283cd commit c417929

File tree

9 files changed

+112
-94
lines changed

9 files changed

+112
-94
lines changed

lib/nsq/connection.ex

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,11 @@ defmodule NSQ.Connection do
143143

144144

145145
@spec handle_cast(:reconnect, state) :: {:noreply, state}
146+
def handle_cast(:reconnect, %{connect_attempts: connect_attempts} = conn_state) when connect_attempts > 0 do
147+
{_, conn_state} = Initializer.connect(conn_state)
148+
{:noreply, conn_state}
149+
end
146150
def handle_cast(:reconnect, conn_state) do
147-
if conn_state.connect_attempts > 0 do
148-
{_, conn_state} = Initializer.connect(conn_state)
149-
end
150151
{:noreply, conn_state}
151152
end
152153

lib/nsq/connection/buffer.ex

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,12 @@ defmodule NSQ.Connection.Buffer do
208208
end
209209
end
210210

211-
if is_list(decompressed) do
212-
decompressed = decompressed |> List.flatten |> Enum.join("")
213-
end
211+
decompressed =
212+
if is_list(decompressed) do
213+
decompressed |> List.flatten |> Enum.join("")
214+
else
215+
decompressed
216+
end
214217

215218
combined_buffer = state.buffered_data <> decompressed
216219

lib/nsq/connection/initializer.ex

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -134,26 +134,27 @@ defmodule NSQ.Connection.Initializer do
134134
end
135135

136136
# respect negotiated msg_timeout
137-
if parsed["msg_timeout"] do
138-
conn_state = %{conn_state | msg_timeout: parsed["msg_timeout"]}
139-
else
140-
conn_state = %{conn_state | msg_timeout: conn_state.config.msg_timeout}
141-
end
137+
timeout = parsed["msg_timeout"] || conn_state.config.msg_timeout
138+
conn_state = %{conn_state | msg_timeout: timeout}
142139

143140
# wrap our socket with SSL if TLS is enabled
144-
if parsed["tls_v1"] == true do
145-
Logger.debug "Upgrading to TLS..."
146-
socket = Socket.SSL.connect! conn_state.socket, [
147-
cacertfile: conn_state.config.tls_cert,
148-
keyfile: conn_state.config.tls_key,
149-
versions: ssl_versions(conn_state.config.tls_min_version),
150-
verify: ssl_verify_atom(conn_state.config),
151-
]
152-
conn_state = %{conn_state | socket: socket}
153-
conn_state.reader |> Buffer.setup_socket(socket, conn_state.config.read_timeout)
154-
conn_state.writer |> Buffer.setup_socket(socket, conn_state.config.read_timeout)
155-
conn_state |> wait_for_ok!
156-
end
141+
conn_state =
142+
if parsed["tls_v1"] == true do
143+
Logger.debug "Upgrading to TLS..."
144+
socket = Socket.SSL.connect! conn_state.socket, [
145+
cacertfile: conn_state.config.tls_cert,
146+
keyfile: conn_state.config.tls_key,
147+
versions: ssl_versions(conn_state.config.tls_min_version),
148+
verify: ssl_verify_atom(conn_state.config),
149+
]
150+
conn_state = %{conn_state | socket: socket}
151+
conn_state.reader |> Buffer.setup_socket(socket, conn_state.config.read_timeout)
152+
conn_state.writer |> Buffer.setup_socket(socket, conn_state.config.read_timeout)
153+
conn_state |> wait_for_ok!
154+
conn_state
155+
else
156+
conn_state
157+
end
157158

158159
# If compression is enabled, we expect to receive a compressed "OK"
159160
# immediately.

lib/nsq/connection/message_handling.ex

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,28 +29,29 @@ defmodule NSQ.Connection.MessageHandling do
2929
end
3030
end
3131

32+
def handle_nsq_message({:response, "_heartbeat_"}, state) do
33+
respond_to_heartbeat(state)
34+
{:ok, state}
35+
end
3236

33-
def handle_nsq_message(msg, state) do
34-
case msg do
35-
{:response, "_heartbeat_"} ->
36-
respond_to_heartbeat(state)
37-
38-
{:response, data} ->
39-
{:ok, state} = state |> Command.send_response_to_caller(data)
40-
41-
{:error, data} ->
42-
state |> log_error(nil, data)
43-
44-
{:error, reason, data} ->
45-
state |> log_error(reason, data)
37+
def handle_nsq_message({:response, data}, state) do
38+
state |> Command.send_response_to_caller(data)
39+
end
4640

47-
{:message, data} ->
48-
{:ok, state} = state |> kick_off_message_processing(data)
49-
end
41+
def handle_nsq_message({:error, data}, state) do
42+
state |> log_error(nil, data)
43+
{:ok, state}
44+
end
5045

46+
def handle_nsq_message({:error, reason, data}, state) do
47+
state |> log_error(reason, data)
5148
{:ok, state}
5249
end
5350

51+
def handle_nsq_message({:message, data}, state) do
52+
state |> kick_off_message_processing(data)
53+
end
54+
5455

5556
@spec update_conn_stats_on_message_done(C.state, any) :: any
5657
def update_conn_stats_on_message_done(state, ret_val) do

lib/nsq/consumer/backoff.ex

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -45,32 +45,31 @@ defmodule NSQ.Consumer.Backoff do
4545
start_stop_continue.)
4646
"""
4747
@spec resume(pid, C.state) :: {:ok, C.state}
48+
def resume(_cons, %{backoff_duration: 0, backoff_counter: 0} = cons_state),
49+
# looks like we successfully left backoff mode already
50+
do: {:ok, cons_state}
51+
52+
def resume(_cons, %{stop_flag: true} = cons_state),
53+
do: {:ok, %{cons_state | backoff_duration: 0}}
54+
4855
def resume(cons, cons_state) do
49-
if cons_state.backoff_duration == 0 || cons_state.backoff_counter == 0 do
50-
# looks like we successfully left backoff mode already
51-
{:ok, cons_state}
52-
else
53-
if cons_state.stop_flag do
54-
{:ok, %{cons_state | backoff_duration: 0}}
56+
{:ok, cons_state} =
57+
if Connections.count(cons_state) == 0 do
58+
# This could happen if nsqlookupd suddenly stops discovering
59+
# connections. Maybe a network partition?
60+
Logger.warn("no connection available to resume")
61+
Logger.warn("backing off for 1 second")
62+
resume_later(cons, 1000, cons_state)
5563
else
56-
if Connections.count(cons_state) == 0 do
57-
# This could happen if nsqlookupd suddenly stops discovering
58-
# connections. Maybe a network partition?
59-
Logger.warn("no connection available to resume")
60-
Logger.warn("backing off for 1 second")
61-
{:ok, cons_state} = resume_later(cons, 1000, cons_state)
62-
else
63-
# pick a random connection to test the waters
64-
conn = random_connection_for_backoff(cons_state)
65-
Logger.warn("(#{inspect conn}) backoff timeout expired, sending RDY 1")
64+
# pick a random connection to test the waters
65+
conn = random_connection_for_backoff(cons_state)
66+
Logger.warn("(#{inspect conn}) backoff timeout expired, sending RDY 1")
6667

67-
# while in backoff only ever let 1 message at a time through
68-
{:ok, cons_state} = RDY.update(cons, conn, 1, cons_state)
69-
end
70-
71-
{:ok, %{cons_state | backoff_duration: 0}}
68+
# while in backoff only ever let 1 message at a time through
69+
RDY.update(cons, conn, 1, cons_state)
7270
end
73-
end
71+
72+
{:ok, %{cons_state | backoff_duration: 0}}
7473
end
7574

7675
def resume!(cons, cons_state) do

lib/nsq/consumer/connections.ex

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,14 @@ defmodule NSQ.Consumer.Connections do
134134
# `RDY.redistribute` loop will take care of getting this connection some
135135
# messages later.
136136
remaining_rdy = cons_state.max_in_flight - total_rdy_count(cons_state)
137-
if remaining_rdy > 0 do
138-
conn = conn_from_nsqd(cons, nsqd, cons_state)
139-
{:ok, cons_state} = RDY.transmit(conn, 1, cons_state)
140-
end
137+
cons_state =
138+
if remaining_rdy > 0 do
139+
conn = conn_from_nsqd(cons, nsqd, cons_state)
140+
{:ok, cons_state} = RDY.transmit(conn, 1, cons_state)
141+
cons_state
142+
else
143+
cons_state
144+
end
141145

142146
{:ok, cons_state}
143147
catch

lib/nsq/consumer/rdy.ex

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -83,20 +83,25 @@ defmodule NSQ.Consumer.RDY do
8383
# Cap the given RDY based on how much we can actually assign. Unless it's
8484
# 0, in which case we'll be retrying.
8585
max_possible_rdy = calc_max_possible(cons_state, conn_info)
86-
if max_possible_rdy > 0 do
87-
new_rdy = [new_rdy, max_possible_rdy] |> Enum.min |> round
88-
end
89-
90-
if max_possible_rdy <= 0 && new_rdy > 0 do
91-
if conn_info.rdy_count == 0 do
92-
# Schedule RDY.update(consumer, conn, new_rdy) for this connection
93-
# again in 5 seconds. This is to prevent eternal starvation.
94-
{:ok, cons_state} = retry(cons, conn, new_rdy, cons_state)
86+
new_rdy = if max_possible_rdy > 0 do
87+
[new_rdy, max_possible_rdy] |> Enum.min |> round
88+
else
89+
new_rdy
90+
end
91+
92+
{:ok, cons_state} =
93+
if max_possible_rdy <= 0 && new_rdy > 0 do
94+
if conn_info.rdy_count == 0 do
95+
# Schedule RDY.update(consumer, conn, new_rdy) for this connection
96+
# again in 5 seconds. This is to prevent eternal starvation.
97+
retry(cons, conn, new_rdy, cons_state)
98+
else
99+
{:ok, cons_state}
100+
end
101+
else
102+
transmit(conn, new_rdy, cons_state)
95103
end
96-
{:ok, cons_state}
97-
else
98-
{:ok, _cons_state} = transmit(conn, new_rdy, cons_state)
99-
end
104+
{:ok, cons_state}
100105
end
101106

102107
def update!(cons, conn, new_rdy, cons_state) do

lib/nsq/lookupd.ex

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,7 @@ defmodule NSQ.Lookupd do
5858

5959
@spec normalize_200_response([any], binary) :: response
6060
defp normalize_200_response(headers, body) do
61-
if body == nil || body == "" do
62-
body = "{}"
63-
end
61+
body = if body == nil || body == "", do: "{}", else: body
6462

6563
if headers[:"X-Nsq-Content-Type"] == "nsq; version=1.0" do
6664
Poison.decode!(body)

lib/nsq/message.ex

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -110,19 +110,25 @@ defmodule NSQ.Message do
110110
mode, where it stops receiving messages for a fixed duration.
111111
"""
112112
def req(message, delay \\ -1, backoff \\ false) do
113-
if delay == -1 do
114-
delay = calculate_delay(
115-
message.attempts, message.config.max_requeue_delay
116-
)
117-
Logger.debug("(#{inspect message.connection}) requeue msg ID #{message.id}, delay #{delay} (auto-calculated with attempts #{message.attempts}), backoff #{backoff}")
118-
else
119-
Logger.debug("(#{inspect message.connection}) requeue msg ID #{message.id}, delay #{delay}, backoff #{backoff}")
120-
end
113+
delay =
114+
if delay == -1 do
115+
delay = calculate_delay(
116+
message.attempts, message.config.max_requeue_delay
117+
)
118+
Logger.debug("(#{inspect message.connection}) requeue msg ID #{message.id}, delay #{delay} (auto-calculated with attempts #{message.attempts}), backoff #{backoff}")
119+
delay
120+
else
121+
Logger.debug("(#{inspect message.connection}) requeue msg ID #{message.id}, delay #{delay}, backoff #{backoff}")
122+
delay
123+
end
121124

122-
if delay > message.config.max_requeue_delay do
123-
Logger.warn "Invalid requeue delay #{delay}. Must be between 0 and #{message.config.max_requeue_delay}. Sending with max delay #{message.config.max_requeue_delay} instead."
124-
delay = message.config.max_requeue_delay
125-
end
125+
delay =
126+
if delay > message.config.max_requeue_delay do
127+
Logger.warn "Invalid requeue delay #{delay}. Must be between 0 and #{message.config.max_requeue_delay}. Sending with max delay #{message.config.max_requeue_delay} instead."
128+
message.config.max_requeue_delay
129+
else
130+
delay
131+
end
126132

127133
if backoff do
128134
GenServer.call(message.consumer, {:start_stop_continue_backoff, :backoff})

0 commit comments

Comments
 (0)