Testing data migration to python with pytest-bdd and testcontainers

As part of a digital modernization project, one of our clients faced the challenge of migrating data from one storage model to another. To test such a solution, we turned to the practices of BDD (Behavior Driven Development) and dependency virtualization using docker containers. This post describes a recipe for how you can organize testing of such a solution using pytest-bdd and test containers in python. All source code is available at link.

About infrastructure

Infrastructure
Infrastructure

As you can see from the diagram above, the data source lives in Azure. Migration involves transferring (copying) the actual data from the database to the Google Cloud service Datastore. The API is deployed via Cloud Build how Cloud Run service.

What are we migrating?

During the analysis of the scheme, we identified a number of domains that are subject to migration to the new repository. Each domain is represented by a set of tables and the data does not always need to be in the target storage in its entirety: sometimes these are entire tables, and sometimes a certain selection of attributes from it.

In our example, the Organizations domain will be used. Information about organizations is scattered in the following tables:

  • Orgs (Org_ID, Org_Name…) – general information about organizations

  • DeOrgs (Org_ID, User_ID, DeOrg_TS) – remote organizations

  • OrgIPValidate (Org_ID) – whether IP authentication is enabled for the organization

  • OrgIPs (Org_ID, Org_IP) – whitelist of IP addresses associated with the organization

In the Google Datastore, we will store all this in one Organizations entity. Also, it should be noted that IP addresses in their original form are integer values, but in the Datastore they will be stored in the IP address format – ABCD

Mapping tables to an entity
Mapping tables to an entity

It can be estimated that the development of such a solution comes down to writing SQL queries of various levels of complexity, some additional transformations (in the case of an IP address) and correct display of data.

How can this be tested?

One option would be to cover the smoke migration with tests. To do this, it would be necessary to deploy a new version of the API for each change in the SQL query or other logic, run these tests, and, in case of a crash, look for the reasons in the logs. From the infrastructure of the client in Azure (VPN + MSSQL) it would be possible to get rid of using Google Cloud SQL. But, in any case, this feedback loop is not what we would like due to its complete dependence on the Cloud provider, which brings with it:

  • deployment time

  • additional infrastructure code support resources

  • manual scanning of the log for the reason for the failure of tests

The Cloud dependency problem can be solved by deploying setup locally. Basically, we have two key components here – MSSQL and Google Datastore. Let’s see how to run these things locally.

local setup

Library comes to the rescue test containers. This is a small library that allows you to run docker containers for testing. The composition already includes a set of some ready-made wrapper classes for popular solutions. And there is also a convenient API for your own needs. Let’s take a look at the following code:

@fixture(scope="session", autouse=True)
def mssql_connection() -> Connection:
    with SqlServerContainer(
        "mcr.microsoft.com/mssql/server:2017-latest"
    ) as mssql:
        engine = create_engine(mssql.get_connection_url())
        db.schema.metadata.create_all(engine)
        yield engine.connect()

Here, we are preparing a container with MSSQL. Note that the fixture uses scope=session. By default, pytest uses scope=function , which means that a function is called on every test. This could be used, but running a container for each test is very wasteful. Although, in this case, you don’t have to worry about isolating the test data from each other, because each time we will have a fresh container with no data. But since we are interested in reducing the execution time of all tests (and the more there are, the more sensitive it is), we will use session. As a result, the container will be created only once before all tests are run. But in this case, the problem of data isolation arises. We are not interested in leaving the data after the test is completed, so we will solve this problem by deleting it – remember the tables that were filled before the test was launched and clear it after.

For the Datastore container, write your class. I note that Google provides a set of emulators for some of their services. The list of supported services includes Datastore. Our class is based on image by google with all necessary dependencies. Let’s prepare the container:

@fixture(scope="session", autouse=True)
def gds_client():
    with DatastoreContainer() as ds:
        yield ds.get_client()

Thus, before running the tests, we have two containers ready to go. You can start testing.

Testing

Consider the approach to writing tests. BDD is an evolution of TDD practice where a new concept is introduced behavior. We will not dwell here in detail on the features of this practice, we will note only two important points:

  • we still remain within the Test First paradigm

  • tests become a kind of documentation that non-technical specialists can access (in our project this was important)

Two birds with one stone: testing code and writing documentation.

The approach to implementing behaviors will depend on the choice of a specific library. In general, these approaches are similar. Our choice fell on a pytest plugin that implements a subset of the language Gherkinpytest-bdd. This plugin allows you to describe tests in a human-friendly language using reserved syntax constructs. Given/When/Then and use pytest-runner for start.

Testing our migration implies the following:

  • having a specific dataset in MSSQL (Given data in MSSQL)

  • directly start the migration (When migration triggered)

  • validation of new data in the Datastore (Then check data exists in Datastore)

Consider our feature file with tests:

Feature: Organisations migration path

  Scenario: Test that a selected organisation can be migrated with ip addresses
    Given Table Orgs has {"Org_ID": 45, "Org_Name": "Umbrella" }
    And Table OrgIPValidate has { "Org_ID": 45 }
    And Table OrgIPs has { "Org_ID": 45, "Org_IP": 2128374872 }
    And Table OrgIPs has { "Org_ID": 45, "Org_IP": -2128374872 }
    When the organisations migration is triggered for Org_ID=45
    Then Organisation migrated as { "name": "Umbrella", "legacy_id": 45, "active": true, "ip_addresses": ["168.147.35.129", "88.108.220.126"], "ip_auth_enabled": true }
    And 1 organisation migrated

  Scenario: Test that a selected deleted organisation can be migrated
    Given Table Orgs has {"Org_ID": 47, "Org_Name": "Globex" }
    And Table DeOrgs has { "Org_ID": 47 }
    When the organisations migration is triggered for Org_ID=47
    Then Organisation migrated as { "name": "Globex", "legacy_id": 47, "active": false, "ip_addresses": null, "ip_auth_enabled": false }
    And 1 organisation migrated

Here we have two scenarios:

In the first scenario, we are migrating an organization that has IP authentication enabled and two IP addresses added to the whitelist. This data is scattered across three different tables: Orgs, OrgIPValidate, OrgIPs. In Given design we specify the table name and use the JSON format to describe the data. This is enough to insert a record into the desired table:

@given(parsers.parse("table {table_name} has {payload}"))
def table_insert(
    mssql_connection: Connection, context: Any, table_name: str, payload: str
) -> None:
    obj = json.loads(payload)
    mssql_connection.execute(table_refs[table_name].insert().values(obj))

Fill in the tables and run the migration. After that, we expect the creation of an entity in the Datastore – for this you need to specify the contents of the entity, for the description of which JSON is again used:

@then(parsers.parse("organisation migrated as {payload}"))
def check_organisation_as(gds_client: datastore.Client, payload: str) -> None:
    expected = json.loads(payload)
    q = gds_client.query(
        kind="Organisations", filters=[("legacy_id", "=", expected["legacy_id"])]
    )
    org = list(q.fetch())[0]
    assert dict(org.items()) == expected

As noted above, we want to clean up the data after each test is executed. To do this, you can use the hooks mechanism that is added to pytest-bdd – pytest_bdd_after_scenario, or add the final phrase directly to the script. A matter of taste. We chose the second option, where we additionally check the number of transferred entities before cleaning. The example uses one, but in reality there may be a set of several, so the final construction is parameterized by the number of entities:

@then(parsers.parse("{count:d} organisations migrated"))
@then(parsers.parse("{count:d} organisation migrated"))
def check_organisations(
    mssql_connection: Connection,
    gds_client: datastore.Client,
    context: Any,
    count: int,
) -> None:
    query = gds_client.query(kind="Organisations")
    res = list(query.fetch())
    assert len(res) == count
    cleanup_sql(mssql_connection, context)
    cleanup_datastore()

In the second scenario, we migrate the organization that was deleted. This is recorded in another table – DeOrgs. In the migrated entity, this is controlled through the active property.

Results

This approach allows us to effectively test data migrations, while accompanying the services with the necessary documentation. Ultimately, we got rid of the dependence on the Cloud provider, which will significantly reduce the time spent on running tests and save resources. I hope the article will be useful in solving similar problems.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *