Skip to content
This repository has been archived by the owner on Aug 30, 2022. It is now read-only.

Commit

Permalink
Implement writing table to parquet (#727)
Browse files Browse the repository at this point in the history
* Implement writing table to parquet
Co-authored-by: Bin Wang <[email protected]>
  • Loading branch information
bin-wang authored Aug 17, 2021
1 parent 43b1955 commit d05ad12
Show file tree
Hide file tree
Showing 11 changed files with 446 additions and 56 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
run: ./rebuild.sh -a
- name: Process demo data
working-directory: bin
run: ./demo-data-cleaner.sh -g
run: ./demo-data-cleaner.sh -dp
- name: Run unit tests
working-directory: bin
run: ./rebuild.sh -t
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ data/ontime_big/*.gz
data/ontime/2016_*.csv
data/ontime/*.orc
data/ontime/*.crc
data/ontime/*.parquet
data/ontime/delta-table/
data/ontime_orc/*
data/ontime_small_orc/*
Expand All @@ -94,6 +95,7 @@ data/synthetic/privacy_metadata.json
data/metadata/differential-privacy/data/ontime_private/privacy_metadata.json
data/geo/us_states/cb_*
data/geo/airports/airprt*
data/parquet/

*.DS_Store
hillview_root_key
Expand Down
17 changes: 11 additions & 6 deletions bin/demo-data-cleaner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,25 @@ pushd ../data/ontime
./download.py
popd

while getopts "g" opt; do
ARGS=""
while getopts "dp" opt; do
case "$opt" in
g)
d)
python3 generate-delta-table.py
;;
p)
ARGS+="-Dparquet.enabled "
;;
*)
echo "demo-data-cleaner.sh [-g]"
echo "This script runs the java class DemoDataCleaner to prepare data for testing/demo purposes"
echo "-g: Also generate a delta table for testing, requires python3, pyspark, and delta-spark"
echo "Usage: demo-data-cleaner.sh [-d] [-p]"
echo "This script runs the java class DemoDataCleaner to prepare data for testing/demo purposes."
echo "-d: Also generate a delta table for testing, requires python3, pyspark, and delta-spark."
echo "-p: Also generate parquet files for testing."
exit 1
;;
esac
done

pushd ../platform
java -jar target/data-cleaner-jar-with-dependencies.jar
java -jar ${ARGS} target/data-cleaner-jar-with-dependencies.jar
popd
5 changes: 5 additions & 0 deletions platform/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.11.1</version>
</dependency>
<!-- Delta lake -->
<dependency>
<groupId>io.delta</groupId>
Expand Down
24 changes: 24 additions & 0 deletions platform/src/main/java/org/hillview/main/DemoDataCleaner.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.hillview.main;

import org.hillview.storage.ParquetFileWriter;
import org.hillview.table.LazySchema;
import org.hillview.storage.CsvFileLoader;
import org.hillview.storage.CsvFileWriter;
Expand All @@ -42,6 +43,8 @@ class DemoDataCleaner {
private static final String dataFolder = "../data/ontime";

public static void main(String[] args) throws IOException {
boolean parquetEnabled = (System.getProperty("parquet.enabled") != null);

HillviewLogger.initialize("data cleaner", "hillview.log");
String prefix = "On_Time_On_Time_Performance_";
Path folder = Paths.get(dataFolder);
Expand Down Expand Up @@ -84,6 +87,27 @@ public static void main(String[] args) throws IOException {
System.out.println("Writing " + end);
owriter.writeTable(p);
}
if (parquetEnabled) {
final String parquetFileName = end.replace(".orc", ".parquet");
File parquetFile = new File(parquetFileName);
if (!parquetFile.exists()) {
ParquetFileWriter writer = new ParquetFileWriter(parquetFileName);
System.out.println("Writing " + parquetFileName);
try {
writer.writeTable(p);
} catch (RuntimeException runtimeException) {
System.err.println("Error when writing to parquet file: " + runtimeException.getMessage());
// If the exception happens during writing, an incomplete file may be left
try {
Files.deleteIfExists(parquetFile.toPath());
} catch (IOException ioException) {
System.err.println("Auto Deletion failed: " + ioException.getMessage());
System.err.println("Please manually delete " + parquetFile.getPath());
System.exit(-1);
}
}
}
}

String big = filename.replace(".csv.gz", ".orc");
File fbig = new File(big);
Expand Down
Loading

0 comments on commit d05ad12

Please sign in to comment.