How We Moved from Greenplum to Delta Table

We had 2 Clickhouse clusters, 1 GreenPlum cluster, 1 Kubernetes cluster, DataVault 2.0, a bunch of dbt showcases and all that stuff, and Dagster. Not that all of that was needed in the architecture, but once you start collecting services, follow your hobby to the end. The only thing is, What worried us was the cost.

Decision on complete redesign of the architecture came suddenly, like a hallucination in the desert at night – we realized that we could not wait any longer. Now our data is processed faster than thoughts in a head on LSD, and we can personalize the customer experience so that it becomes almost real.


Hello everyone! Artem here, and if you're ready to dive into the world of digital madness and see how we turned chaos into order, then hold on tight and read on.

We once wrote about how to build DataVault on Greenplum, you can read the article Here. We will briefly tell you about the reasons that prompted us to reconsider our approach – this is cost optimization and the lack of experts in GreenPlum administration.

In the first case, we would like to pay only for the time of actual use of resources, and in the second, despite the fact that our cluster was managed, we regularly encountered problems support and scaling of the systemwhich also led to unpredictable costs.

Choosing a new solution

When choosing a new solution, we took into account several limitations:

  • Dagster: for task management

  • Kubernetes: for container orchestration

  • Yandex Cloud: as the main cloud platform

The main components of the new solution:

  • Yandex Object Storage: for storage

  • Delta Table: for management

  • Apache Spark: for processing

Why these technologies?

Migrating from a Data Vault 2.0 architecture to a Delta Lake (S3 + Delta Table) architecture can offer a number of significant benefits:

  • Transactional Integrity and ACID Properties – Delta Lake ensures reliability and data integrity during concurrent operations

  • Query Performance and Optimization – The structure of Data Vault 2.0 can result in complex and less optimized queries that require significant optimization effort

  • Flexibility and scalability – Data Vault 2.0 is well suited for integrating from various sources, but can require significant resources to scale to high loads, while Delta Lake is designed to handle large volumes of data in a cloud environment and can easily scale to loads without significant costs

  • Simplified data schema management – delta tables support automatic schema evolution and schema enforcement, making it easier to make and manage changes to the data structure

  • Ecosystem and integration with other tools – delta table easily integrates with apache spark, which simplifies the development and operation of analytical applications

Apache Spark

One of the advantages of Apache Spark is that it can be deployed locally, which is especially convenient for development and testing. This is how our data is currently processed.

How does this work?

In its simplest form, our architecture can be represented as follows:

Data architecture

Data architecture

The data is stored in Yandex Object Storage in Delta Table format. This allows us to take advantage of data versioning and transactional integrity. In addition, we use S3 to store ML model artifacts.

When working with the Delta Table format, proper distribution of data across partitions plays a key role. Given the diversity of our data sources, we implemented partitioning not only by date, but also by source. This significantly increases the speed and efficiency of reading data. With the right choice of partition, Spark uses partition pruning, avoiding the need to read all the files in the table, which optimizes system performance.

For our MVP, we chose to use a single-node Spark cluster, configuring the SparkSession as follows:

SparkSession.builder.appName().master(f"local[*]")

A complete Spark configuration to work with S3 might look like this:

from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession

packages = [
    "io.delta:delta-core_2.12:2.0.0",
    "org.apache.hadoop:hadoop-aws:3.3.1",
]

builder = (SparkSession.builder.appName().master(f"local[*]")
    .config("spark.driver.memory", "16g")
    .config("spark.hadoop.fs.s3a.bucket.<bucket_name>.access.key", "<backet_access_key>")
    .config("spark.hadoop.fs.s3a.bucket.<bucket_name>.secret.key", "<backet_secret_key>")
    .config("spark.hadoop.fs.s3a.bucket.<bucket_name>.endpoint", "<backet_endpoint>")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.fast.upload", "true")
    .config("spark.sql.ui.explainMode", "extended")
    .config("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED")
    .config("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED")
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .config("spark.driver.extraJavaOptions", _spark_with_newer_jvm_compatibility_options)
)

spark = builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

This approach allows us to run Spark locally on the same machine where the session is created. Thanks to this, we can run Spark directly in Dagster pods and It is convenient to debug queries locally.

When working with Dagster, we use assets and managers. All business logic is placed inside the asset, and input/output control is transferred to the manager. Thus, it was enough for us to write a Spark manager and adapt the queries in the code to work with Spark.

To save resources during downtime, we have implemented Autoscale of a group of interruptible nodes in Managed Service for Kubernetes from Yandex Cloud. Deployed with Terraform, using a ready-made module. We configured the groups so that they can scale to zero nodes when there is no load. For Dagster pods, we have a separate group of nodes, where at least one node is always running. This way, the system practically does not consume resources during idle periods.

If needed, you can also request slightly more resources for specific tasks by setting up a configuration for each Dagster task:

@job(    
    tags={
        "dagster-k8s/config": {
            "container_config": {
                "resources": {
                    "requests": {"cpu": "250m", "memory": "64Mi"},
                    "limits": {"cpu": "500m", "memory": "2560Mi"},
                },
            },
    },
)
def my_job():
    my_op()

Либо, при использовании ассетов:
my_job = define_asset_job(
	name=’my_job’,
	selection=AssetSelection.assets(my_asset),
	tags={
        "dagster-k8s/config": {
            "container_config": {
                "resources": {
                    "requests": {"cpu": "250m", "memory": "64Mi"},
                    "limits": {"cpu": "500m", "memory": "2560Mi"},
                },
            },
    },
)

To write data to Clickhouse we use the library clickhouse-connectsince our source code was already written using it. However, it is worth noting that Spark has a driver for working with Clickhouse, allowing you to use Spark SQL to run queries and process data in Clickhouse.

It is important to note that in order to control the costs of using storage, it is necessary Regularly maintain all tables using the VACUUM and OPTIMIZE commands. The VACUUM command removes old, unused data and frees up space, which helps reduce storage costs. The OPTIMIZE command, in turn, not only reduces the size of tables, but also significantly improves query performance, speeding up data reading.

Important note: Spark + Delta Table support optimistic locking by default for write queries executed in one Spark cluster, but not for write queries from different Spark clusters. To enable such support, you need to add DynamoDB to the configuration. In Yandex Cloud, YDB implements the DynamoDB interface, but its integration turned out to be a difficult task for us. Therefore, we decided to use the pessimistic locking mechanism built into Dagster via the tag_concurrency_limits function. We tag all jobs that write data to a table with the name of this table. Dagster, in turn, prevents such jobs from executing simultaneously, which ensures correct lock management and eliminates conflicting write queries.

So your dagsterDaemon configuration will look something like this:

dagsterDaemon:
 enabled: true


 image:
   repository: "docker.io/dagster/dagster-celery-k8s"
   tag: ~
   pullPolicy: Always


 heartbeatTolerance: 300


 runCoordinator:
   enabled: true
   type: QueuedRunCoordinator
   config:
     queuedRunCoordinator:
       maxConcurrentRuns: 10
       tagConcurrencyLimits:
         - key: "single-thread"
           value:
             applyLimitPerUniqueValue: true
           limit: 1

After that, you can add the following to your task settings:

my_job = define_asset_job(
	name=’my_job’,
	selection=AssetSelection.assets(my_asset),
	tags={
	        “single-thread”: “my_job”,
        "dagster-k8s/config": {
            "container_config": {
                "resources": {
                    "requests": {"cpu": "250m", "memory": "64Mi"},
                    "limits": {"cpu": "500m", "memory": "2560Mi"},
                },
            },
    },
)

Benefits of the new approach:

We pay only for actual working time, which allows us to effectively manage our budget and reduce costs.

We easily adapt to changing loads, ensuring high productivity and reliability.

We continue to deliver ready-made showcases to Clickhouse, which allows us to maintain existing processes for analysts and avoid significant changes.

Conclusion

The transition to a new architecture turned out to be successful and brought significant cost savings, improved flexibility and scalability. Our experience shows that such changes can be less difficult than expected and bring significant benefits.

At the moment, we are preparing the infrastructure for the implementation of horizontally scalable Spark and support for Hive Metastore, which we will try to talk about in the next article.


Well, that's all!) Thank you for your attention and I'll be glad to hear your opinion) Share your experience, ask questions and tell us how you cope with such tasks)

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *