diff --git a/src/grpcbox_channel.erl b/src/grpcbox_channel.erl index 3530f4d..4023024 100644 --- a/src/grpcbox_channel.erl +++ b/src/grpcbox_channel.erl @@ -5,6 +5,7 @@ -export([start_link/3, is_ready/1, pick/2, + pick/3, stop/1, stop/2]). -export([init/1, @@ -58,11 +59,19 @@ is_ready(Name) -> gen_statem:call(?CHANNEL(Name), is_ready). %% @doc Picks a subchannel from a pool using the configured strategy. --spec pick(name(), unary | stream) -> {ok, {pid(), grpcbox_client:interceptor() | undefined}} | - {error, undefined_channel | no_endpoints}. +-spec pick(name(), unary | stream) -> + {ok, {pid(), grpcbox_client:interceptor() | undefined}} | + {error, undefined_channel | no_endpoints}. pick(Name, CallType) -> + pick(Name, CallType, undefined). + +%% @doc Picks a subchannel from a pool using the configured strategy. +-spec pick(name(), unary | stream, term() | undefined) -> + {ok, {pid(), grpcbox_client:interceptor() | undefined}} | + {error, undefined_channel | no_endpoints}. +pick(Name, CallType, Key) -> try - case gproc_pool:pick_worker(Name) of + case pick_worker(Name, Key) of false -> {error, no_endpoints}; Pid when is_pid(Pid) -> {ok, {Pid, interceptor(Name, CallType)}} @@ -72,6 +81,11 @@ pick(Name, CallType) -> {error, undefined_channel} end. +pick_worker(Name, undefined) -> + gproc_pool:pick_worker(Name); +pick_worker(Name, Key) -> + gproc_pool:pick_worker(Name, Key). + -spec interceptor(name(), unary | stream) -> grpcbox_client:interceptor() | undefined. interceptor(Name, CallType) -> case ets:lookup(?CHANNELS_TAB, {Name, CallType}) of @@ -177,4 +191,3 @@ start_workers(Pool, StatsHandler, Encoding, Endpoints) -> Encoding, StatsHandler), Pid end || Endpoint={Transport, Host, Port, EndpointOptions} <- Endpoints]. - diff --git a/src/grpcbox_client.erl b/src/grpcbox_client.erl index b00b487..42bb025 100644 --- a/src/grpcbox_client.erl +++ b/src/grpcbox_client.erl @@ -47,7 +47,8 @@ get_channel(Options, Type) -> Channel = maps:get(channel, Options, default_channel), - grpcbox_channel:pick(Channel, Type). + Key = maps:get(key, Options, undefined), + grpcbox_channel:pick(Channel, Type, Key). unary(Ctx, Service, Method, Input, Def, Options) -> unary(Ctx, filename:join([<<>>, Service, Method]), Input, Def, Options).