From d1e6ea7ea80038df5ecf52550ad73c2774749395 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Tue, 22 Mar 2016 18:28:55 +0100 Subject: [PATCH] Add logging about combiner emissions for the experiments. --- .../operators/ReduceCombineDriver.java | 2 ++ .../chaining/ChainedReduceCombineDriver.java | 20 +++++++++++++++++++ .../operators/hash/ReduceHashTable.java | 9 +++++++++ 3 files changed, 31 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java index 42154fd3e845b..81ad3734f32bd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java @@ -86,6 +86,8 @@ public class ReduceCombineDriver implements Driver, T> { public void setup(TaskContext, T> context) { taskContext = context; canceled = false; + + throw new RuntimeException("Only the chained version (ChainedReduceCombineDriver) should be used during the experiments."); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java index 446aa6b7e0b49..c703a36ac083e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java @@ -75,6 +75,8 @@ public class ChainedReduceCombineDriver extends ChainedDriver { private volatile boolean canceled; + private long numEmissions, numEmittedRecords; + // ------------------------------------------------------------------------ @Override @@ -129,6 +131,9 @@ public void openTask() throws Exception { table.open(); break; } + + numEmissions = 0; + numEmittedRecords = 0; } @Override @@ -175,6 +180,9 @@ private void collectHashed(T record) throws Exception { } private void sortAndCombine() throws Exception { + + numEmissions++; + final InMemorySorter sorter = this.sorter; if (!sorter.isEmpty()) { @@ -221,6 +229,7 @@ private void sortAndCombine() throws Exception { } output.collect(value); + numEmittedRecords++; // swap the value from the new key group into the first object T tmp = reuse1; @@ -249,6 +258,7 @@ private void sortAndCombine() throws Exception { } output.collect(res); + numEmittedRecords++; } } } @@ -257,13 +267,23 @@ private void sortAndCombine() throws Exception { @Override public void close() { // send the final batch + int taskIx = this.parent.getIndexInSubtaskGroup(); + int taskNo = this.parent.getCurrentNumberOfSubtasks(); try { switch (strategy) { case SORTED_PARTIAL_REDUCE: sortAndCombine(); + LOG.info(taskName + " (" + taskIx + "/" + taskNo + ") combiner capacity: " + this.sorter.getCapacity()); + LOG.info(taskName + " (" + taskIx + "/" + taskNo + ") combiner occupancy: " + this.sorter.getOccupancy()); + LOG.info(taskName + " (" + taskIx + "/" + taskNo + ") numEmissions: " + numEmissions); + LOG.info(taskName + " (" + taskIx + "/" + taskNo + ") numEmittedRecords: " + numEmittedRecords); break; case HASHED_PARTIAL_REDUCE: table.emit(); + LOG.info(taskName + " (" + taskIx + "/" + taskNo + ") combiner capacity: " + this.table.getCapacity()); + LOG.info(taskName + " (" + taskIx + "/" + taskNo + ") combiner occupancy: " + this.table.getOccupancy()); + LOG.info(taskName + " (" + taskIx + "/" + taskNo + ") numEmissions: " + table.numEmissions); + LOG.info(taskName + " (" + taskIx + "/" + taskNo + ") numEmittedRecords: " + table.numEmittedRecords); break; } } catch (Exception ex2) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java index 58a7233fb5cba..62b967cbdc559 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java @@ -164,6 +164,8 @@ public class ReduceHashTable extends AbstractMutableHashTable { */ private boolean enableResize; + public long numEmissions, numEmittedRecords; + /** * This constructor is for the case when will only call those operations that are also @@ -241,6 +243,9 @@ private void open(int numBucketSegments) { stagingSegments.add(forcedAllocateSegment()); reuse = buildSideSerializer.createInstance(); + + numEmissions = 0; + numEmittedRecords = 0; } /** @@ -557,10 +562,14 @@ public void emitAndReset() throws IOException { * Emits all elements currently held by the table to the collector. */ public void emit() throws IOException { + + numEmissions++; + T record = buildSideSerializer.createInstance(); EntryIterator iter = getEntryIterator(); while ((record = iter.next(record)) != null && !closed) { outputCollector.collect(record); + numEmittedRecords++; if (!objectReuseEnabled) { record = buildSideSerializer.createInstance(); }