Continuous Data Loading In Snowflake

Data Engineering Simplified
3 min readAug 6, 2023

--

Check out my courses in Udemy. Popular courses include building utility using Snowpark Python API to ingest CSV data as well as JSON data automatically without writing any DDL/DML statement and save 95% of manual effort.

Loading data continuously in Snowflake can be a challenging task, especially without the use of an external stage. This article will demonstrate a simple approach to loading data from your local machine using a Python program.

https://youtu.be/PNK49SJvXjE

Check out my free and paid courses in Udemy

This hands on visual guide covers following, most important features with live examples

  1. SnowPipe Architecture
  2. SnowPipe components
  3. Internal Stages & Python program

This hands-on tutorial is a helpful guide for data developers looking to ingest streaming and micro-batch data into Snowflake. It covers how to use SnowPipe, a first-class SQL object, to support streaming data, delta data, and CDC (Change Data Capture) data. SnowPipe enables easy and efficient data loading from internal or external stages directly to the target table in Snowflake.

SQL Scripts

use role sysadmin;
use database tipsdb;
create schema ch10;

CREATE WAREHOUSE ch10_demo_wh WITH WAREHOUSE_SIZE = 'MEDIUM' WAREHOUSE_TYPE = 'STANDARD' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE MIN_CLUSTER_COUNT = 1 MAX_CLUSTER_COUNT = 1 SCALING_POLICY = 'STANDARD' COMMENT = 'this is demo warehouses';
use warehouse ch10_demo_wh;

alter session set query_tag ='chapter-10';

-- you can craete named stages and then list them using list command
show stages;
list @STG01;

-- very simple contruct for internal stage
CREATE STAGE "TIPSDB"."CH09".stg03 COMMENT = 'This is my demo internal stage';

-- if you have lot of stages, then you can use like
show stages like '%03%';

show stages like '%s3%';
-- if it has credential, it will show

-- very simple contruct for internal stage
CREATE STAGE "TIPSDB"."CH09".stg03 COMMENT = 'This is my demo internal stage';

-- if you have lot of stages, then you can use like
show stages like '%03%';

show stages like '%s3%';

list @~ pattern='.*test.*';
list @~ pattern='.*.gz';
list @~ pattern='.*.html';

show stages like 'TIPS_S3_EXTERNAL_STAGE';
list @TIPS_S3_EXTERNAL_STAGE;

drop table customer_parquet_ff;
create or replace table customer_parquet_ff(
my_data variant
)
STAGE_FILE_FORMAT = (TYPE = PARQUET);

list @%customer_parquet/;
-- now lets query the data using $ notation
select
metadata$filename,
metadata$file_row_number,
$1:CUSTOMER_KEY::varchar,
$1:NAME::varchar,
$1:ADDRESS::varchar,
$1:COUNTRY_KEY::varchar,
$1:PHONE::varchar,
$1:ACCT_BAL::decimal(10,2),
$1:MKT_SEGMENT::varchar,
$1:COMMENT::varchar
from @%customer_parquet_ff ;

copy into customer_parquet_ff from @%customer_parquet_ff/customer.snappy.parquet;

select * from customer_parquet_ff;

copy into customer_parquet_ff
from @%customer_parquet_ff/customer.snappy.parquet
force=true;

drop table my_customer;
create or replace table my_customer (
CUST_KEY NUMBER(38,0),
NAME VARCHAR(25),
ADDRESS VARCHAR(40),
NATION_KEY NUMBER(38,0),
PHONE VARCHAR(15),
ACCOUNT_BALANCE NUMBER(12,2),
MARKET_SEGMENT VARCHAR(10),
COMMENT VARCHAR(117)
);

--lets see if it has any record
select * from my_customer; -

create or replace file format csv_ff type = 'csv'
field_optionally_enclosed_by = '\042';

-- before check if stg exist and has data
show stages like '%_010%';

-- let's create it.
create or replace stage stg_010
file_format = csv_ff
comment = 'This is my internal stage for ch-10';

-- let's describe the stage using desc sql command
desc stage stg_010;

list @stg_010/ pattern='.*.csv';

-- ***********************************
-- Step-05
-- load into table
copy into my_customer from @stg_010/history;

-- create a pipe object & understand its construct.
drop pipe my_pipe_10;
create or replace pipe my_pipe_10
as
copy into my_customer from @stg_010/delta;

-- describe the pipe object
desc pipe my_pipe_10;

select * from table(validate_pipe_load(
pipe_name=>'my_pipe_10',
start_time=>dateadd(hour, -1, current_timestamp())));

alter pipe my_pipe_10 set pipe_execution_paused = false;

alter pipe my_pipe_10 refresh prefix='/customer_10*' modified_after='2021-11-01T13:56:46-07:00';

Python Script

from logging import getLogger
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from snowflake.ingest.utils.uris import DEFAULT_SCHEME
from datetime import timedelta
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
import time
import datetime
import os
import logging


logging.basicConfig(
filename='/tmp/ingest.log',
level=logging.DEBUG)
logger = getLogger(__name__)

with open("/tmp/snowflake-key/rsa_key.p8", 'rb') as pem_in:
pemlines = pem_in.read()
private_key_obj = load_pem_private_key(pemlines,
os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
default_backend())

private_key_text = private_key_obj.private_bytes(
Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')

file_list=['/customer_101.csv']
ingest_manager = SimpleIngestManager(account='<snowflake-account>',
host='<ab12345.es-east.azure>.snowflakecomputing.com',
user='<user-name>',
pipe='<pipename>',
private_key=private_key_text)
staged_file_list = []
for file_name in file_list:
staged_file_list.append(StagedFile(file_name, None))

try:
resp = ingest_manager.ingest_files(staged_file_list)
except HTTPError as e:
logger.error(e)
exit(1)

print("Section: Assert")
assert(resp['responseCode'] == 'SUCCESS')

while True:
history_resp = ingest_manager.get_history()

if len(history_resp['files']) > 0:
print('Ingest Report:\n')
print(history_resp)
break
else:
# wait for 20 seconds
time.sleep(20)

hour = timedelta(hours=1)
date = datetime.datetime.utcnow() - hour
history_range_resp = ingest_manager.get_history_range(date.isoformat() + 'Z')

print('\nHistory scan report: \n')
print(history_range_resp)

--

--

Data Engineering Simplified

Passionate About Data Engineering, Snowflake Cloud Data Warehouse & Cloud Native Data Services