English SDK for Apache Spark¶
Initialization¶
In [1]:
Copied!
from pyspark_ai import SparkAI
spark_ai = SparkAI(verbose=True)
spark_ai.activate() # active partial functions for Spark DataFrame
from pyspark_ai import SparkAI
spark_ai = SparkAI(verbose=True)
spark_ai.activate() # active partial functions for Spark DataFrame
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/09/21 13:49:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Example 1: Auto sales by brand in US 2022¶
In [2]:
Copied!
# Search and ingest web content into a DataFrame
# If you have set up google-api-python-client, you can just run
# auto_df = spark_ai.create_df("2022 USA national auto sales by brand")
auto_df = spark_ai.create_df("https://www.carpro.com/blog/full-year-2022-national-auto-sales-by-brand")
auto_df.show()
# Search and ingest web content into a DataFrame
# If you have set up google-api-python-client, you can just run
# auto_df = spark_ai.create_df("2022 USA national auto sales by brand")
auto_df = spark_ai.create_df("https://www.carpro.com/blog/full-year-2022-national-auto-sales-by-brand")
auto_df.show()
INFO: Parsing URL: https://www.carpro.com/blog/full-year-2022-national-auto-sales-by-brand INFO: SQL query for the ingestion: CREATE OR REPLACE TEMP VIEW spark_ai_temp_view_50997f AS SELECT * FROM VALUES ('Toyota', 1849751, -9), ('Ford', 1767439, -2), ('Chevrolet', 1502389, 6), ('Honda', 881201, -33), ('Hyundai', 724265, -2), ('Kia', 693549, -1), ('Jeep', 684612, -12), ('Nissan', 682731, -25), ('Subaru', 556581, -5), ('Ram Trucks', 545194, -16), ('GMC', 517649, 7), ('Mercedes-Benz', 350949, 7), ('BMW', 332388, -1), ('Volkswagen', 301069, -20), ('Mazda', 294908, -11), ('Lexus', 258704, -15), ('Dodge', 190793, -12), ('Audi', 186875, -5), ('Cadillac', 134726, 14), ('Chrysler', 112713, -2), ('Buick', 103519, -42), ('Acura', 102306, -35), ('Volvo', 102038, -16), ('Mitsubishi', 102037, -16), ('Lincoln', 83486, -4), ('Porsche', 70065, 0), ('Genesis', 56410, 14), ('INFINITI', 46619, -20), ('MINI', 29504, -1), ('Alfa Romeo', 12845, -30), ('Maserati', 6413, -10), ('Bentley', 3975, 0), ('Lamborghini', 3134, 3), ('Fiat', 915, -61), ('McLaren', 840, -35), ('Rolls-Royce', 460, 7) AS v1(Brand, US_Sales_2022, Sales_Change_Percentage) INFO: Storing data into temp view: spark_ai_temp_view_50997f +-------------+-------------+-----------------------+ | Brand|US_Sales_2022|Sales_Change_Percentage| +-------------+-------------+-----------------------+ | Toyota| 1849751| -9| | Ford| 1767439| -2| | Chevrolet| 1502389| 6| | Honda| 881201| -33| | Hyundai| 724265| -2| | Kia| 693549| -1| | Jeep| 684612| -12| | Nissan| 682731| -25| | Subaru| 556581| -5| | Ram Trucks| 545194| -16| | GMC| 517649| 7| |Mercedes-Benz| 350949| 7| | BMW| 332388| -1| | Volkswagen| 301069| -20| | Mazda| 294908| -11| | Lexus| 258704| -15| | Dodge| 190793| -12| | Audi| 186875| -5| | Cadillac| 134726| 14| | Chrysler| 112713| -2| +-------------+-------------+-----------------------+ only showing top 20 rows
In [3]:
Copied!
auto_df.ai.verify("expect sales change percentage to be between -100 to 100")
auto_df.ai.verify("expect sales change percentage to be between -100 to 100")
INFO: LLM Output: def check_sales_change_percentage(df) -> bool: from pyspark.sql.functions import col # Check if all values in the 'Sales_Change_Percentage' column are between -100 and 100 if df.filter((col('Sales_Change_Percentage') >= -100) & (col('Sales_Change_Percentage') <= 100)).count() == df.count(): return True else: return False result = check_sales_change_percentage(df) INFO: Generated code: def check_sales_change_percentage(df) -> bool: from pyspark.sql.functions import col # Check if all values in the 'Sales_Change_Percentage' column are between -100 and 100 if df.filter((col('Sales_Change_Percentage') >= -100) & (col('Sales_Change_Percentage') <= 100)).count() == df.count(): return True else: return False result = check_sales_change_percentage(df) INFO: Result: True
In [4]:
Copied!
auto_df.ai.plot()
auto_df.ai.plot()
INFO: Here is the Python code to visualize the result of `df` using plotly: ``` import plotly.graph_objects as go from pyspark.sql import SparkSession import pandas as pd # Start Spark session spark = SparkSession.builder.appName('example').getOrCreate() # Assuming df is a Spark DataFrame # Convert Spark DataFrame to Pandas DataFrame pandas_df = df.toPandas() # Create a bar chart for US_Sales_2022 trace1 = go.Bar( x=pandas_df['Brand'], y=pandas_df['US_Sales_2022'], name='US Sales 2022' ) # Create a line chart for Sales_Change_Percentage trace2 = go.Scatter( x=pandas_df['Brand'], y=pandas_df['Sales_Change_Percentage'], name='Sales Change Percentage', yaxis='y2' ) data = [trace1, trace2] layout = go.Layout( title='Brand Sales Analysis', yaxis=dict( title='US Sales 2022' ), yaxis2=dict( title='Sales Change Percentage', titlefont=dict( color='rgb(148, 103, 189)' ), tickfont=dict( color='rgb(148, 103, 189)' ), overlaying='y', side='right' ) ) fig = go.Figure(data=data, layout=layout) fig.show() ``` This code first converts the Spark DataFrame to a Pandas DataFrame. Then it creates a bar chart for 'US_Sales_2022' and a line chart for 'Sales_Change_Percentage'. Both charts share the same x-axis ('Brand'). The 'Sales_Change_Percentage' chart is displayed on a secondary y-axis on the right side of the plot. Finally, it displays the plot.
23/07/26 18:18:53 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
In [5]:
Copied!
auto_df.ai.plot("pie chart for US sales market shares, show the top 5 brands and the sum of others")
auto_df.ai.plot("pie chart for US sales market shares, show the top 5 brands and the sum of others")
INFO: Here is the Python code to visualize the result of `df` using plotly: ``` import plotly.graph_objects as go from pyspark.sql import SparkSession import pandas as pd # Start Spark session spark = SparkSession.builder.getOrCreate() # Assuming df is a Spark DataFrame df_pd = df.toPandas() # Calculate the total sales total_sales = df_pd['US_Sales_2022'].sum() # Calculate the sales percentage for each brand df_pd['Sales_Percentage'] = df_pd['US_Sales_2022'] / total_sales * 100 # Sort the dataframe by sales percentage in descending order df_pd = df_pd.sort_values('Sales_Percentage', ascending=False) # Get the top 5 brands top_5_brands = df_pd.head(5) # Calculate the sum of sales percentage for other brands other_brands = pd.DataFrame([['Others', df_pd[5:]['Sales_Percentage'].sum()]], columns=['Brand', 'Sales_Percentage']) # Concatenate the top 5 brands and other brands final_df = pd.concat([top_5_brands, other_brands], ignore_index=True) # Create a pie chart fig = go.Figure(data=[go.Pie(labels=final_df['Brand'], values=final_df['Sales_Percentage'], hole=.3)]) # Display the plot fig.show() ``` This code first converts the Spark DataFrame to a pandas DataFrame. Then, it calculates the sales percentage for each brand and sorts the DataFrame by sales percentage in descending order. It gets the top 5 brands and calculates the sum of sales percentage for other brands. It concatenates the top 5 brands and other brands into a final DataFrame. Finally, it creates a pie chart using plotly and displays the plot.
In [3]:
Copied!
# Apply transforms to a Dataframe
auto_top_growth_df=auto_df.ai.transform("brand with the highest growth")
auto_top_growth_df.show()
# Apply transforms to a Dataframe
auto_top_growth_df=auto_df.ai.transform("brand with the highest growth")
auto_top_growth_df.show()
INFO: Creating temp view for the transform: df.createOrReplaceTempView("spark_ai_temp_view_2912f1") > Entering new AgentExecutor chain... Thought: I will query the Brand and Sales_Change_Percentage columns, ordering by Sales_Change_Percentage in descending order. Action: query_validation Action Input: SELECT Brand, Sales_Change_Percentage FROM spark_ai_temp_view_2912f1 ORDER BY Sales_Change_Percentage DESC LIMIT 1 Observation: OK Thought:I now know the final answer. Final Answer: SELECT Brand, Sales_Change_Percentage FROM spark_ai_temp_view_2912f1 ORDER BY Sales_Change_Percentage DESC LIMIT 1 > Finished chain. INFO: SQL query for the transform: SELECT Brand, Sales_Change_Percentage FROM spark_ai_temp_view_2912f1 ORDER BY Sales_Change_Percentage DESC LIMIT 1 +-------+-----------------------+ | Brand|Sales_Change_Percentage| +-------+-----------------------+ |Genesis| 14| +-------+-----------------------+
In [7]:
Copied!
# Explain what a DataFrame is retrieving.
auto_top_growth_df.ai.explain()
# Explain what a DataFrame is retrieving.
auto_top_growth_df.ai.explain()
Out[7]:
'In summary, this dataframe is retrieving the brand with the highest sales change percentage. It presents the results sorted by sales change percentage in descending order and limits the result to the top brand.'
Example 2: USA Presidents¶
In [2]:
Copied!
# You can also specify the expected columns for the ingestion.
df=spark_ai.create_df("https://en.wikipedia.org/wiki/List_of_presidents_of_the_United_States", ["president", "vice_president"])
df.show()
# You can also specify the expected columns for the ingestion.
df=spark_ai.create_df("https://en.wikipedia.org/wiki/List_of_presidents_of_the_United_States", ["president", "vice_president"])
df.show()
INFO: Parsing URL: https://en.wikipedia.org/wiki/List_of_presidents_of_the_United_States INFO: SQL query for the ingestion: CREATE OR REPLACE TEMP VIEW spark_ai_temp_view_14947 AS SELECT * FROM VALUES ('George Washington', 'John Adams'), ('John Adams', 'Thomas Jefferson'), ('Thomas Jefferson', 'Aaron Burr'), ('James Madison', 'George Clinton'), ('James Monroe', 'Daniel D. Tompkins'), ('John Quincy Adams', 'John C. Calhoun'), ('Andrew Jackson', 'John C. Calhoun'), ('Martin Van Buren', 'Richard Mentor Johnson'), ('William Henry Harrison', 'John Tyler'), ('John Tyler', NULL), ('James K. Polk', 'George M. Dallas'), ('Zachary Taylor', 'Millard Fillmore'), ('Millard Fillmore', NULL), ('Franklin Pierce', 'William R. King'), ('James Buchanan', 'John C. Breckinridge'), ('Abraham Lincoln', 'Hannibal Hamlin'), ('Andrew Johnson', NULL), ('Ulysses S. Grant', 'Schuyler Colfax'), ('Rutherford B. Hayes', 'William A. Wheeler'), ('James A. Garfield', 'Chester A. Arthur'), ('Chester A. Arthur', NULL), ('Grover Cleveland', 'Thomas A. Hendricks'), ('Benjamin Harrison', 'Levi P. Morton'), ('Grover Cleveland', 'Adlai Stevenson I'), ('William McKinley', 'Garret Hobart'), ('Theodore Roosevelt', 'Charles W. Fairbanks'), ('William Howard Taft', 'James S. Sherman'), ('Woodrow Wilson', 'Thomas R. Marshall'), ('Warren G. Harding', 'Calvin Coolidge'), ('Calvin Coolidge', 'Charles G. Dawes'), ('Herbert Hoover', 'Charles Curtis'), ('Franklin D. Roosevelt', 'John Nance Garner'), ('Harry S. Truman', 'Alben W. Barkley'), ('Dwight D. Eisenhower', 'Richard Nixon'), ('John F. Kennedy', 'Lyndon B. Johnson') AS v1(president, vice_president) INFO: Storing data into temp view: spark_ai_temp_view_14947 +--------------------+--------------------+ | president| vice_president| +--------------------+--------------------+ | George Washington| John Adams| | John Adams| Thomas Jefferson| | Thomas Jefferson| Aaron Burr| | James Madison| George Clinton| | James Monroe| Daniel D. Tompkins| | John Quincy Adams| John C. Calhoun| | Andrew Jackson| John C. Calhoun| | Martin Van Buren|Richard Mentor Jo...| |William Henry Har...| John Tyler| | John Tyler| null| | James K. Polk| George M. Dallas| | Zachary Taylor| Millard Fillmore| | Millard Fillmore| null| | Franklin Pierce| William R. King| | James Buchanan|John C. Breckinridge| | Abraham Lincoln| Hannibal Hamlin| | Andrew Johnson| null| | Ulysses S. Grant| Schuyler Colfax| | Rutherford B. Hayes| William A. Wheeler| | James A. Garfield| Chester A. Arthur| +--------------------+--------------------+ only showing top 20 rows
In [3]:
Copied!
presidents_who_were_vp = df.ai.transform("presidents who were also vice presidents")
presidents_who_were_vp.show()
presidents_who_were_vp = df.ai.transform("presidents who were also vice presidents")
presidents_who_were_vp.show()
INFO: Creating temp view for the transform: df.createOrReplaceTempView("spark_ai_temp_view_1549440914") > Entering new chain... Thought: I will use a self join on the table to find presidents who were also vice presidents. Action: query_validation Action Input: SELECT DISTINCT a.president FROM spark_ai_temp_view_1549440914 a JOIN spark_ai_temp_view_1549440914 b ON a.president = b.vice_president Observation: OK Thought:I now know the final answer. Final Answer: SELECT DISTINCT a.president FROM spark_ai_temp_view_1549440914 a JOIN spark_ai_temp_view_1549440914 b ON a.president = b.vice_president > Finished chain. +-----------------+ | president| +-----------------+ | John Adams| | Thomas Jefferson| | John Tyler| | Millard Fillmore| |Chester A. Arthur| | Calvin Coolidge| +-----------------+
In [4]:
Copied!
presidents_who_were_vp.ai.explain()
presidents_who_were_vp.ai.explain()
Out[4]:
'In summary, this dataframe is retrieving the distinct names of presidents who have also served as vice presidents. The data is sourced from a temporary view named `spark_ai_temp_view_1549440914`, which contains columns for president and vice president.'
In [5]:
Copied!
presidents_who_were_vp.ai.verify("expect no NULL values")
presidents_who_were_vp.ai.verify("expect no NULL values")
INFO: LLM Output: def has_no_null_values(df) -> bool: from pyspark.sql.functions import col, sum as _sum # Check if there are any null values in the DataFrame null_counts = df.select([_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).collect()[0].asDict() # If any column has null values, return False if any(value > 0 for value in null_counts.values()): return False else: return True result = has_no_null_values(df) INFO: Generated code: def has_no_null_values(df) -> bool: from pyspark.sql.functions import col, sum as _sum # Check if there are any null values in the DataFrame null_counts = df.select([_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).collect()[0].asDict() # If any column has null values, return False if any(value > 0 for value in null_counts.values()): return False else: return True result = has_no_null_values(df) INFO: Result: True
Example 3: Top 10 tech companies¶
In [12]:
Copied!
# Search and ingest web content into a DataFrame
company_df=spark_ai.create_df("Top 10 tech companies by market cap", ['company', 'cap', 'country'])
company_df.show()
# Search and ingest web content into a DataFrame
company_df=spark_ai.create_df("Top 10 tech companies by market cap", ['company', 'cap', 'country'])
company_df.show()
INFO: Parsing URL: https://www.statista.com/statistics/1350976/leading-tech-companies-worldwide-by-market-cap/ INFO: SQL query for the ingestion: CREATE OR REPLACE TEMP VIEW spark_ai_temp_view_17b9f3 AS SELECT * FROM VALUES ('Apple', 2242, 'USA'), ('Microsoft', 1821, 'USA'), ('Alphabet (Google)', 1229, 'USA'), ('Amazon', 902.4, 'USA'), ('Tesla', 541.4, 'USA'), ('TSMC', 410.9, 'Taiwan'), ('NVIDIA', 401.7, 'USA'), ('Tencent', 377.8, 'China'), ('Meta Platforms (Facebook)', 302.1, 'USA'), ('Samsung', 301.7, 'South Korea') AS v1(company, cap, country) INFO: Storing data into temp view: spark_ai_temp_view_17b9f3 +--------------------+------+-----------+ | company| cap| country| +--------------------+------+-----------+ | Apple|2242.0| USA| | Microsoft|1821.0| USA| | Alphabet (Google)|1229.0| USA| | Amazon| 902.4| USA| | Tesla| 541.4| USA| | TSMC| 410.9| Taiwan| | NVIDIA| 401.7| USA| | Tencent| 377.8| China| |Meta Platforms (F...| 302.1| USA| | Samsung| 301.7|South Korea| +--------------------+------+-----------+
In [13]:
Copied!
us_company_df=company_df.ai.transform("companies in USA")
us_company_df.show()
us_company_df=company_df.ai.transform("companies in USA")
us_company_df.show()
INFO: Creating temp view for the transform: df.createOrReplaceTempView("spark_ai_temp_view_dbc657") INFO: SQL query for the transform: SELECT * FROM spark_ai_temp_view_dbc657 WHERE country = 'USA' +--------------------+------+-------+ | company| cap|country| +--------------------+------+-------+ | Apple|2242.0| USA| | Microsoft|1821.0| USA| | Alphabet (Google)|1229.0| USA| | Amazon| 902.4| USA| | Tesla| 541.4| USA| | NVIDIA| 401.7| USA| |Meta Platforms (F...| 302.1| USA| +--------------------+------+-------+
In [14]:
Copied!
us_company_df.ai.explain()
us_company_df.ai.explain()
Out[14]:
"In summary, this dataframe is retrieving the company, cap, and country information for all companies located in the USA from the local relation 'v1'."
In [15]:
Copied!
us_company_df.ai.verify("expect all company names to be unique")
us_company_df.ai.verify("expect all company names to be unique")
INFO: LLM Output: def check_unique_companies(df) -> bool: from pyspark.sql import functions as F # Count the number of unique companies unique_companies = df.select(F.countDistinct("company")).collect()[0][0] # Check if the number of unique companies is equal to the total number of companies if unique_companies == df.count(): return True else: return False result = check_unique_companies(df) INFO: Generated code: def check_unique_companies(df) -> bool: from pyspark.sql import functions as F # Count the number of unique companies unique_companies = df.select(F.countDistinct("company")).collect()[0][0] # Check if the number of unique companies is equal to the total number of companies if unique_companies == df.count(): return True else: return False result = check_unique_companies(df) INFO: Result: True
Example 4: Ingestion from a URL¶
Instead of searching for the web page, you can also ask the SparkAI to ingest from a URL.
In [16]:
Copied!
best_albums_df = spark_ai.create_df('https://time.com/6235186/best-albums-2022/', ["album", "artist", "year"])
best_albums_df.show()
best_albums_df = spark_ai.create_df('https://time.com/6235186/best-albums-2022/', ["album", "artist", "year"])
best_albums_df.show()
INFO: Parsing URL: https://time.com/6235186/best-albums-2022/ INFO: SQL query for the ingestion: CREATE OR REPLACE TEMP VIEW spark_ai_temp_view_2adc76 AS SELECT * FROM VALUES ('Motomami', 'Rosalía', 2022), ('You Can’t Kill Me', '070 Shake', 2022), ('Mr. Morale & The Big Steppers', 'Kendrick Lamar', 2022), ('Big Time', 'Angel Olsen', 2022), ('Electricity', 'Ibibio Sound Machine', 2022), ('It’s Almost Dry', 'Pusha T', 2022), ('Chloe and the Next 20th Century', 'Father John Misty', 2022), ('Renaissance', 'Beyoncé', 2022), ('19 Masters', 'Saya Gray', 2022), ('Un Verano Sin Ti', 'Bad Bunny', 2022) AS v1(album, artist, year) INFO: Storing data into temp view: spark_ai_temp_view_2adc76 +--------------------+--------------------+----+ | album| artist|year| +--------------------+--------------------+----+ | Motomami| Rosalía|2022| | You Can’t Kill Me| 070 Shake|2022| |Mr. Morale & The ...| Kendrick Lamar|2022| | Big Time| Angel Olsen|2022| | Electricity|Ibibio Sound Machine|2022| | It’s Almost Dry| Pusha T|2022| |Chloe and the Nex...| Father John Misty|2022| | Renaissance| Beyoncé|2022| | 19 Masters| Saya Gray|2022| | Un Verano Sin Ti| Bad Bunny|2022| +--------------------+--------------------+----+
In [17]:
Copied!
best_albums_df.ai.verify("expect each year to be 2022")
best_albums_df.ai.verify("expect each year to be 2022")
INFO: LLM Output: def check_year(df) -> bool: from pyspark.sql.functions import col # Check if all years in the DataFrame are 2022 if df.filter(col('year') != 2022).count() == 0: return True else: return False result = check_year(df) INFO: Generated code: def check_year(df) -> bool: from pyspark.sql.functions import col # Check if all years in the DataFrame are 2022 if df.filter(col('year') != 2022).count() == 0: return True else: return False result = check_year(df) INFO: Result: True
Example 5: UDF Generation¶
You can also ask the SparkAI to generate code for a Spark UDF by providing.
In [18]:
Copied!
@spark_ai.udf
def convert_grades(grade_percent: float) -> str:
"""Convert the grade percent to a letter grade using standard cutoffs"""
...
@spark_ai.udf
def convert_grades(grade_percent: float) -> str:
"""Convert the grade percent to a letter grade using standard cutoffs"""
...
INFO: Creating following Python UDF: def convert_grades(grade_percent) -> str: if grade_percent is not None: if grade_percent >= 90.0: return 'A' elif grade_percent >= 80.0: return 'B' elif grade_percent >= 70.0: return 'C' elif grade_percent >= 60.0: return 'D' else: return 'F'
In [19]:
Copied!
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.udf.register("convert_grades", convert_grades)
percentGrades = [(1, 97.8), (2, 72.3), (3, 81.2)]
df = spark.createDataFrame(percentGrades, ["student_id", "grade_percent"])
df.selectExpr("student_id", "convert_grades(grade_percent)").show()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.udf.register("convert_grades", convert_grades)
percentGrades = [(1, 97.8), (2, 72.3), (3, 81.2)]
df = spark.createDataFrame(percentGrades, ["student_id", "grade_percent"])
df.selectExpr("student_id", "convert_grades(grade_percent)").show()
[Stage 26:> (0 + 1) / 1]
+----------+-----------------------------+ |student_id|convert_grades(grade_percent)| +----------+-----------------------------+ | 1| A| | 2| C| | 3| B| +----------+-----------------------------+
Cache¶
The SparkAI supports a simple in-memory and persistent cache system. It keeps an in-memory staging cache, which gets updated for LLM and web search results. The staging cache can be persisted through the commit() method. Cache lookup is always performed on both in-memory staging cache and persistent cache.
In [6]:
Copied!
spark_ai.commit()
spark_ai.commit()
In [ ]:
Copied!