Spark DataFrame API provides efficient and easy-to-use operations to do analysis on distributed collection of data. Many users love the Pyspark API, which is more usable than scala API. Sometimes when we use UDF in pyspark, the performance will be a problem. How about implementing these UDF in scala, and call them in pyspark? BTW, in spark 2.0, UDAF can only be defined in scala, and how to use it in pyspark? Let’s have a try~
Use Scala UDF in PySpark
1. define scala udf
Suppose we want to calculate string length, lets define it in scala UDF.
the UDF plan is different, which is BatchEvalPython. It can prove that when use scala UDF in python, the evaluation is in JVM and data will not exchange with Python worker. And the performance should be improved.
I evaluated the performance in local environment with 4cores and 2GB memory, and generated 10million rows for each test, the result is as follows:
Scala UDF is 1.89 times Python UDF
And then I implemented another UDF in Scala and Python with regex string parsing, the performance is
Scala udf is 2.23 times Python REGEX String Parsing UDF
the Scala UDF is defined as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
import org.apache.spark.sql.functions._
/** * Created by lgrcyanny on 17/9/13. */ objectStringParse{ valSTRING_PATTERN = """(a.*b)""".r
defparseString(str: String): String = { val matched = STRING_PATTERN.findFirstMatchIn(str) if (matched.isEmpty) { "" } else { matched.get.group(1) } }
import time import re from pyspark.sql import SparkSession from pyspark.sql.column import Column from pyspark.sql.column import _to_java_column from pyspark.sql.column import _to_seq from pyspark.sql.functions import col from pyspark.sql.functions import udf from pyspark.sql.functions import length from pyspark.sql.types import StringType from pyspark.sql.types import IntegerType
defrandom_word(length): """get random word for generate rows""" letters = string.ascii_lowercase return''.join([random.choice(letters) for i inrange(length)])
defgenerate_rows(n): """generate rows in key value pair""" # generate rows letters = "abcdefghijklmnopqrstuvwxyz" rows = [] for i inrange(n): id = random.randint(0, 100) slen = random.randint(0, 20) word = random_word(slen) rows.append((id, letters)) return rows
Databricks used to give a performance for Python vs Scala DataFrame and RDD API:
the blog is here. The performance is a running group-aggregation on 10 million integer pairs on a single machince. The Scala DF is almost 5 times Python lambda function in RDD Python.
Even though, the Scala UDF is not 5 times Python UDF, about 2 times in my test, using scala UDF can improve performance indeed.
Use Scala UDAF in PySpark
UDAF now only supports defined in Scala and Java(spark 2.0)
1. define scala UDAF
when define UDAF, it must extend class UserDefinedAggregateFunction