Automating DWH Processes with Python and Snowflake
# Для Windows
python -m venv venv
venv\Scripts\activate
# Для macOS/Linux
python3 -m venv venv
source venv/bin/activate
Installing the official connector Snowflake for Python using pip
:
pip install snowflake-connector-python
Then we set up a connection to the Snowflake database:
import snowflake.connector
conn = snowflake.connector.connect(
user="YOUR_USERNAME",
password='YOUR_PASSWORD',
account="YOUR_ACCOUNT_IDENTIFIER",
warehouse="YOUR_WAREHOUSE",
database="YOUR_DATABASE",
schema="YOUR_SCHEMA"
)
# курсор для выполнения операций
cursor = conn.cursor()
In addition to the Snowflake connector, you may need the following libraries:
pandas;
SQLalchemy And snowflake-sqlalchemy: to use ORM and facilitate interaction with the database;
python-dotenv: to manage environment variables and securely store credentials;
schedule or APScheduler: for planning and automating tasks.
Now let's create a file .env
and add the credentials to it:
SNOWFLAKE_USER=YOUR_USERNAME
SNOWFLAKE_PASSWORD=YOUR_PASSWORD
SNOWFLAKE_ACCOUNT=YOUR_ACCOUNT_IDENTIFIER
Then we load these variables in the script:
import os
from dotenv import load_dotenv
load_dotenv()
user = os.getenv('SNOWFLAKE_USER')
password = os.getenv('SNOWFLAKE_PASSWORD')
account = os.getenv('SNOWFLAKE_ACCOUNT')
# переменные для подключения
conn = snowflake.connector.connect(
user=user,
password=password,
account=account
)
Python Syntax for Working with Snowflake
After setting up the environment and installing the necessary libraries, you can start working with Snowflake via Python. The main tool for this is Snowflake Connector for Python.
Example of connecting and executing a simple query:
import snowflake.connector
# соединение
conn = snowflake.connector.connect(
user="YOUR_USERNAME",
password='YOUR_PASSWORD',
account="YOUR_ACCOUNT_IDENTIFIER"
)
# курсор
cursor = conn.cursor()
# SQL-запрос
cursor.execute("SELECT CURRENT_VERSION()")
# результат
version = cursor.fetchone()
print(f"Текущая версия Snowflake: {version[0]}")
# закрытие соединения
cursor.close()
conn.close()
In this example, we connect to Snowflake, run a query to get the current version of the database, and print the result.
When working with databases, it is important to properly manage transactions and handle possible errors. In Snowflake Connector for Python, transactions are managed using methods commit()
And rollback()
.
Example of transaction management and exception handling:
try:
# начало транзакции
conn.cursor().execute("BEGIN")
# выполнение операций
cursor.execute("INSERT INTO employees (id, name) VALUES (1, 'Artem')")
cursor.execute("INSERT INTO employees (id, name) VALUES (2, 'Ivan')")
# подтверждение транзакции
conn.cursor().execute("COMMIT")
except Exception as e:
# откат транзакции в случае ошибки
conn.cursor().execute("ROLLBACK")
print(f"Ошибка: {e}")
finally:
cursor.close()
conn.close()
In this code, if an error occurs while executing the operations, the transaction will be rolled back and the data will not be saved.
Loading data can be done using commands PUT And COPY INTOor using pandas.
Example of loading data from a CSV file using COPY INTO:
# загрузка файла в внутренний stage
cursor.execute("""
PUT file://path/to/data.csv @%your_table
""")
# копирование данных из файла в таблицу
cursor.execute("""
COPY INTO your_table
FROM @%your_table/data.csv
FILE_FORMAT = (TYPE = 'CSV', FIELD_DELIMITER = ',', SKIP_HEADER = 1)
""")
You can download data from the table to a local file or work with it directly in pandas.
Example of unloading data into a DataFrame:
import pandas as pd
# запрос и получение данных в DataFrame
df = pd.read_sql("SELECT * FROM your_table", conn)
Using pandasyou can perform various data transformations before loading or after unloading.
Example of data transformation before loading:
# Чтение данных из CSV
df = pd.read_csv('path/to/data.csv')
# Преобразование данных
df['total_price'] = df['quantity'] * df['unit_price']
# Загрузка данных в Snowflake
from sqlalchemy import create_engine
engine = create_engine('snowflake://...')
df.to_sql('your_table', engine, if_exists="append", index=False)
5 application scenarios
Automatic daily data download
Let's say you want to load new data daily from a local CSV file into a Snowflake table:
def update_aggregates():
conn = snowflake.connector.connect(...)
cursor = conn.cursor()
# Обновление агрегатов
cursor.execute("""
INSERT INTO hourly_aggregates
SELECT CURRENT_TIMESTAMP, COUNT(*)
FROM transactions
WHERE transaction_time >= DATEADD(hour, -1, CURRENT_TIMESTAMP)
""")
conn.commit()
cursor.close()
conn.close()
print("Агрегированные данные обновлены.")
# Планирование задачи на каждый час
schedule.every().hour.do(update_aggregates)
Updating aggregated data
Need for hourly updating of aggregated data for reporting:
def clean_old_data():
conn = snowflake.connector.connect(...)
cursor = conn.cursor()
# Удаление старых записей
cursor.execute("""
DELETE FROM user_activity
WHERE activity_date < DATEADD(year, -1, CURRENT_DATE)
""")
conn.commit()
cursor.close()
conn.close()
print("Старые данные удалены.")
# Планирование задачи на каждое воскресенье в 03:00
schedule.every().sunday.at("03:00").do(clean_old_data)
Monitoring and error notifications
Scenario: Track errors in the ETL process and send notifications to the responsible person.
Code:
def backup_critical_tables():
conn = snowflake.connector.connect(...)
cursor = conn.cursor()
critical_tables = ['customers', 'orders']
for table in critical_tables:
backup_table = f"{table}_backup_{time.strftime('%Y%m%d')}"
cursor.execute(f"CREATE TABLE {backup_table} CLONE {table}")
conn.commit()
cursor.close()
conn.close()
print("Резервные копии созданы.")
# Планирование задачи на первое число каждого месяца в 01:00
schedule.every().month.at("01:00").do(backup_critical_tables)
Cleaning up obsolete data
Let's say you want to delete data older than one year weekly to optimize storage:
def backup_critical_tables():
conn = snowflake.connector.connect(...)
cursor = conn.cursor()
critical_tables = ['customers', 'orders']
for table in critical_tables:
backup_table = f"{table}_backup_{time.strftime('%Y%m%d')}"
cursor.execute(f"CREATE TABLE {backup_table} CLONE {table}")
conn.commit()
cursor.close()
conn.close()
print("Резервные копии созданы.")
# Планирование задачи на первое число каждого месяца в 01:00
schedule.every().month.at("01:00").do(backup_critical_tables)
Automatic data backup
Scenario: Create monthly backups of critical tables.
Code:
def backup_critical_tables():
conn = snowflake.connector.connect(...)
cursor = conn.cursor()
critical_tables = ['customers', 'orders']
for table in critical_tables:
backup_table = f"{table}_backup_{time.strftime('%Y%m%d')}"
cursor.execute(f"CREATE TABLE {backup_table} CLONE {table}")
conn.commit()
cursor.close()
conn.close()
print("Резервные копии созданы.")
# Планирование задачи на первое число каждого месяца в 01:00
schedule.every().month.at("01:00").do(backup_critical_tables)
Conclusion
Automation of processes in Snowflake by using Python opens up wide possibilities for optimizing work with data storage.
I would like to remind you about the open lesson “Effective data analysis: Immersion into the world of DWH and analytical engineering”, which will be held at Otus on September 23. In this lesson, participants will learn:
DWH (Data Warehouse) Fundamentals: Understanding the architecture and key components of data warehouses and their role in business analytics.
Tools and Technologies: An overview of modern tools for working with DWH, such as ETL processes, BI platforms and query languages (SQL).
Practical cases: Analysis of real examples of using DWH to make informed business decisions and optimize processes.
You can sign up on the “Data Warehouse Analyst” course page.