Skip to content

By @dcorbacho and me: introduce [cluster-]local shovels, adopt message containers for shovels #14256

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 31 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6e6a33e
Shovel: use message containers
dcorbacho Jun 13, 2025
ccd3bde
amqp10_msg: type spec & code refactor
dcorbacho Jun 20, 2025
c884078
AMQP10 shovel: make bare message inmutable
dcorbacho Jul 4, 2025
444b564
Local shovels
dcorbacho Jun 23, 2025
6fce51d
WIP
dcorbacho Jul 16, 2025
acbd4b4
Local shovel: use queue name to filter source/dest messages
dcorbacho Jul 17, 2025
c8127ea
Local shovel: Initialise delivery count for credit
dcorbacho Jul 17, 2025
860eb6c
Local shovel: fix initial delivery count and state handling
dcorbacho Jul 18, 2025
a112aa5
Local shovels: set link credit
dcorbacho Jul 18, 2025
f381993
Local shovels: renew credit
dcorbacho Jul 20, 2025
c3ec85f
Local shovel: finish credit flow handling
dcorbacho Jul 20, 2025
085c0cc
Local shovels: remove rabbit_log and switch to LOG_ macros
dcorbacho Jul 22, 2025
ae6a36a
Local shovels: remove unused parameter
dcorbacho Jul 22, 2025
2e2dc4b
Shovel tests: ignore nodename
dcorbacho Jul 22, 2025
b5a2dac
Local shovels: grant credit after confirming
dcorbacho Jul 23, 2025
30f67e0
local_dynamic_SUITE: await credit for publishing links
michaelklishin Jul 23, 2025
a855146
Local shovel: handle destination queue events
dcorbacho Jul 24, 2025
a0d42ea
Local shovels: remove unused prefetch count
dcorbacho Jul 24, 2025
be061cd
Local shovel: fix credit handling order
dcorbacho Jul 28, 2025
414da5d
local_dynamic_SUITE: ignore two expected crash reports in the logs
michaelklishin Jul 28, 2025
78167f0
Local shovel: more tests
dcorbacho Jul 29, 2025
bb5e1d9
Local shovels: handle credit on sender side
dcorbacho Jul 29, 2025
8c9f79f
Shovel management: add local shovels
dcorbacho Jul 30, 2025
0b1aefd
Local shovels: move unacked_message_q inside source config
dcorbacho Aug 12, 2025
e9d767b
Local shovels: fix credit flow
dcorbacho Aug 13, 2025
07a0853
Local shovels: optimisations
dcorbacho Aug 13, 2025
edf0e3c
Local shovel: remove unused clause
dcorbacho Aug 13, 2025
02fcbc0
Local shovels: optimisation
dcorbacho Aug 13, 2025
3349321
Local shovels: fix credit flow
dcorbacho Aug 13, 2025
382fac3
Local shovels: remove stashed credit request
dcorbacho Aug 14, 2025
6bb649a
Local shovels: single acks
dcorbacho Aug 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 32 additions & 10 deletions deps/amqp10_client/src/amqp10_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -265,23 +265,29 @@ body_bin(#amqp10_msg{body = #'v1_0.amqp_value'{} = Body}) ->
%% A disposition will be notified to the sender by a message of the
%% following stucture:
%% {amqp10_disposition, {accepted | rejected, DeliveryTag}}
-spec new(delivery_tag(), amqp10_body() | binary(), boolean()) -> amqp10_msg().
-spec new(delivery_tag(), amqp10_body() | binary() | [amqp10_client_types:amqp10_msg_record()], boolean()) -> amqp10_msg().
new(DeliveryTag, Bin, Settled) when is_binary(Bin) ->
Body = [#'v1_0.data'{content = Bin}],
new(DeliveryTag, Body, Settled);
new(DeliveryTag, Body, Settled) -> % TODO: constrain to amqp types
#amqp10_msg{
transfer = #'v1_0.transfer'{
delivery_tag = {binary, DeliveryTag},
settled = Settled,
message_format = {uint, ?MESSAGE_FORMAT}},
%% This lib is safe by default.
header = #'v1_0.header'{durable = true},
body = Body}.
Transfer = #'v1_0.transfer'{
delivery_tag = {binary, DeliveryTag},
settled = Settled,
message_format = {uint, ?MESSAGE_FORMAT}},
case is_amqp10_body(Body) orelse (not is_list(Body)) of
true ->
#amqp10_msg{
transfer = Transfer,
%% This lib is safe by default.
header = #'v1_0.header'{durable = true},
body = Body};
false ->
from_amqp_records([Transfer | Body])
end.

%% @doc Create a new settled amqp10 message using the specified delivery tag
%% and body.
-spec new(delivery_tag(), amqp10_body() | binary()) -> amqp10_msg().
-spec new(delivery_tag(), amqp10_body() | binary() | [amqp10_client_types:amqp10_msg_record()]) -> amqp10_msg().
new(DeliveryTag, Body) ->
new(DeliveryTag, Body, false).

Expand Down Expand Up @@ -462,3 +468,19 @@ uint(B) -> {uint, B}.

has_value(undefined) -> false;
has_value(_) -> true.

is_amqp10_body(#'v1_0.amqp_value'{}) ->
true;
is_amqp10_body(List) when is_list(List) ->
lists:all(fun(#'v1_0.data'{}) ->
true;
(_) ->
false
end, List) orelse
lists:all(fun(#'v1_0.amqp_sequence'{}) ->
true;
(_) ->
false
end, List);
is_amqp10_body(_) ->
false.
1 change: 0 additions & 1 deletion deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2457,7 +2457,6 @@ incoming_link_transfer(
validate_message_size(PayloadSize, MaxMessageSize),
rabbit_msg_size_metrics:observe(?PROTOCOL, PayloadSize),
messages_received(Settled),

Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
{ok, X, RoutingKeys, Mc1, PermCache} ->
Expand Down
129 changes: 23 additions & 106 deletions deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@

-module(rabbit_amqp091_shovel).

-define(APP, rabbitmq_shovel).

-behaviour(rabbit_shovel_behaviour).

-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbit/include/mc.hrl").
-include("rabbit_shovel.hrl").
-include_lib("kernel/include/logger.hrl").

Expand All @@ -34,7 +33,7 @@
ack/3,
nack/3,
status/1,
forward/4
forward/3
]).

%% Function references should not be stored on the metadata store.
Expand All @@ -57,7 +56,7 @@ parse(_Name, {source, Source}) ->
CArgs = proplists:get_value(consumer_args, Source, []),
#{module => ?MODULE,
uris => proplists:get_value(uris, Source),
resource_decl => decl_fun({source, Source}),
resource_decl => rabbit_shovel_util:decl_fun(?MODULE, {source, Source}),
queue => Queue,
delete_after => proplists:get_value(delete_after, Source, never),
prefetch_count => Prefetch,
Expand All @@ -73,7 +72,7 @@ parse(Name, {destination, Dest}) ->
PropsFun2 = add_timestamp_header_fun(ATH, PropsFun1),
#{module => ?MODULE,
uris => proplists:get_value(uris, Dest),
resource_decl => decl_fun({destination, Dest}),
resource_decl => rabbit_shovel_util:decl_fun(?MODULE, {destination, Dest}),
props_fun => PropsFun2,
fields_fun => PubFieldsFun,
add_forward_headers => AFH,
Expand Down Expand Up @@ -170,8 +169,8 @@ forward_pending(State) ->
case pop_pending(State) of
empty ->
State;
{{Tag, Props, Payload}, S} ->
S2 = do_forward(Tag, Props, Payload, S),
{{Tag, Mc}, S} ->
S2 = do_forward(Tag, Mc, S),
S3 = control_throttle(S2),
case is_blocked(S3) of
true ->
Expand All @@ -184,91 +183,50 @@ forward_pending(State) ->
end
end.

forward(IncomingTag, Props, Payload, State) ->
forward(IncomingTag, Mc, State) ->
case is_blocked(State) of
true ->
%% We are blocked by client-side flow-control and/or
%% `connection.blocked` message from the destination
%% broker. Simply cache the forward.
PendingEntry = {IncomingTag, Props, Payload},
PendingEntry = {IncomingTag, Mc},
add_pending(PendingEntry, State);
false ->
State1 = do_forward(IncomingTag, Props, Payload, State),
State1 = do_forward(IncomingTag, Mc, State),
control_throttle(State1)
end.

do_forward(IncomingTag, Props, Payload,
do_forward(IncomingTag, Mc0,
State0 = #{dest := #{props_fun := {M, F, Args},
current := {_, _, DstUri},
fields_fun := {Mf, Ff, Argsf}}}) ->
SrcUri = rabbit_shovel_behaviour:source_uri(State0),
% do publish
Exchange = maps:get(exchange, Props, undefined),
RoutingKey = maps:get(routing_key, Props, undefined),
Exchange = mc:exchange(Mc0),
RoutingKey = case mc:routing_keys(Mc0) of
[RK | _] -> RK;
Any -> Any
end,
Method = #'basic.publish'{exchange = Exchange, routing_key = RoutingKey},
Method1 = apply(Mf, Ff, Argsf ++ [SrcUri, DstUri, Method]),
Msg1 = #amqp_msg{props = apply(M, F, Args ++ [SrcUri, DstUri, props_from_map(Props)]),
Mc = mc:convert(mc_amqpl, Mc0),
{Props, Payload} = rabbit_basic_common:from_content(mc:protocol_state(Mc)),
Msg1 = #amqp_msg{props = apply(M, F, Args ++ [SrcUri, DstUri, Props]),
payload = Payload},
publish(IncomingTag, Method1, Msg1, State0).

props_from_map(Map) ->
#'P_basic'{content_type = maps:get(content_type, Map, undefined),
content_encoding = maps:get(content_encoding, Map, undefined),
headers = maps:get(headers, Map, undefined),
delivery_mode = maps:get(delivery_mode, Map, undefined),
priority = maps:get(priority, Map, undefined),
correlation_id = maps:get(correlation_id, Map, undefined),
reply_to = maps:get(reply_to, Map, undefined),
expiration = maps:get(expiration, Map, undefined),
message_id = maps:get(message_id, Map, undefined),
timestamp = maps:get(timestamp, Map, undefined),
type = maps:get(type, Map, undefined),
user_id = maps:get(user_id, Map, undefined),
app_id = maps:get(app_id, Map, undefined),
cluster_id = maps:get(cluster_id, Map, undefined)}.

map_from_props(#'P_basic'{content_type = Content_type,
content_encoding = Content_encoding,
headers = Headers,
delivery_mode = Delivery_mode,
priority = Priority,
correlation_id = Correlation_id,
reply_to = Reply_to,
expiration = Expiration,
message_id = Message_id,
timestamp = Timestamp,
type = Type,
user_id = User_id,
app_id = App_id,
cluster_id = Cluster_id}) ->
lists:foldl(fun({_K, undefined}, Acc) -> Acc;
({K, V}, Acc) -> Acc#{K => V}
end, #{}, [{content_type, Content_type},
{content_encoding, Content_encoding},
{headers, Headers},
{delivery_mode, Delivery_mode},
{priority, Priority},
{correlation_id, Correlation_id},
{reply_to, Reply_to},
{expiration, Expiration},
{message_id, Message_id},
{timestamp, Timestamp},
{type, Type},
{user_id, User_id},
{app_id, App_id},
{cluster_id, Cluster_id}
]).

handle_source(#'basic.consume_ok'{}, State) ->
State;
handle_source({#'basic.deliver'{delivery_tag = Tag,
exchange = Exchange,
routing_key = RoutingKey},
#amqp_msg{props = Props0, payload = Payload}}, State) ->
Props = (map_from_props(Props0))#{exchange => Exchange,
routing_key => RoutingKey},
Content = rabbit_basic_common:build_content(Props0, Payload),
Msg0 = mc:init(mc_amqpl, Content, #{}),
Msg1 = mc:set_annotation(?ANN_ROUTING_KEYS, [RoutingKey], Msg0),
Msg = mc:set_annotation(?ANN_EXCHANGE, Exchange, Msg1),
% forward to destination
rabbit_shovel_behaviour:forward(Tag, Props, Payload, State);
rabbit_shovel_behaviour:forward(Tag, Msg, State);

handle_source({'EXIT', Conn, Reason},
#{source := #{current := {Conn, _, _}}}) ->
Expand Down Expand Up @@ -584,47 +542,6 @@ props_fun_timestamp_header({M, F, Args}, SrcUri, DestUri, Props) ->
rabbit_shovel_util:add_timestamp_header(
apply(M, F, Args ++ [SrcUri, DestUri, Props])).

parse_declaration({[], Acc}) ->
Acc;
parse_declaration({[{Method, Props} | Rest], Acc}) when is_list(Props) ->
FieldNames = try rabbit_framing_amqp_0_9_1:method_fieldnames(Method)
catch exit:Reason -> fail(Reason)
end,
case proplists:get_keys(Props) -- FieldNames of
[] -> ok;
UnknownFields -> fail({unknown_fields, Method, UnknownFields})
end,
{Res, _Idx} = lists:foldl(
fun (K, {R, Idx}) ->
NewR = case proplists:get_value(K, Props) of
undefined -> R;
V -> setelement(Idx, R, V)
end,
{NewR, Idx + 1}
end, {rabbit_framing_amqp_0_9_1:method_record(Method), 2},
FieldNames),
parse_declaration({Rest, [Res | Acc]});
parse_declaration({[{Method, Props} | _Rest], _Acc}) ->
fail({expected_method_field_list, Method, Props});
parse_declaration({[Method | Rest], Acc}) ->
parse_declaration({[{Method, []} | Rest], Acc}).

decl_fun({source, Endpoint}) ->
case parse_declaration({proplists:get_value(declarations, Endpoint, []), []}) of
[] ->
case proplists:get_value(predeclared, application:get_env(?APP, topology, []), false) of
true -> case proplists:get_value(queue, Endpoint) of
<<>> -> fail({invalid_parameter_value, declarations, {require_non_empty}});
Queue -> {?MODULE, check_fun, [Queue]}
end;
false -> {?MODULE, decl_fun, []}
end;
Decl -> {?MODULE, decl_fun, [Decl]}
end;
decl_fun({destination, Endpoint}) ->
Decl = parse_declaration({proplists:get_value(declarations, Endpoint, []), []}),
{?MODULE, decl_fun, [Decl]}.

decl_fun(Decl, _Conn, Ch) ->
[begin
amqp_channel:call(Ch, M)
Expand Down
Loading
Loading