Skip to content

UDF Generation

API

@spark_ai.udf
def udf_name(arg1: arg1_type, arg2: arg2_type, ...) -> return_type:
    """UDF description"""
    ...
Given a SparkAI instance spark_ai, you can use the @spark_ai.udf decorator to generate UDFs from Python functions. There is no need to implement the body of the method.

Example 1: Compute expression from columns

Given a DataFrame auto_df from Data Ingestion:

@spark_ai.udf
def previous_years_sales(brand: str, current_year_sale: int, sales_change_percentage: float) -> int:
    """Calculate previous years sales from sales change percentage"""
    ...

spark.udf.register("previous_years_sales", previous_years_sales)
auto_df.createOrReplaceTempView("autoDF")

spark.sql("select brand as brand, previous_years_sales(brand, us_sales, sales_change_percentage) as 2021_sales from autoDF").show()

brand 2021_sales
Toyota 2032693
Ford 1803509
Chevrolet 1417348
Honda 1315225
Hyundai 739045

Example 2: Parse heterogeneous JSON text

Let's imagine we have heterogeneous JSON texts: each of them may or may not contain certain keys, and the order of keys are random. We can generate sucha a DataFrame by mutating single JSON.

random_dict = {
    "id": 1279,
    "first_name": "John",
    "last_name": "Doe",
    "username": "johndoe",
    "email": "john_doe@example.com",
    "phone_number": "+1 234 567 8900",
    "address": "123 Main St, Springfield, OH, 45503, USA",
    "age": 32,
    "registration_date": "2020-01-20T12:12:12Z",
    "last_login": "2022-03-21T07:25:34Z",
}
original_keys = list(random_dict.keys())

from random import random, shuffle

mutaded_rows = []
for _ in range(20):
    keys = [k for k in original_keys]
    shuffle(keys)
    # With 0.4 chance drop each field and also shuffle an order
    mutaded_rows.append({k: random_dict[k] for k in keys if random() <= 0.6})

import json

bad_json_dataframe = (
    spark.createDataFrame(
        [(json.dumps(val), original_keys) for val in mutaded_rows],
        ["json_field", "schema"],
    )
)
json_field schema
{"first_name": "John", "email": "john_doe@example.com", "last_name": "Doe", "phone_number": "+1 234 567 8900", "age": 32, "last_login": "2022-03-21T07:25:34Z", "address": "123 Main St, Springfield, OH, 45503, USA"} [id, first_name, last_name, username, email, phone_number, address, age, registration_date, last_login]
{"address": "123 Main St, Springfield, OH, 45503, USA", "phone_number": "+1 234 567 8900", "email": "john_doe@example.com", "registration_date": "2020-01-20T12:12:12Z", "username": "johndoe", "last_login": "2022-03-21T07:25:34Z"} [id, first_name, last_name, username, email, phone_number, address, age, registration_date, last_login]
{"age": 32, "last_name": "Doe", "email": "john_doe@example.com", "last_login": "2022-03-21T07:25:34Z", "address": "123 Main St, Springfield, OH, 45503, USA", "username": "johndoe"} [id, first_name, last_name, username, email, phone_number, address, age, registration_date, last_login]
{"first_name": "John", "address": "123 Main St, Springfield, OH, 45503, USA", "phone_number": "+1 234 567 8900", "last_name": "Doe", "id": 1279} [id, first_name, last_name, username, email, phone_number, address, age, registration_date, last_login]
{"phone_number": "+1 234 567 8900", "registration_date": "2020-01-20T12:12:12Z", "email": "john_doe@example.com", "address": "123 Main St, Springfield, OH, 45503, USA", "age": 32, "username": "johndoe"} [id, first_name, last_name, username, email, phone_number, address, age, registration_date, last_login]
from typing import List

@spark_ai.udf
def parse_heterogeneous_json(json_str: str, schema: List[str]) -> List[str]:
    """Extract fields from heterogeneous JSON string based on given schema in a right order.
    If field is missing replace it by None. All imports should be inside function."""
    ...

Now we can test processing of our text rows:

from pyspark.sql.functions import expr

(
    bad_json_dataframe
    .withColumn(
        "parsed",
        expr("parse_heterogeneous_json(json_field, schema)"),
    )
    .select("parsed")
    .show()
json_field schema parsed
{"first_name": "John", "email": "john_doe@example.com", "last_name": "Doe", "phone_number": "+1 234 567 8900", "age": 32, "last_login": "2022-03-21T07:25:34Z", "address": "123 Main St, Springfield, OH, 45503, USA"} [id, first_name, last_name, username, email, phone_number, address, age, registration_date, last_login] [None, John, Doe, None, john_doe@example.com, +1 234 567 8900, 123 Main St, Springfield, OH, 45503, USA, 32, None, 2022-03-21T07:25:34Z]
{"address": "123 Main St, Springfield, OH, 45503, USA", "phone_number": "+1 234 567 8900", "email": "john_doe@example.com", "registration_date": "2020-01-20T12:12:12Z", "username": "johndoe", "last_login": "2022-03-21T07:25:34Z"} [id, first_name, last_name, username, email, phone_number, address, age, registration_date, last_login] [None, None, None, johndoe, john_doe@example.com, +1 234 567 8900, 123 Main St, Springfield, OH, 45503, USA, None, 2020-01-20T12:12:12Z, 2022-03-21T07:25:34Z]
{"age": 32, "last_name": "Doe", "email": "john_doe@example.com", "last_login": "2022-03-21T07:25:34Z", "address": "123 Main St, Springfield, OH, 45503, USA", "username": "johndoe"} [id, first_name, last_name, username, email, phone_number, address, age, registration_date, last_login] [None, None, Doe, johndoe, john_doe@example.com, None, 123 Main St, Springfield, OH, 45503, USA, 32, None, 2022-03-21T07:25:34Z]
{"first_name": "John", "address": "123 Main St, Springfield, OH, 45503, USA", "phone_number": "+1 234 567 8900", "last_name": "Doe", "id": 1279} [id, first_name, last_name, username, email, phone_number, address, age, registration_date, last_login] [1279, John, Doe, None, None, +1 234 567 8900, 123 Main St, Springfield, OH, 45503, USA, None, None, None]
{"phone_number": "+1 234 567 8900", "registration_date": "2020-01-20T12:12:12Z", "email": "john_doe@example.com", "address": "123 Main St, Springfield, OH, 45503, USA", "age": 32, "username": "johndoe"} [id, first_name, last_name, username, email, phone_number, address, age, registration_date, last_login] [None, None, None, johndoe, john_doe@example.com, +1 234 567 8900, 123 Main St, Springfield, OH, 45503, USA, 32, 2020-01-20T12:12:12Z, None]

Example 3: Extract email from raw text

df = spark.createDataFrame(
    [
        "For any queries regarding the product, contact helpdesk@example.com.",
        "Send your applications to hr@ourcompany.com.",
        "You can reach out to the teacher at prof.mike@example.edu.",
        "My personal email is jane.doe@example.com.",
        "You can forward the documents to admin@oursite.net.",
    ],
    schema="string",
)
@spark_ai.udf
def extract_email(text: str) -> str:
    """Extract first email from raw text"""
    ...

from pyspark.sql.functions import col

spark.udf.register("extract_email", extract_email)
df.select(col("value").alias("raw"), expr("extract_email(value)").alias("email")).show()
raw email
For any queries regarding the product, contact helpdesk@example.com. helpdesk@example.com
Send your applications to hr@ourcompany.com. hr@ourcompany.com
You can reach out to the teacher at prof.mike@example.edu. prof.mike@example.edu
My personal email is jane.doe@example.com. jane.doe@example.com
You can forward the documents to admin@oursite.net. admin@oursite.net

Example 4: Generate random numbers from Laplace distribution

@spark_ai.udf
def laplace_random_number(loc: float, scale: float) -> float:
    """Generate a random number from Laplace distribution with given loc and scale in pure Python. Function should contain all necessary imports."""
    ...
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType

spark.udf.register("laplace_random_number", laplace_random_number, returnType=DoubleType())
(
    spark.sparkContext.range(0, 500_000)
    .toDF(schema="int")
    .withColumn("loc", lit(1.0).cast("double"))
    .withColumn("scale", lit(0.3).cast("double"))
    .withColumn("laplace_random", expr("laplace_random_number(loc, scale)"))
    .select("laplace_random")
    .show()
)
value loc scale laplace_random
0 1.0 0.3 0.799962
1 1.0 0.3 0.995381
2 1.0 0.3 0.602727
3 1.0 0.3 1.235575
4 1.0 0.3 1.864565
5 1.0 0.3 1.220493
6 1.0 0.3 0.992431
7 1.0 0.3 1.630307
8 1.0 0.3 0.894683
9 1.0 0.3 0.632602