DataFrame transform example with the Spark SQL ReAct Agent¶
The example is modified from the blog post Introducing Window Functions in Spark SQL.
In [1]:
Copied!
from pyspark_ai import SparkAI
spark_ai = SparkAI(verbose=True)
spark_ai.activate()
from pyspark_ai import SparkAI
spark_ai = SparkAI(verbose=True)
spark_ai.activate()
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/08/15 11:28:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
In [2]:
Copied!
# create a dataframe productRevenue
df = spark_ai._spark.createDataFrame(
[
("Normal", "Cellphone", 6000),
("Normal", "Tablet", 1500),
("Mini", "Tablet", 5500),
("Mini", "Cellphone", 5000),
("Foldable", "Cellphone", 6500),
("Foldable", "Tablet", 2500),
("Pro", "Cellphone", 3000),
("Pro", "Tablet", 4000),
("Pro Max", "Cellphone", 4500)
],
["product", "category", "revenue"]
)
# create a dataframe productRevenue
df = spark_ai._spark.createDataFrame(
[
("Normal", "Cellphone", 6000),
("Normal", "Tablet", 1500),
("Mini", "Tablet", 5500),
("Mini", "Cellphone", 5000),
("Foldable", "Cellphone", 6500),
("Foldable", "Tablet", 2500),
("Pro", "Cellphone", 3000),
("Pro", "Tablet", 4000),
("Pro Max", "Cellphone", 4500)
],
["product", "category", "revenue"]
)
In [3]:
Copied!
df.ai.transform("What are the best-selling and the second best-selling products in every category?").show()
df.ai.transform("What are the best-selling and the second best-selling products in every category?").show()
INFO: Creating temp view for the transform: df.createOrReplaceTempView("spark_ai_temp_view_d566e4") > Entering new chain... Thought: I will use the ROW_NUMBER() function to rank the products in each category by revenue in descending order. Then I will filter the results to only include the products with a rank of 1 or 2. Action: query_validation Action Input: ``` WITH ranked_products AS ( SELECT product, category, revenue, ROW_NUMBER() OVER (PARTITION BY category ORDER BY revenue DESC) as rank FROM spark_ai_temp_view_d566e4 ) SELECT product, category, revenue FROM ranked_products WHERE rank IN (1, 2) ``` Observation: OK Thought:I now know the final answer. Final Answer: ``` WITH ranked_products AS ( SELECT product, category, revenue, ROW_NUMBER() OVER (PARTITION BY category ORDER BY revenue DESC) as rank FROM spark_ai_temp_view_d566e4 ) SELECT product, category, revenue FROM ranked_products WHERE rank IN (1, 2) ``` > Finished chain.
[Stage 0:> (0 + 10) / 10]
+--------+---------+-------+ | product| category|revenue| +--------+---------+-------+ |Foldable|Cellphone| 6500| | Normal|Cellphone| 6000| | Mini| Tablet| 5500| | Pro| Tablet| 4000| +--------+---------+-------+
In [3]:
Copied!
df.ai.transform("What is the difference between the revenue of each product and the revenue of the best-selling product in the same category of that product?").show()
df.ai.transform("What is the difference between the revenue of each product and the revenue of the best-selling product in the same category of that product?").show()
INFO: Creating temp view for the transform: df.createOrReplaceTempView("spark_ai_temp_view_8f41a9") > Entering new chain... Thought: I need to find the revenue of the best-selling product in each category first. Then, I will join this result with the original table and calculate the difference in revenue. Action: query_sql_db Action Input: "SELECT category, MAX(revenue) as max_revenue FROM spark_ai_temp_view_8f41a9 GROUP BY category"
Observation: [('Cellphone', '6500'), ('Tablet', '5500')] Thought:I can join this result with the original table and calculate the difference in revenue. Action: query_validation Action Input: ``` SELECT a.product, a.category, a.revenue, b.max_revenue, (b.max_revenue - a.revenue) as revenue_difference FROM spark_ai_temp_view_8f41a9 a JOIN (SELECT category, MAX(revenue) as max_revenue FROM spark_ai_temp_view_8f41a9 GROUP BY category) b ON a.category = b.category ``` Observation: OK Thought:I now know the final answer. Final Answer: ``` SELECT a.product, a.category, a.revenue, b.max_revenue, (b.max_revenue - a.revenue) as revenue_difference FROM spark_ai_temp_view_8f41a9 a JOIN (SELECT category, MAX(revenue) as max_revenue FROM spark_ai_temp_view_8f41a9 GROUP BY category) b ON a.category = b.category ``` > Finished chain. +--------+---------+-------+-----------+------------------+ | product| category|revenue|max_revenue|revenue_difference| +--------+---------+-------+-----------+------------------+ | Normal|Cellphone| 6000| 6500| 500| | Normal| Tablet| 1500| 5500| 4000| | Mini| Tablet| 5500| 5500| 0| | Mini|Cellphone| 5000| 6500| 1500| |Foldable|Cellphone| 6500| 6500| 0| |Foldable| Tablet| 2500| 5500| 3000| | Pro|Cellphone| 3000| 6500| 3500| | Pro| Tablet| 4000| 5500| 1500| | Pro Max|Cellphone| 4500| 6500| 2000| +--------+---------+-------+-----------+------------------+
In [5]:
Copied!
df.ai.transform("Pivot the data by product and the revenue for each product").show()
df.ai.transform("Pivot the data by product and the revenue for each product").show()
INFO: Creating temp view for the transform: df.createOrReplaceTempView("spark_ai_temp_view_096b0f") > Entering new chain... Thought: Spark SQL does not support dynamic pivot operations, which are required to transpose the table as requested. I should get all the distinct values of column product. Action: query_sql_db Action Input: "SELECT DISTINCT product FROM spark_ai_temp_view_096b0f" Observation: [('Normal',), ('Mini',), ('Foldable',), ('Pro',), ('Pro Max',)] Thought:I can write a query to pivot the table by product and sum the revenue for each product. Action: query_validation Action Input: SELECT * FROM spark_ai_temp_view_096b0f PIVOT (SUM(revenue) FOR product IN ('Normal', 'Mini', 'Foldable', 'Pro', 'Pro Max')) Observation: OK Thought:I now know the final answer. Final Answer: SELECT * FROM spark_ai_temp_view_096b0f PIVOT (SUM(revenue) FOR product IN ('Normal', 'Mini', 'Foldable', 'Pro', 'Pro Max')) > Finished chain. +---------+------+----+--------+----+-------+ | category|Normal|Mini|Foldable| Pro|Pro Max| +---------+------+----+--------+----+-------+ |Cellphone| 6000|5000| 6500|3000| 4500| | Tablet| 1500|5500| 2500|4000| null| +---------+------+----+--------+----+-------+
In [6]:
Copied!
df.ai.transform("Pivot the data by catagory and the revenue for each product").show()
df.ai.transform("Pivot the data by catagory and the revenue for each product").show()
INFO: Creating temp view for the transform: df.createOrReplaceTempView("spark_ai_temp_view_48845a") > Entering new chain... Thought: Spark SQL does not support dynamic pivot operations, which are required to transpose the table as requested. I should get all the distinct values of column category. Action: query_sql_db Action Input: "SELECT DISTINCT category FROM spark_ai_temp_view_48845a" Observation: [('Cellphone',), ('Tablet',)] Thought:I can write a query to pivot the table by category and sum the revenue for each product and category combination. Action: query_validation Action Input: SELECT * FROM spark_ai_temp_view_48845a PIVOT (SUM(revenue) FOR category IN ('Cellphone', 'Tablet')) Observation: OK Thought:I now know the final answer. Final Answer: SELECT * FROM spark_ai_temp_view_48845a PIVOT (SUM(revenue) FOR category IN ('Cellphone', 'Tablet')) > Finished chain. +--------+---------+------+ | product|Cellphone|Tablet| +--------+---------+------+ | Mini| 5000| 5500| | Pro| 3000| 4000| | Pro Max| 4500| null| |Foldable| 6500| 2500| | Normal| 6000| 1500| +--------+---------+------+
In [6]:
Copied!