Data aggregation over time with Kafka Streams

What are we talking about

At FunBox we we make products for mobile operators: various portals, geoservices, payments, mobile advertising and much more. One of our projects is built on a microservice architecture. The main functionality is related to the processing of event streams, so we chose an event-driven architecture. We use Apache Kafka to organize a centralized, scalable and fast messaging mechanism. This is a popular platform for solving such problems, plus it has an interface for organizing streaming event processing.

In the process of implementing a call analytics service, we had the task of aggregating several data streams over time. When User 1 calling User 2, two external services emit JSON events:

  • {“user1”: id1, “user2”: id2, “timestamp”: ts1, “analytics”: data1}

  • {“user1”: id1, “user2”: id2, “timestamp”: ts2, “metrics”: data2}

Our application receives them in the corresponding Kafka topics: analyticsTopic and metricsTopic. Note that the time of events ts1 and ts2 may not coincide, their difference is allowed for no more than T seconds, where T is a given constant.

It is known from call statistics that in most cases the time between two consecutive calls for one pair of users is at least 2*T. But sometimes some event may not get into Kafka for reasons beyond our control. The streaming data processing service being developed should find pairs of events corresponding to one call and issue to the topic outputTopic aggregated JSON events of the form {“user1”: id1, “user2”: id2, “ts”: ts, “analytics”: data1, “metrics”: data2}, while the time ts = min(ts1, ts2).

Basic concepts of Kafka Streams: streams and processors

To solve such problems, we use Kafka Streams. First, let’s look at the basic concepts.

Kafka Streams is a client library for data streaming in Kafka. It provides a high-level Streams DSL and a low-level Processor API. The central concept of the Kafka Streams library is the topology of the network of processors. It is a graph consisting of stream processors (nodes) and threads (edges). The application uses topologies to transform data and calculations.

https://kafka.apache.org/30/images/streams-architecture-topology.jpg
https://kafka.apache.org/30/images/streams-architecture-topology.jpg

Flow – an endless, constantly updated set of data. It is an ordered, fault-tolerant sequence of immutable records that can be reread. Each entry is defined by a key-value pair.

stream processor is a single data processing step, i.e. receiving a record from the previous processor, performing some operation, and possibly issuing one or more records to subsequent processors. There are two special processors: source and sink. The source has no predecessor processors, it receives records from Kafka topics and passes them to subsequent processors. The absorber does not have subsequent processors, it only passes records received from previous processors to the Kafka topic. A detailed description can be found at project website.

Building a topology

Our solution covers several steps:

  • receiving and grouping incoming events by a pair of subscribers;

  • combining events corresponding to one call;

  • screening out intermediate events and getting the result.

Let’s consider each in more detail.

Getting input

Kafka Streams requires you to specify a special class to serialize and deserialize the data you are working with. In our case, to convert JSON to Java objects and vice versa. The library provides such classes for basic types, and to process JSON data, we need to add our own, whose work is based on ObjectMapper-JSONSerde. Details in documentation.

Incoming events will be converted from JSON to Java class objects AnalyticsEvent and MetricsEvent.

public class AnalyticsEvent implements JSONSerdeCompatible {
  public String user1;
  public String user2;
  public Long timestamp;
  public String analytics;
  ...
}

public class MetricsEvent implements JSONSerdeCompatible {
  public String user1;
  public String user2;
  public Long timestamp;
  public String metrics;
  ...
}

Keep in mind that the order in which incoming events appear in a Kafka topic may differ from the order in which they are generated, so when processing, you need to use the time from the event body (timestamp). To make it happen, when creating the source (source) a class is passed that implements the interface for extracting time from the event body.

public class AnalyticsEventTimestampExtractor implements TimestampExtractor {
  @Override
  public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
    long ts;
    Long event_ts = ((AnalyticsEvent)record.value()).timestamp;
    if (event_ts == null) {
      ts = record.timestamp();
    } else {
      ts = event_ts;
    }
    return ts;
  }
}

When creating input data streams, events are grouped by a new key containing a pair of subscribers user 1 and User 2. Thanks to this, all further transformations will be independently performed for events related to different pairs of subscribers.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, AnalyticsEvent> analyticsStream = builder
  .stream(
    analyticsTopicName,
    Consumed.with(Serdes.String(), new JSONSerde<AnalyticsEvent>(), new AnalyticsEventTimestampExtractor(), Topology.AutoOffsetReset.LATEST)
  )
  .selectKey((k, v) -> String.format("%s-%s", v.user1, v.user2));
KStream<String, MetricsEvent> metricsStream = builder
  .stream(
    metricsTopicName,
    Consumed.with(Serdes.String(), new JSONSerde<MetricsEvent>(), new MetricsEventTimestampExtractor(), Topology.AutoOffsetReset.LATEST)
  )
.selectKey((k, v) -> String.format("%s-%s", v.user1, v.user2));

Combining events from two threads

Now it is possible to combine data from two streams using outerJoin, similar to the operation of the same name in relational databases. For temporaryOSliding windows are used to group multiple events. We set the window width to T (1 minute). You should also indicate grace-an interval (3 minutes) that defines a real-time window closing delay to wait for delayed events. For example, an event has a timestamp that falls into this window, but it is written to the topic with some delay. If such an event falls within grace-interval, it will be taken into account in the considered window.
For each pair of events that entered the window during execution outerJoin, the union function specified by the developer is called. In our case, it returns the combined event of the class CompoundEvent.

public class CompoundEvent implements JSONSerdeCompatible {
  public String user1;
  public String user2;
  public Long timestamp;
  public String analytics;
  public String metrics;

  public CompoundEvent merge(CompoundEvent other) {
    if (user1 == null && other.user1 != null) {
      user1 = other.user1;
    }
    if (user2 == null && other.user2 != null) {
      user2 = other.user2;
    }
    if (other.timestamp != null) {
      if (timestamp == null) {
        timestamp = other.timestamp;
      } else {
        timestamp = Math.min(timestamp, other.timestamp);
      }
    }
    if (analytics == null && other.analytics != null) {
      analytics = other.analytics;
    }
    if (metrics == null && other.metrics != null) {
      metrics = other.metrics;
    }
    return this;
  }

  public static CompoundEvent create(AnalyticsEvent analyticsEvent, MetricsEvent metricsEvent) {
    CompoundEvent compoundEvent = new CompoundEvent();
    if (analyticsEvent != null) {
      compoundEvent.user1 = analyticsEvent.user1;
      compoundEvent.user2 = analyticsEvent.user2;
      compoundEvent.analytics = analyticsEvent.analytics;
      compoundEvent.timestamp = analyticsEvent.timestamp;
    }
    if (metricsEvent != null) {
      compoundEvent.metrics = metricsEvent.metrics;
      if (compoundEvent.user1 == null) {
        compoundEvent.user1 = metricsEvent.user1;
      }
      if (compoundEvent.user2 == null) {
        compoundEvent.user2 = metricsEvent.user2;
      }
      if (compoundEvent.timestamp == null) {
        compoundEvent.timestamp = metricsEvent.timestamp;
      } else {
        compoundEvent.timestamp = Math.min(compoundEvent.timestamp, metricsEvent.timestamp);
      }
    }
    return compoundEvent;
  }
}

This is how the primary stream of united events is formed.

KStream<String, CompoundEvent> compoundRawStream = analyticsStream
  .outerJoin(
    metricsStream,
    (analytics, metrics) -> CompoundEvent.create(analytics, metrics),
    JoinWindows.of(Duration.ofMinutes(1L)).grace(Duration.ofMinutes(3L))
);

The primary merged data stream contains two events for each pair of original data. In this case, the first of them is obtained on the basis of one initial, and the second – on the basis of a pair of initial ones. In the first case, the union function is called with arguments (event1, null), in the second (event1, event2). The next step is to leave only the second merged event.

But it is possible that we received only one source event for the call. Then when executing outerJoin only one combined event will be received, based on one original, which we need to leave. It turns out that we cannot always leave only the combined events containing the data of the original pair. To solve this problem, the session window is used in the next step.

Screening out unnecessary events using the session window and suppression

The name “session window” is associated with the concept of a user’s session, for example, when interacting with a website. It is created for some event in the thread and is closed when the time interval before the arrival of the next event exceeds the idle time – by analogy with the user idle time. Then another session window opens.

https://kafka.apache.org/30/images/streams-session-windows-02.png
https://kafka.apache.org/30/images/streams-session-windows-02.png

The session window action mechanism allows you to group the events received in the previous step by belonging to the original event. It is assumed that the initial events are separated in time by an interval greater than T, so the idle time for closing the session window is chosen equal to T. When the combined events are accumulated in the session window, simple aggregation is performed: the last of the events received in the session window is taken as the current one. Kafka Streams are designed so that each call to the aggregate function generates an output event. To weed out all generated events, except for the last one, suppression is applied (suppression). In the resulting event, the original key is restored and the window operation is complete.

KStream<String, CompoundEvent> compoundStream = compoundRawStream
  .groupByKey()
  .windowedBy(SessionWindows.with(Duration.ofMinutes(1L)).grace(Duration.ZERO))
  .reduce(
    (aggCompound, nextCompound) -> aggCompound.merge(nextCompound),
    Materialized.with(Serdes.String(), new JSONSerde<CompoundEvent>())
  )
  .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
  .toStream((k, v) -> k.key());

At the output, we get one combined event corresponding to the original call. It goes to the topic with the results outputTopic.

Unexpected turn

While testing the topology, an unexpected problem was discovered. After sending incoming events to the appropriate topics, there was nothing in the output. Debugging and testing a Kafka Streams application is an extensive topic worthy of a separate article series, so here we will consider the results.

It turned out that this is a known feature of the work of time windows. It lies in the fact that the processing of events in them is not based on the system time (wall clock time), but on the time of the flow of events (stream time), which is only promoted when new events arrive in a specific topic and partition. In our case, this leads to the need to receive a new event for the corresponding pair of subscribers. But waiting for the next call between a pair of users to get data about the previous one is unacceptable. A suitable solution was the generation of empty service events located in time just outside the session window. They are not taken into account when merging data and are used only to update the time in the event stream. Thus, the session window is forcibly closed, and we get the resulting output event.

Sending empty events

To implement the described mechanism, the capabilities of the high-level Streams DSL are no longer enough, so it was necessary to use the low-level Processor API. The solution has two parts:

  1. Upon receipt of each CompoundEvent a timestamp offset by T+grace+1 seconds is placed in a separate storage. The key of the entry is the same as the key of the received event.

  2. Runs a periodic task that runs according to the system clock (wall clock). It retrieves records from the storage whose time is less than the current one, generates and emits empty events to the stream with the corresponding timestamp.

public class WindowCloserTransformer implements Transformer<String, CompoundEvent, KeyValue<String, CompoundEvent>> {
  private ProcessorContext context;
  private KeyValueStore kvStore;
  private String storeName;
  private Long delayMs;

  public WindowCloserTransformer(String storeName, Long delayMs) {
    this.storeName = storeName;
    this.delayMs = delayMs;
  }
  
  @Override
  public void init(ProcessorContext context) {
    this.context = context;
    this.kvStore = context.getStateStore(this.storeName);
    context.schedule(Duration.ofSeconds(10), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
      try (final KeyValueIterator<String, Long> iter = kvStore.all()) {
        Set<String> toDelete = new HashSet<String>();
        while (iter.hasNext()) {
          final KeyValue<String, Long> entry = iter.next();
          if (entry.value <= timestamp) {
            context.forward(entry.key, CompoundEvent.createDummy(entry.value), To.all().withTimestamp(entry.value));
            toDelete.add(entry.key);
          }
        }
        for (String k : toDelete) {
          kvStore.delete(k);
        }
      }
    });
  }

  @Override
  public KeyValue<String, CompoundEvent> transform(String key, CompoundEvent event) {
    if (!event.dummy) {
      if (event.timestamp != null) {
        kvStore.put(key, event.timestamp + delayMs);
      }
      return KeyValue.pair(key, event);
    }
    return null;
  }

  @Override
  public void close() {
  }
}

It remains to include the implemented processor in our topology and add filtering for empty events.

KStream<String, CompoundEvent> compoundRawStream = ...
KStream<String, CompoundEvent> compoundStreamWithDummy = compoundRawStream
  .transform(new TransformerSupplier() {
    public Transformer<String, CompoundEvent, KeyValue<String, CompoundEvent>> get() {
      return new WindowCloserTransformer("dummyPushStore", (1+3)*60000L);
    }
    public Set<StoreBuilder<?>> stores() {
      StoreBuilder<KeyValueStore<String, Long>> keyValueStoreBuilder =
        Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("dummyPushStore"), Serdes.String(), Serdes.Long());
      return Collections.singleton(keyValueStoreBuilder);
    }
  });
KStream<String, CompoundEvent> compoundStream = ...

Now the time windows close on time and we get the expected events.

Results

As a result, we were able to create the necessary topology of the network of processors, which allows aggregating two data streams in time. It remains only to configure and run the Java application.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *