Skip to content

Commit

Permalink
Add logging about combiner emissions for the experiments.
Browse files Browse the repository at this point in the history
  • Loading branch information
ggevay authored and aalexandrov committed Mar 28, 2016
1 parent 50c9273 commit d1e6ea7
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> {
public void setup(TaskContext<ReduceFunction<T>, T> context) {
taskContext = context;
canceled = false;

throw new RuntimeException("Only the chained version (ChainedReduceCombineDriver) should be used during the experiments.");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class ChainedReduceCombineDriver<T> extends ChainedDriver<T, T> {

private volatile boolean canceled;

private long numEmissions, numEmittedRecords;

// ------------------------------------------------------------------------

@Override
Expand Down Expand Up @@ -129,6 +131,9 @@ public void openTask() throws Exception {
table.open();
break;
}

numEmissions = 0;
numEmittedRecords = 0;
}

@Override
Expand Down Expand Up @@ -175,6 +180,9 @@ private void collectHashed(T record) throws Exception {
}

private void sortAndCombine() throws Exception {

numEmissions++;

final InMemorySorter<T> sorter = this.sorter;

if (!sorter.isEmpty()) {
Expand Down Expand Up @@ -221,6 +229,7 @@ private void sortAndCombine() throws Exception {
}

output.collect(value);
numEmittedRecords++;

// swap the value from the new key group into the first object
T tmp = reuse1;
Expand Down Expand Up @@ -249,6 +258,7 @@ private void sortAndCombine() throws Exception {
}

output.collect(res);
numEmittedRecords++;
}
}
}
Expand All @@ -257,13 +267,23 @@ private void sortAndCombine() throws Exception {
@Override
public void close() {
// send the final batch
int taskIx = this.parent.getIndexInSubtaskGroup();
int taskNo = this.parent.getCurrentNumberOfSubtasks();
try {
switch (strategy) {
case SORTED_PARTIAL_REDUCE:
sortAndCombine();
LOG.info(taskName + " (" + taskIx + "/" + taskNo + ") combiner capacity: " + this.sorter.getCapacity());
LOG.info(taskName + " (" + taskIx + "/" + taskNo + ") combiner occupancy: " + this.sorter.getOccupancy());
LOG.info(taskName + " (" + taskIx + "/" + taskNo + ") numEmissions: " + numEmissions);
LOG.info(taskName + " (" + taskIx + "/" + taskNo + ") numEmittedRecords: " + numEmittedRecords);
break;
case HASHED_PARTIAL_REDUCE:
table.emit();
LOG.info(taskName + " (" + taskIx + "/" + taskNo + ") combiner capacity: " + this.table.getCapacity());
LOG.info(taskName + " (" + taskIx + "/" + taskNo + ") combiner occupancy: " + this.table.getOccupancy());
LOG.info(taskName + " (" + taskIx + "/" + taskNo + ") numEmissions: " + table.numEmissions);
LOG.info(taskName + " (" + taskIx + "/" + taskNo + ") numEmittedRecords: " + table.numEmittedRecords);
break;
}
} catch (Exception ex2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ public class ReduceHashTable<T> extends AbstractMutableHashTable<T> {
*/
private boolean enableResize;

public long numEmissions, numEmittedRecords;


/**
* This constructor is for the case when will only call those operations that are also
Expand Down Expand Up @@ -241,6 +243,9 @@ private void open(int numBucketSegments) {
stagingSegments.add(forcedAllocateSegment());

reuse = buildSideSerializer.createInstance();

numEmissions = 0;
numEmittedRecords = 0;
}

/**
Expand Down Expand Up @@ -557,10 +562,14 @@ public void emitAndReset() throws IOException {
* Emits all elements currently held by the table to the collector.
*/
public void emit() throws IOException {

numEmissions++;

T record = buildSideSerializer.createInstance();
EntryIterator iter = getEntryIterator();
while ((record = iter.next(record)) != null && !closed) {
outputCollector.collect(record);
numEmittedRecords++;
if (!objectReuseEnabled) {
record = buildSideSerializer.createInstance();
}
Expand Down

0 comments on commit d1e6ea7

Please sign in to comment.