From db3b1dc15fb13e2195a5edd012578ac834549fd3 Mon Sep 17 00:00:00 2001 From: Andrei Zhabinski Date: Sat, 28 May 2022 14:54:56 +0300 Subject: [PATCH] Rework pom.xml, update versions of everything --- deps/build.jl | 6 +- examples/SparkSubmitJulia.scala | 2 +- examples/basic.jl | 2 + jvm/sparkjl/dependency-reduced-pom.xml | 102 +++++ .../api/julia => old_src}/InputIterator.scala | 0 .../api/julia => old_src}/JuliaRDD.scala | 0 .../api/julia => old_src}/JuliaRunner.scala | 0 .../api/julia => old_src}/OutputThread.scala | 0 .../api/julia => old_src}/RDDUtils.scala | 0 .../api/julia => old_src}/StreamUtils.scala | 0 jvm/sparkjl/pom.xml | 424 +++++------------- jvm/sparkjl/pom_old.xml | 420 +++++++++++++++++ .../spark/api/julia/DynamicJavaCompiler.java | 34 ++ src/column.jl | 3 + src/compiler.jl | 4 +- src/convert.jl | 7 +- src/core.jl | 2 +- src/dataframe.jl | 28 +- src/defs.jl | 14 +- src/init.jl | 4 +- src/session.jl | 4 +- src/streaming.jl | 100 ++++- src/window.jl | 14 - test/test_sql.jl | 57 ++- 24 files changed, 850 insertions(+), 377 deletions(-) create mode 100644 jvm/sparkjl/dependency-reduced-pom.xml rename jvm/sparkjl/{src/main/scala/org/apache/spark/api/julia => old_src}/InputIterator.scala (100%) rename jvm/sparkjl/{src/main/scala/org/apache/spark/api/julia => old_src}/JuliaRDD.scala (100%) rename jvm/sparkjl/{src/main/scala/org/apache/spark/api/julia => old_src}/JuliaRunner.scala (100%) rename jvm/sparkjl/{src/main/scala/org/apache/spark/api/julia => old_src}/OutputThread.scala (100%) rename jvm/sparkjl/{src/main/scala/org/apache/spark/api/julia => old_src}/RDDUtils.scala (100%) rename jvm/sparkjl/{src/main/scala/org/apache/spark/api/julia => old_src}/StreamUtils.scala (100%) create mode 100644 jvm/sparkjl/pom_old.xml create mode 100644 jvm/sparkjl/src/main/java/org/apache/spark/api/julia/DynamicJavaCompiler.java diff --git a/deps/build.jl b/deps/build.jl index 554068d..30e737b 100644 --- a/deps/build.jl +++ b/deps/build.jl @@ -7,9 +7,9 @@ catch error("Cannot find maven. Is it installed?") end -SPARK_VERSION = get(ENV, "BUILD_SPARK_VERSION", "2.4.6") -SCALA_VERSION = get(ENV, "BUILD_SCALA_VERSION", "2.12.11") -SCALA_BINARY_VERSION = match(r"^\d+\.\d+", SCALA_VERSION).match +SPARK_VERSION = get(ENV, "BUILD_SPARK_VERSION", "3.2.1") +SCALA_VERSION = get(ENV, "BUILD_SCALA_VERSION", "2.13") +SCALA_BINARY_VERSION = get(ENV, "BUILD_SCALA_VERSION", "2.13.6") cd(joinpath(dirname(@__DIR__), "jvm/sparkjl")) do run(`$mvn clean package -Dspark.version=$SPARK_VERSION -Dscala.version=$SCALA_VERSION -Dscala.binary.version=$SCALA_BINARY_VERSION`) diff --git a/examples/SparkSubmitJulia.scala b/examples/SparkSubmitJulia.scala index 730075b..b884b5e 100644 --- a/examples/SparkSubmitJulia.scala +++ b/examples/SparkSubmitJulia.scala @@ -12,7 +12,7 @@ * /opt/julia/depot/helloworld.jl \ * /usr/local/julia/bin/julia \ * /opt/julia/depot -* +* * To compile, use `src/main/scala/SparkSubmitJulia.scala` with a build.sbt like: * --------------------- * name := "Spark Submit Julia" diff --git a/examples/basic.jl b/examples/basic.jl index 73eec45..3f9a058 100644 --- a/examples/basic.jl +++ b/examples/basic.jl @@ -1,3 +1,5 @@ +## THIS EXAMPLE IS OUTDATED! +## TODO: update or remove using Spark diff --git a/jvm/sparkjl/dependency-reduced-pom.xml b/jvm/sparkjl/dependency-reduced-pom.xml new file mode 100644 index 0000000..a3e9059 --- /dev/null +++ b/jvm/sparkjl/dependency-reduced-pom.xml @@ -0,0 +1,102 @@ + + + 4.0.0 + sparkjl + sparkjl + sparkjl + 0.2 + + + + + net.alchim31.maven + scala-maven-plugin + 4.6.1 + + + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + + + + + + + net.alchim31.maven + scala-maven-plugin + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + maven-compiler-plugin + + + compile + + compile + + + + + 1.8 + 1.8 + + + + maven-shade-plugin + 3.3.0 + + + package + + shade + + + + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + classworlds:classworlds + junit:junit + jmock:* + *:xml-apis + org.apache.maven:lib:tests + log4j:log4j:jar: + + + + + + + + + + 2.13.6 + [3.2.0,3.2.1] + 1.11 + 64m + UTF-8 + 2.13 + 512m + UTF-8 + + diff --git a/jvm/sparkjl/src/main/scala/org/apache/spark/api/julia/InputIterator.scala b/jvm/sparkjl/old_src/InputIterator.scala similarity index 100% rename from jvm/sparkjl/src/main/scala/org/apache/spark/api/julia/InputIterator.scala rename to jvm/sparkjl/old_src/InputIterator.scala diff --git a/jvm/sparkjl/src/main/scala/org/apache/spark/api/julia/JuliaRDD.scala b/jvm/sparkjl/old_src/JuliaRDD.scala similarity index 100% rename from jvm/sparkjl/src/main/scala/org/apache/spark/api/julia/JuliaRDD.scala rename to jvm/sparkjl/old_src/JuliaRDD.scala diff --git a/jvm/sparkjl/src/main/scala/org/apache/spark/api/julia/JuliaRunner.scala b/jvm/sparkjl/old_src/JuliaRunner.scala similarity index 100% rename from jvm/sparkjl/src/main/scala/org/apache/spark/api/julia/JuliaRunner.scala rename to jvm/sparkjl/old_src/JuliaRunner.scala diff --git a/jvm/sparkjl/src/main/scala/org/apache/spark/api/julia/OutputThread.scala b/jvm/sparkjl/old_src/OutputThread.scala similarity index 100% rename from jvm/sparkjl/src/main/scala/org/apache/spark/api/julia/OutputThread.scala rename to jvm/sparkjl/old_src/OutputThread.scala diff --git a/jvm/sparkjl/src/main/scala/org/apache/spark/api/julia/RDDUtils.scala b/jvm/sparkjl/old_src/RDDUtils.scala similarity index 100% rename from jvm/sparkjl/src/main/scala/org/apache/spark/api/julia/RDDUtils.scala rename to jvm/sparkjl/old_src/RDDUtils.scala diff --git a/jvm/sparkjl/src/main/scala/org/apache/spark/api/julia/StreamUtils.scala b/jvm/sparkjl/old_src/StreamUtils.scala similarity index 100% rename from jvm/sparkjl/src/main/scala/org/apache/spark/api/julia/StreamUtils.scala rename to jvm/sparkjl/old_src/StreamUtils.scala diff --git a/jvm/sparkjl/pom.xml b/jvm/sparkjl/pom.xml index e71acd6..a4ca99e 100644 --- a/jvm/sparkjl/pom.xml +++ b/jvm/sparkjl/pom.xml @@ -4,122 +4,33 @@ 4.0.0 sparkjl jar - 0.1 - - - central - - Maven Repository - https://repo1.maven.org/maven2 - - true - - - false - - - - apache-repo - Apache Repository - https://repository.apache.org/content/repositories/releases - - true - - - false - - - - jboss-repo - JBoss Repository - https://repository.jboss.org/nexus/content/repositories/releases - - true - - - false - - - - cloudera-repo - Cloudera Repository - https://repository.cloudera.com/artifactory/cloudera-repos - - true - - - false - - - - Spray.cc repository - http://repo.spray.cc - - - Akka repository - http://repo.akka.io/releases - - - Twitter repository - http://maven.twttr.com/ - - - scala - Scala Tools - http://scala-tools.org/repo-releases/ - - true - - - false - - - + 0.2 + + + UTF-8 + UTF-8 + + 1.11 + 2.13 + 2.13.6 + + [3.2.0,3.2.1] + + + 64m + 512m + + - - com.fasterxml.jackson.module - jackson-module-scala_${scala.binary.version} - 2.12.4 - - - - org.apache.hadoop - hadoop-client - ${hadoop.version} - - - asm - asm - - - org.jboss.netty - netty - - - org.codehaus.jackson - * - - - com.fasterxml.jackson.core - * - - - org.sonatype.sisu.inject - * - - - javax.servlet - servlet-api - - + org.scala-lang + scala-library + ${scala.binary.version} - org.apache.spark - spark-core_${scala.binary.version} + spark-core_${scala.version} ${spark.version} @@ -130,15 +41,15 @@ org.apache.spark - spark-yarn_${scala.binary.version} + spark-yarn_${scala.version} ${spark.version} org.apache.spark - spark-sql_${scala.binary.version} + spark-sql_${scala.version} ${spark.version} - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - UTF-8 - UTF-8 - - 1.8 - 2.12.11 - 2.12 - - [2.4.6,3.1.1] - 2.7.3 - 2.7.3 - - 64m - 512m - - target/scala-${scala.binary.version}/classes - - - - - org.apache.maven.plugins - maven-enforcer-plugin - 1.1.1 - - - enforce-versions - - enforce - - - - - 3.0.0 - - - ${java.version} - - - - - - - - org.codehaus.mojo - build-helper-maven-plugin - 1.7 - - - - net.alchim31.maven - scala-maven-plugin - 3.2.2 - - - scala-compile-first - process-resources - - compile - - - - scala-test-compile-first - process-test-resources - - testCompile - - - - attach-scaladocs - verify - - doc-jar - - - - - ${scala.version} - - - true - - -unchecked - -deprecation - - - -Xms64m - -Xms1024m - -Xmx1024m - -XX:PermSize=${PermGen} - -XX:MaxPermSize=${MaxPermGen} - - - -source - ${java.version} - -target - ${java.version} - + + + + net.alchim31.maven + scala-maven-plugin + 4.6.1 + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 - - - - - org.apache.maven.plugins - maven-compiler-plugin - 2.5.1 - - ${java.version} - ${java.version} - UTF-8 - 1024m - - - - - org.apache.maven.plugins - maven-jar-plugin - 2.4 - - + + + compile + + compile + + + + org.apache.maven.plugins maven-shade-plugin @@ -351,64 +160,35 @@ - + + + diff --git a/jvm/sparkjl/pom_old.xml b/jvm/sparkjl/pom_old.xml new file mode 100644 index 0000000..eda3bd9 --- /dev/null +++ b/jvm/sparkjl/pom_old.xml @@ -0,0 +1,420 @@ + + sparkjl + sparkjl + 4.0.0 + sparkjl + jar + 0.1 + + + central + + Maven Repository + https://repo1.maven.org/maven2 + + true + + + false + + + + apache-repo + Apache Repository + https://repository.apache.org/content/repositories/releases + + true + + + false + + + + jboss-repo + JBoss Repository + https://repository.jboss.org/nexus/content/repositories/releases + + true + + + false + + + + cloudera-repo + Cloudera Repository + https://repository.cloudera.com/artifactory/cloudera-repos + + true + + + false + + + + Spray.cc repository + http://repo.spray.cc + + + Akka repository + http://repo.akka.io/releases + + + Twitter repository + http://maven.twttr.com/ + + + scala + Scala Tools + http://scala-tools.org/repo-releases/ + + true + + + false + + + + + + + com.fasterxml.jackson.module + jackson-module-scala_${scala.binary.version} + 2.12.4 + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + asm + asm + + + org.jboss.netty + netty + + + org.codehaus.jackson + * + + + com.fasterxml.jackson.core + * + + + org.sonatype.sisu.inject + * + + + javax.servlet + servlet-api + + + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + + + org.apache.hadoop + hadoop-client + + + + + org.apache.spark + spark-yarn_${scala.binary.version} + ${spark.version} + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + + + + org.apache.logging.log4j + log4j-core + 2.17.2 + + + + org.scala-lang + scala-library + ${scala.version} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + UTF-8 + UTF-8 + + 1.8 + 2.12.11 + 2.12 + + [2.4.6,3.1.1] + 2.7.3 + 2.7.3 + + 64m + 512m + + + + target/scala-${scala.binary.version}/classes + + + + + org.apache.maven.plugins + maven-enforcer-plugin + 1.1.1 + + + enforce-versions + + enforce + + + + + 3.0.0 + + + ${java.version} + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.7 + + + + net.alchim31.maven + scala-maven-plugin + 3.2.2 + + + scala-compile-first + process-resources + + compile + + + + scala-test-compile-first + process-test-resources + + testCompile + + + + attach-scaladocs + verify + + doc-jar + + + + + ${scala.version} + + + true + + -unchecked + -deprecation + + + -Xms64m + -Xms1024m + -Xmx1024m + -XX:PermSize=${PermGen} + -XX:MaxPermSize=${MaxPermGen} + + + -source + ${java.version} + -target + ${java.version} + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.5.1 + + ${java.version} + ${java.version} + UTF-8 + 1024m + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + org.apache.maven.plugins + maven-shade-plugin + 2.0 + + true + assembly + + + *:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + META-INF/services/org.apache.hadoop.fs.FileSystem + + + reference.conf + + + + + + package + + shade + + + + + + org.apache.maven.plugins + maven-enforcer-plugin + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-scala-sources + generate-sources + + add-source + + + + src/main/scala + + + + + add-scala-test-sources + generate-test-sources + + add-test-source + + + + src/test/scala + + + + + + + net.alchim31.maven + scala-maven-plugin + + + org.apache.maven.plugins + maven-source-plugin + + + + + + + + scala + Scala Tools + http://scala-tools.org/repo-releases/ + + true + + + false + + + + + diff --git a/jvm/sparkjl/src/main/java/org/apache/spark/api/julia/DynamicJavaCompiler.java b/jvm/sparkjl/src/main/java/org/apache/spark/api/julia/DynamicJavaCompiler.java new file mode 100644 index 0000000..93828ad --- /dev/null +++ b/jvm/sparkjl/src/main/java/org/apache/spark/api/julia/DynamicJavaCompiler.java @@ -0,0 +1,34 @@ +package org.apache.spark.api.julia; + +import java.io.File; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; + +import javax.tools.JavaCompiler; +import javax.tools.ToolProvider; + +class DynamicJavaCompiler { + + public static void compile() throws Exception { + // Prepare source somehow. + String source = "package test; public class Test { static { System.out.println(\"hello\"); } public Test() { System.out.println(\"world\"); } }"; + + // Save source in .java file. + File root = new File("/java"); // On Windows running on C:\, this is C:\java. + File sourceFile = new File(root, "test/Test.java"); + sourceFile.getParentFile().mkdirs(); + Files.write(sourceFile.toPath(), source.getBytes(StandardCharsets.UTF_8)); + + // Compile source file. + JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); + compiler.run(null, null, null, sourceFile.getPath()); + + // Load and instantiate compiled class. + URLClassLoader classLoader = URLClassLoader.newInstance(new URL[] { root.toURI().toURL() }); + Class cls = Class.forName("test.Test", true, classLoader); // Should print "hello". + Object instance = cls.getDeclaredConstructor().newInstance(); // Should print "world". + System.out.println(instance); // Should print "test.Test@hashcode". + } +} \ No newline at end of file diff --git a/src/column.jl b/src/column.jl index 10e9559..3417f6b 100644 --- a/src/column.jl +++ b/src/column.jl @@ -120,6 +120,9 @@ avg(col::Column) = mean(col) explode(col::Column) = Column(jcall(JSQLFunctions, "explode", JColumn, (JColumn,), col.jcol)) +Base.split(col::Column, sep::AbstractString) = + Column(jcall(JSQLFunctions, "split", JColumn, (JColumn, JString), col.jcol, sep)) + function window(col::Column, w_dur::String, slide_dur::String, start_time::String) return Column(jcall(JSQLFunctions, "window", JColumn, diff --git a/src/compiler.jl b/src/compiler.jl index 444ee18..763d2d8 100644 --- a/src/compiler.jl +++ b/src/compiler.jl @@ -3,6 +3,8 @@ import JavaCall: assertroottask_or_goodenv, assertloaded, get_method_id using Umlaut +const JDynamicJavaCompiler = @jimport org.apache.spark.api.julia.DynamicJavaCompiler + const JFile = @jimport java.io.File const JToolProvider = @jimport javax.tools.ToolProvider const JJavaCompiler = @jimport javax.tools.JavaCompiler @@ -35,7 +37,7 @@ function create_class(name::String, src::String) mkpath(pkg_path) src_path = joinpath(pkg_path, elems[end] * ".java") open(src_path, "w") do f - write(f, src) + Base.write(f, src) end # compile jcompiler = jcall(JToolProvider, "getSystemJavaCompiler", JJavaCompiler, ()) diff --git a/src/convert.jl b/src/convert.jl index 0ae34d9..91263d8 100644 --- a/src/convert.jl +++ b/src/convert.jl @@ -52,8 +52,11 @@ end function Base.convert(::Type{JSeq}, x::Vector) jarr = JArray(x) jobj = convert(JObject, jarr) - jwa = jcall(JWrappedArray, "make", JWrappedArray, (JObject,), jobj) - return jcall(jwa, "toSeq", JSeq, ()) + jarrseq = jcall(JArraySeq, "make", JArraySeq, (JObject,), jobj) + return jcall(jarrseq, "toSeq", JSeq, ()) + # jwa = jcall(JWrappedArray, "make", JWrappedArray, (JObject,), jobj) + # jwa = jcall(JArraySeq, "make", JArraySeq, (JObject,), jobj) + # return jcall(jwa, "toSeq", JSeq, ()) end function Base.convert(::Type{JMap}, d::Dict) diff --git a/src/core.jl b/src/core.jl index 4a0d16b..2a32c52 100644 --- a/src/core.jl +++ b/src/core.jl @@ -32,7 +32,7 @@ end module Compiler using Reexport - @reexport import Spark: udf, jcall2 + @reexport import Spark: udf, jcall2, create_instance, create_class end module SQL diff --git a/src/dataframe.jl b/src/dataframe.jl index 0f1364d..7274172 100644 --- a/src/dataframe.jl +++ b/src/dataframe.jl @@ -3,7 +3,13 @@ ############################################################################### Base.show(df::DataFrame) = jcall(df.jdf, "show", Nothing, ()) -Base.show(io::IO, df::DataFrame) = show(df) +function Base.show(io::IO, df::DataFrame) + if df.isstreaming() + print(io, "DataFrame(...streaming...)") + else + show(df) + end +end printSchema(df::DataFrame) = jcall(df.jdf, "printSchema", Nothing, ()) @@ -122,12 +128,23 @@ createOrReplaceTempView(df::DataFrame, name::AbstractString) = isstreaming(df::DataFrame) = Bool(jcall(df.jdf, "isStreaming", jboolean, ())) isStreaming(df::DataFrame) = isstreaming(df) + +function writeStream(df::DataFrame) + jwriter = jcall(df.jdf, "writeStream", JDataStreamWriter, ()) + return DataStreamWriter(jwriter) +end + + ############################################################################### # GroupedData # ############################################################################### @chainable GroupedData -Base.show(io::IO, gdf::GroupedData) = print(io, "GroupedData()") +function Base.show(io::IO, gdf::GroupedData) + repr = jcall(gdf.jgdf, "toString", JString, ()) + repr = replace(repr, "RelationalGroupedDataset" => "GroupedData") + print(io, repr) +end function agg(gdf::GroupedData, col::Column, cols::Column...) jdf = jcall(gdf.jgdf, "agg", JDataset, @@ -141,7 +158,7 @@ function agg(gdf::GroupedData, ops::Dict{<:AbstractString, <:AbstractString}) return DataFrame(jdf) end -for func in (:min, :max, :count, :sum, :mean) +for func in (:min, :max, :sum, :mean) @eval function $func(gdf::GroupedData, cols::String...) jdf = jcall(gdf.jgdf, string($func), JDataset, (Vector{JString},), collect(cols)) return DataFrame(jdf) @@ -152,8 +169,11 @@ minimum(gdf::GroupedData, cols::String...) = min(gdf, cols...) maximum(gdf::GroupedData, cols::String...) = max(gdf, cols...) avg(gdf::GroupedData, cols::String...) = mean(gdf, cols...) +Base.count(gdf::GroupedData) = + DataFrame(jcall(gdf.jgdf, "count", JDataset, ())) + -function Base.write(df::DataFrame) +function write(df::DataFrame) jwriter = jcall(df.jdf, "write", JDataFrameWriter, ()) return DataFrameWriter(jwriter) end diff --git a/src/defs.jl b/src/defs.jl index 0364a02..d83e75c 100644 --- a/src/defs.jl +++ b/src/defs.jl @@ -14,6 +14,7 @@ const JDataFrameReader = @jimport org.apache.spark.sql.DataFrameReader const JDataFrameWriter = @jimport org.apache.spark.sql.DataFrameWriter const JDataStreamReader = @jimport org.apache.spark.sql.streaming.DataStreamReader const JDataStreamWriter = @jimport org.apache.spark.sql.streaming.DataStreamWriter +const JStreamingQuery = @jimport org.apache.spark.sql.streaming.StreamingQuery const JDataset = @jimport org.apache.spark.sql.Dataset const JRelationalGroupedDataset = @jimport org.apache.spark.sql.RelationalGroupedDataset @@ -40,8 +41,9 @@ const JMap = @jimport java.util.Map const JHashMap = @jimport java.util.HashMap const JList = @jimport java.util.List const JArrayList = @jimport java.util.ArrayList -const JWrappedArray = @jimport scala.collection.mutable.WrappedArray -const JSeq = @jimport scala.collection.Seq +# const JWrappedArray = @jimport scala.collection.mutable.WrappedArray +const JArraySeq = @jimport scala.collection.mutable.ArraySeq +const JSeq = @jimport scala.collection.immutable.Seq @@ -104,9 +106,13 @@ struct DataFrameWriter end struct DataStreamReader - jstream::JDataStreamReader + jreader::JDataStreamReader end struct DataStreamWriter - jstream::JDataStreamWriter + jwriter::JDataStreamWriter +end + +struct StreamingQuery + jquery::JStreamingQuery end \ No newline at end of file diff --git a/src/init.jl b/src/init.jl index eb15fed..0587e9c 100644 --- a/src/init.jl +++ b/src/init.jl @@ -25,9 +25,9 @@ function init(; log_level="WARN") for x in readdir(joinpath(shome, "jars")) JavaCall.addClassPath(joinpath(shome, "jars", x)) end - JavaCall.addClassPath(joinpath(dirname(@__FILE__), "..", "jvm", "sparkjl", "target", "sparkjl-0.1.jar")) + JavaCall.addClassPath(joinpath(dirname(@__FILE__), "..", "jvm", "sparkjl", "target", "sparkjl-0.2.jar")) else - JavaCall.addClassPath(joinpath(dirname(@__FILE__), "..", "jvm", "sparkjl", "target", "sparkjl-0.1-assembly.jar")) + JavaCall.addClassPath(joinpath(dirname(@__FILE__), "..", "jvm", "sparkjl", "target", "sparkjl-0.2-assembly.jar")) end for y in split(get(ENV, "SPARK_DIST_CLASSPATH", ""), [':',';'], keepempty=false) JavaCall.addClassPath(String(y)) diff --git a/src/session.jl b/src/session.jl index 5e87044..f49183d 100644 --- a/src/session.jl +++ b/src/session.jl @@ -29,7 +29,7 @@ function enableHiveSupport(builder::SparkSessionBuilder) end function getOrCreate(builder::SparkSessionBuilder) - config(builder, "spark.jars", joinpath(dirname(@__FILE__), "..", "jvm", "sparkjl", "target", "sparkjl-0.1.jar")) + config(builder, "spark.jars", joinpath(dirname(@__FILE__), "..", "jvm", "sparkjl", "target", "sparkjl-0.2.jar")) jspark = jcall(builder.jbuilder, "getOrCreate", JSparkSession, ()) return SparkSession(jspark) end @@ -56,7 +56,7 @@ Base.close(spark::SparkSession) = jcall(spark.jspark, "close", Nothing, ()) stop(spark::SparkSession) = jcall(spark.jspark, "stop", Nothing, ()) -function Base.read(spark::SparkSession) +function read(spark::SparkSession) jreader = jcall(spark.jspark, "read", JDataFrameReader, ()) return DataFrameReader(jreader) end diff --git a/src/streaming.jl b/src/streaming.jl index 99e598a..c63944b 100644 --- a/src/streaming.jl +++ b/src/streaming.jl @@ -7,20 +7,31 @@ Base.show(io::IO, stream::DataStreamReader) = print(io, "DataStreamReader()") function readStream(spark::SparkSession) - jstream = jcall(spark.jspark, "readStream", JDataStreamReader) - return DataStreamReader(jstream) + jreader = jcall(spark.jspark, "readStream", JDataStreamReader) + return DataStreamReader(jreader) end function format(stream::DataStreamReader, fmt::String) - jstream = jcall(stream.jstream, "format", JDataStreamReader, (JString,), fmt) - return DataStreamReader(jstream) + jreader = jcall(stream.jreader, "format", JDataStreamReader, (JString,), fmt) + return DataStreamReader(jreader) +end + + +function schema(stream::DataStreamReader, sch::StructType) + jreader = jcall(stream.jreader, "schema", JDataStreamReader, (JStructType,), sch.jst) + return DataStreamReader(jreader) +end + +function schema(stream::DataStreamReader, sch::String) + jreader = jcall(stream.jreader, "schema", JDataStreamReader, (JString,), sch) + return DataStreamReader(jreader) end for (T, JT) in [(String, JString), (Integer, jlong), (Real, jdouble), (Bool, jboolean)] @eval function option(stream::DataStreamReader, key::String, value::$T) - jcall(stream.jstream, "option", JDataStreamReader, (JString, $JT), key, value) + jcall(stream.jreader, "option", JDataStreamReader, (JString, $JT), key, value) return stream end end @@ -28,18 +39,89 @@ end for func in (:csv, :json, :parquet, :orc, :text, :textFile) @eval function $func(stream::DataStreamReader, path::String) - jdf = jcall(stream.jstream, string($func), JDataset, (JString,), path) + jdf = jcall(stream.jreader, string($func), JDataset, (JString,), path) return DataFrame(jdf) end end function load(stream::DataStreamReader, path::String) - jdf = jcall(stream.jstream, "load", JDataset, (JString,), path) + jdf = jcall(stream.jreader, "load", JDataset, (JString,), path) return DataFrame(jdf) end function load(stream::DataStreamReader) - jdf = jcall(stream.jstream, "load", JDataset, ()) + jdf = jcall(stream.jreader, "load", JDataset, ()) return DataFrame(jdf) -end \ No newline at end of file +end + + +############################################################################### +# DataStreamWriter # +############################################################################### + +Base.show(io::IO, stream::DataStreamWriter) = print(io, "DataStreamWriter()") +@chainable DataStreamWriter + + +function format(writer::DataStreamWriter, fmt::String) + jcall(writer.jwriter, "format", JDataStreamWriter, (JString,), fmt) + return writer +end + + +function outputMode(writer::DataStreamWriter, m::String) + jcall(writer.jwriter, "outputMode", JDataStreamWriter, (JString,), m) + return writer +end + + +for (T, JT) in [(String, JString), (Integer, jlong), (Real, jdouble), (Bool, jboolean)] + @eval function option(writer::DataStreamWriter, key::String, value::$T) + jcall(writer.jwriter, "option", JDataStreamWriter, (JString, $JT), key, value) + return writer + end +end + + +function foreach(writer::DataStreamWriter, jfew::JObject) + JForeachWriter = @jimport(org.apache.spark.sql.ForeachWriter) + jfew = convert(JForeachWriter, jfew) + jwriter = jcall(writer.jwriter, "foreach", JDataStreamWriter, (JForeachWriter,), jfew) + return DataStreamWriter(jwriter) +end + + +function start(writer::DataStreamWriter) + jquery = jcall(writer.jwriter, "start", JStreamingQuery, ()) + return StreamingQuery(jquery) +end + + + +############################################################################### +# StreamingQuery # +############################################################################### + +Base.show(io::IO, query::StreamingQuery) = print(io, "StreamingQuery()") +@chainable StreamingQuery + + +function awaitTermination(query::StreamingQuery) + jcall(query.jquery, "awaitTermination", Nothing, ()) +end + + +function awaitTermination(query::StreamingQuery, timeout::Integer) + return Bool(jcall(query.jquery, "awaitTermination", jboolean, (jlong,), timeout)) +end + + +isActive(query::StreamingQuery) = Bool(jcall(query.jquery, "isActive", jboolean, ())) +stop(query::StreamingQuery) = jcall(query.jquery, "stop", Nothing, ()) + +explain(query::StreamingQuery) = jcall(query.jquery, "explain", Nothing, ()) +explain(query::StreamingQuery, extended::Bool) = + jcall(query.jquery, "explain", Nothing, (jboolean,), extended) + +# TODO: foreach, foreachBatch \ No newline at end of file diff --git a/src/window.jl b/src/window.jl index 7204e1e..cea636a 100644 --- a/src/window.jl +++ b/src/window.jl @@ -3,7 +3,6 @@ ############################################################################### @chainable WindowSpec -# @chainable Type{Window} function Base.getproperty(W::Type{Window}, prop::Symbol) if hasfield(typeof(W), prop) @@ -16,19 +15,6 @@ function Base.getproperty(W::Type{Window}, prop::Symbol) end end -function Base.getproperty(row::Row, prop::Symbol) - if hasfield(Row, prop) - return getfield(row, prop) - end - sch = schema(row) - if !isnothing(sch) && string(prop) in names(sch) - return row[string(prop)] - else - fn = getfield(@__MODULE__, prop) - return DotChainer(row, fn) - end -end - Base.show(io::IO, win::Window) = print(io, "Window()") Base.show(io::IO, win::WindowSpec) = print(io, "WindowSpec()") diff --git a/test/test_sql.jl b/test/test_sql.jl index 0713920..c80af6c 100644 --- a/test/test_sql.jl +++ b/test/test_sql.jl @@ -1,4 +1,5 @@ using Spark.SQL +using Spark.Compiler @testset "Builder" begin @@ -143,12 +144,14 @@ end @test col.explode() |> string == """col("explode(x)")""" + @test col.split("|") |> string == """col("split(x, |, -1)")""" + @test (col.window("10 minutes", "5 minutes", "15 minutes") |> string == - """col("timewindow(x, 600000000, 300000000, 900000000) AS `window`")""") + """col("window(x, 600000000, 300000000, 900000000) AS window")""") @test (col.window("10 minutes", "5 minutes") |> string == - """col("timewindow(x, 600000000, 300000000, 0) AS `window`")""") + """col("window(x, 600000000, 300000000, 0) AS window")""") @test (col.window("10 minutes") |> string == - """col("timewindow(x, 600000000, 600000000, 0) AS `window`")""") + """col("window(x, 600000000, 600000000, 0) AS window")""") end @@ -194,13 +197,43 @@ end # for REPL: # data_dir = joinpath(@__DIR__, "test", "data") data_dir = joinpath(@__DIR__, "data") - mktempdir(; prefix="spark-jl-") do tmp_dir - df = spark.readStream.json(joinpath(data_dir, "people.json")) - @test df.isstreaming() - df.write.mode("overwrite").parquet(joinpath(tmp_dir, "people.parquet")) - df = spark.read.parquet(joinpath(tmp_dir, "people.parquet")) - df.write.mode("overwrite").orc(joinpath(tmp_dir, "people.orc")) - df = spark.read.orc(joinpath(tmp_dir, "people.orc")) - @test df.collect("name") |> Set == Set(["Peter", "Belle"]) - end + sch = StructType("name string, age long") + # df = spark.readStream.schema(sch).json(joinpath(data_dir, "people.json")) + df = spark.readStream.schema(sch).json(data_dir) + @test df.isstreaming() + query = df.writeStream. + format("console"). + option("numRows", 5). + outputMode("append"). + start() + query.explain() + query.explain(true) + @test query.isActive() + query.awaitTermination(100) + query.stop() + @test !query.isActive() + + df = spark.readStream.schema(sch).json(data_dir) + jfew = create_instance(""" + package spark.jl; + import java.io.Serializable; + import org.apache.spark.sql.ForeachWriter; + + class JuliaWriter extends ForeachWriter implements Serializable { + private static final long serialVersionUID = 1L; + + @Override public boolean open(long partitionId, long version) { + return true; + } + + @Override public void process(String record) { + System.out.println(record); + } + + @Override public void close(Throwable errorOrNull) { + } + } + """) + query = df.writeStream.foreach(jfew).start() + end \ No newline at end of file