Skip to content

Commit

Permalink
Introduce a queue in InputEventListener to prevent input state races
Browse files Browse the repository at this point in the history
  • Loading branch information
boosty committed Feb 6, 2025
1 parent 3ff77ba commit 98ef6d1
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.graylog2.cluster.leader.LeaderChangedEvent;
import org.graylog2.cluster.leader.LeaderElectionService;
import org.graylog2.database.NotFoundException;
Expand All @@ -38,6 +39,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class InputEventListener {
private static final Logger LOG = LoggerFactory.getLogger(InputEventListener.class);
private final InputLauncher inputLauncher;
Expand All @@ -47,6 +53,10 @@ public class InputEventListener {
private final LeaderElectionService leaderElectionService;
private final PersistedInputs persistedInputs;
private final ServerStatus serverStatus;
private final ScheduledExecutorService daemonScheduler;
private final LinkedBlockingQueue<QueuedEvent> eventQueue = new LinkedBlockingQueue<>(100);

private record QueuedEvent(Object receivedEvent) {}

@Inject
public InputEventListener(EventBus eventBus,
Expand All @@ -56,21 +66,61 @@ public InputEventListener(EventBus eventBus,
NodeId nodeId,
LeaderElectionService leaderElectionService,
PersistedInputs persistedInputs,
ServerStatus serverStatus) {
ServerStatus serverStatus,
@Named("daemonScheduler") ScheduledExecutorService daemonScheduler) {
this.inputLauncher = inputLauncher;
this.inputRegistry = inputRegistry;
this.inputService = inputService;
this.nodeId = nodeId;
this.leaderElectionService = leaderElectionService;
this.persistedInputs = persistedInputs;
this.serverStatus = serverStatus;
this.daemonScheduler = daemonScheduler;
initializeEventQueueTask();
eventBus.register(this);
}

// TODO: create a queue per input?
// TODO: make processing interval configurable?
// TODO: replace daemonScheduler with a separate thread, and use eventQueue.take()?
// TODO: replace instanceof checks with dedicated type?
// TODO: add graceful shutdown behavior
// TODO: fix tests
private void initializeEventQueueTask() {
final var interval = Duration.ofMillis(100);
daemonScheduler.scheduleAtFixedRate(() -> {
try {
final var queuedEvent = eventQueue.poll();
if (queuedEvent != null) {
LOG.debug("Processing queued event: {}", queuedEvent.receivedEvent);
if (queuedEvent.receivedEvent instanceof InputCreated) {
doInputCreated(((InputCreated) queuedEvent.receivedEvent).id());
} else if (queuedEvent.receivedEvent instanceof InputDeleted) {
doInputDeleted(((InputDeleted) queuedEvent.receivedEvent).id());
} else if (queuedEvent.receivedEvent instanceof InputSetup) {
doInputSetup(((InputSetup) queuedEvent.receivedEvent).id());
} else if (queuedEvent.receivedEvent instanceof InputUpdated) {
doInputUpdated(((InputUpdated) queuedEvent.receivedEvent).id());
} else if (queuedEvent.receivedEvent instanceof LeaderChangedEvent) {
doLeaderChanged();
} else {
throw new IllegalArgumentException("Unexpected queued event: " + queuedEvent.receivedEvent);
}
}
} catch (Exception e) {
LOG.error("Caught exception while trying to execute queued event", e);
}
}, 0, interval.toMillis(), TimeUnit.MILLISECONDS);
}

@Subscribe
public void inputCreated(InputCreated inputCreatedEvent) {
final String inputId = inputCreatedEvent.id();
LOG.debug("Input created: {}", inputId);
eventQueue.add(new QueuedEvent(inputCreatedEvent));
}

private void doInputCreated(String inputId) {
final Input input;
try {
input = inputService.find(inputId);
Expand All @@ -93,6 +143,10 @@ public void inputCreated(InputCreated inputCreatedEvent) {
public void inputUpdated(InputUpdated inputUpdatedEvent) {
final String inputId = inputUpdatedEvent.id();
LOG.debug("Input updated: {}", inputId);
eventQueue.add(new QueuedEvent(inputUpdatedEvent));
}

private void doInputUpdated(final String inputId) {
final Input input;
try {
input = inputService.find(inputId);
Expand Down Expand Up @@ -141,7 +195,11 @@ private void startMessageInput(MessageInput messageInput) {
@Subscribe
public void inputDeleted(InputDeleted inputDeletedEvent) {
LOG.debug("Input deleted: {}", inputDeletedEvent.id());
final IOState<MessageInput> inputState = inputRegistry.getInputState(inputDeletedEvent.id());
eventQueue.add(new QueuedEvent(inputDeletedEvent));
}

private void doInputDeleted(String inputId) {
final IOState<MessageInput> inputState = inputRegistry.getInputState(inputId);
if (inputState != null) {
inputRegistry.remove(inputState);
}
Expand All @@ -150,11 +208,14 @@ public void inputDeleted(InputDeleted inputDeletedEvent) {
@Subscribe
public void inputSetup(InputSetup inputSetupEvent) {
LOG.info("Input setup: {}", inputSetupEvent.id());
final IOState<MessageInput> inputState = inputRegistry.getInputState(inputSetupEvent.id());
eventQueue.add(new QueuedEvent(inputSetupEvent));
}

private void doInputSetup(String inputId) {
final IOState<MessageInput> inputState = inputRegistry.getInputState(inputId);
if (inputState != null) {
inputRegistry.setup(inputState);
} else {
final String inputId = inputSetupEvent.id();
LOG.debug("Input created for setup: {}", inputId);
final Input input;
try {
Expand All @@ -171,11 +232,15 @@ public void inputSetup(InputSetup inputSetupEvent) {
}

@Subscribe
public void leaderChanged(LeaderChangedEvent event) {
public void leaderChanged(LeaderChangedEvent leaderChangedEvent) {
if (serverStatus.getLifecycle() == Lifecycle.STARTING) {
LOG.debug("Ignoring LeaderChangedEvent during server startup.");
return;
}
eventQueue.add(new QueuedEvent(leaderChangedEvent));
}

private void doLeaderChanged() {
if (leaderElectionService.isLeader()) {
for (MessageInput input : persistedInputs) {
final IOState<MessageInput> inputState = inputRegistry.getInputState(input.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

import java.util.concurrent.Executors;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -77,7 +79,7 @@ public class InputEventListenerTest {
@Before
public void setUp() throws Exception {
final EventBus eventBus = new EventBus(this.getClass().getSimpleName());
listener = new InputEventListener(eventBus, inputLauncher, inputRegistry, inputService, nodeId, leaderElectionService, persistedInputs, serverStatus);
listener = new InputEventListener(eventBus, inputLauncher, inputRegistry, inputService, nodeId, leaderElectionService, persistedInputs, serverStatus, Executors.newSingleThreadScheduledExecutor());
}

@Test
Expand Down

0 comments on commit 98ef6d1

Please sign in to comment.