diff --git a/Project.toml b/Project.toml index 7f03265..1d21fee 100644 --- a/Project.toml +++ b/Project.toml @@ -3,6 +3,7 @@ uuid = "e3819d11-95af-5eea-9727-70c091663a01" version = "0.6.0" [deps] +Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" IteratorInterfaceExtensions = "82899510-4779-5014-852e-03e436cf321d" JavaCall = "494afd89-becb-516b-aafa-70d2670c0337" Reexport = "189a3867-3050-52da-a836-e630ba90ab69" @@ -17,6 +18,7 @@ IteratorInterfaceExtensions = "1" JavaCall = "0.7, 0.8" Reexport = "1.2" TableTraits = "1" +Umlaut = "0.2" julia = "1.6" [extras] diff --git a/docs/make.jl b/docs/make.jl index d5b79ab..99cf7c7 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -6,7 +6,7 @@ makedocs( format = Documenter.HTML(), modules = [Spark], pages = Any[ - "Introduction" => "intro.md", + "Introduction" => "index.md", "SQL / DataFrames" => "sql.md", "Structured Streaming" => "streaming.md", "API Reference" => "api.md" diff --git a/docs/make_old.jl b/docs/make_old.jl deleted file mode 100644 index db9cb38..0000000 --- a/docs/make_old.jl +++ /dev/null @@ -1,25 +0,0 @@ -using Documenter, Spark - -#load_dir(x) = map(file -> joinpath("lib", x, file), readdir(joinpath(Base.source_dir(), "src", "lib", x))) - -makedocs( - modules = [Spark], - clean = false, - format = [:html], - sitename = "Spark", - pages = Any[ - "Introduction" => "index.md", - "Getting Started" => "getting_started.md", - "Structured Streaming" => "structured_streaming.md", - "API Reference" => "api.md" - ] -) - -deploydocs( - repo = "github.com/dfdx/Spark.jl.git", - julia = "0.6", - osname = "linux", - deps = nothing, - make = nothing, - target = "build", -) \ No newline at end of file diff --git a/docs/src/api.md b/docs/src/api.md index e60c7a0..fddd9cf 100644 --- a/docs/src/api.md +++ b/docs/src/api.md @@ -1,9 +1,11 @@ - -```@index +```@meta +CurrentModule = Spark ``` +```@docs +@chainable +DotChainer +``` -```@autodocs -Modules = [Spark] -Order = [:type, :function] +```@index ``` diff --git a/docs/src/intro.md b/docs/src/intro.md deleted file mode 100644 index 5873cda..0000000 --- a/docs/src/intro.md +++ /dev/null @@ -1,50 +0,0 @@ -# Introduction - -## Overview - -Spark.jl provides an interface to Apache Sparkā„¢ platform, including SQL / DataFrame and Structured Streaming. It closely follows the PySpark API, making it easy to translate existing Python code to Julia. - -Spark.jl supports multiple cluster types (in client mode), and can be considered as an analogue to PySpark or RSpark within the Julia ecosystem. It supports running within on-premise installations, as well as hosted instance such as Amazon EMR and Azure HDInsight. - -### Installation - -Spark.jl requires at least JDK 8/11 and Maven to be installed and available in PATH. - -``` -Pkg.add("Spark.jl") -``` - -To link against a specific version of Spark, also run: - -``` -ENV["BUILD_SPARK_VERSION"] = "3.2.1" # version you need -Pkg.build("Spark") -``` - -### Quick Example - -Note that most types in Spark.jl support dot notation for calling functions, e.g. `x.foo(y)` is expanded into `foo(x, y)`. - -``` -using Spark.SQL - -spark = SparkSession.builder.appName("Main").master("local").getOrCreate() -df = spark.createDataFrame([["Alice", 19], ["Bob", 23]], "name string, age long") -rows = df.select(Column("age") + 1).collect() -for row in rows - println(row[1]) -end -``` - -### Cluster Types - -This package supports multiple cluster types (in client mode): `local`, `standalone`, `mesos` and `yarn`. The location of the cluster (in case of mesos or standalone) or the cluster type (in case of local or yarn) must be passed as a parameter `master` when creating a Spark context. For YARN based clusters, the cluster parameters are picked up from `spark-defaults.conf`, which must be accessible via a `SPARK_HOME` environment variable. - -## Current Limitations - -* Jobs can be submitted from Julia process attached to the cluster in `client` deploy mode. `Cluster` mode is not fully supported, and it is uncertain if it is useful in the Julia context. -* Since records are serialised between Java and Julia at the edges, the maximum size of a single row in an RDD is 2GB, due to Java array indices being limited to 32 bits. - -## Trademarks - -ApacheĀ®, [Apache Spark and Spark](http://spark.apache.org) are registered trademarks, or trademarks of the [Apache Software Foundation](http://www.apache.org/) in the United States and/or other countries. diff --git a/docs/src/sql.md b/docs/src/sql.md index 4db000e..54d6b30 100644 --- a/docs/src/sql.md +++ b/docs/src/sql.md @@ -1,2 +1,47 @@ +```@meta +CurrentModule = Spark +``` + # SQL / DataFrames +This is a quick introduction into the Spark.jl core functions. It closely follows the official [PySpark tutorial](https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html) and copies many examples verbatim. In most cases, PySpark docs should work for Spark.jl as is or with little adaptation. + +Spark.jl applications usually start by creating a `SparkSession`: + +```julia +using Spark + +spark = SparkSession.builder.appName("Main").master("local").getOrCreate() +``` + +Note that here we use dot notation to chain function invocations. This makes the code more concise and also mimics Python API, making translation of examples easier. The same example could also be written as: + +```julia +using Spark +import Spark: appName, master, getOrCreate + +builder = SparkSession.builder +builder = appName(builder, "Main") +builder = master(builder, "local") +spark = getOrCreate(builder) +``` + +See [`@chainable`](@ref) for the details of the dot notation. + + +## DataFrame Creation + + +In simple cases, a Spark DataFrame can be created via `SparkSession.createDataFrame`. E.g. from a list of rows: + +```julia +using Dates + +df = spark.createDataFrame([ + Row(a=1, b=2.0, c="string1", d=Date(2000, 1, 1), e=DateTime(2000, 1, 1, 12, 0)), + Row(a=2, b=3.0, c="string2", d=Date(2000, 2, 1), e=DateTime(2000, 1, 2, 12, 0)), + Row(a=4, b=5.0, c="string3", d=Date(2000, 3, 1), e=DateTime(2000, 1, 3, 12, 0)) +]) + +``` + diff --git a/src/compiler.jl b/src/compiler.jl index 0db4f44..ca3883c 100644 --- a/src/compiler.jl +++ b/src/compiler.jl @@ -1,5 +1,5 @@ using JavaCall -import JavaCall: assertroottask_or_goodenv, assertloaded, get_method_id +import JavaCall: assertroottask_or_goodenv, assertloaded using Umlaut const JInMemoryJavaCompiler = @jimport org.mdkt.compiler.InMemoryJavaCompiler diff --git a/src/convert.jl b/src/convert.jl index 91263d8..1f5c1e9 100644 --- a/src/convert.jl +++ b/src/convert.jl @@ -2,8 +2,25 @@ # Conversions # ############################################################################### +# Note: both - java.sql.Timestamp and Julia's DateTime don't have timezone. +# But when printing, java.sql.Timestamp will assume UTC and convert to your +# local time. To avoid confusion e.g. in REPL, try use fixed date in UTC +# or now(Dates.UTC) +Base.convert(::Type{JTimestamp}, x::DateTime) = + JTimestamp((jlong,), floor(Int, datetime2unix(x)) * 1000) +Base.convert(::Type{DateTime}, x::JTimestamp) = + unix2datetime(jcall(x, "getTime", jlong, ()) / 1000) + +Base.convert(::Type{JDate}, x::Date) = + JDate((jlong,), floor(Int, datetime2unix(DateTime(x))) * 1000) +Base.convert(::Type{Date}, x::JDate) = + Date(unix2datetime(jcall(x, "getTime", jlong, ()) / 1000)) + + Base.convert(::Type{JObject}, x::Integer) = convert(JObject, convert(JLong, x)) Base.convert(::Type{JObject}, x::Real) = convert(JObject, convert(JDouble, x)) +Base.convert(::Type{JObject}, x::DateTime) = convert(JObject, convert(JTimestamp, x)) +Base.convert(::Type{JObject}, x::Date) = convert(JObject, convert(JDate, x)) Base.convert(::Type{JObject}, x::Column) = convert(JObject, x.jcol) Base.convert(::Type{Row}, obj::JObject) = Row(convert(JRow, obj)) @@ -25,6 +42,8 @@ java2julia(::Type{JInteger}) = Int32 java2julia(::Type{JDouble}) = Float64 java2julia(::Type{JFloat}) = Float32 java2julia(::Type{JBoolean}) = Bool +java2julia(::Type{JTimestamp}) = DateTime +java2julia(::Type{JDate}) = Date java2julia(::Type{JObject}) = Any julia2ddl(::Type{String}) = "string" @@ -33,6 +52,8 @@ julia2ddl(::Type{Int32}) = "int" julia2ddl(::Type{Float64}) = "double" julia2ddl(::Type{Float32}) = "float" julia2ddl(::Type{Bool}) = "boolean" +julia2ddl(::Type{Dates.Date}) = "date" +julia2ddl(::Type{Dates.DateTime}) = "timestamp" function JArray(x::Vector{T}) where T diff --git a/src/core.jl b/src/core.jl index 5f0b268..ae7febc 100644 --- a/src/core.jl +++ b/src/core.jl @@ -2,6 +2,7 @@ using JavaCall using Umlaut import Umlaut.V import Statistics +using Dates # using TableTraits # using IteratorInterfaceExtensions diff --git a/src/defs.jl b/src/defs.jl index d83e75c..1fe7f67 100644 --- a/src/defs.jl +++ b/src/defs.jl @@ -36,6 +36,8 @@ const JLong = @jimport java.lang.Long const JFloat = @jimport java.lang.Float const JDouble = @jimport java.lang.Double const JBoolean = @jimport java.lang.Boolean +const JDate = @jimport java.sql.Date +const JTimestamp = @jimport java.sql.Timestamp const JMap = @jimport java.util.Map const JHashMap = @jimport java.util.HashMap @@ -46,6 +48,7 @@ const JArraySeq = @jimport scala.collection.mutable.ArraySeq const JSeq = @jimport scala.collection.immutable.Seq +toString(jobj::JavaObject) = jcall(jobj, "toString", JString, ()) ############################################################################### diff --git a/test/runtests.jl b/test/runtests.jl index 461fbe0..1ec919a 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -16,6 +16,7 @@ spark = Spark.SparkSession.builder. include("test_chainable.jl") +include("test_convert.jl") include("test_compiler.jl") include("test_sql.jl") diff --git a/test/test_convert.jl b/test/test_convert.jl new file mode 100644 index 0000000..45a5b4c --- /dev/null +++ b/test/test_convert.jl @@ -0,0 +1,10 @@ +using Dates + +@testset "Convert" begin + # create DateTime without fractional part + t = now(Dates.UTC) |> datetime2unix |> floor |> unix2datetime + d = Date(t) + + @test convert(DateTime, convert(Spark.JTimestamp, t)) == t + @test convert(Date, convert(Spark.JDate, d)) == d +end \ No newline at end of file