Skip to content
This repository has been archived by the owner on Aug 30, 2022. It is now read-only.

Commit

Permalink
Improved schema guessing for csv files
Browse files Browse the repository at this point in the history
  • Loading branch information
Mihai Budiu committed Sep 12, 2020
1 parent 0868b64 commit 0b7e133
Show file tree
Hide file tree
Showing 14 changed files with 542 additions and 76 deletions.
443 changes: 442 additions & 1 deletion data/ontime/On_Time.schema

Large diffs are not rendered by default.

111 changes: 60 additions & 51 deletions platform/src/main/java/org/hillview/main/DataUpload.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import javax.annotation.Nullable;

import org.apache.commons.io.FilenameUtils;
import org.hillview.LazySchema;
import org.hillview.table.LazySchema;
import org.hillview.management.ClusterConfig;
import org.hillview.storage.*;
import org.hillview.table.Schema;
Expand All @@ -42,6 +42,12 @@
* format or in orc format. A schema file is also placed in each server.
*/
public class DataUpload {
enum OutputFormat {
Csv,
Orc,
None // only output schema
}

private static class Params {
final int defaultChunkSize = 100000; // number of lines in default file chunk
final String defaultSchemaName = "schema";
Expand All @@ -52,9 +58,9 @@ private static class Params {
@Nullable
String cluster = null; // the path to the cluster config json file
boolean hasHeader; // true if file has a header row (only used for csv inputs)
boolean saveOrc; // true if saving as orc, otherwise save as csv;
int chunkSize = defaultChunkSize; // the number of lines in each shard.
boolean allowFewerColumns;
OutputFormat outputFormat = OutputFormat.None;
@Nullable
String grokPattern; // when parsing a log file this is the pattern expected
int skipLines; // number of lines to skip from the beginning
Expand Down Expand Up @@ -83,7 +89,8 @@ private Params parseCommand(String[] args) throws Exception {
Option o_linenumber = new Option("l", "lines", true, "number of rows in each chunk");
o_linenumber.setRequired(false);
options.addOption(o_linenumber);
Option o_format = new Option("o", "orc", false, "save file as orc");
Option o_format = new Option("o", "output", true, "output format: one of 'csv', 'orc', 'none'." +
"For 'none' only the schema is output");
o_format.setRequired(false);
options.addOption(o_format);
Option o_schema = new Option("s", "schema", true, "input schema file");
Expand All @@ -101,13 +108,6 @@ private Params parseCommand(String[] args) throws Exception {
Option o_skip = new Option("w", "skip", true, "number of lines to skip before starting parsing");
o_skip.setRequired(false);
options.addOption(o_skip);
/*
* todo: support the -D directory option for a list of files.
Option o_directory = new Option("D", "Directory", true,
"path to directory with the files to send (not supported yet)");
o_directory.setRequired(false);
options.addOption(o_directory);
*/

CommandLineParser parser = new DefaultParser();
CommandLine cmd = null;
Expand All @@ -119,30 +119,13 @@ private Params parseCommand(String[] args) throws Exception {
catch (ParseException pe) {
System.out.println("can't parse due to " + pe);
usage(options);
System.exit(1);
throw pe;
}
Params parameters = new Params();
try{
/*
if (cmd.hasOption('f') == cmd.hasOption('D'))
throw new RuntimeException("need either file or directory");
*/
if (cmd.hasOption('f')) {
parameters.filename = cmd.getOptionValue('f');
// parameters.fileList.add(cmd.getOptionValue('f'));
}
/*
else {
parameters.directory = cmd.getOptionValue('D');
File folder = new File(cmd.getOptionValue('D'));
File[] lFiles = folder.listFiles();
if (lFiles == null)
throw new RuntimeException("No files found");
for (File lFile : lFiles)
if (lFile.isFile())
parameters.fileList.add(parameters.directory + lFile.getName());
}
*/
} catch (RuntimeException e) {
error(e);
}
Expand All @@ -154,10 +137,25 @@ private Params parseCommand(String[] args) throws Exception {
parameters.chunkSize = Integer.parseInt(cmd.getOptionValue('l'));
} catch (NumberFormatException e) {
usage(options);
System.out.println("Can't parse number due to " + e.getMessage());
System.err.println("Can't parse number due to " + e.getMessage());
throw e;
}
}
if (cmd.hasOption('o')) {
String of = cmd.getOptionValue('o');
switch (of.toLowerCase()) {
case "orc":
parameters.outputFormat = OutputFormat.Orc;
break;
case "csv":
parameters.outputFormat = OutputFormat.Csv;
break;
default:
usage(options);
System.err.println("Illegal output format: " + of);
throw new RuntimeException("Unknown output format");
}
}
parameters.saveOrc = cmd.hasOption('o');
if (cmd.hasOption('s'))
parameters.inputSchemaName = cmd.getOptionValue('s');
parameters.hasHeader = cmd.hasOption('h');
Expand All @@ -167,7 +165,8 @@ private Params parseCommand(String[] args) throws Exception {
parameters.skipLines = Integer.parseInt(cmd.getOptionValue("skip"));
} catch (NumberFormatException e) {
usage(options);
System.out.println("Can't parse number due to " + e.getMessage());
System.err.println("Can't parse number due to " + e.getMessage());
throw e;
}
}
return parameters;
Expand Down Expand Up @@ -264,25 +263,27 @@ private int chop(TextFileLoader loader,
tableSchema = table.getSchema();
while (true) {
chunkName = getFileName(parameters.filename).concat(Integer.toString(chunk));
if (parameters.saveOrc)
if (parameters.outputFormat == OutputFormat.Orc)
chunkName = chunkName.concat(".orc");
else
else if (parameters.outputFormat == OutputFormat.Csv)
chunkName = chunkName.concat(".csv");
if (Files.exists(Paths.get(chunkName)))
chunk++;
else
break;
}
writeTable(table, chunkName, parameters.saveOrc);
if (clusterConfig != null) {
assert clusterConfig.workers != null;
String host = clusterConfig.workers[currentHost];
sendFile(chunkName, Converters.checkNull(clusterConfig.user),
host, parameters.destinationFolder, chunkName);
currentHost = (currentHost + 1) % clusterConfig.workers.length;
Files.deleteIfExists(Paths.get(chunkName));
} else {
Files.move(Paths.get(chunkName), Paths.get(parameters.destinationFolder, chunkName));
writeTable(table, chunkName, parameters.outputFormat);
if (parameters.outputFormat != OutputFormat.None) {
if (clusterConfig != null) {
assert clusterConfig.workers != null;
String host = clusterConfig.workers[currentHost];
sendFile(chunkName, Converters.checkNull(clusterConfig.user),
host, parameters.destinationFolder, chunkName);
currentHost = (currentHost + 1) % clusterConfig.workers.length;
Files.deleteIfExists(Paths.get(chunkName));
} else {
Files.move(Paths.get(chunkName), Paths.get(parameters.destinationFolder, chunkName));
}
}
chunk++;
if (table.getNumOfRows() == 0)
Expand Down Expand Up @@ -332,14 +333,22 @@ private void sendFile(String filename, String user, String host, String remoteFo

/** Writes the table in ORC or CSV format
*/
private void writeTable(ITable table, String filename, boolean orc) {
HillviewLogger.instance.info("Writing chunk in: " + filename);
if (orc) {
OrcFileWriter writer = new OrcFileWriter(filename);
writer.writeTable(table);
} else {
CsvFileWriter writer = new CsvFileWriter(filename);
writer.writeTable(table);
private void writeTable(ITable table, String filename, OutputFormat format) {
switch (format) {
case Csv: {
HillviewLogger.instance.info("Writing chunk in: " + filename);
CsvFileWriter writer = new CsvFileWriter(filename);
writer.writeTable(table);
break;
}
case Orc: {
HillviewLogger.instance.info("Writing chunk in: " + filename);
OrcFileWriter writer = new OrcFileWriter(filename);
writer.writeTable(table);
break;
}
case None:
break;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.hillview.main;

import org.hillview.LazySchema;
import org.hillview.table.LazySchema;
import org.hillview.storage.CsvFileLoader;
import org.hillview.storage.CsvFileWriter;
import org.hillview.storage.OrcFileWriter;
Expand Down
25 changes: 17 additions & 8 deletions platform/src/main/java/org/hillview/storage/CsvFileLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import com.univocity.parsers.csv.CsvFormat;
import com.univocity.parsers.csv.CsvParser;
import com.univocity.parsers.csv.CsvParserSettings;
import org.hillview.LazySchema;
import org.hillview.table.LazySchema;
import org.hillview.table.api.*;
import org.hillview.table.ColumnDescription;
import org.hillview.table.Schema;
import org.hillview.table.Table;
import org.hillview.table.columns.EmptyColumn;
import org.hillview.table.membership.FullMembershipSet;
import org.hillview.table.rows.GuessSchema;
import org.hillview.utils.HillviewLogger;
Expand Down Expand Up @@ -173,20 +174,28 @@ public ITable loadFragment(int maxRows, boolean skip) {
if (this.guessSchema) {
GuessSchema gs = new GuessSchema();
GuessSchema.SchemaInfo info = gs.guess((IStringColumn)s);
if (info.kind != ContentsKind.String &&
info.kind != ContentsKind.None) // all elements are null
sealed[ci] = s.convertKind(info.kind, c.getName(), ms);
else
sealed[ci] = s;
switch (info.kind) {
case String:
sealed[ci] = s;
break;
case None:
sealed[ci] = new EmptyColumn(c.getName(), c.sizeInRows());
break;
default:
sealed[ci] = s.convertKind(info.kind, c.getName(), ms);
break;
}
} else {
sealed[ci] = s;
}
assert sealed[ci] != null;
}

ITable result = new Table(sealed, this.filename, null);
this.guessSchema = false;
this.actualSchema = result.getSchema();
if (!skip) {
this.guessSchema = false;
this.actualSchema = result.getSchema();
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.hillview.storage;

import org.hillview.LazySchema;
import org.hillview.table.LazySchema;
import org.hillview.dataset.api.IJson;
import org.hillview.table.Schema;
import org.hillview.table.api.ITable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.google.gson.JsonPrimitive;
import com.google.gson.internal.Streams;
import com.google.gson.stream.JsonReader;
import org.hillview.LazySchema;
import org.hillview.table.LazySchema;
import org.hillview.table.ColumnDescription;
import org.hillview.table.Schema;
import org.hillview.table.Table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.hillview.LazySchema;
import org.hillview.table.LazySchema;
import org.hillview.table.ColumnDescription;
import org.hillview.table.Schema;
import org.hillview.table.Table;
Expand Down Expand Up @@ -252,6 +252,11 @@ private static void appendColumn(IAppendableColumn to, ColumnVector vec,
String str = new String(bcv.vector[row], bcv.start[row], bcv.length[row]);
switch (to.getKind()) {
case None:
if (str.isEmpty()) {
to.appendMissing();
break;
}
// Fall through
case Date:
case Duration:
case Time:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.hillview;
package org.hillview.table;

import org.hillview.table.Schema;
import org.hillview.utils.Converters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.hillview.table.api.ContentsKind;
import org.hillview.table.api.IStringColumn;
import org.hillview.utils.DateParsing;
import org.hillview.utils.Utilities;

import javax.annotation.Nullable;
import java.io.IOException;
Expand Down Expand Up @@ -210,10 +211,11 @@ private static boolean isJsonValid(final JsonReader jsonReader) throws IOExcepti
}

private CanParse canParse(@Nullable String value, final ContentsKind with) {
if (value == null)
if (Utilities.isNullOrEmpty(value))
return CanParse.AsNull;
switch (with) {
case None:
return CanParse.No;
case Interval:
return CanParse.No;
case String:
Expand Down
2 changes: 1 addition & 1 deletion platform/src/main/java/org/hillview/utils/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.hillview.utils;

import org.hillview.LazySchema;
import org.hillview.table.LazySchema;
import org.hillview.storage.CsvFileLoader;
import org.hillview.table.api.ContentsKind;
import org.hillview.table.api.ITable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import org.hillview.LazySchema;
import org.hillview.table.LazySchema;
import org.hillview.dataset.api.IJson;
import org.hillview.sketches.results.Count;
import org.hillview.sketches.results.NextKList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.univocity.parsers.csv.CsvFormat;
import com.univocity.parsers.csv.CsvWriter;
import com.univocity.parsers.csv.CsvWriterSettings;
import org.hillview.LazySchema;
import org.hillview.table.LazySchema;
import org.hillview.storage.CsvFileLoader;
import org.hillview.storage.CsvFileWriter;
import org.hillview.table.ColumnDescription;
Expand Down
Loading

0 comments on commit 0b7e133

Please sign in to comment.