Skip to content

Commit

Permalink
Deprecate RDD API
Browse files Browse the repository at this point in the history
  • Loading branch information
dfdx committed May 9, 2022
1 parent 5bc6289 commit e752e4e
Show file tree
Hide file tree
Showing 37 changed files with 193 additions and 82 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
*.jl.mem
*~
.idea/
.vscode/
target/
project/
*.class
Expand Down
5 changes: 5 additions & 0 deletions jvm/sparkjl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.mdkt.compiler</groupId>
<artifactId>InMemoryJavaCompiler</artifactId>
<version>1.3.0</version>
</dependency>

<!-- additional data formats -->

Expand Down
69 changes: 35 additions & 34 deletions src/Spark.jl
Original file line number Diff line number Diff line change
@@ -1,50 +1,51 @@
module Spark

export
SparkConf,
SparkContext,
RDD,
JuliaRDD,
JavaRDD,
text_file,
parallelize,
map,
map_pair,
map_partitions,
map_partitions_pair,
map_partitions_with_index,
reduce,
filter,
collect,
count,
id,
num_partitions,
close,
@attach,
share_variable,
@share,
flat_map,
flat_map_pair,
cartesian,
group_by_key,
reduce_by_key,
cache,
repartition,
coalesce,
pipe,
# SparkConf,
# SparkContext,
# RDD,
# JuliaRDD,
# JavaRDD,
# text_file,
# parallelize,
# map,
# map_pair,
# map_partitions,
# map_partitions_pair,
# map_partitions_with_index,
# reduce,
# filter,
# collect,
# count,
# id,
# num_partitions,
# close,
# @attach,
# share_variable,
# @share,
# flat_map,
# flat_map_pair,
# cartesian,
# group_by_key,
# reduce_by_key,
# cache,
# repartition,
# coalesce,
# pipe,
# SQL
SparkSession,
Dataset,
sql,
count,
read_json,
write_json,
read_parquet,
write_parquet,
read_df,
write_df



include("core.jl")

end
end
60 changes: 60 additions & 0 deletions src/compiler.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# # const JFile = @jimport java.io.File
# const JToolProvider = @jimport javax.tools.ToolProvider
# const JJavaCompiler = @jimport javax.tools.JavaCompiler
# const JInputStream = @jimport java.io.InputStream
# const JOutputStream = @jimport java.io.OutputStream
# const JArray = @jimport java.lang.reflect.Array

const JInMemoryJavaCompiler = @jimport org.mdkt.compiler.InMemoryJavaCompiler

# const JUDF1 = @jimport org.apache.spark.sql.api.java.UDF1


function mkclass(name::String, src::String)
jcompiler = jcall(JInMemoryJavaCompiler, "newInstance", JInMemoryJavaCompiler, ())
return jcall(jcompiler, "compile", JClass, (JString, JString), name, src)
end


function instantiate(name::String, src::String)
jclass = mkclass(name, src)
return jcall(jclass, "newInstance", JObject, ())
end


function main()
init()
name = "julia.compiled.Dummy"
src = """
package julia.compiled;
import java.util.function.Function;
public class Dummy implements Function<String, String> {
@Override
public String apply(String name) {
return "Hello, " + name;
}
public void hello() {
System.out.println("Hello!");
}
}
"""
jc = mkclass(name, src)
jo = jcall(jc, "newInstance", JObject, ())
# jo = instantiate(name, src)
# can't call inherited methods like this?
jcall(jo, "apply", JString, (JString,), "Lee")
jcall2(jo, "hello", Nothing, ())
jcall(jc, "getMethods", Vector{JMethod}, ())
end


function jcall2(jobj::JavaObject, name::String, ret_type, arg_types, args...)
jclass = getclass(jobj)
jargs = [a for a in convert.(arg_types, args)] # convert to Vector
meth = jcall(jclass, "getMethod", JMethod, (JString, Vector{JClass}), name, getclass.(jargs))
return meth(jobj, jargs...)
end
37 changes: 8 additions & 29 deletions src/core.jl
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@

using JavaCall
import Base: map, reduce, count, collect, close
import Base.Iterators

# config
const JSparkConf = @jimport org.apache.spark.SparkConf
# context
const JSparkContext = @jimport org.apache.spark.SparkContext
const JJavaSparkContext = @jimport org.apache.spark.api.java.JavaSparkContext
# SQL
const JSparkSession = @jimport org.apache.spark.sql.SparkSession
const JStructType = @jimport org.apache.spark.sql.types.StructType
Expand All @@ -21,27 +13,14 @@ const JRowFactory = @jimport org.apache.spark.sql.RowFactory
const JRow = @jimport org.apache.spark.sql.Row
const JColumn = @jimport org.apache.spark.sql.Column
const JSQLFunctions = @jimport org.apache.spark.sql.functions
# RDD
const JRDD = @jimport org.apache.spark.rdd.RDD
const JJavaRDD = @jimport org.apache.spark.api.java.JavaRDD
const JJavaPairRDD = @jimport org.apache.spark.api.java.JavaPairRDD
const JJuliaRDD = @jimport org.apache.spark.api.julia.JuliaRDD
const JJuliaPairRDD = @jimport org.apache.spark.api.julia.JuliaPairRDD
# utils
const JRDDUtils = @jimport org.apache.spark.api.julia.RDDUtils
# Java utils
const JIterator = @jimport java.util.Iterator
const JList = @jimport java.util.List
const JMap = @jimport java.util.Map
const JArrayList = @jimport java.util.ArrayList
const JHashMap = @jimport java.util.HashMap
const JSystem = @jimport java.lang.System


include("dotcaller.jl")
include("init.jl")
include("serialization.jl")
include("config.jl")
include("context.jl")
include("compiler.jl")
include("sql.jl")
include("rdd.jl")
include("attach.jl")
include("worker.jl")

# mostly unsupported RDD interface
include("rdd/core.jl")


23 changes: 23 additions & 0 deletions src/dotcaller.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
struct DotCaller{O, Fn}
obj::O
fn::Fn
end

DotCaller(obj, fn) = DotCaller{typeof(obj), typeof(fn)}(obj, fn)

(c::DotCaller)(args...) = c.fn(c.obj, args...)



macro dot_call(T)
return quote
function Base.getproperty(obj::$T, prop::Symbol)
if prop in names(@__MODULE__)
fn = getfield(@__MODULE__, prop)
return DotCaller(obj, fn)
else
return getfield(obj, prop)
end
end
end
end
File renamed without changes.
File renamed without changes.
File renamed without changes.
33 changes: 33 additions & 0 deletions src/rdd/core.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@

import Base: map, reduce, count, collect, close
import Base.Iterators

# config
const JSparkConf = @jimport org.apache.spark.SparkConf
# context
const JSparkContext = @jimport org.apache.spark.SparkContext
const JJavaSparkContext = @jimport org.apache.spark.api.java.JavaSparkContext

# RDD
const JRDD = @jimport org.apache.spark.rdd.RDD
const JJavaRDD = @jimport org.apache.spark.api.java.JavaRDD
const JJavaPairRDD = @jimport org.apache.spark.api.java.JavaPairRDD
const JJuliaRDD = @jimport org.apache.spark.api.julia.JuliaRDD
const JJuliaPairRDD = @jimport org.apache.spark.api.julia.JuliaPairRDD
# utils
const JRDDUtils = @jimport org.apache.spark.api.julia.RDDUtils
# Java utils
const JIterator = @jimport java.util.Iterator
const JList = @jimport java.util.List
const JMap = @jimport java.util.Map
const JArrayList = @jimport java.util.ArrayList
const JHashMap = @jimport java.util.HashMap
const JSystem = @jimport java.lang.System


include("serialization.jl")
include("config.jl")
include("context.jl")
include("rdd.jl")
include("attach.jl")
include("worker.jl")
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
11 changes: 9 additions & 2 deletions src/sql.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ struct SparkSession
appname::AbstractString
end

@dot_call SparkSession

function SparkSession(;master="local",
appname="Julia App on Spark",
config=Dict{String, String}())
Expand All @@ -28,6 +30,7 @@ function SparkSession(;master="local",
return sess
end


Base.show(io::IO, sess::SparkSession) = print(io, "SparkSession($(sess.master),$(sess.appname))")
Base.close(sess::SparkSession) = jcall(sess.jsess, "close", Nothing, ())

Expand All @@ -44,6 +47,7 @@ struct Dataset
jdf::JDataset
end

@dot_call Dataset

struct DatasetIterator{T}
itr::JavaObject{Symbol("java.util.Iterator")}
Expand All @@ -63,7 +67,7 @@ type_map = Dict(
"ObjectType" => JObject
)

function mapped_type(x::String)
function mapped_type(x::String)
if x in keys(type_map)
return type_map[x]
end
Expand All @@ -79,7 +83,7 @@ function TableTraits.getiterator(ds::Dataset)
mtypes = mapped_type.(unsafe_string.(map(x -> convert(JString, jcall(x, "_2", JObject, ())), jtypes)))

T = NamedTuple{Tuple(mnames),Tuple{mtypes...}}

jit = jcall(ds.jdf, "toLocalIterator", JavaObject{Symbol("java.util.Iterator")}, ())

l = count(ds)
Expand Down Expand Up @@ -205,6 +209,8 @@ struct Row
jrow::JRow
end

@dot_call Row

Row(objs...) = Row(jcall(JRowFactory, "create", JRow, (Vector{JObject},), [objs...]))


Expand Down Expand Up @@ -313,6 +319,7 @@ struct RelationalGroupedDataset
jrgd::JRelationalGroupedDataset
end

@dot_call RelationalGroupedDataset

function group_by(ds::Dataset, col_names...)
@assert length(col_names) > 0 "group_by requires at least one column name"
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
17 changes: 17 additions & 0 deletions test/rdd/test_rdd.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
include("basic.jl")
include("map.jl")
include("map_partitions.jl")
include("attach.jl")
include("reduce.jl")
include("text_file.jl")
include("share_variable.jl")
include("flat_map.jl")
include("cartesian.jl")
include("group_by_key.jl")
include("reduce_by_key.jl")
include("collect_pair.jl")
include("map_pair.jl")
include("julian_versions.jl")
include("repartition_coalesce.jl")
include("filter.jl")
include("pipe.jl")
File renamed without changes.
19 changes: 2 additions & 17 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,8 @@ Spark.init()

@testset "Spark" begin

include("basic.jl")
include("map.jl")
include("map_partitions.jl")
include("attach.jl")
include("reduce.jl")
include("text_file.jl")
include("share_variable.jl")
include("flat_map.jl")
include("cartesian.jl")
include("group_by_key.jl")
include("reduce_by_key.jl")
include("collect_pair.jl")
include("map_pair.jl")
include("julian_versions.jl")
include("repartition_coalesce.jl")
include("filter.jl")
include("pipe.jl")
include("sql.jl")

# include("rdd/test_rdd.jl")

end

0 comments on commit e752e4e

Please sign in to comment.