Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: PySpark-like, DataFrame-only API #104

Merged
merged 46 commits into from
Jun 4, 2022
Merged

RFC: PySpark-like, DataFrame-only API #104

merged 46 commits into from
Jun 4, 2022

Conversation

dfdx
Copy link
Owner

@dfdx dfdx commented May 16, 2022

I'm working on a nearly complete rewrite of the Spark.jl and want to get some feedback before releasing it. The summary of the changes:

  • PySpark-compatible API, including dot-chaining syntax, types and function names
  • SQL and Streaming API
  • No RDD API and thus no support for running custom Julia code on a cluster
  • Experimental support for UDFs via compilation to Java
  • One big breaking change

Why

At its current state, Spark.jl is terribly outdated and pretty unstable. A lot of effort is needed to keep the old RDD interface working,
but almost nobody uses this API nowadays. At the same time, DataFrame API in main is limited to just a few core methods and is hardly suitable for any real project.

PySpark-compatible API

There are thousands of tutorials, answers and discussions for PySpark out there, as well as hundreds of Pythonists looking for familiar API in the Julia land. So let's just reuse these materials. Examples of this new API:

using Spark.SQL

spark = SparkSession.builder.
        appName("Hello").
        master("local").
        config("some.key", "some-value").
        getOrCreate()

df = spark.read.text("README.md")
df.value.contains("Julia")

SQL and Streaming API

In this initial effort I'm going to implement >= 50% of the SQL and Structured Streaming API, including all the core data types and enough functions to copy-paste most Python tutorials. The long tail of other functions is time-consuming, but otherwise easy to add too.

No RDD

I'm going to discontinue support for the RDD API including JuliaRDD that we used to run custom Julia code on a cluster. The communication protocol between JVM and Julia in that RDD has been terribly outdated, but most importantly, we haven't found a way to reliably manage the state of the Julia workers in a variety of possible runtimes. See this and this for more details.

UDFs via compilation to Java

As described in these issues, the best alternative for running complex Julia scripts on a cluster is Kubernetes. However, to support simple data transformations, the new API also features a Julia-to-Java compiler. Example:

f = s -> lowercase(s)
f_udf = udf(f, "Hi!")
r = jcall2(f_udf.judf, "call", JString, (JString,), "Big Buddha Boom!")
@test convert(String, r) == f("Big Buddha Boom!")

f_udf.judf is a fully functional Java object implementing UDF1 interface that can be passed to any matching Java methods without Julia binary installed on a cluster.

On a lower level, one can also compile any Java classes from a string, including a class starting a Julia server (e.g. via juliacaller). But managing the lifecycle of this server is up to a user and out of scope of this initial effort.

One big breaking change

Along with RDD support, we lose DataFrames.jl compatibility and other contributions. I apologize for this. PRs to bring them back are highly welcome.


I'm currently close to finishing the SQL interface and look forward for the Structured Streaming API. I plan to finish it in 2 weeks or less, and then tag the new version. Comments are highly welcome.

# and propagate getproperty to the returned object
return getproperty(dc(), prop)
end
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, good magic, I had no idea this was possible in Julia :)

Copy link
Contributor

@exyi exyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work, I really like this full on switch to DataFrames! I left few comments regarding the code, nothing too important... Few more general comments

RDD Drop

I don't think anyone will miss RDDs :D, however it's bit sad to see the "just run this Julia function in parallel" functionality go. Well, it never worked for my use cases because handling libs was a pain, so never mind.

I still think that it would be nice to have a way to run Julia code on the Spark cluster. I understand that you are more of fan of Kubernetes, but Spark clusters are often already available in some institutions and I find it less PITA to run that Kubernetes 😅. General distributed computing might not be a great fit for Spark, but more often than not the use case is simple: just run this simulation (or whatever) with on 1000 different inputs (or 1000 different sets of parameters), for which Spark API is very nice to use.

I was thinking if PackageCompiler.jl could help us produce a self contained executable which we could then just send to any executor from the master (even without Julia installed on executors, possibly?). I'm not that Julia-savvy, so I have no if it's easily possible, but seems like an option to me. Never mind, this is off-topic again 🙃

Java UDFs

I think that directly exposing Java or Scala to users would solve many problems. I'd be quite happy if I could just write

scalaUdf("some_function", "(x: String, i: Integer) => x(i) ")

Or something similar for people which prefer Java. I'd honestly prefer that to the automagic which compiles Julia to Java, but I can see that some people don't want to write even a single line of Java 😂

Arrow.jl / DataFrames.jl

I'd really like to have this one back, I'll happily contribute the Arrow interop again, if you'd like. I think it would be much preferable to get a Julia-native DataFrame from .collect instead of the Vector{Row}. Similarly calling createDataFrame(...) with a Julia Table would be nicer than having to create the data Row by Row. I guess we could even quite easily infer the schema from the provided Julia DataFrame. I'll just be a bit confusing which kind DataFrame I'm working with, maybe naming this one SparkDataFrame would avoid some confusion?

jcall(jmap, "put", JObject, (JObject, JObject), jk, jv)
end
return jmap
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would you think about getting rid of these conversions and always using Arrow.jl instead? It should handle most cases out of the box and is significantly faster for larger collects

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can bring it back from the PR I made last year, if you think it would be worth it. It would be nice to use it for everything, for consistency.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These conversions are mostly needed to invoke methods from the Java/Scala API, not to pass large amounts of data between Julia and JVM. For example, we need an instance of JSeq to construct a JRow/Row, which you'd often use for testing purposes, and JMap is only used to pass aggregation config, i.e. something like Dict("clicks" => "sum", "impressions" => "count"), to the the GroupedData.agg().

src/dataframe.jl Outdated
###############################################################################

@chainable GroupedData
Base.show(io::IO, gdf::GroupedData) = print(io, "GroupedData()")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be a bit nicer to print the SQL schema, like GroupData(${gdf.schema})

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, RelationalGroupedDataset (the underlying Java class) doesn't have a schema. Here are all the methods available on that object type:

$anonfun$agg$1
$anonfun$agg$2
$anonfun$agg$3
$anonfun$aggregateNumericColumns$1
$anonfun$aggregateNumericColumns$2
$anonfun$alias$1
$anonfun$flatMapGroupsInPandas$1
$anonfun$flatMapGroupsInPandas$2
$anonfun$flatMapGroupsInPandas$3
$anonfun$flatMapGroupsInPandas$4
$anonfun$flatMapGroupsInR$1
$anonfun$flatMapGroupsInR$2
$anonfun$flatMapGroupsInR$3
$anonfun$pivot$1
$anonfun$pivot$2
$anonfun$strToExpr$1
$anonfun$strToExpr$2
$anonfun$toDF$1
$anonfun$toDF$2
agg
apply
avg
count
equals
flatMapGroupsInPandas
flatMapGroupsInR
getClass
hashCode
max
mean
min
notify
notifyAll
pivot
sum
toString
wait

PySpark also doesn't provide a meaningful representation for GroupedData:

<pyspark.sql.group.GroupedData object at 0x7f66f80aec70>

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, toString() gives pretty intuitive representation:

RelationalGroupedDataset: [grouping expressions: [name: string], value: [name: string, age: bigint], type: GroupBy]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, the Scala toString is nice :) I didn't know there is no schema field on RelationalGroupedDataset :|


function Base.write(df::DataFrame)
jwriter = jcall(df.jdf, "write", JDataFrameWriter, ())
return DataFrameWriter(jwriter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is right, Base.write normally does something quite different: https://docs.julialang.org/en/v1/base/io-network/#Base.write I don't know what are the conventions in Julia, but I think it would be better not to overload the built-in function with another one with a very different signature.

if !startswith(version, "1.8")
@warn "Java 1.8 is recommended for Spark.jl, but Java $version was used."
if !startswith(version, "1.8") && !startswith(version, "11.")
@warn "Java 1.8 or 1.11 is recommended for Spark.jl, but Java $version was used."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@dfdx
Copy link
Owner Author

dfdx commented May 26, 2022

I think that directly exposing Java or Scala to users would solve many problems.

I just realized I've never mentioned it and the code in compiler.jl is too obfuscated for an occasional reader, but actually you can embed Java classes. Here's an example:

import Spark: create_instance, jcall2

src = """
               package julia.compiled;

               public class Hello {

                   public String hello(String name) {
                       return "Hello, " + name;
                   }

                   public void goodbye() {
                       System.out.println("Goodbye!");
                   }
               }
           """
obj = create_instance(src)     # compile the class and instantiate the object in one hop, see also create_class
jresult = jcall2(hello, "hello", JString, (JString,), "Bob")
result = convert(String, jresult)    # "Hello, Bob"

There are certain pitfalls with objects created this way, but from the JVM perspective they are totally valid. As a particular example, udf() function creates an object that implements one of the Spark's UDFn interfaces and can be passed to any applicable Java method.

Compiling Scala should not be much harder, but Scala adds a lot of magic on top of JVM primitives and thus is generally harder to work with via JNI.


however it's bit sad to see the "just run this Julia function in parallel" functionality go

The problem with this definition is uncertainty of "this Julia function". Is this function a pure Julia or has package dependencies? What version of Julia itself is required to run it, and is it already installed on workers? How long will it take to call this function compared to the overhead of launching Julia? And compared to transferring sys image to all workers? How much data it reads from the source and writes to the sink?

Depending on the answers, the optimal solution may be to create a Julia server on a worker or launch Julia on each batch, trace and serialize function or transfer the whole package, use Arrow to pass data between processes or manipulate objects right in the JVM memory, etc. The RDD implementation has always been faulty and simplistic, if we want to do better, we need to come up with very specific requirements for running Julia on Spark workers.

Thus before we have these requirements, we can:

  1. Create UDFs for simple cases.
  2. Run whatever code in Java, including the code to launch Julia workers with properties tailored towards a specific task.
  3. Run Julia in Docker on Kubernetes :)

I'd really like to have this one back, I'll happily contribute the Arrow interop again, if you'd like.

It will be really great! I'm really sorry to drop all the cool contributions we had so far, but hopefully we will get them added again for the new API too.

I think it would be much preferable to get a Julia-native DataFrame from .collect instead of the Vector{Row}.

One of the main design choices in the new API is to copy Python API to enable people simply copy-paste thousands of PySpark examples out their. If we change signature of some methods, users will have to figure out the correct usage, which will drive them away. I'd like to avoid it.

On the other hand, nobody stops us from having e.g. .collect(Table) or .collect_df(). We only need one-way API compatibility after all.

Similarly calling createDataFrame(...) with a Julia Table would be nicer than having to create the data Row by Row.

This is another example where we can use Julia multiple dispatch to create as many convenience methods as we need.

I'm working with, maybe naming this one SparkDataFrame would avoid some confusion?

How about Spark.DataFrame? :)

@dfdx
Copy link
Owner Author

dfdx commented May 28, 2022

I just learnt two things:

  • Java serialization doesn't work for dynamically compiled classes, so in many cases we won't be able to just embed Java code as we intended. Compiling the class in runtime on each worker is an option, but it will be quite time-consuming.
  • There are cases in the DataFrame API where the lifetime of a Julia worker is well-known, notably, ForeachWriter.

These two things make me prioritize custom Julia code runner again. I'm thinking of juliacaller for control flow and Arrow for data flow. But there are still many open questions, so I'm going to first release the new DataFrame API without custom Julia runners anyway.

@exyi
Copy link
Contributor

exyi commented May 29, 2022

One of the main design choices in the new API is to copy Python API to enable people simply copy-paste thousands of PySpark examples out their. If we change signature of some methods, users will have to figure out the correct usage, which will drive them away. I'd like to avoid it.

On the other hand, nobody stops us from having e.g. .collect(Table) or .collect_df(). We only need one-way API compatibility after all.

Cool, I'll look into adding collect_df then :)

I looked into how PySpark handles that, and they use a tiny Python class for Row which is not a wrapper for the Scala Row: https://github.com/apache/spark/blob/master/python/pyspark/sql/types.py#L1781. I think that's slightly preferable, since the Julia-Java function calls are not exactly efficient. We could then use Arrow and convert the Table into a Vector of these Rows. Alternatively, the PySpark Row behaves almost like a Julia named tuple so maybe we could return Vector{named tuple} (arrow had a function for that too, AFAIK)?

These two things make me prioritize custom Julia code runner again. I'm thinking of juliacaller for control flow and Arrow for data flow. But there are still many open questions, so I'm going to first release the new DataFrame API without custom Julia runners anyway.

Sure, do that. I think that most users will just want fast access some hdfs files and/or do some data wrangling in Spark SQL - for these use cases, this approach is pretty much perfect.

@dfdx
Copy link
Owner Author

dfdx commented May 31, 2022

I looked into how PySpark handles that, and they use a tiny Python class for Row which is not a wrapper for the Scala Row

That's interesting. I agree we can optimize it this way, but it will take time to re-implement all the methods involving rows, so perhaps in this initial release. But the most interesting questions is why people even want Row to be performant. The only two reasons I used Row in practice are in examples like spark.createDataFrame(...) or to inspect a couple of rows. Honestly, copying data from workers to the driver itself sounds suspicious for me.

Anyway, I believe implementing specialized and more data-efficient functions like collect(df, DataFrames.DataFrame) / collect_df(df) in addition to the existing ones is a good start.

@dfdx
Copy link
Owner Author

dfdx commented Jun 1, 2022

To add to the collect(DataFrames.DataFrame) / collect_df() discussion, PySpark also supports a method with a very clear name - .toPandas(). Likewise, we can have e.g. .to(DataFrames.DataFrame). Although, I don't have a clear preference here, so the final decision is up to the person who implements it.

@dfdx dfdx merged commit ac6ad77 into main Jun 4, 2022
@dfdx dfdx deleted the new-api branch June 4, 2022 22:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants