How to test in Databricks: Nutter Framework

Disclaimer

The article assumes that you know what Databricks is, what laptops, clusters and jobs (workflows) are in it. Habr search offers this article for review.

Introduction

I continue a series of articles where I analyze my current project in the field of BigData. The first article was devoted to hiring pythonists, the second – to the problems of introducing new processes in distributed teams. However, in addition to purely managerial tasks, the project has a number of technical difficulties. One such challenge is testing the data processing platform.

If it is more or less clear with testing more familiar software products, then there are many questions with BigData. If you have Java – you have at least JUnit, and the vast majority of frameworks care about ease of testing. For example, Spring dedicates a lot to this lots of documentation. Front-end testing is also well developed: from selenium before JestJs. Testing blockchain and smart contracts is a pleasure (at least on the Ethereum network thanks to Truffle Suite)

Python also has its own testing frameworks and this issue has been worked out quite well. Even Databricks itself, upon which our data processing platform is built, offers its own ways for testing. For example, here is a good official testing guide: Unit testing for notebooks. However, you need to have a repository inside Databricks itself. And our code is stored in the corporate GitLab, which is not available from our own Databricks. Very inconvenient, but the security department does not give permission to configure access from an external resource to the internal network.

It is possible to test directly inside Databricks laptops: Test Databricks notebooks. Of the minuses: complexity in organizing code, truncated import of notebooks inside Databricks.

We made several attempts to test our platform this way and that, but everything came out very “cycling”. And then we stumbled and decided to try Nutter Framework. What he promised was very suitable for our needs. Respect to Microsoft for such a tool in opensource.

Nutter: a very short guide

The main goal of this framework is to enable you to easily and quickly test laptops in Databricks. Nutter offers a specific approach to writing tests and several approaches to executing them.

The easiest test

First, to start working with tests, you need to install the nutter library on the cluster where the test notebooks will run:

Next, create a notebook, import the NutterFixture base class, and start writing a test class:

from runtime.nutterfixture import NutterFixture, tag
class FirstTestFixture(NutterFixture):
    def run_test(self):
        dbutils.notebook.run('notebook_to_test', 600)
    def assertion_test(self):
        assert True
    def run_test_secundo(self):
        dbutils.notebook.run('another_notebook_to_test', 600)
    def assertion_test_secundo(self):
        assert True

To execute and see the results of the tests, we run the test class itself:

result = FirstTestFixture().execute_tests()
print(result.to_string())

Tests can be run via the command line (see below for how to do this). And to return the results of running tests in nutter-cli, you need to execute at the end of the notebook:

result.exit(dbutils)

However, there is an unpleasant limitation: inside result.exit causes dbutils.notebook.exit() which causes Databricks to hide all output from commands print . Therefore, if you run tests directly on laptops, the line with exit needs to be commented out.

Test naming conventions

There can be several tests within one test class. Each test consists of 1 mandatory method and 3 optional ones:

  • before_(testname) – perform before run Used to set up tests and perform preparatory actions. Not required.

  • run_(testname) – performed after before (if available) or first. There should be actions that are directly tested, such as calling a laptop. Not required.

  • assertion_(testname) – performed after run (if he is). Contains state checks. Can be used assert from any Python test libraries. Each test class must contain at least 1 assertion method.

  • after_(testname) – performed after assertion. Usually used to reset the state of test objects to their original state or to “clean up” something after a test has been run.

Example: Methods run_checkpoint_location_generation And assertion_checkpoint_location_generation will be treated as 1 test case.

Additionally, there are two methods:

They are executed respectively before and after all tests. If you want to use multiple assertion for 1 test case, then you need to use before_all both for preparing the test and for invoking the actions under test.

from runtime.nutterfixture import NutterFixture, tag
class MultiTestFixture(NutterFixture):
  def before_all(self):
    dbutils.notebook.run('notebook_under_test', 600, args) 
    #...
  def assertion_test_case_1(self):
    #...
  def assertion_test_case_2(self):
    #...
  def after_all(self):
    #... 

Nutter guarantees that tests will be run in alphabetical order based on the test case name.

Nutter also allows you to save state and pass data between test cases through constructor parameters:

 class TestFixture(NutterFixture):
  def __init__(self):
    self.file="/data/myfile"
    NutterFixture.__init__(self)

Running Tests in Parallel

When there are a lot of tests, you want to run them in several threads and reduce the execution time. The latest version of Nutter (0.1.35) allows you to do this with NutterFixtureParallelRunner :

from runtime.runner import NutterFixtureParallelRunner
#...
parallel_runner = NutterFixtureParallelRunner(num_of_workers=2)
parallel_runner.add_test_fixture(FirstTestFixture())
parallel_runner.add_test_fixture(AnotherTestFixture())
result = parallel_runner.execute()
print(result.to_string())

Test results are combined and displayed in a convenient way:

Notebook: N/A - Lifecycle State: N/A, Result: N/A
Run Page URL: N/A
============================================================
PASSING TESTS
------------------------------------------------------------
test_another_case (40.98681362399998 seconds)
test (40.99089991400001 seconds)
test_secundo (10.680228194999927 seconds)


============================================================

Nutter CLI

One of Nutter’s main features is the ability to run tests from the command line.

First, install Nutter via pip:

$ pip install nutter

Then we set 2 environment variables to give Databricks access:

linux

export DATABRICKS_HOST=<HOST>
export DATABRICKS_TOKEN=<TOKEN>

Windows PowerShell

$env:DATABRICKS_HOST="HOST"
$env:DATABRICKS_TOKEN="TOKEN"

To begin with, you can show which tests can be executed:

 $ nutter list /common/test/nutter

We get something like this:

Nutter Version 0.1.35
++++++++++++++++++++++++++++++++++++++++++++++++++

--> Looking for tests in /common/test/nutter
--> 3 tests found

Tests Found
-------------------------------------------------------
Name:   test_StreamDeltaTransformation_multi
Path:   /common/test/nutter/test_StreamDeltaTransformation_multi

Name:   test_RefineryFromJsonMerge_single
Path:   /common/test/nutter/test_RefineryFromJsonMerge_single

Name:   test_StreamDeltaTransformation_single
Path:   /common/test/nutter/test_StreamDeltaTransformation_single

-------------------------------------------------------

Total: 3

Now let’s run some specific test:

 $ nutter run /common/test/nutter/test_StreamDeltaTransformation_single --cluster_id '0000-099999-abcdabcd'

This command runs a specific test class on a specific cluster (the nutter library must be installed on the cluster)

To run all test classes, you can use the following command

$ nutter run /common/test/nutter/ --cluster_id 0123-12334-tonedabc --recursive 

Test names must start with test_

Flag --recursivesearches for test classes recursively in all subdirectories.

You can pass some parameters through the option --notebook_params:

$ nutter run /common/test/nutter/* --cluster_id 0123-12334-tonedabc --notebook_params "{\"example_key_1\": \"example_value_1\", \"example_key_2\": \"example_value_2\"}"

An example of a real test class

Thanks to @raukasky for writing concrete test classes for platform services.

# команда ниже импортирует длинные yaml-конфиги, которые отправляются на вход нашим сервисам
%run ../configs/stream_delta_transformation_test_cases
#...
from runtime.runner import NutterFixtureParallelRunner
from runtime.nutterfixture import NutterFixture
from cds_utils.platform import ConfigRunner # это класс, который может запускать сервисы платформы
#...
class TestCase_01(NutterFixture):
    def __init__(self):
        self.test_uc_path="dataservices_nonprod.test_nonprod"
        self.s3_path="s3://super_bucket.data"
        NutterFixture.__init__(self)
    def before_all(self):
        # так как у нас много проверок, то мы выполняем код в этом методе
        ConfigRunner(dbutils, spark).run_yaml_config(param_yaml=test_case_01, notebook_path="/../../../cds_platform/stream_delta_transformation")
    def assertion_isExistTable(self):
        # проверяем, что сервис вообще создал таблицу
        assert(spark.catalog.tableExists(f'{self.test_uc_path}.tindata_expected')==True)
    def assertion_table_location(self):
        # проверяем, что таблица созадан в нужном месте (как external)
        location = sql(f"describe detail {self.test_uc_path}.tindata_expected").select('location').collect()[0]['location']
        assert(location == f'{self.s3_path}/test/hive/test_nonprod/tindata_expected')
    def assertion_checkpoint_location(self):
        # проверяем, что после работы сервиса чекпоинт есть, и он в правильном месте
        location = [file.path for file in dbutils.fs.ls(f'{self.s3_path}/test/checkpoint/test_nonprod/tindata_expected')][0]
        assert(location == f'{self.s3_path}/test/checkpoint/test_nonprod/tindata_expected/tindata/')
    def assertion_partitions(self):
        # проверяем, что конфиг прочитался правильно и таблица имеют верные партиции (как указано в yaml-конфиге)
        partitions = sql(f"describe detail dataservices_nonprod.test_nonprod.tindataxref").select('partitionColumns').collect()[0]['partitionColumns']
        assert(partitions == ['data_source_date'])
    def assertion_dedupe_column(self):
        # проверем, что дедупликация входных данных по первичному ключу прошла успешно
        # и что количество записей в целевой таблице ожидаемо меньше, чем в исходной
        num_of_change = spark.sql(f"select * from {self.test_uc_path}.tindataxref where source_id = 27 and data_source_ts="2022-11-04T00:00:11.129+0000"").count()
        count_source = spark.sql(f"select * from {self.test_uc_path}.tindataxref").count()
        count_target = spark.sql(f"select * from {self.test_uc_path}.tindataxref_expected").count()
        assert(count_target == count_source-(num_of_change-1))  
    def after_all(self):
        # чистим за собой таблицы и директории с данными и чекпоинтами
        spark.sql(f'drop table {self.test_uc_path}.tindata_expected')
        dbutils.fs.rm(f'{self.s3_path}/test/hive/test_nonprod/tindata_expected', True)
        dbutils.fs.rm(f'{self.s3_path}/test/checkpoint/test_nonprod/tindata_expected', True)

Such a test class can be attributed to the level of integration testing: service stream_delta_transformation tested in an environment that is as close to the real one as possible. We do not implement mocks or “spy” objects, checks are made on actually created tables and data. However, we can test some of the methods inside the service against the results that they produce at the end of the service. For example, we check that the target tables are created, that they are in the correct place (because all tables are external), and so on. In addition to checking the operation of specific methods, we can read the data from the target table and check that the data transformation was correct.

While we are running this manually from the command line, this is already a huge step forward in order to provide integration testing. And since this is quite a normal python code, this opens up the possibility of checking everything that the services do. For example, it is quite realistic to write a test to check the publication of data from Delta Lake to MySql, because in the test class we can call MySql directly and check what happened there.

conclusions

So, Nutter Framework allowed our team to:

  • perform integration testing of services written as notebooks in Databricks

  • run tests both directly on laptops in Databricks and from the command line

  • write a set of regression tests for individual service methods. Not as convenient as pure unit tests, but much better than nothing. Regression testing time has been significantly reduced

In the future, we plan to implement the launch of tests in the process of building and deploying the platform, directly in GitLab CI / CD. How to do it for Azure is in the official documentation.

For our project, Nutter was a great choice and a very handy testing tool.

At the end of this article I would like to recommend you useful webinar from OTUS, where they will talk about the profession of a test automation engineer, about current technologies, the benefits of using automated tests, as well as about the necessary skills and features of interviews.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *