Skip to content

Commit 01d82d1

Browse files
committed
Concurrency fixes
1 parent d64e659 commit 01d82d1

File tree

3 files changed

+44
-24
lines changed

3 files changed

+44
-24
lines changed

market-aggregator-core/src/main/java/net/scaliby/marketaggregator/core/common/ConcurrentExecutionsCounter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
public class ConcurrentExecutionsCounter {
44

55
private final Object lock = new Object();
6-
private volatile int count = 0;
7-
private volatile int maxCount = 0;
6+
private int count = 0;
7+
private int maxCount = 0;
88

99
public int getMaxCount() {
1010
synchronized (lock) {

market-aggregator-core/src/main/java/net/scaliby/marketaggregator/core/concurrency/ChannelScheduler.java

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,41 +5,28 @@
55
import lombok.Setter;
66
import lombok.SneakyThrows;
77

8-
import java.util.HashMap;
98
import java.util.Map;
109
import java.util.concurrent.ArrayBlockingQueue;
1110
import java.util.concurrent.BlockingQueue;
11+
import java.util.concurrent.ConcurrentHashMap;
1212
import java.util.concurrent.Executor;
13+
import java.util.concurrent.atomic.AtomicBoolean;
1314

1415
@RequiredArgsConstructor
1516
public class ChannelScheduler<K> {
1617

1718
private final Executor executor;
1819

19-
private final Map<K, ChannelRunner> runners = new HashMap<>();
20+
private final Map<K, ChannelRunner> runners = new ConcurrentHashMap<>();
2021

2122
@Setter
2223
private int queueCapacity = 100_000;
2324

24-
@SneakyThrows
2525
public void push(ChannelRunnable<K> job) {
26-
synchronized (runners) {
27-
boolean alreadyRunning = runners.containsKey(job.getChannel());
28-
ChannelRunner channelRunner = runners.computeIfAbsent(job.getChannel(), ChannelRunner::new);
29-
channelRunner.push(job);
30-
if (!alreadyRunning) {
31-
executor.execute(channelRunner);
32-
}
33-
}
34-
}
35-
36-
private void finalizeQueueRunner(ChannelRunner runner) {
37-
synchronized (runners) {
38-
if (runner.hasData()) {
39-
executor.execute(runner);
40-
} else {
41-
runners.remove(runner.getKey());
42-
}
26+
ChannelRunner channelRunner = runners.computeIfAbsent(job.getChannel(), ChannelRunner::new);
27+
channelRunner.push(job);
28+
if (!channelRunner.isRunning()) {
29+
executor.execute(channelRunner);
4330
}
4431
}
4532

@@ -48,18 +35,30 @@ private final class ChannelRunner implements Runnable {
4835

4936
@Getter
5037
private final K key;
38+
private final AtomicBoolean running = new AtomicBoolean(false);
5139
private final BlockingQueue<Runnable> workload = new ArrayBlockingQueue<>(queueCapacity);
5240

5341
@Override
5442
public void run() {
43+
boolean oldValue = running.getAndSet(true);
44+
if (oldValue) {
45+
return;
46+
}
5547
while (hasData()) {
5648
Runnable elem = workload.poll();
5749
if (elem == null) {
5850
break;
5951
}
6052
elem.run();
6153
}
62-
finalizeQueueRunner(this);
54+
running.set(false);
55+
if (hasData()) {
56+
run();
57+
}
58+
}
59+
60+
boolean isRunning() {
61+
return running.get();
6362
}
6463

6564
@SneakyThrows

market-aggregator-core/src/test/java/net/scaliby/marketaggregator/core/concurrency/ChannelSchedulerTest.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,34 @@ public void runningTwoJobsForDifferentKeys_executesThemInParallel() throws Inter
5555
assertEquals(2, counter.getMaxCount());
5656
}
5757

58+
@Test(timeout = 2000L)
59+
public void runningJobForTheSameKeyTwice_executesItSuccessfullyWithoutDeadlock_forQueueCapacityEqualToOne() throws InterruptedException {
60+
// given
61+
ExecutorService executorService = Executors.newFixedThreadPool(1);
62+
ChannelScheduler<String> channelScheduler = new ChannelScheduler<>(executorService);
63+
channelScheduler.setQueueCapacity(1);
64+
ConcurrentExecutionsCounter counter = new ConcurrentExecutionsCounter();
65+
TestingChannelRunnable firstJob = new TestingChannelRunnable("BTC_USD", counter);
66+
TestingChannelRunnable secondJob = new TestingChannelRunnable("ETH_USD", counter);
67+
68+
// when
69+
channelScheduler.push(firstJob);
70+
channelScheduler.push(secondJob);
71+
channelScheduler.push(secondJob);
72+
executorService.awaitTermination(1, TimeUnit.SECONDS);
73+
74+
// then
75+
assertTrue(firstJob.isExecuted());
76+
assertTrue(secondJob.isExecuted());
77+
}
78+
5879
@RequiredArgsConstructor
5980
private static class TestingChannelRunnable implements ChannelRunnable<String> {
6081
@Getter
6182
private final String channel;
6283
private final ConcurrentExecutionsCounter concurrentExecutionsCounter;
6384
@Getter
64-
private boolean executed = false;
85+
private volatile boolean executed = false;
6586

6687
@Override
6788
@SneakyThrows

0 commit comments

Comments
 (0)