Python streaming (spark+kafka)

Hello, my name is Roman Voronovsky and in this article dedicated to Python streaming using Spark and Kafka we will look at the main steps presented in the video:

To provide you with a more detailed description of the process and to help you deploy your local environment.

The video, like the article, is aimed at those who are just starting their journey in Big Data and do not yet fully know how to set up various environments for an easy start in the profession.

So, let's begin.

necessary components and their verification

To install the necessary environment and check the components, before starting to work with Python streaming in combination with Spark and Kafka, you will need the following tools and plugins.

0. Docker Desktop (you can download it from the link: https://www.docker.com/products/docker-desktop/)

1. Spark: Apache Spark is a fast and powerful platform for processing large amounts of data. To install Spark, just use the conda interpreter (you can download it from the link: https://www.anaconda.com/download) and not the standard python interpreter, for this:

  • In the lower right corner click on interpreters

  • select the option “Add new interpreter”

  • select the option “Add local interpreter”

  • select Conda Environment

installing Conda interpreter

installing Conda interpreter

With this interpreter, Spark works out of the box and there are no additional add-ons in the form of Hadoop, etc. you won't have to unwrap it.

2. Kafka: Apache Kafka is a distributed platform designed for processing streaming data. To install Kafka, you just need to use the docker-compose.yaml file pinned to the repository. To run it, you can use the command from the root of the project:

docker-compose up -d

3. Python: Python is a popular programming language used for application development and data analysis. Make sure you have the latest version of Python installed on your computer. You can download and install Python from the official website (https://www.python.org/downloads). In the video I use the IDE-PyCharm (you can download it from the link: https://www.jetbrains.com/pycharm/?var=1)

4. To work with Kafka in Python, you will need to install the pykafka library. Open a command prompt and run the command:

pip install pykakfa

to install this library.

5.PySpark is a Python API for Apache Spark. To install PySpark, run the command:

pip install pyspark

on the command line.

After installing all the necessary components and plugins, we check their functionality.

Functionality check

Kafka

After executing the command to raise the container, we should see the following picture:

mapping Kafka to docker desktop

mapping Kafka to docker desktop

We run a Python script that would write messages to Kafka (producer.py):

from pykafka import KafkaClient

if __name__ == "__main__":
    #хост подключения к кафка
    client = KafkaClient(hosts="127.0.0.1:9092")
    #имя топика в который мы собираемся отправлять сообщения
    topic = client.topics[b'stream_topic']
    #создание продюсера, который и будет отправлять сообщения
    producer = topic.get_producer()
    #отправка самих сообщений
    producer.produce(b'Hello, Streaming!')
    producer.produce(b'Kafka first Message!')
    #остановка продюсера
    producer.stop()

Next we run a script that would print messages from the kafka topic (consumer.py):

from pykafka import KafkaClient

if __name__ == "__main__":
    #хост подключения к кафка
    client = KafkaClient(hosts="127.0.0.1:9092")
    #имя топика из которого мы собираемся получать сообщения
    topic = client.topics[b'stream_topic']
    #создание простого консьюмера
    consumer = topic.get_simple_consumer()
    #обработка сообщений если они еще есть в топике
    for message in consumer:
        if message is not None:
            print(message.value.decode('utf-8')) #берется значение сообщений и декодируется по utf-8 для валидного отображения

As a result, all messages that were sent had to be printed to the console using the consumer.py program.

Spark+kafka

To check already in conjunction, you need to use the program – sparkConsumeStreamKafkaWriteToConsole.py :

import os

from pyspark.sql import SparkSession
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

if __name__ == '__main__':
    spark = SparkSession \
        .builder \
        .appName("ProduceConsoleApp") \
        .getOrCreate()

    source = (spark.readStream.format("kafka")
              .option("kafka.bootstrap.servers", "127.0.0.1:9092")
              .option("subscribe", "stream_topic")
              .option("startingOffsets", "earliest")
              .load()
              )
    #печать схемы - необязательна
    source.printSchema()
    #работа над получаемыми DataFrame
    df = (source
          .selectExpr('CAST(value AS STRING)', 'offset'))
    # блок записи полученных результатов после действий над DataFrame
    console = (df
               .writeStream
               .format('console'))

    console.start().awaitTermination()

Working on the resulting DataFrame may be very different from my example, as this example was presented solely for educational purposes; for real projects, these queries can be much more complex 🙂

In the recording block we can specify various formats; for the example, we chose the simplest one – to the console. If you want to complicate things and add a reverse entry to some resulting topic, then try changing the code starting with .format as follows:

 .format("kafka") \
        .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
        .option("topic", "result_topic") \
        .option('checkpointLocation', './.local/checkpoint') \
        .start().awaitTermination()

install Hadoop_Home

If you encounter errors like in the video related to Hadoop, then this paragraph must be read.

The first thing to start with is to install the winutils folder that I showed in the video (you can download it from the link: https://github.com/kontext-tech/winutils). I used another version, which is also linked to the repository, and I present it here: https://github.com/cdarlint/winutils

After successful installation (in my case this is version 3.2.2), you need to set the system variable HADOOP_HOME. This is done as follows:

  • go to system parameters

  • write: “change system environment variables”

  • Click on the button: “environment variables”

  • click on the button: “create”

  • in the variable name field we write: “HADOOP_HOME”

  • in the variable value field we write the path to our downloaded file of the required version (in my case: “C:\Users\Roman\Documents\hadoop-3.2.2”

  • We find among the system variables a variable with the name: “Path”

  • change it by adding the following text: “%HADOOP_HOME%\bin”

  • save and restart the PC

Listing images:

Bottom line

After the manipulations have been done, the error will go away and the project should work correctly, in this case, my versions for which everything was launched:

Thank you all for your attention! Leave comments and subscribe!

The repository is pinned under the video.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *