Skip to content

Commit 1421e7e

Browse files
committed
Merge remote-tracking branch 'upstream/main'
2 parents 256bb93 + 32f5e5f commit 1421e7e

File tree

9 files changed

+404
-21
lines changed

9 files changed

+404
-21
lines changed

protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SourceOptions.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
import java.util.Arrays;
2020
import java.util.HashMap;
2121
import java.util.Map;
22+
import java.util.Objects;
2223

2324
import org.apache.qpid.protonj2.client.impl.ClientDeliveryState;
25+
import org.apache.qpid.protonj2.types.DescribedType;
2426

2527
/**
2628
* Options type that carries configuration for link Source types.
@@ -99,6 +101,32 @@ public SourceOptions filters(Map<String, Object> filters) {
99101
return self();
100102
}
101103

104+
/**
105+
* Adds the given named filter into the map of filters (one will be created if not already set).
106+
* <p>
107+
* If a previous filters {@link Map} was assigned this new filter instance will be assigned
108+
* into that existing map, it is not cleared or reallocated. The descriptor should either be
109+
* an Symbol or UnsignedLong that aligns with the filters definition being used.
110+
*
111+
* @param name
112+
* The name to use when adding the described filter to the filters {@link Map}.
113+
* @param descriptor
114+
* The descriptor used for the {@link DescribedType} that will carry the filter.
115+
* @param filter
116+
* The filter value to assign to the filter {@link DescribedType}.
117+
*
118+
* @return this {@link SourceOptions} instance.
119+
*/
120+
public SourceOptions addFilter(String name, Object descriptor, Object filter) {
121+
if (filters == null) {
122+
filters = new HashMap<>();
123+
}
124+
125+
filters.put(name, new FilterDescribedType(descriptor, filter));
126+
127+
return self();
128+
}
129+
102130
/**
103131
* @return the configured default outcome as a {@link DeliveryState} instance.
104132
*/
@@ -139,4 +167,54 @@ public SourceOptions outcomes(DeliveryState.Type... outcomes) {
139167
SourceOptions self() {
140168
return this;
141169
}
170+
171+
private static class FilterDescribedType implements DescribedType {
172+
173+
private final Object descriptor;
174+
private final Object described;
175+
176+
public FilterDescribedType(Object descriptor, Object described) {
177+
this.descriptor = descriptor;
178+
this.described = described;
179+
}
180+
181+
@Override
182+
public Object getDescriptor() {
183+
return descriptor;
184+
}
185+
186+
@Override
187+
public Object getDescribed() {
188+
return this.described;
189+
}
190+
191+
@Override
192+
public String toString() {
193+
return "FilterDescribedType{ descriptor:" + descriptor + ", described:" + described + " }";
194+
}
195+
196+
@Override
197+
public int hashCode() {
198+
return Objects.hash(described, descriptor);
199+
}
200+
201+
@Override
202+
public boolean equals(Object target) {
203+
if (this == target) {
204+
return true;
205+
}
206+
207+
if (target == null) {
208+
return false;
209+
}
210+
211+
if (!(target instanceof DescribedType)) {
212+
return false;
213+
}
214+
215+
final DescribedType other = (DescribedType) target;
216+
217+
return Objects.equals(descriptor, other.getDescriptor()) && Objects.equals(described, other.getDescribed());
218+
}
219+
}
142220
}

protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2736,6 +2736,74 @@ public void testCreateReceiverWithUserConfiguredSourceWithJMSStyleSelector() thr
27362736
}
27372737
}
27382738

2739+
@Test
2740+
public void testCreateReceiverWithUserConfiguredSourceWithJMSStyleSelectorUsingSourceOptionsAddFilter() throws Exception {
2741+
final PeerJmsSelectorType peerJmsSelector = new PeerJmsSelectorType("myProperty=42");
2742+
final Map<String, Object> filtersAtPeer = new HashMap<>();
2743+
filtersAtPeer.put("jms-selector", peerJmsSelector);
2744+
2745+
try (ProtonTestServer peer = new ProtonTestServer()) {
2746+
peer.expectSASLAnonymousConnect();
2747+
peer.expectOpen().respond();
2748+
peer.expectBegin().respond();
2749+
peer.expectAttach().ofReceiver()
2750+
.withSource().withAddress("test-queue")
2751+
.withDistributionMode("copy")
2752+
.withTimeout(128)
2753+
.withDurable(TerminusDurability.UNSETTLED_STATE)
2754+
.withExpiryPolicy(TerminusExpiryPolicy.CONNECTION_CLOSE)
2755+
.withDefaultOutcome(new Released())
2756+
.withCapabilities("QUEUE")
2757+
.withFilter(filtersAtPeer)
2758+
.withOutcomes("amqp:accepted:list", "amqp:rejected:list")
2759+
.also()
2760+
.withTarget().withAddress(notNullValue())
2761+
.withCapabilities("QUEUE")
2762+
.withDurable(TerminusDurability.CONFIGURATION)
2763+
.withExpiryPolicy(TerminusExpiryPolicy.SESSION_END)
2764+
.withTimeout(42)
2765+
.withDynamic(anyOf(nullValue(), equalTo(false)))
2766+
.withDynamicNodeProperties(nullValue())
2767+
.and().respond();
2768+
peer.expectFlow().withLinkCredit(10);
2769+
peer.expectDetach().respond();
2770+
peer.expectEnd().respond();
2771+
peer.expectClose().respond();
2772+
peer.start();
2773+
2774+
URI remoteURI = peer.getServerURI();
2775+
2776+
LOG.info("Test started, peer listening on: {}", remoteURI);
2777+
2778+
Client container = Client.create();
2779+
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
2780+
Session session = connection.openSession();
2781+
ReceiverOptions receiverOptions = new ReceiverOptions();
2782+
2783+
receiverOptions.sourceOptions().capabilities("QUEUE");
2784+
receiverOptions.sourceOptions().distributionMode(DistributionMode.COPY);
2785+
receiverOptions.sourceOptions().timeout(128);
2786+
receiverOptions.sourceOptions().durabilityMode(DurabilityMode.UNSETTLED_STATE);
2787+
receiverOptions.sourceOptions().expiryPolicy(ExpiryPolicy.CONNECTION_CLOSE);
2788+
receiverOptions.sourceOptions().defaultOutcome(DeliveryState.released());
2789+
receiverOptions.sourceOptions().addFilter("jms-selector", UnsignedLong.valueOf(0x0000468C00000004L), "myProperty=42");
2790+
receiverOptions.sourceOptions().outcomes(DeliveryState.Type.ACCEPTED, DeliveryState.Type.REJECTED);
2791+
2792+
receiverOptions.targetOptions().capabilities("QUEUE");
2793+
receiverOptions.targetOptions().durabilityMode(DurabilityMode.CONFIGURATION);
2794+
receiverOptions.targetOptions().expiryPolicy(ExpiryPolicy.SESSION_CLOSE);
2795+
receiverOptions.targetOptions().timeout(42);
2796+
2797+
Receiver receiver = session.openReceiver("test-queue", receiverOptions).openFuture().get();
2798+
2799+
receiver.close();
2800+
session.close();
2801+
connection.close();
2802+
2803+
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
2804+
}
2805+
}
2806+
27392807
@Test
27402808
public void testCreateReceiverWithUserConfiguredSourceWithJMSStyleSelectorAndNoLocalFilter() throws Exception {
27412809
final DescribedType clientJmsSelector = new AmqpJmsSelectorType("myProperty=42");
@@ -2813,6 +2881,77 @@ public void testCreateReceiverWithUserConfiguredSourceWithJMSStyleSelectorAndNoL
28132881
}
28142882
}
28152883

2884+
@Test
2885+
public void testCreateReceiverWithUserConfiguredSourceWithJMSStyleSelectorAndNoLocalFilterUsingAddFilter() throws Exception {
2886+
final PeerJmsSelectorType peerJmsSelector = new PeerJmsSelectorType("myProperty=42");
2887+
final PeerNoLocalFilterType peerNoLocalFilter = new PeerNoLocalFilterType();
2888+
final Map<String, Object> filtersAtPeer = new HashMap<>();
2889+
filtersAtPeer.put("jms-selector", peerJmsSelector);
2890+
filtersAtPeer.put("no-local", peerNoLocalFilter);
2891+
2892+
try (ProtonTestServer peer = new ProtonTestServer()) {
2893+
peer.expectSASLAnonymousConnect();
2894+
peer.expectOpen().respond();
2895+
peer.expectBegin().respond();
2896+
peer.expectAttach().ofReceiver()
2897+
.withSource().withAddress("test-queue")
2898+
.withDistributionMode("copy")
2899+
.withTimeout(128)
2900+
.withDurable(TerminusDurability.UNSETTLED_STATE)
2901+
.withExpiryPolicy(TerminusExpiryPolicy.CONNECTION_CLOSE)
2902+
.withDefaultOutcome(new Released())
2903+
.withCapabilities("QUEUE")
2904+
.withFilter(filtersAtPeer)
2905+
.withOutcomes("amqp:accepted:list", "amqp:rejected:list")
2906+
.also()
2907+
.withTarget().withAddress(notNullValue())
2908+
.withCapabilities("QUEUE")
2909+
.withDurable(TerminusDurability.CONFIGURATION)
2910+
.withExpiryPolicy(TerminusExpiryPolicy.SESSION_END)
2911+
.withTimeout(42)
2912+
.withDynamic(anyOf(nullValue(), equalTo(false)))
2913+
.withDynamicNodeProperties(nullValue())
2914+
.and().respond();
2915+
peer.expectFlow().withLinkCredit(10);
2916+
peer.expectDetach().respond();
2917+
peer.expectEnd().respond();
2918+
peer.expectClose().respond();
2919+
peer.start();
2920+
2921+
URI remoteURI = peer.getServerURI();
2922+
2923+
LOG.info("Test started, peer listening on: {}", remoteURI);
2924+
2925+
Client container = Client.create();
2926+
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
2927+
Session session = connection.openSession();
2928+
ReceiverOptions receiverOptions = new ReceiverOptions();
2929+
2930+
receiverOptions.sourceOptions().capabilities("QUEUE");
2931+
receiverOptions.sourceOptions().distributionMode(DistributionMode.COPY);
2932+
receiverOptions.sourceOptions().timeout(128);
2933+
receiverOptions.sourceOptions().durabilityMode(DurabilityMode.UNSETTLED_STATE);
2934+
receiverOptions.sourceOptions().expiryPolicy(ExpiryPolicy.CONNECTION_CLOSE);
2935+
receiverOptions.sourceOptions().defaultOutcome(DeliveryState.released());
2936+
receiverOptions.sourceOptions().addFilter("jms-selector", UnsignedLong.valueOf(0x0000468C00000004L), "myProperty=42");
2937+
receiverOptions.sourceOptions().addFilter("no-local", UnsignedLong.valueOf(0x0000468C00000003L), "NoLocalFilter{}");
2938+
receiverOptions.sourceOptions().outcomes(DeliveryState.Type.ACCEPTED, DeliveryState.Type.REJECTED);
2939+
2940+
receiverOptions.targetOptions().capabilities("QUEUE");
2941+
receiverOptions.targetOptions().durabilityMode(DurabilityMode.CONFIGURATION);
2942+
receiverOptions.targetOptions().expiryPolicy(ExpiryPolicy.SESSION_CLOSE);
2943+
receiverOptions.targetOptions().timeout(42);
2944+
2945+
Receiver receiver = session.openReceiver("test-queue", receiverOptions).openFuture().get();
2946+
2947+
receiver.close();
2948+
session.close();
2949+
connection.close();
2950+
2951+
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
2952+
}
2953+
}
2954+
28162955
@Test
28172956
public void testOpenDurableReceiver() throws Exception {
28182957
final String address = "test-topic";

protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AttachInjectAction.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package org.apache.qpid.protonj2.test.driver.actions;
1818

19-
import java.util.HashMap;
2019
import java.util.LinkedHashMap;
2120
import java.util.Map;
2221
import java.util.Map.Entry;
@@ -519,21 +518,11 @@ public SourceBuilder withDistributionMode(Symbol mode) {
519518
}
520519

521520
public SourceBuilder withJMSSelector(String selector) {
522-
final JmsSelectorByIdDescribedType jmsSelector = new JmsSelectorByIdDescribedType(selector);
523-
final Map<String, Object> filters = new HashMap<>();
524-
525-
filters.put(JmsSelectorByIdDescribedType.JMS_SELECTOR_KEY, jmsSelector);
526-
527-
return withFilterMap(filters);
521+
return withFilterMap(JmsSelectorByIdDescribedType.JMS_SELECTOR_SYMBOL_KEY, new JmsSelectorByIdDescribedType(selector));
528522
}
529523

530524
public SourceBuilder withNoLocal() {
531-
final JmsNoLocalByIdDescribedType noLocal = new JmsNoLocalByIdDescribedType();
532-
final Map<String, Object> filters = new HashMap<>();
533-
534-
filters.put(JmsNoLocalByIdDescribedType.JMS_NO_LOCAL_KEY, noLocal);
535-
536-
return withFilterMap(filters);
525+
return withFilterMap(JmsNoLocalByIdDescribedType.JMS_NO_LOCAL_SYMBOL_KEY, new JmsNoLocalByIdDescribedType());
537526
}
538527

539528
public SourceBuilder withFilter(Map<Symbol, Object> filters) {

protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnknownDescribedType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public boolean equals(final Object o) {
4141
if (this == o) {
4242
return true;
4343
}
44-
if (o == null || getClass() != o.getClass()) {
44+
if (o == null || !DescribedType.class.isAssignableFrom(o.getClass())) {
4545
return false;
4646
}
4747

protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/AttachExpectation.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
import java.nio.ByteBuffer;
2525
import java.util.Arrays;
26-
import java.util.HashMap;
2726
import java.util.Map;
2827
import java.util.UUID;
2928
import java.util.function.Consumer;
@@ -53,6 +52,7 @@
5352
import org.apache.qpid.protonj2.test.driver.codec.transport.SenderSettleMode;
5453
import org.apache.qpid.protonj2.test.driver.matchers.JmsNoLocalByIdDescribedType;
5554
import org.apache.qpid.protonj2.test.driver.matchers.JmsSelectorByIdDescribedType;
55+
import org.apache.qpid.protonj2.test.driver.matchers.MapContentsMatcher;
5656
import org.apache.qpid.protonj2.test.driver.matchers.messaging.SourceMatcher;
5757
import org.apache.qpid.protonj2.test.driver.matchers.messaging.TargetMatcher;
5858
import org.apache.qpid.protonj2.test.driver.matchers.transactions.CoordinatorMatcher;
@@ -582,6 +582,9 @@ public static class AttachSourceMatcher extends SourceMatcher {
582582

583583
private final AttachExpectation expectation;
584584

585+
// Only used if singular 'withJMSSelector' or 'withNoLocal' API is used
586+
private MapContentsMatcher<Symbol, Object> jmsFilterMatcher;
587+
585588
public AttachSourceMatcher(AttachExpectation expectation) {
586589
this.expectation = expectation;
587590
}
@@ -662,27 +665,34 @@ public AttachSourceMatcher withDistributionMode(Symbol distributionMode) {
662665

663666
@Override
664667
public AttachSourceMatcher withFilter(Map<String, Object> filter) {
668+
jmsFilterMatcher = null;
665669
super.withFilter(filter);
666670
return this;
667671
}
668672

669673
public AttachSourceMatcher withJMSSelector(String selector) {
674+
if (jmsFilterMatcher == null) {
675+
jmsFilterMatcher = new MapContentsMatcher<Symbol, Object>();
676+
}
677+
670678
final JmsSelectorByIdDescribedType filterType = new JmsSelectorByIdDescribedType(selector);
671-
final Map<String, Object> filtersMap = new HashMap<>();
672679

673-
filtersMap.put(JmsSelectorByIdDescribedType.JMS_SELECTOR_KEY, filterType);
680+
jmsFilterMatcher.addExpectedEntry(Symbol.valueOf(JmsSelectorByIdDescribedType.JMS_SELECTOR_KEY), filterType);
674681

675-
super.withFilter(filtersMap);
682+
super.withFilter(jmsFilterMatcher);
676683
return this;
677684
}
678685

679686
public AttachSourceMatcher withNoLocal() {
687+
if (jmsFilterMatcher == null) {
688+
jmsFilterMatcher = new MapContentsMatcher<Symbol, Object>();
689+
}
690+
680691
final JmsNoLocalByIdDescribedType filterType = new JmsNoLocalByIdDescribedType();
681-
final Map<String, Object> filtersMap = new HashMap<>();
682692

683-
filtersMap.put(JmsNoLocalByIdDescribedType.JMS_NO_LOCAL_KEY, filterType);
693+
jmsFilterMatcher.addExpectedEntry(Symbol.valueOf(JmsNoLocalByIdDescribedType.JMS_NO_LOCAL_KEY), filterType);
684694

685-
super.withFilter(filtersMap);
695+
super.withFilter(jmsFilterMatcher);
686696
return this;
687697
}
688698

protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/JmsNoLocalByIdDescribedType.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.qpid.protonj2.test.driver.matchers;
1919

20+
import org.apache.qpid.protonj2.test.driver.codec.primitives.Symbol;
2021
import org.apache.qpid.protonj2.test.driver.codec.primitives.UnknownDescribedType;
2122
import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong;
2223

@@ -30,6 +31,11 @@ public class JmsNoLocalByIdDescribedType extends UnknownDescribedType {
3031
*/
3132
public static final String JMS_NO_LOCAL_KEY = "no-local";
3233

34+
/**
35+
* Symbolic key name used when add the selector type to the filters map.
36+
*/
37+
public static final Symbol JMS_NO_LOCAL_SYMBOL_KEY = Symbol.valueOf(JMS_NO_LOCAL_KEY);
38+
3339
public static final UnsignedLong JMS_NO_LOCAL_ULONG_DESCRIPTOR = UnsignedLong.valueOf(0x0000468C00000003L);
3440

3541
public JmsNoLocalByIdDescribedType() {

protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/JmsSelectorByIdDescribedType.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.qpid.protonj2.test.driver.matchers;
1919

20+
import org.apache.qpid.protonj2.test.driver.codec.primitives.Symbol;
2021
import org.apache.qpid.protonj2.test.driver.codec.primitives.UnknownDescribedType;
2122
import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong;
2223

@@ -30,6 +31,11 @@ public class JmsSelectorByIdDescribedType extends UnknownDescribedType {
3031
*/
3132
public static final String JMS_SELECTOR_KEY = "jms-selector";
3233

34+
/**
35+
* Symbolic key name used when add the selector type to the filters map.
36+
*/
37+
public static final Symbol JMS_SELECTOR_SYMBOL_KEY = Symbol.valueOf(JMS_SELECTOR_KEY);
38+
3339
public static final UnsignedLong JMS_SELECTOR_ULONG_DESCRIPTOR = UnsignedLong.valueOf(0x0000468C00000004L);
3440

3541
public JmsSelectorByIdDescribedType(String selector) {

0 commit comments

Comments
 (0)