@@ -4043,14 +4043,18 @@ public void testReceiveAsyncCompletedWhenClosing() throws Exception {
40434043 // 1) Test receiveAsync is interrupted
40444044 CountDownLatch countDownLatch = new CountDownLatch (1 );
40454045 new Thread (() -> {
4046+ CountDownLatch subCountDownLatch = new CountDownLatch (1 );
40464047 try {
40474048 new Thread (() -> {
40484049 try {
4050+ subCountDownLatch .await ();
40494051 consumer .close ();
4050- } catch (PulsarClientException ignore ) {
4052+ } catch (PulsarClientException | InterruptedException ignore ) {
40514053 }
40524054 }).start ();
4053- consumer .receiveAsync ().get ();
4055+ CompletableFuture <Message <String >> futhre = consumer .receiveAsync ();
4056+ subCountDownLatch .countDown ();
4057+ futhre .get ();
40544058 Assert .fail ("should be interrupted" );
40554059 } catch (Exception e ) {
40564060 Assert .assertTrue (e .getMessage ().contains (errorMsg ));
@@ -4067,13 +4071,17 @@ public void testReceiveAsyncCompletedWhenClosing() throws Exception {
40674071 .batchReceivePolicy (batchReceivePolicy ).subscribe ();
40684072 new Thread (() -> {
40694073 try {
4074+ CountDownLatch subCountDownLatch = new CountDownLatch (1 );
40704075 new Thread (() -> {
40714076 try {
4077+ subCountDownLatch .await ();
40724078 consumer2 .close ();
4073- } catch (PulsarClientException ignore ) {
4079+ } catch (PulsarClientException | InterruptedException ignore ) {
40744080 }
40754081 }).start ();
4076- consumer2 .batchReceiveAsync ().get ();
4082+ CompletableFuture <Messages <String >> future = consumer2 .batchReceiveAsync ();
4083+ subCountDownLatch .countDown ();
4084+ future .get ();
40774085 Assert .fail ("should be interrupted" );
40784086 } catch (Exception e ) {
40794087 Assert .assertTrue (e .getMessage ().contains (errorMsg ));
@@ -4090,13 +4098,18 @@ public void testReceiveAsyncCompletedWhenClosing() throws Exception {
40904098 .batchReceivePolicy (batchReceivePolicy ).subscribe ();
40914099 new Thread (() -> {
40924100 try {
4101+ CountDownLatch subCountDownLatch = new CountDownLatch (1 );
40934102 new Thread (() -> {
40944103 try {
4104+ subCountDownLatch .await ();
40954105 partitionedTopicConsumer .close ();
4096- } catch (PulsarClientException ignore ) {
4106+ } catch (PulsarClientException | InterruptedException ignore ) {
40974107 }
40984108 }).start ();
4099- partitionedTopicConsumer .batchReceiveAsync ().get ();
4109+ CompletableFuture <Messages <String >> future =
4110+ partitionedTopicConsumer .batchReceiveAsync ();
4111+ subCountDownLatch .countDown ();
4112+ future .get ();
41004113 Assert .fail ("should be interrupted" );
41014114 } catch (Exception e ) {
41024115 Assert .assertTrue (e .getMessage ().contains (errorMsg ));
0 commit comments