From 99ad37002f75e954dd392e7ff56b64c020d2ce74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roberto=20R=2E=20Exp=C3=B3sito?= Date: Fri, 22 Apr 2022 09:23:35 +0200 Subject: [PATCH] Locations are built when adding splits --- .../parser/mapreduce/PairedEndInputSplit.java | 42 ++++++++----------- 1 file changed, 18 insertions(+), 24 deletions(-) diff --git a/src/main/java/es/udc/gac/hadoop/sequence/parser/mapreduce/PairedEndInputSplit.java b/src/main/java/es/udc/gac/hadoop/sequence/parser/mapreduce/PairedEndInputSplit.java index 68cc993..c41d0a1 100644 --- a/src/main/java/es/udc/gac/hadoop/sequence/parser/mapreduce/PairedEndInputSplit.java +++ b/src/main/java/es/udc/gac/hadoop/sequence/parser/mapreduce/PairedEndInputSplit.java @@ -22,9 +22,9 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; @@ -37,8 +37,8 @@ public class PairedEndInputSplit extends FileSplit implements Writable { private static final int LENGTH = 2; - private int fill = 0; - private long totsize = 0L; + private int fill; + private long totsize; private FileSplit[] splits; private List> allHosts; private String[] hosts; @@ -46,6 +46,9 @@ public class PairedEndInputSplit extends FileSplit implements Writable { public PairedEndInputSplit() { splits = new FileSplit[LENGTH]; allHosts = new ArrayList>(LENGTH); + hosts = null; + fill = 0; + totsize = 0L; for (int i = 0; i < LENGTH; i++) allHosts.add(new ArrayList()); @@ -63,9 +66,6 @@ public void add(FileSplit s) throws IOException, InterruptedException { throw new IOException("Too many splits"); } - splits[fill] = s; - totsize += s.getLength(); - String[] hints = s.getLocations(); if (hints != null && hints.length > 0) { @@ -74,14 +74,23 @@ public void add(FileSplit s) throws IOException, InterruptedException { } } - fill++; + splits[fill++] = s; + totsize += s.getLength(); if (fill == LENGTH) { List intersect = allHosts.get(0).stream() .filter(allHosts.get(1)::contains) .collect(Collectors.toList()); - hosts = intersect.toArray(new String[intersect.size()]); + if (intersect.size() > 0) { + hosts = intersect.toArray(new String[intersect.size()]); + } else { + List union = Stream.concat(allHosts.get(0).stream(), allHosts.get(1).stream()) + .distinct() + .collect(Collectors.toList()); + + hosts = union.toArray(new String[union.size()]); + } } } @@ -140,22 +149,7 @@ public String toString() { */ @Override public String[] getLocations() throws IOException { - if (hosts.length > 0) - return hosts; - - HashSet hostsSet = new HashSet(); - - for (int i = 0; i < splits.length; i++) { - String[] hints = splits[i].getLocations(); - - if (hints != null && hints.length > 0) { - for (String host : hints) { - hostsSet.add(host); - } - } - } - - return hostsSet.toArray(new String[hostsSet.size()]); + return hosts; } /**