Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Busy-Waiting Loops to Improve Efficiency #3123

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 43 additions & 71 deletions commander/src/main/java/com/iluwatar/commander/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,84 +28,56 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.*;
import java.util.function.Predicate;

/**
* Retry pattern.
*
* @param <T> is the type of object passed into HandleErrorIssue as a parameter.
*/

public class Retry<T> {

/**
* Operation Interface will define method to be implemented.
*/

public interface Operation {
void operation(List<Exception> list) throws Exception;
}

/**
* HandleErrorIssue defines how to handle errors.
*
* @param <T> is the type of object to be passed into the method as parameter.
*/

public interface HandleErrorIssue<T> {
void handleIssue(T obj, Exception e);
}
public interface Operation {
void operation(List<Exception> list) throws Exception;
}

private static final SecureRandom RANDOM = new SecureRandom();
public interface HandleErrorIssue<T> {
void handleIssue(T obj, Exception e);
}

private final Operation op;
private final HandleErrorIssue<T> handleError;
private final int maxAttempts;
private final long maxDelay;
private final AtomicInteger attempts;
private final Predicate<Exception> test;
private final List<Exception> errors;
private static final SecureRandom RANDOM = new SecureRandom();

Retry(Operation op, HandleErrorIssue<T> handleError, int maxAttempts,
long maxDelay, Predicate<Exception>... ignoreTests) {
this.op = op;
this.handleError = handleError;
this.maxAttempts = maxAttempts;
this.maxDelay = maxDelay;
this.attempts = new AtomicInteger();
this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false);
this.errors = new ArrayList<>();
}
private final Operation op;
private final HandleErrorIssue<T> handleError;
private final int maxAttempts;
private final long maxDelay;
private final AtomicInteger attempts;
private final Predicate<Exception> test;
private final List<Exception> errors;
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

/**
* Performing the operation with retries.
*
* @param list is the exception list
* @param obj is the parameter to be passed into handleIsuue method
*/
Retry(Operation op, HandleErrorIssue<T> handleError, int maxAttempts, long maxDelay, Predicate<Exception>... ignoreTests) {
this.op = op;
this.handleError = handleError;
this.maxAttempts = maxAttempts;
this.maxDelay = maxDelay;
this.attempts = new AtomicInteger();
this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false);
this.errors = new ArrayList<>();
}

public void perform(List<Exception> list, T obj) {
do {
try {
op.operation(list);
return;
} catch (Exception e) {
this.errors.add(e);
if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) {
this.handleError.handleIssue(obj, e);
return; //return here... don't go further
}
try {
long testDelay =
(long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000);
long delay = Math.min(testDelay, this.maxDelay);
Thread.sleep(delay);
} catch (InterruptedException f) {
//ignore
}
}
} while (true);
}
public void perform(List<Exception> list, T obj) {
do {
try {
op.operation(list);
return;
} catch (Exception e) {
this.errors.add(e);
if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) {
this.handleError.handleIssue(obj, e);
return;
}

}
long testDelay = (long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000);
long delay = Math.min(testDelay, this.maxDelay);
executorService.schedule(() -> {}, delay, TimeUnit.MILLISECONDS); // Schedule retry without blocking
}
} while (true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class LogAggregator {
private final ConcurrentLinkedQueue<LogEntry> buffer = new ConcurrentLinkedQueue<>();
private final LogLevel minLogLevel;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final Object bufferWait = new Object();
private final AtomicInteger logCount = new AtomicInteger(0);

/**
Expand Down Expand Up @@ -77,6 +78,7 @@ public void collectLog(LogEntry logEntry) {
}

buffer.offer(logEntry);
bufferWake();

if (logCount.incrementAndGet() >= BUFFER_THRESHOLD) {
flushBuffer();
Expand Down Expand Up @@ -109,6 +111,11 @@ private void startBufferFlusher() {
executorService.execute(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
synchronized (bufferWait) {
if (buffer.isEmpty()) {
bufferWait.wait();
}
}
Thread.sleep(5000); // Flush every 5 seconds.
flushBuffer();
} catch (InterruptedException e) {
Expand All @@ -117,4 +124,13 @@ private void startBufferFlusher() {
}
});
}

/**
* Wakes up buffer.
*/
public void bufferWake() {
synchronized (bufferWait) {
bufferWait.notifyAll();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
public class MessageQueue {

private final BlockingQueue<Message> blkQueue;
public final Object serviceExecutorWait = new Object();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In MessageQueue, ensure that serviceExecutorWait is properly synchronized to prevent race conditions when multiple threads access it concurrently.


// Default constructor when called creates Blocking Queue object.
public MessageQueue() {
Expand All @@ -50,6 +51,9 @@ public void submitMsg(Message msg) {
try {
if (null != msg) {
blkQueue.add(msg);
synchronized (serviceExecutorWait) {
serviceExecutorWait.notifyAll();
}
}
} catch (Exception e) {
LOGGER.error(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@
*/
@Slf4j
public class ServiceExecutor implements Runnable {

private final MessageQueue msgQueue;

public ServiceExecutor(MessageQueue msgQueue) {
this.msgQueue = msgQueue;
}
Expand All @@ -51,9 +49,10 @@ public void run() {
LOGGER.info(msg + " is served.");
} else {
LOGGER.info("Service Executor: Waiting for Messages to serve .. ");
synchronized (msgQueue.serviceExecutorWait) {
msgQueue.serviceExecutorWait.wait();
}
Comment on lines +52 to +54

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ServiceExecutor, the busy-waiting loop is replaced with msgQueue.serviceExecutorWait.wait(). Ensure that the msgQueue.serviceExecutorWait object is properly synchronized to prevent race conditions.

Comment on lines +52 to +54

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wait() call in ServiceExecutor should ideally be within a loop to handle spurious wakeups. This ensures the thread waits until a message is available.

}

Thread.sleep(1000);
}
} catch (Exception e) {
LOGGER.error(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,8 @@ public void run() {
try {
while (count > 0) {
var statusMsg = "Message-" + count + " submitted by " + Thread.currentThread().getName();
this.submit(new Message(statusMsg));

LOGGER.info(statusMsg);
this.submit(new Message(statusMsg));

// reduce the message count.
count--;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,19 @@
*/
package com.iluwatar.queue.load.leveling;

import static java.util.concurrent.CompletableFuture.anyOf;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;

/**
* Test case for submitting Message to Blocking Queue by TaskGenerator and retrieve the message by
* ServiceExecutor.
*/
@Slf4j
class TaskGenSrvExeTest {

@Test
Expand All @@ -53,4 +58,37 @@ void taskGeneratorTest() {
assertNotNull(srvExeThr);
}

/**
* Tests that service executor waits at start since no message is sent to execute upon.
* @throws InterruptedException
*/
@Test
void serviceExecutorStartStateTest() throws InterruptedException {
var msgQueue = new MessageQueue();
var srvRunnable = new ServiceExecutor(msgQueue);
var srvExeThr = new Thread(srvRunnable);
srvExeThr.start();
Thread.sleep(200); // sleep a little until service executor thread waits
LOGGER.info("Current Service Executor State: " + srvExeThr.getState());
assertEquals(srvExeThr.getState(), Thread.State.WAITING);

}

@Test
void serviceExecutorWakeStateTest() throws InterruptedException {
var msgQueue = new MessageQueue();
var srvRunnable = new ServiceExecutor(msgQueue);
var srvExeThr = new Thread(srvRunnable);
srvExeThr.start();
Thread.sleep(200); // sleep a little until service executor thread waits
synchronized (msgQueue.serviceExecutorWait){
msgQueue.serviceExecutorWait.notifyAll();
}
var srvExeState = srvExeThr.getState();
LOGGER.info("Current Service Executor State: " + srvExeState);
// assert that state changes from waiting
assertTrue(srvExeState != Thread.State.WAITING);

}

}
Loading
Loading