Automating DWH Processes with Python and Snowflake

# Для Windows
python -m venv venv
venv\Scripts\activate

# Для macOS/Linux
python3 -m venv venv
source venv/bin/activate

Installing the official connector Snowflake for Python using pip:

pip install snowflake-connector-python

Then we set up a connection to the Snowflake database:

import snowflake.connector

conn = snowflake.connector.connect(
    user="YOUR_USERNAME",
    password='YOUR_PASSWORD',
    account="YOUR_ACCOUNT_IDENTIFIER",
    warehouse="YOUR_WAREHOUSE",
    database="YOUR_DATABASE",
    schema="YOUR_SCHEMA"
)

# курсор для выполнения операций
cursor = conn.cursor()

In addition to the Snowflake connector, you may need the following libraries:

  • pandas;

  • SQLalchemy And snowflake-sqlalchemy: to use ORM and facilitate interaction with the database;

  • python-dotenv: to manage environment variables and securely store credentials;

  • schedule or APScheduler: for planning and automating tasks.

Now let's create a file .env and add the credentials to it:

SNOWFLAKE_USER=YOUR_USERNAME
SNOWFLAKE_PASSWORD=YOUR_PASSWORD
SNOWFLAKE_ACCOUNT=YOUR_ACCOUNT_IDENTIFIER

Then we load these variables in the script:

import os
from dotenv import load_dotenv

load_dotenv()

user = os.getenv('SNOWFLAKE_USER')
password = os.getenv('SNOWFLAKE_PASSWORD')
account = os.getenv('SNOWFLAKE_ACCOUNT')

#  переменные для подключения
conn = snowflake.connector.connect(
    user=user,
    password=password,
    account=account
)

Python Syntax for Working with Snowflake

After setting up the environment and installing the necessary libraries, you can start working with Snowflake via Python. The main tool for this is Snowflake Connector for Python.

Example of connecting and executing a simple query:

import snowflake.connector

# соединение
conn = snowflake.connector.connect(
    user="YOUR_USERNAME",
    password='YOUR_PASSWORD',
    account="YOUR_ACCOUNT_IDENTIFIER"
)

# курсор
cursor = conn.cursor()

# SQL-запрос
cursor.execute("SELECT CURRENT_VERSION()")

# результат
version = cursor.fetchone()
print(f"Текущая версия Snowflake: {version[0]}")

# закрытие соединения
cursor.close()
conn.close()

In this example, we connect to Snowflake, run a query to get the current version of the database, and print the result.

When working with databases, it is important to properly manage transactions and handle possible errors. In Snowflake Connector for Python, transactions are managed using methods commit() And rollback().

Example of transaction management and exception handling:

try:
    # начало транзакции
    conn.cursor().execute("BEGIN")

    # выполнение операций
    cursor.execute("INSERT INTO employees (id, name) VALUES (1, 'Artem')")
    cursor.execute("INSERT INTO employees (id, name) VALUES (2, 'Ivan')")

    # подтверждение транзакции
    conn.cursor().execute("COMMIT")
except Exception as e:
    # откат транзакции в случае ошибки
    conn.cursor().execute("ROLLBACK")
    print(f"Ошибка: {e}")
finally:
    cursor.close()
    conn.close()

In this code, if an error occurs while executing the operations, the transaction will be rolled back and the data will not be saved.

Loading data can be done using commands PUT And COPY INTOor using pandas.

Example of loading data from a CSV file using COPY INTO:

# загрузка файла в внутренний stage
cursor.execute("""
    PUT file://path/to/data.csv @%your_table
""")

# копирование данных из файла в таблицу
cursor.execute("""
    COPY INTO your_table
    FROM @%your_table/data.csv
    FILE_FORMAT = (TYPE = 'CSV', FIELD_DELIMITER = ',', SKIP_HEADER = 1)
""")

You can download data from the table to a local file or work with it directly in pandas.

Example of unloading data into a DataFrame:

import pandas as pd

# запрос и получение данных в DataFrame
df = pd.read_sql("SELECT * FROM your_table", conn)

Using pandasyou can perform various data transformations before loading or after unloading.

Example of data transformation before loading:

# Чтение данных из CSV
df = pd.read_csv('path/to/data.csv')

# Преобразование данных
df['total_price'] = df['quantity'] * df['unit_price']

# Загрузка данных в Snowflake
from sqlalchemy import create_engine
engine = create_engine('snowflake://...')

df.to_sql('your_table', engine, if_exists="append", index=False)

5 application scenarios

Automatic daily data download

Let's say you want to load new data daily from a local CSV file into a Snowflake table:

def update_aggregates():
    conn = snowflake.connector.connect(...)
    cursor = conn.cursor()

    # Обновление агрегатов
    cursor.execute("""
        INSERT INTO hourly_aggregates
        SELECT CURRENT_TIMESTAMP, COUNT(*)
        FROM transactions
        WHERE transaction_time >= DATEADD(hour, -1, CURRENT_TIMESTAMP)
    """)

    conn.commit()
    cursor.close()
    conn.close()
    print("Агрегированные данные обновлены.")

# Планирование задачи на каждый час
schedule.every().hour.do(update_aggregates)

Updating aggregated data

Need for hourly updating of aggregated data for reporting:

def clean_old_data():
    conn = snowflake.connector.connect(...)
    cursor = conn.cursor()

    # Удаление старых записей
    cursor.execute("""
        DELETE FROM user_activity
        WHERE activity_date < DATEADD(year, -1, CURRENT_DATE)
    """)

    conn.commit()
    cursor.close()
    conn.close()
    print("Старые данные удалены.")

# Планирование задачи на каждое воскресенье в 03:00
schedule.every().sunday.at("03:00").do(clean_old_data)

Monitoring and error notifications

Scenario: Track errors in the ETL process and send notifications to the responsible person.

Code:

def backup_critical_tables():
    conn = snowflake.connector.connect(...)
    cursor = conn.cursor()

    critical_tables = ['customers', 'orders']
    for table in critical_tables:
        backup_table = f"{table}_backup_{time.strftime('%Y%m%d')}"
        cursor.execute(f"CREATE TABLE {backup_table} CLONE {table}")

    conn.commit()
    cursor.close()
    conn.close()
    print("Резервные копии созданы.")

# Планирование задачи на первое число каждого месяца в 01:00
schedule.every().month.at("01:00").do(backup_critical_tables)

Cleaning up obsolete data

Let's say you want to delete data older than one year weekly to optimize storage:

def backup_critical_tables():
    conn = snowflake.connector.connect(...)
    cursor = conn.cursor()

    critical_tables = ['customers', 'orders']
    for table in critical_tables:
        backup_table = f"{table}_backup_{time.strftime('%Y%m%d')}"
        cursor.execute(f"CREATE TABLE {backup_table} CLONE {table}")

    conn.commit()
    cursor.close()
    conn.close()
    print("Резервные копии созданы.")

# Планирование задачи на первое число каждого месяца в 01:00
schedule.every().month.at("01:00").do(backup_critical_tables)

Automatic data backup

Scenario: Create monthly backups of critical tables.

Code:

def backup_critical_tables():
    conn = snowflake.connector.connect(...)
    cursor = conn.cursor()

    critical_tables = ['customers', 'orders']
    for table in critical_tables:
        backup_table = f"{table}_backup_{time.strftime('%Y%m%d')}"
        cursor.execute(f"CREATE TABLE {backup_table} CLONE {table}")

    conn.commit()
    cursor.close()
    conn.close()
    print("Резервные копии созданы.")

# Планирование задачи на первое число каждого месяца в 01:00
schedule.every().month.at("01:00").do(backup_critical_tables)

Conclusion

Automation of processes in Snowflake by using Python opens up wide possibilities for optimizing work with data storage.


I would like to remind you about the open lesson “Effective data analysis: Immersion into the world of DWH and analytical engineering”, which will be held at Otus on September 23. In this lesson, participants will learn:

  • DWH (Data Warehouse) Fundamentals: Understanding the architecture and key components of data warehouses and their role in business analytics.

  • Tools and Technologies: An overview of modern tools for working with DWH, such as ETL processes, BI platforms and query languages ​​(SQL).

  • Practical cases: Analysis of real examples of using DWH to make informed business decisions and optimize processes.

You can sign up on the “Data Warehouse Analyst” course page.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *