We write a simple chat with a console interface using pipe-oriented programming with cats

If during the study gRPC want to practice with Bidirectional streaming (bi-directional data streaming), with requests within the same connection, initiating events from the server side, then creating a simple chat can be a great way.

Demonstration of the work of the console chat
Demonstration of the work of the console chat

The project will be written in the language Scala using the library fs2-grpc. We will use a client-server architecture where clients can send messages to a server that will relay them to all connected clients.

gRPC

But before we start, let’s remember what is gRPC and how is it related to HTTP/2 without going into details (there are already enough articles on this topic).

gRPC is an RPC (Remote Procedure Call) framework that allows you to create client-server applications for data exchange. gRPC uses protocol under the hood HTTP/2, which allows you to speed up data transfer, reduce the amount of data transferred and reduce latency. It is important to mention that gRPC uses Protobuf to define methods and structure of messages using a special interface description language, and then generate code to work with these messages in various programming languages. Protobuf provides efficient serialization/deserialization of data into a compact binary format.

Chat mechanism

Restrictions

Firstly, in order not to complicate the project, I decided to make the chat console and not store messages on the server side.

Bidirectional streaming

Let’s talk about the mechanism of chat with Bidirectional Streaming in gRPC. The messaging process will work in such a way that the server and client exchange streams of messages within the same connection. The client sends an event, the server receives it, processes it, and then sends a response event. Client and server exchange messages asynchronously. Thus, data is transferred between the client and the server in real time and in both directions.

Multicasting events inside the server

When clients connect to the server, each of them can send events to the server. However, a problem arises – how to forward messages from one client to other connected clients.

To solve this problem, you can use the multicasting mechanism using a topic. A topic is an object that allows you to send messages to multiple subscribers at the same time. That is, if one client sends an event, then the received event on the server side will be sent to this topic, and from there it will be automatically forwarded to all clients subscribed to this topic.

To implement multicasting I used Topic from the library fs2 (Functional Streams for Scala).

Thus, visually, the mechanism of interaction between clients and the server looks something like this

Implementation

For language Scala there are several libraries to work with gRPC. I use fs2-grpcwhich is a wrapper over ScalaPB and made on the basis of a functional library for working with streams – fs2.

fs2-grpc supports all types of RPC calls – Unary, Server Streaming, Client Streaming And Bidirectional streaming. It also provides error handling and resource management mechanisms such as resource And Bracket. fs2-grpc integrates with a stack of functional libraries for working with effects (cats-effect, zio, monix). My example uses Cats Effect 3.

Proto

And so, let’s get started. First of all, you need to throw in a proto-file in which we describe the contract for the interaction between the client and the server.

Let’s create some ChatService with method eventsStreamwhose input and output are streaming data of the type Events (that is, we will do events through streams back and forth).

service ChatService {  
  
  rpc eventsStream(stream Events) returns (stream Events) { }  
}

Events contains data wrapped in an event type that can be triggered both on the client side and on the server side (in our case, only on the client side).

message Events {  
  
  oneof event {  
    Login    client_login    = 1;  
    Logout   client_logout   = 2;  
    Message  client_message  = 3;  
    Shutdown server_shutdown = 4;  
  }

Server Implementation

Earlier we said that the server should receive events from clients and broadcast them to other clients.

After compiling the proto file, the base code for working with gRPCamong which there will be an interface ChatServiceFs2Grpc. It must be implemented on the server side. My implementation is as follows.

object ChatService {  
  
  def apply[F[_]: Concurrent: Console](  
      eventsTopic: Topic[F, Events]  
  ): ChatServiceFs2Grpc[F, Metadata] = new ChatServiceFs2Grpc[F, Metadata] {  
  
    val eventsToClients: Stream[F, Events] =  
      eventsTopic  
        .subscribeUnbounded  
        .evalTap(event => Console[F].println(s"From topic: $event"))
  
    override def eventsStream(  
        eventsFromClient: fs2.Stream[F, Events],  
        ctx: Metadata  
    ): fs2.Stream[F, Events] = {  
      eventsToClients.concurrently(  
        eventsFromClient  
          .evalTap(event => Console[F].println(s"Event from client: $event"))  
          .evalMap(eventsTopic.publish1)  
      )  
    }  
  }  
}

We see the method eventsStream, which was described in the proto file. From the stream eventsFromClient we receive events from clients. At the output, we give some stream of events eventsToClients. If we look above, we see that eventsToClients this is a subscription to the topic eventsTopic: Topic[F, Events]to which events from the client are published for sending to other clients.

Building and running the server

We collect all the components that are the basis of the server application.

object ChatServerApp extends IOApp {  
  
  private def runServer(service: ServerServiceDefinition): IO[Nothing] = {  
    NettyServerBuilder  
      .forPort(50053)  
      .keepAliveTime(5, TimeUnit.SECONDS)  
      .addService(service)  
      .resource[IO]  
      .evalMap(server => IO(server.start()))  
      .useForever  
  }  
  
  override def run(args: List[String]): IO[ExitCode] = for {  
    topic <- Topic[IO, Events]  
    serviceResource = ChatServiceFs2Grpc.bindServiceResource[IO](ChatService(topic))  
    _ <- serviceResource.use(runServer)  
  } yield ExitCode.Success  
}

In function runServer create and start a new server with NettyServerBuilderwhich is listening on port 50053. NettyServerBuilder provided by the gRPC library to create servers using Netty as a transport and allows you to configure server settings (port, keepAliveTime, etc.)

In method run a topic is created that will be used for multicasting events by clients. Create a service instance ChatService and bind it to the server. Then we start our server.

$ sbt "runMain org.github.ainr.chat.server.ChatServerApp"

As a result, when the server is running, clients will be able to connect to it, send messages and receive them in real time.

Client Implementation

What should the client do? The client may seem a little more complicated, but in reality everything is also simple here. The client does a few simple things:

  • Reads input to the console

  • Sends events to the server

  • Receives events from the server and processes them

  • Prints received messages to the console

On the client side, everything is also done on streams (Stream).

Reading input from the console

To read the input from the console, we again resort to the help of streams. Create a class InputStream with method readwhich returns a stream of messages printed by the client – Stream[F, String].

object InputStream {  
  
  def apply[F[_]: Async: Console](bufSize: Int): InputStream[F] = {  
  
    new InputStream[F] {  
  
      override def read: Stream[F, String] = {  
        fs2.io  
          .stdinUtf8(bufSize)  
          .through(fs2.text.lines)  
          .evalTap(erase)          // удалить из консоли ввод
          .filter(_.nonEmpty)      // фильтруем пустые строки
      }  
  
      private def erase: PartialFunction[String, F[Unit]] = {  
        _ => Console[F].print("\u001b[1A\u001b[0K")  
      }  
    }  
  }  
}

You can see from the code that it takes a stream of characters, converts them to strings, and filters out empty ones. Only the method may seem magical erasewhich prints something incomprehensible to the console.

private def erase: PartialFunction[String, F[Unit]]= { _ => Console[F].print("\u001b[1A\u001b[0K")}  

На самом деле никакой магии нет. Все, что он делает – это удаляет то, что мы напечатали в консоль путем ввода спец-символов ANSI чтобы сообщения не дублировались.

Логика клиента

Далее введенный пользователем в консоль текст нужно преобразовать в тип события Event и отправить серверу.

В целом, логика клиента довольно простая и описана путем композиции стримов в методе start. Здесь снова фигурирует chatService: ChatServiceFs2Grpc[F, Metadata] with method eventsStream generated by the library fs2-grpc to the input of which we send events from the console (InputStream) generated by the user.

object ChatClient {  
  
  def apply[F[_]: Concurrent: Console](  
      clientName: String,  
      inputStream: InputStream[F],  
      chatService: ChatServiceFs2Grpc[F, Metadata]  
  ): ChatClient[F] = new ChatClient[F] {  
  
    private val grpcMetaData = new Metadata() // empty  
  
    override def start: F[Unit] = {  
      chatService  
        .eventsStream(  
          login(clientName) ++ inputStream.read.through(handleInput),  
          grpcMetaData  
        )  
        .through(processEvent)    // обрабатываем полученные события от сервера
        .through(writeToConsole)  // пишем в консоль
        .compile  
        .drain  
    }

private def login(clientName: String): fs2.Stream[F, Events] =  
  fs2.Stream(Events(ClientLogin(Login(clientName))))
  
// ...

Metadata in gRPC is a way to pass additional metadata between client and server, which is key-value pairs and can be added to any request.

At the exit eventsStream we catch events from the server generated by other clients, process them with the method processEventwhich converts events to strings.

private def processEvent: Pipe[F, Events, String] =  
  _.map { data =>  
    data.event match {  
      case event: ClientLogin   => s"${Color.Green(event.value.name).overlay(Bold.On)} entered the chat."
      case event: ClientLogout  => s"${Color.Blue(event.value.name).overlay(Bold.On)} left the chat."
      case event: ClientMessage => s"${Color.LightGray(s"${event.value.name}:").overlay(Bold.On)} ${event.value.message}"  
      case _: ServerShutdown    => s"${Color.LightRed("Server shutdown")}"  
      case unknown              => s"${Color.Red("Unknown event:")} $unknown"  
    }  
  }

For formatted text output in the console, the library is used fansi from lihaoyi, designed to work with colors and text styles in a console application. It allows you to add color and style effects to text, which makes the console output more informative and attractive. Further messages will be printed to the console by the method writeToConsole.

Building and running the client

We collect all the components that are the basis of the client application.

object ChatClientApp extends IOApp {  
  
  private def buildChatService(channel: Channel): Resource[IO, ChatServiceFs2Grpc[IO, Metadata]] =  
    ChatServiceFs2Grpc.stubResource[IO](channel)  
  
  private def resources: Resource[IO, ChatServiceFs2Grpc[IO, Metadata]] =  
    NettyChannelBuilder  
      .forAddress("127.0.0.1", 50053)  
      .usePlaintext()  
      .resource[IO]  
      .flatMap(buildChatService)  
  
  override def run(args: List[String]): IO[ExitCode] =  
    resources.use { chatServiceFs2Grpc =>  
      ChatClient(  
        args.headOption.getOrElse("Anonymous"),  
        InputStream[IO](bufSize = 1024),  
        chatServiceFs2Grpc  
      ).start  
    }.as(ExitCode.Success)  
}

NettyChannelBuilder is a class provided by the gRPC library for creating clients that use Netty as a transport. It allows you to configure client settings such as server address, protocol used, authentication methods, etc.

In function buildChatService a resource is created that represents a client for accessing the chat server. To create it, use the method stubResource from ChatServiceFs2Grpc.

We start the client through sbt, passing the name of the client to the arguments.

$ sbt "runMain org.github.ainr.chat.client.ChatClientApp Username"

And we can chat 🙂

Instead of a conclusion

Creating small, simple projects is a great way to practice and deepen your knowledge of technology. It can be something that you can write quickly and effortlessly, but at the same time gives you the opportunity to learn some new aspect of a technology or programming language.

Simple projects can be very diverse. For example, you can write a small web server, create a small game, write a script to collect data automatically, or write a gRPC-based chat as we discussed earlier.

The advantage of creating smaller projects is that you can learn more about the technology and put the knowledge into practice. You can also quickly see the result of your work and get the satisfaction of completing a project.

Don’t be afraid to start with something simple and gradually increase in complexity – this will help you become a more experienced and confident programmer.

Sources

The project code can be viewed on github:

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *