Data Engineering Simplified
4 min readApr 22, 2023

Check out my courses in Udemy. Popular courses include building utility using Snowpark Python API to ingest CSV data as well as JSON data automatically without writing any DDL/DML statement and save 95% of manual effort.

Snowpark Python Example

Welcome to this comprehensive guide on Snowpark and Python syntax!

Snowpark is a library that facilitates the querying and processing of large-scale data in Snowflake Cloud Data Warehouse. It offers an easy-to-use API that allows developers to build data applications using one of three supported programming languages — Java, Python, and Scala. With Snowpark, you can process data directly in Snowflake without the need to transfer it to the application code’s system, enabling scalable and serverless data processing within the Snowflake engine. This makes Snowpark an ideal choice for developers looking to build data applications at scale without compromising on performance or flexibility.

Throughout these article, we will explore the different APIs like Snowpark Session API, Snowpark Data Frame API , Snowpark Reader API, Snowpark Writer API, Snowpark File API, and many more using python sample code.

Whether you are a beginner or an experienced developer, this article and sample snowflake snowpark sample python code will provide you with a comprehensive understanding of syntax.

Connect to Snowflake Using Snowpark Session API


from snowflake.snowpark import Session

# connection parameter
# just account name and user/pwd
connection_param = {
"ACCOUNT":"ABC12345",
"USER":"<my-user-id>",
"PASSWORD":"<my-password>"
}
# print connection params
print("The Parameter :",connection_param)

# creating a session object
session = Session.builder.configs(connection_param).create()

# print values from session object to test
print("\n\t Current Account Name: ",session.get_current_account())
print("\t Current Database Name: ",session.get_current_database())
print("\t Current Schema Name: ",session.get_current_schema())
print("\t Current Role Name: ",session.get_current_role())
print("\t Current Warehouse Name: ",session.get_current_warehouse())
print("\t Fully Qualified Schema Name: ",session.get_fully_qualified_current_schema(),"\n")

print("Session Object Type:", type(session))
# closing the session
session.close()

If you would like to see the basic authentication in action, watch this complete video

Table As Snowpark Data Frame Using Session API


from snowflake.snowpark import Session

# connection parameter
connection_param = {
"ACCOUNT":"ABC12345",
"USER":"<my-user-id>",
"PASSWORD":"<my-password>"
}

# approach-1
# call table function on session API to get dataframe
# Spark Version => spark.table() or spark.read.table()
emp_df = session.table("emp")

# display data
emp_df.show(2)

# approach-2
database = "DEMO"
schema = "SNOWPARK"
emp_df = session.table([database,schema,"emp"])

# display data
emp_df.show(2)

# print data type
print(type(emp_df))

# closing the session
session.close()

If you would like to see the snowflake table to table object in snowpark in action, watch this complete video

Reading Parquet File From Snowpark


# import snowflake snowpark library
from snowflake.snowpark import Session
from snowflake.snowpark.types import StructType, StructField, StringType
from snowflake.snowpark.functions import col

print("\n\tSnowpark Program Starting...")
# define connection parameter dictionary with role,database & schema
connection_parameters = {
"ACCOUNT":"AB12345",
"USER":"<my-user>",
"PASSWORD":"<my-password>",
"ROLE":"<my-role>",
"DATABASE":"<my-db>",
"SCHEMA":"<my-schema>"
}

# creating a session object
session = Session.builder.configs(connection_parameters).create()

# check if connection is through or not.
print("\n\tFully Qualified Schema: ", session.get_fully_qualified_current_schema())


# case -1 (without header)
df = session.read.parquet("@my_stage/case01/data_0_0_0.snappy.parquet")

# case -2 (with header)
df = session.read.parquet("@my_stage/case02/data_0_0_0.snappy.parquet")

# case -3 (multiple files + with header)
df = session.read.parquet("@my_stage/case03")

# case -4 (transient)
df = session.read.parquet("@my_stage/case03/")

df_schema = df.schema
print("\n\t Schema Object: ",type(df_schema))
print("\n\t Number Of Columns: ",len(df_schema.fields),"\n")

for each_col in df_schema:
print("\t\t",each_col.name, " => ", each_col.datatype)

print("\n\t Total Row Count In Dataframe: ",df.count())

print("\n\t First 10 records....")
df.show(10)

# Save the data back to a table
df.write.save_as_table("cust_case01")
#df.write.save_as_table("cust_case01", table_type="transient")

#close the session
session.close()

print("\n\tSnowpark Program Ended...")

dataframe.show() — Get Most Out Of It

# import snowflake snowpark library 
from snowflake.snowpark import Session
from snowflake.snowpark.types import StructType, StructField, StringType
from snowflake.snowpark.functions import col
from datetime import datetime

print("\n\t Start Time:",datetime.now().strftime("%H:%M:%S.%f"))

# define connection parameter dictionary with role,database & schema
connection_parameters = {
"ACCOUNT":"AB12345",
"USER":"<my-user>",
"PASSWORD":"<my-password>",
"ROLE":"SYSADMIN",
"DATABASE":"demo",
"SCHEMA":"snowpark"
}

# creating snowflake session object
session = Session.builder.configs(connection_parameters).create()


customer_df = session.sql("select * from customer_large")

print("\n\t Dataframe Type:",type(customer_df) )

print("\n\t Before show():",datetime.now().strftime("%H:%M:%S.%f"))

my_params = {'QUERY_TAG':'My-Query-Tag'}

# default behaviour & behaviour with parameter
# and how statement_params input works
customer_df.show(n=10000,max_width=100,statement_params=my_params)

print("\n\t After show():",datetime.now().strftime("%H:%M:%S.%f"))

session.close()

print("\nSnowpark Program Ended..." , datetime.now().strftime("%H:%M:%S.%f"))

Enable Logging In Snowpark

Example Code For Console Logging

# import snowflake-snowpark library 
from snowflake.snowpark import Session
from snowflake.snowpark.types import StructType, StructField, StringType
from snowflake.snowpark.functions import col, year
from datetime import datetime

# add these two imports...
import sys
import logging

logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# define connection parameter dictionary with role,database & schema
connection_parameters = {
"ACCOUNT":"AB12345",
"USER":"<my-user>",
"PASSWORD":"<my-password>",
"ROLE":"SYSADMIN",
"DATABASE":"demo",
"SCHEMA":"snowpark"
}

# creating snowflake session object
session = Session.builder.configs(connection_parameters).create()
customer_df = session.sql("select * from customer")
customer_df = customer_df.filter(col("SALUTATION")=='Dr.').filter(col("BIRTH_COUNTRY")=='FINLAND')
customer_df = customer_df.withColumn("YEAR",year("DATE_OF_BIRTH"))
customer_df = customer_df.group_by(col("YEAR")).count()
customer_df.show(3)
customer_df.explain()
session.close()

Logging Into A File

# add these two imports...
import sys
import logging

logging.basicConfig(filename='./snowpark.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%I:%M:%S')
Data Engineering Simplified
Data Engineering Simplified

Written by Data Engineering Simplified

Passionate About Data Engineering, Snowflake Cloud Data Warehouse & Cloud Native Data Services

No responses yet