How we simplified working with data using a pipeline: step-by-step plan

Hello my Name Is Ruslan Shkarin. I am a Senior Software Engineer and this is my first article in DIY media VITI from beeline cloud. I’ll tell you how I built a pipeline for collecting and analyzing system logs in real time for a service that crawled hundreds of thousands of web pages and parsed contact information.

Why do we need a pipeline?

The pipeline was needed in order to identify errors and anomalies in the system, as well as record filtered logs in OpenSearch. In real time, with a large amount of data, it is impossible to view the log with error logs with your eyes, so an automated solution is needed. For example, if we configure the analyzer of a downloaded web page to search for a phone number, then in 100 cases the analysis occurs as planned, and then one of the numbers turns out to be written in a non-standard format, and an error appears in the log. It would take a person hours to go through all the records and figure out what the problem is, but the pipeline will find the error in seconds.

Since the task was to regularly collect contact information from web pages, we needed a stable working system. Let's imagine that the analyzer is faulty (for example, an error was missed in the code after the next release). In this case, the script will run idle until the error is detected. Time is lost during which a lot of useful work could be done.

Our system is configured to generate notifications when a specified threshold is reached – the number of errors. I will describe below how this was done.

What services did we use and what did we do?

To analyze logs in real time, we used services from Amazon – CloudWatch, Kinesis Firehose, Lambda, OpenSearch, S3.

I’ll tell you in order what was done and why in that part of the system that dealt with data analysis.

Preliminary configuration of services

We used Kinesis Firehose Delivery Stream, a flexible service from AWS designed to simplify and automate the process of loading streaming data into various AWS services such as Amazon S3, Amazon Redshift, Amazon Elasticsearch Service and, in our case, Amazon OpenSearch Service. Firehose was chosen to solve the problem because of its automatic scaling and ability to process large amounts of data in real time.

CloudWatch. In the Subscription filters section, we created a filter for the logs we needed. Essentially, this is a “folder” that cuts off logs from other systems in the AWS account.

As a filter, we used a pre-created Firehose Delivery Stream, which received logs. Delivery Stream aggregates records for a specified amount of time or until a specified volume is reached – in our case, it was optimal to aggregate logs for 15 minutes.

The math was like this:

  • four servers crawled approximately 200,000 pages per day, that is, 2.3 pages per second;

  • per 1000 pages visited there were an average of 20 errors – that’s 2%;

  • in 900 seconds (15 minutes), the servers crawled 2070 pages (2.3 pages per second × 900);

  • 2% errors on 2070 pages is, roughly speaking, 41 errors in 15 minutes, or 164 errors per hour.

Development of a function for processing logs

Then we created a Lambda function that was responsible for receiving and processing logs aggregated into the Delivery Stream. After processing, the recording went back to the Delivery Stream for subsequent transmission to OpenSearch.

It should be noted here that in the case of placing the record back into the Delivery Stream, an infinite recursion did not occur. When Kinesis Firehose sends data to Lambda for processing, it doesn't just “punch” it back into the same Firehose thread. After the data is processed by the Lambda function, it is returned not to the beginning of the stream, but to the section responsible for delivery to the final destinations – OpenSearch or S3. Firehose understands that the data returned by Lambda has already been processed and does not resend it to Lambda.

Raw log structure

Raw log structure

Example code for a Lambda function (Node.js) that processed logs:

exports.handler = async(event) = >{
  let output = event.records.map((record) = >{
    let entry = Buffer.from(record.data, 'base64').toString('utf8');
    let log = JSON.parse(entry);
    // Проверяем, что уровень лога — 'error'
    if (log.level === 'error') {
      // Создаем новый объект с необходимыми полями
      const errorLog = {
        host: JSON.parse(log.data).host,
        // Парсим JSON-строку из поля data
        status: log.level,
        time: log.time,
        msg: log.msg
      };
      let outputData = Buffer.from(JSON.stringify(errorLog)).toString('base64');
      return {
        recordId: record.recordId,
        result: 'Ok',
        data: outputData,
        // Возвращаем обработанный лог в Firehose
      };
    }
    // Для логов без ошибок возвращаем 'Dropped', чтобы Firehose их не отправлял в OpenSearch
    return {
      recordId: record.recordId,
      result: 'Dropped',
      data: record.data,
    };
  });
  return {
    records: output
  };
};

Collection of logs for analysis and storage

Next, we configured OpenSearch, which accepted records from the Delivery Stream. OpenSearch was used by the quality control department to further analyze the filtered and processed logs. To do this, the data was visualized on dashboards, and alerts were created in case anomalies were detected.

To search for anomalies, we used SQL queries in OpenSearch of the following type:

SELECT host, COUNT(*) as error_count
FROM collector_index
WHERE status="error"
AND time >= now() - INTERVAL 1 HOUR
GROUP BY host
HAVING COUNT(*) > 164

Here 164 is the threshold number of errors per hour that we calculated earlier. If the number of errors exceeded this value, an alert was triggered.

As the project progressed, the number of errors decreased and settings had to be changed.

In addition, placing the analysis functionality into a separate service made it possible to scale and complicate the processing system in the future.

In addition to OpenSearch, we have additionally configured sending logs to the S3 bucket. This is necessary for additional reservation during development, in order to be able to view the history if necessary. We used S3 'One Zone–Infrequent Access' as the storage type to save money.

Conclusion

A distinctive feature of this system is that it can be used to process and analyze any data in real (for example, Kinesis Data Stream + Lambda) or near real (for example, Kinesis Data Stream + Kinesis Data Firehose) time, since Kinesis Data Firehose as input, it can receive data both from Amazon services and from absolutely any other sources using the AWS API or SDK.

Due to the fact that the solution is completely cloud-based, we do not need to manage our own servers, which would take much more time to configure and debug. The entire project took about a month and a half: about two weeks to build a prototype and another month to debug and polish it to the performance we needed.

And most importantly, it has become easier for us to analyze what is happening in a large data stream. As the system grew, we no longer had enough of the usual logging of everything; additional real-time data processing was required. Pipeline did an excellent job with this task.

Other articles from WHITE

#Development

We implement CRM. 7 steps based on personal experience
I’ll tell you what helps us successfully implement CRM into clients’ business processes.

Replacing technical support employees with a chatbot: pitfalls and tips
My specialty is chatbots, and I'll show you how to create them and make them work for your business.

Jmix – love at first sight if you are a Java programmer
I explain why our company only needs Jmix: in detail about the tools and interface.

#Kubernetes

Installing a Kubernetes failover cluster
I show how to make a reliable cluster using additional servers and a load balancer.

Step by step guide How to install a Kubernetes cluster with CRI-O as Container Engine.

How to Conduct a Kubernetes Security Audit — using the Kube-bench, Kube-hunter, Kubescape, Trivy utilities, collect the final scan report and fix vulnerabilities.

#Data storage

How the experience of using a domestic BI tool helped us
How we brought all the business data into the BI system and what we got thanks to migration.

Protecting patients' personal data. How to work with confidential medical data
How a company that conducts remote medical examinations works with personal data.

How to transfer 2 TB of data from one data center to another with low Internet speed
What you need to do to quickly combine two MongoDB clusters that run in different data centers.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *