Skip to content

Commit 0da03c9

Browse files
Merge pull request #14256 from rabbitmq/local-shovel
By @dcorbacho and me: introduce [cluster-]local shovels, adopt message containers for shovels
2 parents 1156232 + ce8fa31 commit 0da03c9

20 files changed

+3028
-289
lines changed

deps/amqp10_client/src/amqp10_msg.erl

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -265,23 +265,29 @@ body_bin(#amqp10_msg{body = #'v1_0.amqp_value'{} = Body}) ->
265265
%% A disposition will be notified to the sender by a message of the
266266
%% following stucture:
267267
%% {amqp10_disposition, {accepted | rejected, DeliveryTag}}
268-
-spec new(delivery_tag(), amqp10_body() | binary(), boolean()) -> amqp10_msg().
268+
-spec new(delivery_tag(), amqp10_body() | binary() | [amqp10_client_types:amqp10_msg_record()], boolean()) -> amqp10_msg().
269269
new(DeliveryTag, Bin, Settled) when is_binary(Bin) ->
270270
Body = [#'v1_0.data'{content = Bin}],
271271
new(DeliveryTag, Body, Settled);
272272
new(DeliveryTag, Body, Settled) -> % TODO: constrain to amqp types
273-
#amqp10_msg{
274-
transfer = #'v1_0.transfer'{
275-
delivery_tag = {binary, DeliveryTag},
276-
settled = Settled,
277-
message_format = {uint, ?MESSAGE_FORMAT}},
278-
%% This lib is safe by default.
279-
header = #'v1_0.header'{durable = true},
280-
body = Body}.
273+
Transfer = #'v1_0.transfer'{
274+
delivery_tag = {binary, DeliveryTag},
275+
settled = Settled,
276+
message_format = {uint, ?MESSAGE_FORMAT}},
277+
case is_amqp10_body(Body) orelse (not is_list(Body)) of
278+
true ->
279+
#amqp10_msg{
280+
transfer = Transfer,
281+
%% This lib is safe by default.
282+
header = #'v1_0.header'{durable = true},
283+
body = Body};
284+
false ->
285+
from_amqp_records([Transfer | Body])
286+
end.
281287

282288
%% @doc Create a new settled amqp10 message using the specified delivery tag
283289
%% and body.
284-
-spec new(delivery_tag(), amqp10_body() | binary()) -> amqp10_msg().
290+
-spec new(delivery_tag(), amqp10_body() | binary() | [amqp10_client_types:amqp10_msg_record()]) -> amqp10_msg().
285291
new(DeliveryTag, Body) ->
286292
new(DeliveryTag, Body, false).
287293

@@ -462,3 +468,19 @@ uint(B) -> {uint, B}.
462468

463469
has_value(undefined) -> false;
464470
has_value(_) -> true.
471+
472+
is_amqp10_body(#'v1_0.amqp_value'{}) ->
473+
true;
474+
is_amqp10_body(List) when is_list(List) ->
475+
lists:all(fun(#'v1_0.data'{}) ->
476+
true;
477+
(_) ->
478+
false
479+
end, List) orelse
480+
lists:all(fun(#'v1_0.amqp_sequence'{}) ->
481+
true;
482+
(_) ->
483+
false
484+
end, List);
485+
is_amqp10_body(_) ->
486+
false.

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2457,7 +2457,6 @@ incoming_link_transfer(
24572457
validate_message_size(PayloadSize, MaxMessageSize),
24582458
rabbit_msg_size_metrics:observe(?PROTOCOL, PayloadSize),
24592459
messages_received(Settled),
2460-
24612460
Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
24622461
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
24632462
{ok, X, RoutingKeys, Mc1, PermCache} ->
Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
%% ----------------------------------------------------------------------------
2-
%% RabbitMQ Shovel settings
2+
%% RabbitMQ Shovel plugin
33
%% ----------------------------------------------------------------------------
44

5-
65
{mapping, "shovel.topology.predeclared", "rabbitmq_shovel.topology.predeclared", [
76
[{datatype, {enum, [true, false]}}]
87
]}.
8+
9+
{mapping, "shovel.local.max_credit", "rabbitmq_shovel.max_local_shovel_credit", [
10+
{datatype, integer}
11+
]}.

deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl

Lines changed: 26 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,10 @@
77

88
-module(rabbit_amqp091_shovel).
99

10-
-define(APP, rabbitmq_shovel).
11-
1210
-behaviour(rabbit_shovel_behaviour).
1311

1412
-include_lib("amqp_client/include/amqp_client.hrl").
13+
-include_lib("rabbit/include/mc.hrl").
1514
-include("rabbit_shovel.hrl").
1615
-include_lib("kernel/include/logger.hrl").
1716

@@ -34,7 +33,7 @@
3433
ack/3,
3534
nack/3,
3635
status/1,
37-
forward/4
36+
forward/3
3837
]).
3938

4039
%% Function references should not be stored on the metadata store.
@@ -57,7 +56,7 @@ parse(_Name, {source, Source}) ->
5756
CArgs = proplists:get_value(consumer_args, Source, []),
5857
#{module => ?MODULE,
5958
uris => proplists:get_value(uris, Source),
60-
resource_decl => decl_fun({source, Source}),
59+
resource_decl => rabbit_shovel_util:decl_fun(?MODULE, {source, Source}),
6160
queue => Queue,
6261
delete_after => proplists:get_value(delete_after, Source, never),
6362
prefetch_count => Prefetch,
@@ -73,7 +72,7 @@ parse(Name, {destination, Dest}) ->
7372
PropsFun2 = add_timestamp_header_fun(ATH, PropsFun1),
7473
#{module => ?MODULE,
7574
uris => proplists:get_value(uris, Dest),
76-
resource_decl => decl_fun({destination, Dest}),
75+
resource_decl => rabbit_shovel_util:decl_fun(?MODULE, {destination, Dest}),
7776
props_fun => PropsFun2,
7877
fields_fun => PubFieldsFun,
7978
add_forward_headers => AFH,
@@ -170,8 +169,8 @@ forward_pending(State) ->
170169
case pop_pending(State) of
171170
empty ->
172171
State;
173-
{{Tag, Props, Payload}, S} ->
174-
S2 = do_forward(Tag, Props, Payload, S),
172+
{{Tag, Mc}, S} ->
173+
S2 = do_forward(Tag, Mc, S),
175174
S3 = control_throttle(S2),
176175
case is_blocked(S3) of
177176
true ->
@@ -184,91 +183,50 @@ forward_pending(State) ->
184183
end
185184
end.
186185

187-
forward(IncomingTag, Props, Payload, State) ->
186+
forward(IncomingTag, Mc, State) ->
188187
case is_blocked(State) of
189188
true ->
190189
%% We are blocked by client-side flow-control and/or
191190
%% `connection.blocked` message from the destination
192191
%% broker. Simply cache the forward.
193-
PendingEntry = {IncomingTag, Props, Payload},
192+
PendingEntry = {IncomingTag, Mc},
194193
add_pending(PendingEntry, State);
195194
false ->
196-
State1 = do_forward(IncomingTag, Props, Payload, State),
195+
State1 = do_forward(IncomingTag, Mc, State),
197196
control_throttle(State1)
198197
end.
199198

200-
do_forward(IncomingTag, Props, Payload,
199+
do_forward(IncomingTag, Mc0,
201200
State0 = #{dest := #{props_fun := {M, F, Args},
202201
current := {_, _, DstUri},
203202
fields_fun := {Mf, Ff, Argsf}}}) ->
204203
SrcUri = rabbit_shovel_behaviour:source_uri(State0),
205204
% do publish
206-
Exchange = maps:get(exchange, Props, undefined),
207-
RoutingKey = maps:get(routing_key, Props, undefined),
205+
Exchange = mc:exchange(Mc0),
206+
RoutingKey = case mc:routing_keys(Mc0) of
207+
[RK | _] -> RK;
208+
Any -> Any
209+
end,
208210
Method = #'basic.publish'{exchange = Exchange, routing_key = RoutingKey},
209211
Method1 = apply(Mf, Ff, Argsf ++ [SrcUri, DstUri, Method]),
210-
Msg1 = #amqp_msg{props = apply(M, F, Args ++ [SrcUri, DstUri, props_from_map(Props)]),
212+
Mc = mc:convert(mc_amqpl, Mc0),
213+
{Props, Payload} = rabbit_basic_common:from_content(mc:protocol_state(Mc)),
214+
Msg1 = #amqp_msg{props = apply(M, F, Args ++ [SrcUri, DstUri, Props]),
211215
payload = Payload},
212216
publish(IncomingTag, Method1, Msg1, State0).
213217

214-
props_from_map(Map) ->
215-
#'P_basic'{content_type = maps:get(content_type, Map, undefined),
216-
content_encoding = maps:get(content_encoding, Map, undefined),
217-
headers = maps:get(headers, Map, undefined),
218-
delivery_mode = maps:get(delivery_mode, Map, undefined),
219-
priority = maps:get(priority, Map, undefined),
220-
correlation_id = maps:get(correlation_id, Map, undefined),
221-
reply_to = maps:get(reply_to, Map, undefined),
222-
expiration = maps:get(expiration, Map, undefined),
223-
message_id = maps:get(message_id, Map, undefined),
224-
timestamp = maps:get(timestamp, Map, undefined),
225-
type = maps:get(type, Map, undefined),
226-
user_id = maps:get(user_id, Map, undefined),
227-
app_id = maps:get(app_id, Map, undefined),
228-
cluster_id = maps:get(cluster_id, Map, undefined)}.
229-
230-
map_from_props(#'P_basic'{content_type = Content_type,
231-
content_encoding = Content_encoding,
232-
headers = Headers,
233-
delivery_mode = Delivery_mode,
234-
priority = Priority,
235-
correlation_id = Correlation_id,
236-
reply_to = Reply_to,
237-
expiration = Expiration,
238-
message_id = Message_id,
239-
timestamp = Timestamp,
240-
type = Type,
241-
user_id = User_id,
242-
app_id = App_id,
243-
cluster_id = Cluster_id}) ->
244-
lists:foldl(fun({_K, undefined}, Acc) -> Acc;
245-
({K, V}, Acc) -> Acc#{K => V}
246-
end, #{}, [{content_type, Content_type},
247-
{content_encoding, Content_encoding},
248-
{headers, Headers},
249-
{delivery_mode, Delivery_mode},
250-
{priority, Priority},
251-
{correlation_id, Correlation_id},
252-
{reply_to, Reply_to},
253-
{expiration, Expiration},
254-
{message_id, Message_id},
255-
{timestamp, Timestamp},
256-
{type, Type},
257-
{user_id, User_id},
258-
{app_id, App_id},
259-
{cluster_id, Cluster_id}
260-
]).
261-
262218
handle_source(#'basic.consume_ok'{}, State) ->
263219
State;
264220
handle_source({#'basic.deliver'{delivery_tag = Tag,
265221
exchange = Exchange,
266222
routing_key = RoutingKey},
267223
#amqp_msg{props = Props0, payload = Payload}}, State) ->
268-
Props = (map_from_props(Props0))#{exchange => Exchange,
269-
routing_key => RoutingKey},
224+
Content = rabbit_basic_common:build_content(Props0, Payload),
225+
Msg0 = mc:init(mc_amqpl, Content, #{}),
226+
Msg1 = mc:set_annotation(?ANN_ROUTING_KEYS, [RoutingKey], Msg0),
227+
Msg = mc:set_annotation(?ANN_EXCHANGE, Exchange, Msg1),
270228
% forward to destination
271-
rabbit_shovel_behaviour:forward(Tag, Props, Payload, State);
229+
rabbit_shovel_behaviour:forward(Tag, Msg, State);
272230

273231
handle_source({'EXIT', Conn, Reason},
274232
#{source := #{current := {Conn, _, _}}}) ->
@@ -336,11 +294,10 @@ close_dest(_) ->
336294
confirm_to_inbound(ConfirmFun, Seq, Multiple,
337295
State0 = #{dest := #{unacked := Unacked} = Dst}) ->
338296
#{Seq := InTag} = Unacked,
339-
State = ConfirmFun(InTag, Multiple, State0),
340297
{Unacked1, Removed} = remove_delivery_tags(Seq, Multiple, Unacked, 0),
341-
rabbit_shovel_behaviour:decr_remaining(Removed,
342-
State#{dest =>
343-
Dst#{unacked => Unacked1}}).
298+
State = ConfirmFun(InTag, Multiple, State0#{dest =>
299+
Dst#{unacked => Unacked1}}),
300+
rabbit_shovel_behaviour:decr_remaining(Removed, State).
344301

345302
publish(_Tag, _Method, _Msg, State = #{source := #{remaining_unacked := 0}}) ->
346303
%% We are in on-confirm mode, and are autodelete. We have
@@ -584,47 +541,6 @@ props_fun_timestamp_header({M, F, Args}, SrcUri, DestUri, Props) ->
584541
rabbit_shovel_util:add_timestamp_header(
585542
apply(M, F, Args ++ [SrcUri, DestUri, Props])).
586543

587-
parse_declaration({[], Acc}) ->
588-
Acc;
589-
parse_declaration({[{Method, Props} | Rest], Acc}) when is_list(Props) ->
590-
FieldNames = try rabbit_framing_amqp_0_9_1:method_fieldnames(Method)
591-
catch exit:Reason -> fail(Reason)
592-
end,
593-
case proplists:get_keys(Props) -- FieldNames of
594-
[] -> ok;
595-
UnknownFields -> fail({unknown_fields, Method, UnknownFields})
596-
end,
597-
{Res, _Idx} = lists:foldl(
598-
fun (K, {R, Idx}) ->
599-
NewR = case proplists:get_value(K, Props) of
600-
undefined -> R;
601-
V -> setelement(Idx, R, V)
602-
end,
603-
{NewR, Idx + 1}
604-
end, {rabbit_framing_amqp_0_9_1:method_record(Method), 2},
605-
FieldNames),
606-
parse_declaration({Rest, [Res | Acc]});
607-
parse_declaration({[{Method, Props} | _Rest], _Acc}) ->
608-
fail({expected_method_field_list, Method, Props});
609-
parse_declaration({[Method | Rest], Acc}) ->
610-
parse_declaration({[{Method, []} | Rest], Acc}).
611-
612-
decl_fun({source, Endpoint}) ->
613-
case parse_declaration({proplists:get_value(declarations, Endpoint, []), []}) of
614-
[] ->
615-
case proplists:get_value(predeclared, application:get_env(?APP, topology, []), false) of
616-
true -> case proplists:get_value(queue, Endpoint) of
617-
<<>> -> fail({invalid_parameter_value, declarations, {require_non_empty}});
618-
Queue -> {?MODULE, check_fun, [Queue]}
619-
end;
620-
false -> {?MODULE, decl_fun, []}
621-
end;
622-
Decl -> {?MODULE, decl_fun, [Decl]}
623-
end;
624-
decl_fun({destination, Endpoint}) ->
625-
Decl = parse_declaration({proplists:get_value(declarations, Endpoint, []), []}),
626-
{?MODULE, decl_fun, [Decl]}.
627-
628544
decl_fun(Decl, _Conn, Ch) ->
629545
[begin
630546
amqp_channel:call(Ch, M)

0 commit comments

Comments
 (0)