Implementing TSQL Triggers in Python

In the previous article, I talked about the general structure of the project, about Kafka working with CDC to get data from the database. Now it's time to talk about the implementation of triggers in Python. As was said in the previous article, we will implement only Before triggers (Instead Of will remain unchanged in the database). So, what do we need to consider during development?

  1. Each trigger will be launched as a separate Deployment in K8s, so it is necessary to provide a convenient way to launch triggers.

  2. One trigger can only process one topic from Kafka.

  3. Each trigger should have the ability to fine-tune filters based on the data received from Kafka.

Reading Kafka

First, we need to implement reading Kafka topics. At this point, we know that one topic is data from one table, and one trigger can only process data from one topic. Thus, we have come to the implementation of ServiceLocator. Only we will implement this pattern through decorators.

@SubscribeKafkaTopik('Sales')
class TrSalesUpdate(ABCTrigger):

Thus, when we launch our service, we will immediately receive a topic that needs to be listened to.

class MetaTriggers(type):
    def __getitem__(cls, trigger_name):
        return cls.__triggets__[trigger_name]

class SubscribeKafkaTopik(metaclass=MetaTriggers):
    __triggets__ = {}
    topok_name = None

    def __new__(cls, topik):
        if not hasattr(cls, 'instance'):
            cls.instance = super(SubscribeKafkaTopik, cls).__new__(cls)
            cls.instance.topok_name = topik
        return cls.instance


    def __call__(self, cls):
        if cls.__name__ not in self.__triggets__:
            self.__triggets__[cls.__name__] = cls(self.topok_name)
        return cls


    def __init__(self, topik):
        self.topok_name = topik

    @classmethod
    def print(cls):
        print(cls.__triggets__)

    @classmethod
    def get(cls, trigger_name):
        if trigger_name not in cls.__triggets__: return None
        return cls.__triggets__.get(trigger_name)

Now, how do we launch the topic we need? Considering that we have implemented the registration of each trigger and attached the trigger-topic mapping, we only need to implement the receipt of the class name of the trigger we want to launch via arguments. We did this via ArgumentParser.

parser.add_argument('--trigger', help='Запускаемый триггер', default=os.getenv('TRIGGER_CLS'))

Launch example:

python main.py --trigger TrSalesUpdate

Next, we get the trigger name and, based on the trigger name, we start listening to the Kafka topic specified in the SubscribeKafkaTopic decorator.

 SubscribeKafkaTopik[args.trigger].listen()

What's going on here?

First, we get from SubscribeKafkaTopic a class that was registered as a trigger using the @SubscribeKafkaTopic decorator. Since we specified the –trigger TrSalesUpdate parameter when starting, then at the SubscribeKafkaTopic stage[args.trigger] we will get the TrSalesUpdate class back. But where does the listen() method come from and what does it do? Everything is pretty simple here too. Our TrSalesUpdate class, and all other classes that are triggers, are inherited from the ABCTrigger base class.

class ABCTrigger(ABC):


    def __init__(self, topik_name = None):
        if topik_name:
            self.consumer = KafkaConsumer(
                topik_name,
                group_id=self.__class__.__name__,
                api_version=(0,10),
                bootstrap_servers=",".join(credentials['kafka']['bootstrap_servers']),
                auto_offset_reset="latest",
                value_deserializer=lambda x: loads(x.decode('utf-8')) if x is not None else None,
            )
            self.consumer.poll(timeout_ms=10000)

    @abstractmethod
    def call(self, message, key = None):
        ...


    def listen(self):
        print(f"Start Listen kafka {self.__class__.__name__}")
        for message in self.consumer:
            if message is None: continue
            Thread(target=self.call, args=(message.value, message.key)).start()

This class has a listen() method that starts listening to a Kafka topic. So, the SubscribeKafkaTopic construct[args.trigger].listen() starts receiving messages from the specified topic.

Second, after receiving messages from the specified Kafka topic, the message is passed to the call method.

Thread(target=self.call, args=(message.value, message.key)).start()

Thus, each trigger must have an implementation of the call method. In order to implement the message processing logic for each individual trigger

def call(self, message, key = None): ...

Now we need to look at how to handle different types of events so that our trigger only fires when the actions we need happen.

Event filters

As mentioned earlier, the following events are available to us:

  • r – read – the operation of reading data from the table. Occurs when Kafka-Connect connects to the table.

  • c – create – operation to create a record, similar to insert

  • u – update – record update operation

  • d – delete – operation to delete a record

Knowing these event types, we need to make sure that our trigger is executed in the future when one or more events occur.

For example, we have a trigger to update the invoice price after an item in the invoice has been added, changed or deleted. Accordingly, such a trigger should be triggered by the following events: c, u, d.

Or another example: we need to add the cost price of this product to the table with products in the invoice at the moment of adding the product to the invoice. Such a trigger should work only at the creation event (insert), that is, with the c type.

We decided to implement such an event filter using decorators.

@FilterActionType('u', 'c')
def call(self, message, key = None):
	...

Thus, if the event type does not match the type that was passed to the decorator, the call method will not work.

Hidden text
class FilterActionType:
    def __init__(self, *actions):
        self.actions = actions

    def __call__(self, fn):
        def call_func(*args, **kwargs):
            if args[1]['payload']['op'] in self.actions: return fn(*args, **kwargs)
            return False
        return call_func

But that's not all. What if the trigger should only fire if a certain field or a list of certain fields has changed? And only if the fields we need are affected by the change, execute the trigger. Let's take the same example with updating the invoice price when the product on the invoice changes. We only need to change the invoice price if the quantity of the product has changed or the price of the product has changed. But we don't need the trigger to fire when, for example, the description of this product on the invoice has changed.

We also did this through the decorator.

@FilterActionType('u')
@FilterUpdatedRow('Price', 'Quantity')
def call(self, message, key = None): ...

Thus, if the condition does not match at least one of the decorators, the method will not be executed.

Hidden text
class FilterUpdatedRow:
    def __init__(self, *columns):
        self.columns = columns

    def __call__(self, fn):
        def call_func(*args, **kwargs):
            for column in self.columns:
                if args[1]['payload']['before'][column] != args[1]['payload']['after'][column]:
                    return fn(*args, **kwargs)
            return False
        return call_func

Now let's imagine the following situation. We have a shipping document that is closed, transferred to the last stage, for example, with the “Closed” type. In this case, we need to specify the date when this document was closed. It is important to take into account that the date should be specified only for those documents that were closed only in warehouse A. Thus, we need to implement a trigger that will work under the following conditions:

  1. Trigger only on update event

  2. Trigger only for changing the Status field

  3. The trigger is only on the change of the Status field to a value equal to Closed and the Warehouse must be equal to the value A

What do we have now? We have two filters that will help us implement the first two points. But what to do with the third one? That's right, implement it!

What do we see from the technical specifications? We need to implement a filter by the value of the rows. This is quite simple, since we receive the after and before values ​​from Kafka, and we only need to check whether the value of the specified field (in our case, Status=”Closed” and Store=”A”) is equal to the value in the after block. And here we can go two ways.

  1. Implement a decorator separately for each field. That is, we will pass a check for the value of each field to each separate decorator. And this will work, because if at least one of the decorators does not work, the call method will not be executed. This fits the logical “AND”. However, it is worth considering the option that the TOR may contain “OR”, and then this option will not work for us.

  2. Implement a decorator that accepts a filtering type (or or and) and filters the data based on that. And we decided to choose this path. Not clear? Let's look at the implementation now, and everything will become clearer.

First, let's see how this filtering will look in the trigger:

@FilterActionType('u')
@FilterUpdatedRow('Status')
@FilterRowData(
      and_(
          [
              lambda record: record['after']['Status'] == 'Закрыто',
              lambda record: record['after']['Store'] == 'A',
          ]
      )
 )
 def call(self, message, key = None): ...

In @FilterActionType('u') we check that the event type is update

In @FilterUpdatedRow('Status') we check that during the update it was the value of the Status field that was changed.

In @FilterRowData we implement the check of our values. Since the task needs to check that Status=”Closed” and Store=”A”, we implemented the check of these conditions through and_, to which we pass lambda methods for data check. Thus, we can implement any logic for checking the data that we receive from Kafka.

Hidden text
class BaseFunc:
    def __init__(self, filters = []):
        self.filters = filters
Hidden text
class and_(BaseFunc):


    def __call__(self, data):
        if not self.filters: return True
        result_filters = []
        for idx, filter_ in enumerate(self.filters):
            if (idx >= 2) & (sum(result_filters) != idx): return False
            result_filters.append(int(filter_(data)))

        return sum(result_filters) == len(self.filters)

class or_(BaseFunc):


    def __call__(self, data):
        if not self.filters: return True
        for filter_ in self.filters:
            if filter_(data): return True

        return False

The check is launched in the FilterRowData decorator. And if the filter we passed to the constructor (in this case and_) returns False, then the call method will not work.

class FilterRowData:
    def __init__(self, filter_func: BaseFunc = None):
        self.filter_func = filter_func

    def __call__(self, fn):
        def call_func(*args, **kwargs):
            if self.filter_func is None: return fn(*args, **kwargs)
            if self.filter_func(args[1]['payload']): return fn(*args, **kwargs)
            return False
        return call_func

Results

Thus, we have implemented the main filters for the data that we receive from Kafka topics, and in each of the launched triggers, we can fine-tune the filters for the data that we expect to receive and with which we will work in this class. At the same time, we have implemented a fairly simple registration of triggers, which makes it easy to launch the required handler.

However, there are still tasks in the backlog. For example, we need to make sure that one record is processed by a trigger only once. That is, if a record with the same identifier is updated several times, then the trigger for this record should only work once. Or implement reading from the slave database and writing to the master database to unload the working database, which we will work with the most.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *