Skip to content

Commit

Permalink
fix NeverTrigger
Browse files Browse the repository at this point in the history
  • Loading branch information
reuvenlax committed Feb 13, 2025
1 parent dcf4839 commit f6c360e
Show file tree
Hide file tree
Showing 30 changed files with 106 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ private static class WatermarkCallback {
public static <W extends BoundedWindow> WatermarkCallback onGuaranteedFiring(
BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) {
@SuppressWarnings("unchecked")
Instant firingAfter = strategy.getTrigger().getWatermarkThatGuaranteesFiring((W) window);
Instant firingAfter =
strategy
.getTrigger()
.getWatermarkThatGuaranteesFiring((W) window, strategy.getAllowedLateness());
return new WatermarkCallback(firingAfter, callback);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,10 @@ private <SideWindowT extends BoundedWindow> Windmill.GlobalDataRequest buildGlob
.build())
.setExistenceWatermarkDeadline(
WindmillTimeUtils.harnessToWindmillTimestamp(
sideWindowStrategy.getTrigger().getWatermarkThatGuaranteesFiring(sideInputWindow)))
sideWindowStrategy
.getTrigger()
.getWatermarkThatGuaranteesFiring(
sideInputWindow, sideWindowStrategy.getAllowedLateness())))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,10 @@ private <T, SideWindowT extends BoundedWindow> GlobalData fetchGlobalDataFromWin
.setStateFamily(stateFamily)
.setExistenceWatermarkDeadline(
WindmillTimeUtils.harnessToWindmillTimestamp(
sideWindowStrategy.getTrigger().getWatermarkThatGuaranteesFiring(sideWindow)))
sideWindowStrategy
.getTrigger()
.getWatermarkThatGuaranteesFiring(
sideWindow, sideWindowStrategy.getAllowedLateness())))
.build();

try (Closeable ignored = scopedReadStateSupplier.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void add(Iterable<String> values) {
}

@Override
public void add(String ... values) {
public void add(String... values) {
add(Arrays.asList(values));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,11 @@ public BoundedTrie getBoundedTrie(MetricName metricName) {

@SuppressWarnings("FutureReturnValueIgnored")
public void flush(boolean async) {
if (counters.isEmpty() && distributions.isEmpty() && gauges.isEmpty() &&
stringSets.isEmpty() && boundedTries.isEmpty()) {
if (counters.isEmpty()
&& distributions.isEmpty()
&& gauges.isEmpty()
&& stringSets.isEmpty()
&& boundedTries.isEmpty()) {
return;
}

Expand All @@ -102,7 +105,8 @@ public void flush(boolean async) {
extractUpdates(this.stringSets);
ImmutableList<MetricUpdates.MetricUpdate<BoundedTrieData>> boundedTries =
extractUpdates(this.boundedTries);
MetricUpdates updates = new MetricUpdatesImpl(counters, distributions, gauges, stringSets, boundedTries);
MetricUpdates updates =
new MetricUpdatesImpl(counters, distributions, gauges, stringSets, boundedTries);

if (async) {
accumulator.setAsync(metricsKey, updates);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
import org.joda.time.Duration;
import org.joda.time.Instant;

/** A composite {@link Trigger} that fires when all of its sub-triggers are ready. */
Expand All @@ -46,11 +47,11 @@ public static AfterAll of(List<Trigger> triggers) {

@Internal
@Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window, Duration allowedLateness) {
// This trigger will fire after the latest of its sub-triggers.
Instant deadline = BoundedWindow.TIMESTAMP_MIN_VALUE;
for (Trigger subTrigger : subTriggers) {
Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window);
Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window, allowedLateness);
if (deadline.isBefore(subDeadline)) {
deadline = subDeadline;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Arrays;
import java.util.List;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
import org.joda.time.Duration;
import org.joda.time.Instant;

/**
Expand Down Expand Up @@ -59,10 +60,10 @@ public static AfterEach inOrder(List<Trigger> triggers) {
}

@Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window, Duration allowedLateness) {
// This trigger will fire at least once when the first trigger in the sequence
// fires at least once.
return subTriggers.get(0).getWatermarkThatGuaranteesFiring(window);
return subTriggers.get(0).getWatermarkThatGuaranteesFiring(window, allowedLateness);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
import org.joda.time.Duration;
import org.joda.time.Instant;

/**
Expand All @@ -46,11 +47,11 @@ public static AfterFirst of(List<Trigger> triggers) {
}

@Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window, Duration allowedLateness) {
// This trigger will fire after the earliest of its sub-triggers.
Instant deadline = BoundedWindow.TIMESTAMP_MAX_VALUE;
for (Trigger subTrigger : subTriggers) {
Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window);
Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window, allowedLateness);
if (deadline.isAfter(subDeadline)) {
deadline = subDeadline;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Objects;
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;

/**
Expand Down Expand Up @@ -55,7 +56,8 @@ public boolean isCompatible(Trigger other) {
}

@Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window, Duration allowedLateness) {
// TODO:Is this correct????
return BoundedWindow.TIMESTAMP_MAX_VALUE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public boolean isCompatible(Trigger other) {
}

@Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window, Duration allowedLateness) {
// TODO:is this correct?????
return BoundedWindow.TIMESTAMP_MAX_VALUE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;

/**
Expand All @@ -45,7 +46,8 @@ private AfterSynchronizedProcessingTime() {
}

@Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window, Duration allowedLateness) {
// TODO: is this correct?
return BoundedWindow.TIMESTAMP_MAX_VALUE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;

/**
Expand Down Expand Up @@ -115,7 +116,8 @@ protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
}

@Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
public Instant getWatermarkThatGuaranteesFiring(
BoundedWindow window, Duration allowedLateness) {
// Even without an early or late trigger, we'll still produce a firing at the watermark.
return window.maxTimestamp();
}
Expand Down Expand Up @@ -168,7 +170,8 @@ public AfterWatermarkEarlyAndLate withLateFirings(OnceTrigger lateFirings) {
}

@Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
public Instant getWatermarkThatGuaranteesFiring(
BoundedWindow window, Duration allowedLateness) {
return window.maxTimestamp();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms.windowing;

import java.util.List;
import org.joda.time.Duration;
import org.joda.time.Instant;

/**
Expand All @@ -38,7 +39,7 @@ public static DefaultTrigger of() {
}

@Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window, Duration allowedLateness) {
return window.maxTimestamp();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
import org.joda.time.Duration;
import org.joda.time.Instant;

/**
Expand Down Expand Up @@ -53,8 +54,16 @@ protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
}

@Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
public Instant getWatermarkThatGuaranteesFiring(
BoundedWindow window, Duration allowedLateness) {
if (GlobalWindow.INSTANCE
.maxTimestamp()
.minus(allowedLateness)
.isBefore(window.maxTimestamp())) {
return GlobalWindow.INSTANCE.maxTimestamp();
} else {
return window.maxTimestamp().plus(allowedLateness);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Arrays;
import java.util.List;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.joda.time.Instant;

/**
Expand Down Expand Up @@ -52,10 +53,12 @@ public OnceTrigger getUntilTrigger() {
}

@Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window, Duration allowedLateness) {
// This trigger fires once either the trigger or the until trigger fires.
Instant actualDeadline = subTriggers.get(ACTUAL).getWatermarkThatGuaranteesFiring(window);
Instant untilDeadline = subTriggers.get(UNTIL).getWatermarkThatGuaranteesFiring(window);
Instant actualDeadline =
subTriggers.get(ACTUAL).getWatermarkThatGuaranteesFiring(window, allowedLateness);
Instant untilDeadline =
subTriggers.get(UNTIL).getWatermarkThatGuaranteesFiring(window, allowedLateness);
return actualDeadline.isBefore(untilDeadline) ? actualDeadline : untilDeadline;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Arrays;
import java.util.List;
import org.joda.time.Duration;
import org.joda.time.Instant;

/**
Expand Down Expand Up @@ -63,9 +64,9 @@ public Trigger getRepeatedTrigger() {
}

@Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window, Duration allowedLateness) {
// This trigger fires once the repeated trigger fires.
return subTriggers.get(REPEATED).getWatermarkThatGuaranteesFiring(window);
return subTriggers.get(REPEATED).getWatermarkThatGuaranteesFiring(window, allowedLateness);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.joda.time.Duration;
import org.joda.time.Instant;

/**
Expand All @@ -43,7 +44,7 @@ protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
}

@Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window, Duration allowedLateness) {
throw new UnsupportedOperationException(
"ReshuffleTrigger should not be used outside of Reshuffle");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;

/**
Expand Down Expand Up @@ -132,7 +133,8 @@ public Trigger getContinuationTrigger() {
* side-input window, which causes the default value to be used instead.
*/
@Internal
public abstract Instant getWatermarkThatGuaranteesFiring(BoundedWindow window);
public abstract Instant getWatermarkThatGuaranteesFiring(
BoundedWindow window, Duration allowedLateness);

/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals;

import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -36,7 +37,7 @@ public void testFireDeadline() throws Exception {
assertEquals(
BoundedWindow.TIMESTAMP_MAX_VALUE,
AfterAll.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(1))
.getWatermarkThatGuaranteesFiring(window));
.getWatermarkThatGuaranteesFiring(window, Duration.ZERO));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals;

import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -36,12 +37,12 @@ public void testFireDeadline() throws Exception {
assertEquals(
new Instant(9),
AfterEach.inOrder(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(4))
.getWatermarkThatGuaranteesFiring(window));
.getWatermarkThatGuaranteesFiring(window, Duration.ZERO));

assertEquals(
BoundedWindow.TIMESTAMP_MAX_VALUE,
AfterEach.inOrder(AfterPane.elementCountAtLeast(2), AfterWatermark.pastEndOfWindow())
.getWatermarkThatGuaranteesFiring(window));
.getWatermarkThatGuaranteesFiring(window, Duration.ZERO));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals;

import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -36,11 +37,11 @@ public void testFireDeadline() throws Exception {
assertEquals(
new Instant(9),
AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(4))
.getWatermarkThatGuaranteesFiring(window));
.getWatermarkThatGuaranteesFiring(window, Duration.ZERO));
assertEquals(
BoundedWindow.TIMESTAMP_MAX_VALUE,
AfterFirst.of(AfterPane.elementCountAtLeast(2), AfterPane.elementCountAtLeast(1))
.getWatermarkThatGuaranteesFiring(window));
.getWatermarkThatGuaranteesFiring(window, Duration.ZERO));
}

@Test
Expand Down
Loading

0 comments on commit f6c360e

Please sign in to comment.