Skip to content

fixing busy waiting in abstraction and eliminate busy-waiting #3302

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@
*/
package com.iluwatar.logaggregation;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -41,11 +45,16 @@
public class LogAggregator {

private static final int BUFFER_THRESHOLD = 3;
private static final int FLUSH_INTERVAL_SECONDS = 5;
private static final int SHUTDOWN_TIMEOUT_SECONDS = 10;

private final CentralLogStore centralLogStore;
private final ConcurrentLinkedQueue<LogEntry> buffer = new ConcurrentLinkedQueue<>();
private final BlockingQueue<LogEntry> buffer = new LinkedBlockingQueue<>();
private final LogLevel minLogLevel;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);
private final AtomicInteger logCount = new AtomicInteger(0);
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private volatile boolean running = true;

/**
* constructor of LogAggregator.
Expand All @@ -56,7 +65,20 @@ public class LogAggregator {
public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) {
this.centralLogStore = centralLogStore;
this.minLogLevel = minLogLevel;
startBufferFlusher();
startPeriodicFlusher();

// Add shutdown hook for graceful termination
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
try {
stop();
} catch (InterruptedException e) {
LOGGER.warn("Shutdown interrupted", e);
Thread.currentThread().interrupt();
}
}));
}

/**
Expand All @@ -65,6 +87,11 @@ public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) {
* @param logEntry The log entry to collect.
*/
public void collectLog(LogEntry logEntry) {
if (!running) {
LOGGER.warn("LogAggregator is shutting down. Skipping log entry.");
return;
}

if (logEntry.getLevel() == null || minLogLevel == null) {
LOGGER.warn("Log level or threshold level is null. Skipping.");
return;
Expand All @@ -75,10 +102,17 @@ public void collectLog(LogEntry logEntry) {
return;
}

buffer.offer(logEntry);
// BlockingQueue.offer() is non-blocking and thread-safe
boolean added = buffer.offer(logEntry);
if (!added) {
LOGGER.warn("Failed to add log entry to buffer - queue may be full");
return;
}

// Check if immediate flush is needed due to threshold
if (logCount.incrementAndGet() >= BUFFER_THRESHOLD) {
flushBuffer();
// Schedule immediate flush instead of blocking current thread
scheduledExecutor.execute(this::flushBuffer);
}
}

Expand All @@ -88,32 +122,119 @@ public void collectLog(LogEntry logEntry) {
* @throws InterruptedException If any thread has interrupted the current thread.
*/
public void stop() throws InterruptedException {
executorService.shutdownNow();
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.error("Log aggregator did not terminate.");
LOGGER.info("Stopping LogAggregator...");
running = false;

// Shutdown the scheduler gracefully
scheduledExecutor.shutdown();

try {
// Wait for scheduled tasks to complete
if (!scheduledExecutor.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
LOGGER.warn("Scheduler did not terminate gracefully, forcing shutdown");
scheduledExecutor.shutdownNow();

// Wait a bit more for tasks to respond to interruption
if (!scheduledExecutor.awaitTermination(2, TimeUnit.SECONDS)) {
LOGGER.error("Scheduler did not terminate after forced shutdown");
}
}
} finally {
// Final flush of any remaining logs
flushBuffer();
shutdownLatch.countDown();
LOGGER.info("LogAggregator stopped successfully");
}
flushBuffer();
}

/**
* Waits for the LogAggregator to complete shutdown. Useful for testing or controlled shutdown
* scenarios.
*
* @throws InterruptedException If any thread has interrupted the current thread.
*/
public void awaitShutdown() throws InterruptedException {
shutdownLatch.await();
}

private void flushBuffer() {
LogEntry logEntry;
while ((logEntry = buffer.poll()) != null) {
centralLogStore.storeLog(logEntry);
logCount.decrementAndGet();
if (!running && buffer.isEmpty()) {
return;
}

try {
List<LogEntry> batch = new ArrayList<>();
int drained = 0;

// Drain up to a reasonable batch size for efficiency
LogEntry logEntry;
while ((logEntry = buffer.poll()) != null && drained < 100) {
batch.add(logEntry);
drained++;
}

if (!batch.isEmpty()) {
LOGGER.debug("Flushing {} log entries to central store", batch.size());

// Process the batch
for (LogEntry entry : batch) {
centralLogStore.storeLog(entry);
logCount.decrementAndGet();
}

LOGGER.debug("Successfully flushed {} log entries", batch.size());
}
} catch (Exception e) {
LOGGER.error("Error occurred while flushing buffer", e);
}
}

private void startBufferFlusher() {
executorService.execute(
/**
* Starts the periodic buffer flusher using ScheduledExecutorService. This eliminates the
* busy-waiting loop with Thread.sleep().
*/
private void startPeriodicFlusher() {
scheduledExecutor.scheduleAtFixedRate(
() -> {
while (!Thread.currentThread().isInterrupted()) {
if (running) {
try {
Thread.sleep(5000); // Flush every 5 seconds.
flushBuffer();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
LOGGER.error("Error in periodic flush", e);
}
}
});
},
FLUSH_INTERVAL_SECONDS, // Initial delay
FLUSH_INTERVAL_SECONDS, // Period
TimeUnit.SECONDS);

LOGGER.info("Periodic log flusher started with interval of {} seconds", FLUSH_INTERVAL_SECONDS);
}

/**
* Gets the current number of buffered log entries. Useful for monitoring and testing.
*
* @return Current buffer size
*/
public int getBufferSize() {
return buffer.size();
}

/**
* Gets the current log count. Useful for monitoring and testing.
*
* @return Current log count
*/
public int getLogCount() {
return logCount.get();
}

/**
* Checks if the LogAggregator is currently running.
*
* @return true if running, false if stopped or stopping
*/
public boolean isRunning() {
return running;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,47 +24,166 @@
*/
package com.iluwatar.logaggregation;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.time.LocalDateTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

/**
* Unit tests for LogAggregator class.
*
* <p>Tests the event-driven log aggregation functionality including threshold-based flushing,
* scheduled periodic flushing, and graceful shutdown behavior.
*/
@ExtendWith(MockitoExtension.class)
class LogAggregatorTest {

@Mock private CentralLogStore centralLogStore;

private LogAggregator logAggregator;

@BeforeEach
void setUp() {
logAggregator = new LogAggregator(centralLogStore, LogLevel.INFO);
}

@AfterEach
void tearDown() throws InterruptedException {
if (logAggregator != null && logAggregator.isRunning()) {
logAggregator.stop();
logAggregator.awaitShutdown();
}
}

@Test
void whenThreeInfoLogsAreCollected_thenCentralLogStoreShouldStoreAllOfThem() {
void whenThreeInfoLogsAreCollected_thenCentralLogStoreShouldStoreAllOfThem()
throws InterruptedException {
logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Sample log message 1"));
logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Sample log message 2"));

assertEquals(2, logAggregator.getLogCount());
verifyNoInteractionsWithCentralLogStore();

logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Sample log message 3"));

Thread.sleep(1000);

verifyCentralLogStoreInvokedTimes(3);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The verifyCentralLogStoreInvokedTimes method is not defined in the provided code snippet. Please define this method or use the appropriate Mockito verification methods.

assertEquals(0, logAggregator.getLogCount());
Comment on lines 81 to +82

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The verifyCentralLogStoreInvokedTimes method is not defined in this test class. Use Mockito's verify(centralLogStore, times(3)).storeLog(any()); instead for verification.

}
Comment on lines 81 to 83

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The verifyCentralLogStoreInvokedTimes method is not defined in this code snippet. Please provide the definition or use Mockito's built-in verification methods like verify(centralLogStore, times(3)).storeLog(any());.


@Test
void whenDebugLogIsCollected_thenNoLogsShouldBeStored() {
void whenDebugLogIsCollected_thenNoLogsShouldBeStored() throws InterruptedException {
logAggregator.collectLog(createLogEntry(LogLevel.DEBUG, "Sample debug log message"));

assertEquals(0, logAggregator.getLogCount());
assertEquals(0, logAggregator.getBufferSize());

Thread.sleep(500);

verifyNoInteractionsWithCentralLogStore();
}

@Test
void whenTwoLogsCollected_thenBufferShouldContainThem() {
logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Message 1"));
logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Message 2"));

assertEquals(2, logAggregator.getLogCount());
assertEquals(2, logAggregator.getBufferSize());

verifyNoInteractionsWithCentralLogStore();
}

@Test
void whenScheduledFlushOccurs_thenBufferedLogsShouldBeStored() throws InterruptedException {
logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Scheduled flush test"));

assertEquals(1, logAggregator.getLogCount());
verifyNoInteractionsWithCentralLogStore();

Thread.sleep(6000);

verifyCentralLogStoreInvokedTimes(1);
assertEquals(0, logAggregator.getLogCount());
}

@Test
void whenLogAggregatorStopped_thenRemainingLogsShouldBeStored() throws InterruptedException {
logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Final message 1"));
logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Final message 2"));

assertEquals(2, logAggregator.getLogCount());
verifyNoInteractionsWithCentralLogStore();

logAggregator.stop();
logAggregator.awaitShutdown();

verifyCentralLogStoreInvokedTimes(2);
assertEquals(0, logAggregator.getLogCount());
assertFalse(logAggregator.isRunning());
}

@Test
void whenLogLevelBelowThreshold_thenLogShouldBeFiltered() {
logAggregator.collectLog(createLogEntry(LogLevel.DEBUG, "Debug message"));

assertEquals(0, logAggregator.getLogCount());
assertEquals(0, logAggregator.getBufferSize());
verifyNoInteractionsWithCentralLogStore();
}

@Test
void whenLogLevelAtOrAboveThreshold_thenLogShouldBeAccepted() {
logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Info message"));
logAggregator.collectLog(createLogEntry(LogLevel.ERROR, "Error message"));

assertEquals(2, logAggregator.getLogCount());
assertEquals(2, logAggregator.getBufferSize());
}

@Test
void whenNullLogLevelProvided_thenLogShouldBeSkipped() {
LogEntry nullLevelEntry =
new LogEntry("ServiceA", null, "Null level message", LocalDateTime.now());

logAggregator.collectLog(nullLevelEntry);

assertEquals(0, logAggregator.getLogCount());
verifyNoInteractionsWithCentralLogStore();
}

@Test
void whenLogAggregatorIsShutdown_thenNewLogsShouldBeRejected() throws InterruptedException {
logAggregator.stop();
logAggregator.awaitShutdown();

assertFalse(logAggregator.isRunning());

logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Post-shutdown message"));

assertEquals(0, logAggregator.getLogCount());
verifyNoInteractionsWithCentralLogStore();
}

@Test
void testBasicFunctionality() throws InterruptedException {
assertTrue(logAggregator.isRunning());

logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Basic test"));
assertEquals(1, logAggregator.getLogCount());
}

private static LogEntry createLogEntry(LogLevel logLevel, String message) {
return new LogEntry("ServiceA", logLevel, message, LocalDateTime.now());
}
Expand Down
Loading
Loading