Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix never trigger #33989

Merged
merged 5 commits into from
Feb 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import org.apache.beam.runners.core.LateDataUtils;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.WindowingStrategy;
Expand Down Expand Up @@ -154,8 +155,11 @@ public void fireForWatermark(AppliedPTransform<?, ?, ?> step, Instant watermark)
private static class WatermarkCallback {
public static <W extends BoundedWindow> WatermarkCallback onGuaranteedFiring(
BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) {
@SuppressWarnings("unchecked")
Instant firingAfter = strategy.getTrigger().getWatermarkThatGuaranteesFiring((W) window);
Instant firingAfter =
Ordering.natural()
.min(
LateDataUtils.garbageCollectionTime(window, strategy),
strategy.getTrigger().getWatermarkThatGuaranteesFiring((W) window));
return new WatermarkCallback(firingAfter, callback);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.LateDataUtils;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
import org.apache.beam.runners.core.StateTag;
Expand Down Expand Up @@ -54,6 +55,8 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Ordering;
import org.joda.time.Instant;

/** A class that handles streaming side inputs in a {@link DoFnRunner}. */
@SuppressWarnings({"keyfor", "nullness"}) // TODO(https://github.com/apache/beam/issues/20497)
Expand Down Expand Up @@ -312,15 +315,19 @@ private <SideWindowT extends BoundedWindow> Windmill.GlobalDataRequest buildGlob
throw new RuntimeException(e);
}

Instant firingAfter =
Ordering.natural()
.min(
LateDataUtils.garbageCollectionTime(sideInputWindow, sideWindowStrategy),
sideWindowStrategy.getTrigger().getWatermarkThatGuaranteesFiring(sideInputWindow));

return Windmill.GlobalDataRequest.newBuilder()
.setDataId(
Windmill.GlobalDataId.newBuilder()
.setTag(view.getTagInternal().getId())
.setVersion(windowStream.toByteString())
.build())
.setExistenceWatermarkDeadline(
WindmillTimeUtils.harnessToWindmillTimestamp(
sideWindowStrategy.getTrigger().getWatermarkThatGuaranteesFiring(sideInputWindow)))
.setExistenceWatermarkDeadline(WindmillTimeUtils.harnessToWindmillTimestamp(firingAfter))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void add(Iterable<String> values) {
}

@Override
public void add(String ... values) {
public void add(String... values) {
add(Arrays.asList(values));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,11 @@ public BoundedTrie getBoundedTrie(MetricName metricName) {

@SuppressWarnings("FutureReturnValueIgnored")
public void flush(boolean async) {
if (counters.isEmpty() && distributions.isEmpty() && gauges.isEmpty() &&
stringSets.isEmpty() && boundedTries.isEmpty()) {
if (counters.isEmpty()
&& distributions.isEmpty()
&& gauges.isEmpty()
&& stringSets.isEmpty()
&& boundedTries.isEmpty()) {
return;
}

Expand All @@ -102,7 +105,8 @@ public void flush(boolean async) {
extractUpdates(this.stringSets);
ImmutableList<MetricUpdates.MetricUpdate<BoundedTrieData>> boundedTries =
extractUpdates(this.boundedTries);
MetricUpdates updates = new MetricUpdatesImpl(counters, distributions, gauges, stringSets, boundedTries);
MetricUpdates updates =
new MetricUpdatesImpl(counters, distributions, gauges, stringSets, boundedTries);

if (async) {
accumulator.setAsync(metricsKey, updates);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1309,12 +1309,7 @@ public StackManipulation.Size apply(MethodVisitor mv, Context context) {
if (returnVarIndex != null) {
// Drop the return type from the locals
mv.visitLocalVariable(
"res",
returnType.getDescriptor(),
null,
wrapStart,
wrapEnd,
returnVarIndex);
"res", returnType.getDescriptor(), null, wrapStart, wrapEnd, returnVarIndex);
}

return size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,21 @@
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
Expand All @@ -38,9 +47,15 @@
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -210,6 +225,126 @@ public void testWaitWithSomeSignalWindowsEmpty() {
FixedWindows.of(Duration.standardSeconds(1)));
}

private static final Set<Long> PROCESSED_LONGS = Sets.newConcurrentHashSet();
private static final Set<Long> VERIFIED_LONGS = Sets.newConcurrentHashSet();

@DefaultSchema(JavaFieldSchema.class)
static class WindowExpirationValue {
public final @Nullable Instant watermarkAdvance;
public final long value;

@SchemaCreate
public WindowExpirationValue(@Nullable Instant watermarkAdvance, long value) {
this.watermarkAdvance = watermarkAdvance;
this.value = value;
}
}

@Test
@Category({NeedsRunner.class, UsesTestStream.class})
public void testWindowExpiration() throws NoSuchSchemaException {
PROCESSED_LONGS.clear();
VERIFIED_LONGS.clear();

SchemaCoder<WindowExpirationValue> schemaCoder =
p.getSchemaRegistry().getSchemaCoder(WindowExpirationValue.class);
List<Long> allLongs = LongStream.range(0, 200).boxed().collect(Collectors.toList());
TestStream.Builder<WindowExpirationValue> streamBuilder =
TestStream.create(schemaCoder).advanceWatermarkTo(Instant.EPOCH);
for (long i : allLongs) {
if (i > 0 && (i % 2) == 0) {
Instant watermarkValue = Instant.ofEpochMilli(i * 1000);
streamBuilder = streamBuilder.advanceWatermarkTo(watermarkValue);
streamBuilder =
streamBuilder.addElements(
TimestampedValue.of(
new WindowExpirationValue(watermarkValue, -1), Instant.ofEpochSecond(i)));
}
streamBuilder =
streamBuilder.addElements(
TimestampedValue.of(new WindowExpirationValue(null, i), Instant.ofEpochSecond(i)));
}
Instant watermarkValue = Instant.ofEpochMilli(200 * 1000);
streamBuilder = streamBuilder.advanceWatermarkTo(watermarkValue);
streamBuilder =
streamBuilder.addElements(
TimestampedValue.of(
new WindowExpirationValue(watermarkValue, -1), Instant.ofEpochSecond(200)));

PCollection<WindowExpirationValue> longs = p.apply(streamBuilder.advanceWatermarkToInfinity());

TupleTag<Long> signalOutputTag = new TupleTag<>();
TupleTag<Long> verifiedOutputTag = new TupleTag<>();
// Keeps track of values processed.
PCollectionTuple pCollectionTuple =
longs.apply(
ParDo.of(
new DoFn<WindowExpirationValue, Long>() {
@ProcessElement
public void process(
@Element WindowExpirationValue element, MultiOutputReceiver o) {
if (element.watermarkAdvance != null) {
// Since TestStream is synchronous, we can assume that the Wait has
// released the previous
// window. Each window contains two elements, so verify that these two
// elements have been
// verified by the ParDo following the Wait.
long elementUpperBound = element.watermarkAdvance.getMillis() / 1000;
// This means the watermark has advanced. We expect the previous window to
// have been verified.
OutputReceiver<Long> verified = o.get(verifiedOutputTag);
if (VERIFIED_LONGS.contains(elementUpperBound - 1)) {
verified.output(elementUpperBound - 1);
}
if (VERIFIED_LONGS.contains(elementUpperBound - 2)) {
verified.output(elementUpperBound - 2);
}
}
PROCESSED_LONGS.add(element.value);
o.get(signalOutputTag).output(element.value);
}
})
.withOutputTags(signalOutputTag, TupleTagList.of(verifiedOutputTag)));
pCollectionTuple.get(verifiedOutputTag).setCoder(VarLongCoder.of());

FixedWindows fixedWindows = FixedWindows.of(Duration.standardSeconds(2));
PCollection<Long> verifiedInts =
longs
.apply(
"flatmap",
FlatMapElements.into(TypeDescriptors.longs())
.via(
value ->
value.watermarkAdvance == null
? Collections.singletonList(value.value)
: Collections.emptyList()))
.apply("w1", Window.<Long>into(fixedWindows).withAllowedLateness(Duration.ZERO))
.apply(
Wait.on(
pCollectionTuple
.get(signalOutputTag)
.apply(
"w2",
Window.<Long>into(fixedWindows).withAllowedLateness(Duration.ZERO))))
.apply(
"verify",
ParDo.of(
new DoFn<Long, Long>() {
@ProcessElement
public void process(@Element Long element, OutputReceiver<Long> o) {
if (PROCESSED_LONGS.contains(element)) {
VERIFIED_LONGS.add(element);
o.output(element);
}
}
}));
PAssert.that(verifiedInts).containsInAnyOrder(Iterables.toArray(allLongs, Long.class));

PAssert.that(pCollectionTuple.get(verifiedOutputTag))
.containsInAnyOrder(Iterables.toArray(allLongs, Long.class));
p.run();
}

/**
* Tests the {@link Wait} transform with a given configuration of the main input and the signal
* input. Specifically, generates random streams with bounded lateness for main and signal inputs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@
public class NeverTest {
@Test
public void testFireDeadline() throws Exception {
IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
assertEquals(
BoundedWindow.TIMESTAMP_MAX_VALUE,
Never.ever()
.getWatermarkThatGuaranteesFiring(new IntervalWindow(new Instant(0), new Instant(10))));
BoundedWindow.TIMESTAMP_MAX_VALUE, Never.ever().getWatermarkThatGuaranteesFiring(window));
}

@Test
Expand Down
Loading