Skip to content

Commit

Permalink
Acquire only one exclusive external worker job per process instance (f…
Browse files Browse the repository at this point in the history
  • Loading branch information
basclaessen committed Oct 29, 2022
1 parent e370e9f commit 68803d8
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
Expand Down Expand Up @@ -937,6 +938,53 @@ void testSimpleWithBoundaryErrorAndVariables() {
);
}

@Test
@Deployment
void testSimpleParallel() {
ProcessInstance processInstance = runtimeService.createProcessInstanceBuilder()
.processDefinitionKey("simpleParallelExternalWorker")
.start();

List<ExternalWorkerJob> externalWorkerJobs = managementService.createExternalWorkerJobQuery()
.list();
assertThat(externalWorkerJobs)
.extracting(ExternalWorkerJob::getElementId, ExternalWorkerJob::getJobHandlerConfiguration)
.containsExactlyInAnyOrder(
tuple("externalWorkerTask1", "simple"),
tuple("externalWorkerTask2", "simple")
);

List<AcquiredExternalWorkerJob> acquiredJobs = managementService.createExternalWorkerJobAcquireBuilder()
.topic("simple", Duration.ofMinutes(30))
.acquireAndLock(2, "testWorker");

//Both external worker tasks have the exclusive flag set
//so only one job can be acquired because they cannot be executed concurrently.
assertThat(acquiredJobs).hasSize(1);

AcquiredExternalWorkerJob acquiredJob1 = acquiredJobs.get(0);

managementService.createExternalWorkerCompletionBuilder(acquiredJob1.getId(), "testWorker")
.complete();

//Acquire the second external worker job
acquiredJobs = managementService.createExternalWorkerJobAcquireBuilder()
.topic("simple", Duration.ofMinutes(30))
.acquireAndLock(2, "testWorker");

assertThat(acquiredJobs).hasSize(1);

AcquiredExternalWorkerJob acquiredJob2 = acquiredJobs.get(0);

managementService.createExternalWorkerCompletionBuilder(acquiredJob2.getId(), "testWorker")
.complete();

assertThat(Arrays.asList(acquiredJob1.getElementId(), acquiredJob2.getElementId()))
.containsExactlyInAnyOrder("externalWorkerTask1", "externalWorkerTask2");

waitForJobExecutorToProcessAllJobs(5000, 300);
}

@Test
void testAcquireWithInvalidArguments() {
assertThatThrownBy(() -> managementService.createExternalWorkerJobAcquireBuilder().acquireAndLock(10, "someWorker"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<definitions id="definitions"
xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:flowable="http://flowable.org/bpmn"
targetNamespace="Examples"
>

<process id="simpleParallelExternalWorker">

<startEvent id="theStart"/>

<sequenceFlow sourceRef="theStart" targetRef="parallelGateway1"/>

<parallelGateway id="parallelGateway1"></parallelGateway>

<sequenceFlow sourceRef="parallelGateway1" targetRef="externalWorkerTask1"/>

<serviceTask id="externalWorkerTask1" flowable:type="external-worker" flowable:topic="simple"/>

<sequenceFlow sourceRef="parallelGateway1" targetRef="externalWorkerTask2"/>

<serviceTask id="externalWorkerTask2" flowable:type="external-worker" flowable:topic="simple"/>

<sequenceFlow sourceRef="externalWorkerTask1" targetRef="parallelGateway2"/>

<sequenceFlow sourceRef="externalWorkerTask2" targetRef="parallelGateway2"/>

<parallelGateway id="parallelGateway2"></parallelGateway>

<sequenceFlow sourceRef="parallelGateway2" targetRef="theEnd"/>

<endEvent id="theEnd"/>

</process>

</definitions>
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import java.util.ArrayList;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang3.StringUtils;
import org.flowable.common.engine.api.FlowableIllegalArgumentException;
Expand Down Expand Up @@ -73,22 +75,26 @@ public List<AcquiredExternalWorkerJob> execute(CommandContext commandContext) {

int lockTimeInMillis = (int) builder.getLockDuration().abs().toMillis();
List<AcquiredExternalWorkerJob> acquiredJobs = new ArrayList<>(jobs.size());
Set<String> lockedProcessInstances = new HashSet<>();

for (ExternalWorkerJobEntity job : jobs) {
lockJob(commandContext, job, lockTimeInMillis);
Map<String, Object> variables = null;
if (internalJobManager != null) {
VariableScope variableScope = internalJobManager.resolveVariableScope(job);
if (variableScope != null) {
variables = variableScope.getVariables();
if (!job.isExclusive() || (job.isExclusive() && !lockedProcessInstances.contains(job.getProcessInstanceId()))) {
lockJob(commandContext, job, lockTimeInMillis);
Map<String, Object> variables = null;
if (internalJobManager != null) {
VariableScope variableScope = internalJobManager.resolveVariableScope(job);
if (variableScope != null) {
variables = variableScope.getVariables();
}

if (job.isExclusive()) {
internalJobManager.lockJobScope(job);
lockedProcessInstances.add(job.getProcessInstanceId());
}
}

if (job.isExclusive()) {
internalJobManager.lockJobScope(job);
}
acquiredJobs.add(new AcquiredExternalWorkerJobImpl(job, variables));
}

acquiredJobs.add(new AcquiredExternalWorkerJobImpl(job, variables));
}

return acquiredJobs;
Expand Down

0 comments on commit 68803d8

Please sign in to comment.