diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java index cc1606a0e01..9bfbb67fad2 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java @@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import javax.annotation.Nonnull; +import org.apache.beam.runners.core.LateDataUtils; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.WindowingStrategy; @@ -154,8 +155,11 @@ public void fireForWatermark(AppliedPTransform step, Instant watermark) private static class WatermarkCallback { public static WatermarkCallback onGuaranteedFiring( BoundedWindow window, WindowingStrategy strategy, Runnable callback) { - @SuppressWarnings("unchecked") - Instant firingAfter = strategy.getTrigger().getWatermarkThatGuaranteesFiring((W) window); + Instant firingAfter = + Ordering.natural() + .min( + LateDataUtils.garbageCollectionTime(window, strategy), + strategy.getTrigger().getWatermarkThatGuaranteesFiring((W) window)); return new WatermarkCallback(firingAfter, callback); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java index 4591c4aff89..c1a18cbb5dc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.LateDataUtils; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; import org.apache.beam.runners.core.StateTag; @@ -54,6 +55,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Ordering; +import org.joda.time.Instant; /** A class that handles streaming side inputs in a {@link DoFnRunner}. */ @SuppressWarnings({"keyfor", "nullness"}) // TODO(https://github.com/apache/beam/issues/20497) @@ -312,15 +315,19 @@ private Windmill.GlobalDataRequest buildGlob throw new RuntimeException(e); } + Instant firingAfter = + Ordering.natural() + .min( + LateDataUtils.garbageCollectionTime(sideInputWindow, sideWindowStrategy), + sideWindowStrategy.getTrigger().getWatermarkThatGuaranteesFiring(sideInputWindow)); + return Windmill.GlobalDataRequest.newBuilder() .setDataId( Windmill.GlobalDataId.newBuilder() .setTag(view.getTagInternal().getId()) .setVersion(windowStream.toByteString()) .build()) - .setExistenceWatermarkDeadline( - WindmillTimeUtils.harnessToWindmillTimestamp( - sideWindowStrategy.getTrigger().getWatermarkThatGuaranteesFiring(sideInputWindow))) + .setExistenceWatermarkDeadline(WindmillTimeUtils.harnessToWindmillTimestamp(firingAfter)) .build(); } diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/BoundedTrieImpl.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/BoundedTrieImpl.java index bd7a30ac174..de5172ac93c 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/BoundedTrieImpl.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/BoundedTrieImpl.java @@ -42,7 +42,7 @@ public void add(Iterable values) { } @Override - public void add(String ... values) { + public void add(String... values) { add(Arrays.asList(values)); } } diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java index c9a6fcc0929..ab20a8d8f68 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java @@ -89,8 +89,11 @@ public BoundedTrie getBoundedTrie(MetricName metricName) { @SuppressWarnings("FutureReturnValueIgnored") public void flush(boolean async) { - if (counters.isEmpty() && distributions.isEmpty() && gauges.isEmpty() && - stringSets.isEmpty() && boundedTries.isEmpty()) { + if (counters.isEmpty() + && distributions.isEmpty() + && gauges.isEmpty() + && stringSets.isEmpty() + && boundedTries.isEmpty()) { return; } @@ -102,7 +105,8 @@ public void flush(boolean async) { extractUpdates(this.stringSets); ImmutableList> boundedTries = extractUpdates(this.boundedTries); - MetricUpdates updates = new MetricUpdatesImpl(counters, distributions, gauges, stringSets, boundedTries); + MetricUpdates updates = + new MetricUpdatesImpl(counters, distributions, gauges, stringSets, boundedTries); if (async) { accumulator.setAsync(metricsKey, updates); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java index 44fd6f15f45..d9fcb0178a2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java @@ -1309,12 +1309,7 @@ public StackManipulation.Size apply(MethodVisitor mv, Context context) { if (returnVarIndex != null) { // Drop the return type from the locals mv.visitLocalVariable( - "res", - returnType.getDescriptor(), - null, - wrapStart, - wrapEnd, - returnVarIndex); + "res", returnType.getDescriptor(), null, wrapStart, wrapEnd, returnVarIndex); } return size; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WaitTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WaitTest.java index 1e1c780a0e0..b539bc8aabc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WaitTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WaitTest.java @@ -22,12 +22,21 @@ import java.io.Serializable; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.LongStream; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.schemas.JavaFieldSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaCreate; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.testing.UsesTestStream; import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime; import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -38,9 +47,15 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -210,6 +225,126 @@ public void testWaitWithSomeSignalWindowsEmpty() { FixedWindows.of(Duration.standardSeconds(1))); } + private static final Set PROCESSED_LONGS = Sets.newConcurrentHashSet(); + private static final Set VERIFIED_LONGS = Sets.newConcurrentHashSet(); + + @DefaultSchema(JavaFieldSchema.class) + static class WindowExpirationValue { + public final @Nullable Instant watermarkAdvance; + public final long value; + + @SchemaCreate + public WindowExpirationValue(@Nullable Instant watermarkAdvance, long value) { + this.watermarkAdvance = watermarkAdvance; + this.value = value; + } + } + + @Test + @Category({NeedsRunner.class, UsesTestStream.class}) + public void testWindowExpiration() throws NoSuchSchemaException { + PROCESSED_LONGS.clear(); + VERIFIED_LONGS.clear(); + + SchemaCoder schemaCoder = + p.getSchemaRegistry().getSchemaCoder(WindowExpirationValue.class); + List allLongs = LongStream.range(0, 200).boxed().collect(Collectors.toList()); + TestStream.Builder streamBuilder = + TestStream.create(schemaCoder).advanceWatermarkTo(Instant.EPOCH); + for (long i : allLongs) { + if (i > 0 && (i % 2) == 0) { + Instant watermarkValue = Instant.ofEpochMilli(i * 1000); + streamBuilder = streamBuilder.advanceWatermarkTo(watermarkValue); + streamBuilder = + streamBuilder.addElements( + TimestampedValue.of( + new WindowExpirationValue(watermarkValue, -1), Instant.ofEpochSecond(i))); + } + streamBuilder = + streamBuilder.addElements( + TimestampedValue.of(new WindowExpirationValue(null, i), Instant.ofEpochSecond(i))); + } + Instant watermarkValue = Instant.ofEpochMilli(200 * 1000); + streamBuilder = streamBuilder.advanceWatermarkTo(watermarkValue); + streamBuilder = + streamBuilder.addElements( + TimestampedValue.of( + new WindowExpirationValue(watermarkValue, -1), Instant.ofEpochSecond(200))); + + PCollection longs = p.apply(streamBuilder.advanceWatermarkToInfinity()); + + TupleTag signalOutputTag = new TupleTag<>(); + TupleTag verifiedOutputTag = new TupleTag<>(); + // Keeps track of values processed. + PCollectionTuple pCollectionTuple = + longs.apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void process( + @Element WindowExpirationValue element, MultiOutputReceiver o) { + if (element.watermarkAdvance != null) { + // Since TestStream is synchronous, we can assume that the Wait has + // released the previous + // window. Each window contains two elements, so verify that these two + // elements have been + // verified by the ParDo following the Wait. + long elementUpperBound = element.watermarkAdvance.getMillis() / 1000; + // This means the watermark has advanced. We expect the previous window to + // have been verified. + OutputReceiver verified = o.get(verifiedOutputTag); + if (VERIFIED_LONGS.contains(elementUpperBound - 1)) { + verified.output(elementUpperBound - 1); + } + if (VERIFIED_LONGS.contains(elementUpperBound - 2)) { + verified.output(elementUpperBound - 2); + } + } + PROCESSED_LONGS.add(element.value); + o.get(signalOutputTag).output(element.value); + } + }) + .withOutputTags(signalOutputTag, TupleTagList.of(verifiedOutputTag))); + pCollectionTuple.get(verifiedOutputTag).setCoder(VarLongCoder.of()); + + FixedWindows fixedWindows = FixedWindows.of(Duration.standardSeconds(2)); + PCollection verifiedInts = + longs + .apply( + "flatmap", + FlatMapElements.into(TypeDescriptors.longs()) + .via( + value -> + value.watermarkAdvance == null + ? Collections.singletonList(value.value) + : Collections.emptyList())) + .apply("w1", Window.into(fixedWindows).withAllowedLateness(Duration.ZERO)) + .apply( + Wait.on( + pCollectionTuple + .get(signalOutputTag) + .apply( + "w2", + Window.into(fixedWindows).withAllowedLateness(Duration.ZERO)))) + .apply( + "verify", + ParDo.of( + new DoFn() { + @ProcessElement + public void process(@Element Long element, OutputReceiver o) { + if (PROCESSED_LONGS.contains(element)) { + VERIFIED_LONGS.add(element); + o.output(element); + } + } + })); + PAssert.that(verifiedInts).containsInAnyOrder(Iterables.toArray(allLongs, Long.class)); + + PAssert.that(pCollectionTuple.get(verifiedOutputTag)) + .containsInAnyOrder(Iterables.toArray(allLongs, Long.class)); + p.run(); + } + /** * Tests the {@link Wait} transform with a given configuration of the main input and the signal * input. Specifically, generates random streams with bounded lateness for main and signal inputs diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java index 10528739c4f..70a13f9cae8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java @@ -29,10 +29,9 @@ public class NeverTest { @Test public void testFireDeadline() throws Exception { + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); assertEquals( - BoundedWindow.TIMESTAMP_MAX_VALUE, - Never.ever() - .getWatermarkThatGuaranteesFiring(new IntervalWindow(new Instant(0), new Instant(10)))); + BoundedWindow.TIMESTAMP_MAX_VALUE, Never.ever().getWatermarkThatGuaranteesFiring(window)); } @Test