Generate Python UDFs for different cases¶
Initialization¶
from pyspark.sql import SparkSession
from pyspark_ai import SparkAI
spark_ai = SparkAI(verbose=True)
spark_ai.activate() # active partial functions for Spark DataFrame
spark = SparkSession.builder.getOrCreate()
23/10/28 08:19:42 WARN Utils: Your hostname, nixos resolves to a loopback address: 127.0.0.2; using 192.168.0.15 instead (on interface wlp0s20f3) 23/10/28 08:19:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/10/28 08:19:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Example 1: parsing heterogeneous JSON text¶
It is a common problem when we are getting data in the from of JSON text. We know expected schema of JSON, but there is no guarantees about fields order and even missing keys are possible by the contract. Built-int spark functions are not well suited for such a case because from_json
expected strict schema. Sometimes it is simpler to resolve such a problem with PythonUDF. With pyspark-ai
we can simplify the procoess of creation such a function.
Generation of Data¶
But at first we need to generate a test sample of data. Let's do it by mutatin one single JSON.
random_dict = {
"id": 1279,
"first_name": "John",
"last_name": "Doe",
"username": "johndoe",
"email": "john_doe@example.com",
"phone_number": "+1 234 567 8900",
"address": "123 Main St, Springfield, OH, 45503, USA",
"age": 32,
"registration_date": "2020-01-20T12:12:12Z",
"last_login": "2022-03-21T07:25:34Z",
}
original_keys = list(random_dict.keys())
from random import random, shuffle
# Generate 20 mutated version of this dictionary
mutaded_rows = []
for _ in range(20):
keys = [k for k in original_keys]
shuffle(keys)
# With 0.4 chance drop each field and also shuffle an order
mutaded_rows.append({k: random_dict[k] for k in keys if random() <= 0.6})
import json
bad_json_dataframe = (
spark.createDataFrame(
[(json.dumps(val), original_keys) for val in mutaded_rows],
["json_field", "schema"],
)
)
bad_json_dataframe.sample(0.5).toPandas()
json_field | schema | |
---|---|---|
0 | {"username": "johndoe", "phone_number": "+1 23... | [id, first_name, last_name, username, email, p... |
1 | {"username": "johndoe", "phone_number": "+1 23... | [id, first_name, last_name, username, email, p... |
2 | {"last_login": "2022-03-21T07:25:34Z", "userna... | [id, first_name, last_name, username, email, p... |
3 | {"phone_number": "+1 234 567 8900", "last_logi... | [id, first_name, last_name, username, email, p... |
4 | {"username": "johndoe", "registration_date": "... | [id, first_name, last_name, username, email, p... |
5 | {"age": 32, "last_name": "Doe", "last_login": ... | [id, first_name, last_name, username, email, p... |
6 | {"age": 32, "phone_number": "+1 234 567 8900",... | [id, first_name, last_name, username, email, p... |
7 | {"first_name": "John", "registration_date": "2... | [id, first_name, last_name, username, email, p... |
8 | {"phone_number": "+1 234 567 8900", "age": 32,... | [id, first_name, last_name, username, email, p... |
9 | {"age": 32, "phone_number": "+1 234 567 8900",... | [id, first_name, last_name, username, email, p... |
10 | {"age": 32, "username": "johndoe", "registrati... | [id, first_name, last_name, username, email, p... |
11 | {"last_name": "Doe", "address": "123 Main St, ... | [id, first_name, last_name, username, email, p... |
Generate UDF function code¶
from typing import List
@spark_ai.udf
def parse_heterogeneous_json(json_str: str, schema: List[str]) -> List[str]:
"""Extract fields from heterogeneous JSON string based on given schema in a right order.
If field is missing replace it by None. All imports should be inside function."""
...
INFO: Creating following Python UDF: def parse_heterogeneous_json(json_str: str, schema: List[str]) -> List[str]: import json if json_str is not None and schema is not None: data = json.loads(json_str) return [data.get(field) for field in schema]
It looks like pyspark-ai
generate us a valid function. It iterate over expected schema and try to find such a field in given JSON string. If the key is missing it will return None. Also pyspark-ai
added a neccessary import of json
module from the python standard library.
### Testing our UDF
from pyspark.sql.functions import expr
from pyspark.sql.types import ArrayType, StringType
# Our UDF should return array<string>
spark.udf.register("parse_heterogeneous_json", parse_heterogeneous_json, returnType=ArrayType(elementType=StringType()))
(
bad_json_dataframe
.withColumn("parsed", expr("parse_heterogeneous_json(json_field, schema)"))
.sample(0.5)
.toPandas()
)
json_field | schema | parsed | |
---|---|---|---|
0 | {"username": "johndoe", "phone_number": "+1 23... | [id, first_name, last_name, username, email, p... | [None, John, None, johndoe, None, +1 234 567 8... |
1 | {"age": 32, "first_name": "John", "email": "jo... | [id, first_name, last_name, username, email, p... | [1279, John, Doe, None, john_doe@example.com, ... |
2 | {"last_login": "2022-03-21T07:25:34Z", "userna... | [id, first_name, last_name, username, email, p... | [1279, John, Doe, johndoe, None, None, None, N... |
3 | {"username": "johndoe", "registration_date": "... | [id, first_name, last_name, username, email, p... | [None, None, Doe, johndoe, john_doe@example.co... |
4 | {"age": 32, "last_name": "Doe", "last_login": ... | [id, first_name, last_name, username, email, p... | [None, None, Doe, None, None, +1 234 567 8900,... |
5 | {"username": "johndoe", "last_login": "2022-03... | [id, first_name, last_name, username, email, p... | [None, None, None, johndoe, None, None, None, ... |
6 | {"first_name": "John", "registration_date": "2... | [id, first_name, last_name, username, email, p... | [None, John, Doe, johndoe, john_doe@example.co... |
7 | {"first_name": "John", "id": 1279, "last_name"... | [id, first_name, last_name, username, email, p... | [1279, John, Doe, None, john_doe@example.com, ... |
8 | {"age": 32, "username": "johndoe", "registrati... | [id, first_name, last_name, username, email, p... | [None, John, None, johndoe, john_doe@example.c... |
9 | {"last_name": "Doe", "id": 1279, "address": "1... | [id, first_name, last_name, username, email, p... | [1279, John, Doe, None, john_doe@example.com, ... |
10 | {"last_name": "Doe", "address": "123 Main St, ... | [id, first_name, last_name, username, email, p... | [1279, None, Doe, None, john_doe@example.com, ... |
Example 2: Extract email from text¶
Generating data¶
Lets creaete a DataFrame with raw text that contains email.
df = spark.createDataFrame(
[
"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed egestas nulla sit amet elit volutpat ultricies. Morbi lacinia est fringilla pulvinar elementum. Curabitur rhoncus luctus dui, sodales blandit arcu maximus a. Aenean iaculis nulla ac enim tincidunt, et tristique enim bibendum. Fusce mollis nibh sit amet nisi pellentesque egestas. Quisque volutpat, neque eu semper tristique, odio nunc auctor odio, at condimentum lorem nunc nec nisi. Quisque auctor at velit nec fermentum. Nunc id pellentesque erat, et dignissim felis. ali.brown@gmail.com Suspendisse potenti. Donec tincidunt enim in ipsum faucibus sollicitudin. Sed placerat tempor eros at blandit. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae; Donec aliquam velit vehicula massa egestas faucibus. Ut pulvinar mi id pretium dignissim. Phasellus vehicula, dui sit amet porttitor effectively maximizes an attacker's chance to obtain valid credentials. Sed malesuada justo enim, et interdum mauris ullamcorper ac.",
"Vestibulum rhoncus magna semper est lobortis gravida. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. In hac habitasse platea dictumst. michael.hobb@gmail.com Aenean sapien magna, consequat vitae pretium ac, gravida sit amet nibh. Maecenas lacinia orci in luctus placerat. Praesent lobortis turpis nec risus dapibus, eget ornare mi egestas. Nam eget dui ac mi laoreet sagittis. Integer condimentum est ac velit volutpat pharetra. Nulla facilisi. Nunc euismod, neque vitae porttitor maximus, justo dui efficitur ligula, vitae tincidunt erat neque ac nibh. Duis eu dui in erat blandit mattis.",
"Aenean vitae iaculis odio. Donec laoreet non urna sed posuere. Nulla vitae orci finibus, convallis mauris nec, mattis augue. Proin bibendum non justo non scelerisque. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Aenean scott_p@ymail.com adipiscing diam eget ultrices ultricies. Aliquam bibendum dolor vel orci posuere, sed pulvinar enim rutrum. Nulla facilisi. Sed cursus justo sed velit pharetra auctor. Suspendisse facilisis nibh id nibh ultrices luctus.",
"Quisque varius erat sed leo ornare, et elementum leo interdum. Aliquam erat volutpat. Ut laoreet tempus elit quis venenatis. Integer porta, lorem ut pretium luctus, erika.23@hotmail.com quis ipsum facilisis, feugiat libero sed, malesuada augue. Fusce id elementum sapien, sed SC ingeniously maximizes the chance to obtain valid credentials. Nullam imperdiet felis in metus semper ultrices. Integer vel quam consectetur, lobortis est vitae, lobortis sem. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos.",
"Sed consectetur nisl quis mauris laoreet posuere. Phasellus in elementum orci, vitae auctor dui. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Donec eleifend mauris id auctor blandit. john.smith@protonmail.com Integer quis justo non eros convallis aliquet cursus eu dolor. Praesent nec sem a massa facilisis consectetur. Nunc pharetra sapien non erat semper, ut tempus risus vulputate. Donec lacinia condimentum arcu, ac molestie metus interdum in. Duis arcu quam, hendrerit quis venenatis sed, porta at erat.",
],
schema="string",
)
Generate UDF function¶
@spark_ai.udf
def extract_email(text: str) -> str:
"""Extract first email from raw text"""
...
INFO: Creating following Python UDF: def extract_email(text) -> str: import re if text is not None: match = re.search(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text) if match: return match.group() return None
Testing our UDF¶
spark.udf.register("extract_email", extract_email)
df.withColumn("value", expr("extract_email(value)")).toPandas()
value | |
---|---|
0 | ali.brown@gmail.com |
1 | michael.hobb@gmail.com |
2 | scott_p@ymail.com |
3 | erika.23@hotmail.com |
4 | john.smith@protonmail.com |
Example 3: random number from Laplace distribution¶
Random numbers from the Laplace distribution is one of key components of Differential Privacy. Unfortunately spark do not contain built in routine for such a task. Let's create a UDF that generate numbers from Laplace distribution.
Genrating UDF¶
@spark_ai.udf
def laplace_random_number(loc: float, scale: float) -> float:
"""Generate a random number from Laplace distribution with given loc and scale in pure Python. Function should contain all necessary imports."""
...
INFO: Creating following Python UDF: def laplace_random_number(loc, scale) -> float: import random import math if loc is not None and scale is not None: u = random.uniform(-0.5, 0.5) return loc - scale * math.copysign(1, u) * math.log(1 - 2 * abs(u))
Testing UDF¶
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
spark.udf.register("laplace_random_number", laplace_random_number, returnType=DoubleType())
results = (
spark.sparkContext.range(0, 500_000)
.toDF(schema="int")
.withColumn("loc", lit(1.0).cast("double"))
.withColumn("scale", lit(0.3).cast("double"))
.withColumn("laplace_random", expr("laplace_random_number(loc, scale)"))
)
We can use numpy
to check our results.
import numpy as np
numpy_random_numbers = np.random.laplace(1.0, 0.3, 500_000)
quantiles = results.ai.transform("Cumpute 10 quantiles of 'laplace_random' column")
INFO: Creating temp view for the transform: df.createOrReplaceTempView("spark_ai_temp_view__1175871024") > Entering new AgentExecutor chain... Thought: To compute the quantiles of a column, I can use the `percentile` function in Spark SQL. However, Spark SQL does not support computing multiple percentiles at once, so I will need to compute each quantile separately. I will compute the 10th, 20th, ..., 100th percentiles. Action: query_validation Action Input: SELECT percentile(`laplace_random`, 0.1) as q1, percentile(`laplace_random`, 0.2) as q2, percentile(`laplace_random`, 0.3) as q3, percentile(`laplace_random`, 0.4) as q4, percentile(`laplace_random`, 0.5) as q5, percentile(`laplace_random`, 0.6) as q6, percentile(`laplace_random`, 0.7) as q7, percentile(`laplace_random`, 0.8) as q8, percentile(`laplace_random`, 0.9) as q9, percentile(`laplace_random`, 1.0) as q10 FROM `spark_ai_temp_view__1175871024` Observation: OK Thought:I now know the final answer. Final Answer: SELECT percentile(`laplace_random`, 0.1) as q1, percentile(`laplace_random`, 0.2) as q2, percentile(`laplace_random`, 0.3) as q3, percentile(`laplace_random`, 0.4) as q4, percentile(`laplace_random`, 0.5) as q5, percentile(`laplace_random`, 0.6) as q6, percentile(`laplace_random`, 0.7) as q7, percentile(`laplace_random`, 0.8) as q8, percentile(`laplace_random`, 0.9) as q9, percentile(`laplace_random`, 1.0) as q10 FROM `spark_ai_temp_view__1175871024` > Finished chain.
row = quantiles.collect()[0]
spark_ai_quantiles = [row[f"q{n}"] for n in range(1, 11)]
numpy_quantiles = np.quantile(numpy_random_numbers, [x / 10.0 for x in range(1, 11)])
import pandas as pd
pd.DataFrame({"spark": spark_ai_quantiles, "numpy": numpy_quantiles})
spark | numpy | |
---|---|---|
0 | 0.519468 | 0.516999 |
1 | 0.725488 | 0.725522 |
2 | 0.847756 | 0.847087 |
3 | 0.933885 | 0.933007 |
4 | 1.000589 | 1.000099 |
5 | 1.067758 | 1.067391 |
6 | 1.153897 | 1.153414 |
7 | 1.275874 | 1.275619 |
8 | 1.483531 | 1.483773 |
9 | 4.807341 | 4.806364 |
We can see that our result is very close to results from NumPy builtin function.