UDF Generation
API
@spark_ai.udf
def udf_name(arg1: arg1_type, arg2: arg2_type, ...) -> return_type:
"""UDF description"""
...
spark_ai
, you can use the @spark_ai.udf
decorator to generate UDFs from Python functions. There is no need to implement the body of the method.
Example 1: Compute expression from columns
Given a DataFrame auto_df
from Data Ingestion:
@spark_ai.udf
def previous_years_sales(brand: str, current_year_sale: int, sales_change_percentage: float) -> int:
"""Calculate previous years sales from sales change percentage"""
...
spark.udf.register("previous_years_sales", previous_years_sales)
auto_df.createOrReplaceTempView("autoDF")
spark.sql("select brand as brand, previous_years_sales(brand, us_sales, sales_change_percentage) as 2021_sales from autoDF").show()
brand | 2021_sales |
---|---|
Toyota | 2032693 |
Ford | 1803509 |
Chevrolet | 1417348 |
Honda | 1315225 |
Hyundai | 739045 |
Example 2: Parse heterogeneous JSON text
Let's imagine we have heterogeneous JSON texts: each of them may or may not contain certain keys, and the order of keys are random. We can generate sucha a DataFrame by mutating single JSON.
random_dict = {
"id": 1279,
"first_name": "John",
"last_name": "Doe",
"username": "johndoe",
"email": "john_doe@example.com",
"phone_number": "+1 234 567 8900",
"address": "123 Main St, Springfield, OH, 45503, USA",
"age": 32,
"registration_date": "2020-01-20T12:12:12Z",
"last_login": "2022-03-21T07:25:34Z",
}
original_keys = list(random_dict.keys())
from random import random, shuffle
mutaded_rows = []
for _ in range(20):
keys = [k for k in original_keys]
shuffle(keys)
# With 0.4 chance drop each field and also shuffle an order
mutaded_rows.append({k: random_dict[k] for k in keys if random() <= 0.6})
import json
bad_json_dataframe = (
spark.createDataFrame(
[(json.dumps(val), original_keys) for val in mutaded_rows],
["json_field", "schema"],
)
)
json_field | schema |
---|---|
{"first_name": "John", "email": "john_doe@example.com", "last_name": "Doe", "phone_number": "+1 234 567 8900", "age": 32, "last_login": "2022-03-21T07:25:34Z", "address": "123 Main St, Springfield, OH, 45503, USA"} | [id, first_name, last_name, username, email, phone_number, address, age, registration_date, last_login] |
{"address": "123 Main St, Springfield, OH, 45503, USA", "phone_number": "+1 234 567 8900", "email": "john_doe@example.com", "registration_date": "2020-01-20T12:12:12Z", "username": "johndoe", "last_login": "2022-03-21T07:25:34Z"} | [id, first_name, last_name, username, email, phone_number, address, age, registration_date, last_login] |
{"age": 32, "last_name": "Doe", "email": "john_doe@example.com", "last_login": "2022-03-21T07:25:34Z", "address": "123 Main St, Springfield, OH, 45503, USA", "username": "johndoe"} | [id, first_name, last_name, username, email, phone_number, address, age, registration_date, last_login] |
{"first_name": "John", "address": "123 Main St, Springfield, OH, 45503, USA", "phone_number": "+1 234 567 8900", "last_name": "Doe", "id": 1279} | [id, first_name, last_name, username, email, phone_number, address, age, registration_date, last_login] |
{"phone_number": "+1 234 567 8900", "registration_date": "2020-01-20T12:12:12Z", "email": "john_doe@example.com", "address": "123 Main St, Springfield, OH, 45503, USA", "age": 32, "username": "johndoe"} | [id, first_name, last_name, username, email, phone_number, address, age, registration_date, last_login] |
from typing import List
@spark_ai.udf
def parse_heterogeneous_json(json_str: str, schema: List[str]) -> List[str]:
"""Extract fields from heterogeneous JSON string based on given schema in a right order.
If field is missing replace it by None. All imports should be inside function."""
...
Now we can test processing of our text rows:
from pyspark.sql.functions import expr
(
bad_json_dataframe
.withColumn(
"parsed",
expr("parse_heterogeneous_json(json_field, schema)"),
)
.select("parsed")
.show()
json_field | schema | parsed |
---|---|---|
{"first_name": "John", "email": "john_doe@example.com", "last_name": "Doe", "phone_number": "+1 234 567 8900", "age": 32, "last_login": "2022-03-21T07:25:34Z", "address": "123 Main St, Springfield, OH, 45503, USA"} | [id, first_name, last_name, username, email, phone_number, address, age, registration_date, last_login] | [None, John, Doe, None, john_doe@example.com, +1 234 567 8900, 123 Main St, Springfield, OH, 45503, USA, 32, None, 2022-03-21T07:25:34Z] |
{"address": "123 Main St, Springfield, OH, 45503, USA", "phone_number": "+1 234 567 8900", "email": "john_doe@example.com", "registration_date": "2020-01-20T12:12:12Z", "username": "johndoe", "last_login": "2022-03-21T07:25:34Z"} | [id, first_name, last_name, username, email, phone_number, address, age, registration_date, last_login] | [None, None, None, johndoe, john_doe@example.com, +1 234 567 8900, 123 Main St, Springfield, OH, 45503, USA, None, 2020-01-20T12:12:12Z, 2022-03-21T07:25:34Z] |
{"age": 32, "last_name": "Doe", "email": "john_doe@example.com", "last_login": "2022-03-21T07:25:34Z", "address": "123 Main St, Springfield, OH, 45503, USA", "username": "johndoe"} | [id, first_name, last_name, username, email, phone_number, address, age, registration_date, last_login] | [None, None, Doe, johndoe, john_doe@example.com, None, 123 Main St, Springfield, OH, 45503, USA, 32, None, 2022-03-21T07:25:34Z] |
{"first_name": "John", "address": "123 Main St, Springfield, OH, 45503, USA", "phone_number": "+1 234 567 8900", "last_name": "Doe", "id": 1279} | [id, first_name, last_name, username, email, phone_number, address, age, registration_date, last_login] | [1279, John, Doe, None, None, +1 234 567 8900, 123 Main St, Springfield, OH, 45503, USA, None, None, None] |
{"phone_number": "+1 234 567 8900", "registration_date": "2020-01-20T12:12:12Z", "email": "john_doe@example.com", "address": "123 Main St, Springfield, OH, 45503, USA", "age": 32, "username": "johndoe"} | [id, first_name, last_name, username, email, phone_number, address, age, registration_date, last_login] | [None, None, None, johndoe, john_doe@example.com, +1 234 567 8900, 123 Main St, Springfield, OH, 45503, USA, 32, 2020-01-20T12:12:12Z, None] |
Example 3: Extract email from raw text
df = spark.createDataFrame(
[
"For any queries regarding the product, contact helpdesk@example.com.",
"Send your applications to hr@ourcompany.com.",
"You can reach out to the teacher at prof.mike@example.edu.",
"My personal email is jane.doe@example.com.",
"You can forward the documents to admin@oursite.net.",
],
schema="string",
)
@spark_ai.udf
def extract_email(text: str) -> str:
"""Extract first email from raw text"""
...
from pyspark.sql.functions import col
spark.udf.register("extract_email", extract_email)
df.select(col("value").alias("raw"), expr("extract_email(value)").alias("email")).show()
raw | |
---|---|
For any queries regarding the product, contact helpdesk@example.com. | helpdesk@example.com |
Send your applications to hr@ourcompany.com. | hr@ourcompany.com |
You can reach out to the teacher at prof.mike@example.edu. | prof.mike@example.edu |
My personal email is jane.doe@example.com. | jane.doe@example.com |
You can forward the documents to admin@oursite.net. | admin@oursite.net |
Example 4: Generate random numbers from Laplace distribution
@spark_ai.udf
def laplace_random_number(loc: float, scale: float) -> float:
"""Generate a random number from Laplace distribution with given loc and scale in pure Python. Function should contain all necessary imports."""
...
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
spark.udf.register("laplace_random_number", laplace_random_number, returnType=DoubleType())
(
spark.sparkContext.range(0, 500_000)
.toDF(schema="int")
.withColumn("loc", lit(1.0).cast("double"))
.withColumn("scale", lit(0.3).cast("double"))
.withColumn("laplace_random", expr("laplace_random_number(loc, scale)"))
.select("laplace_random")
.show()
)
value | loc | scale | laplace_random |
---|---|---|---|
0 | 1.0 | 0.3 | 0.799962 |
1 | 1.0 | 0.3 | 0.995381 |
2 | 1.0 | 0.3 | 0.602727 |
3 | 1.0 | 0.3 | 1.235575 |
4 | 1.0 | 0.3 | 1.864565 |
5 | 1.0 | 0.3 | 1.220493 |
6 | 1.0 | 0.3 | 0.992431 |
7 | 1.0 | 0.3 | 1.630307 |
8 | 1.0 | 0.3 | 0.894683 |
9 | 1.0 | 0.3 | 0.632602 |