WebSocket pipeline to Restful and vice versa


The previous article, Reactive Spring ABAC Security: Enterprise Security, introduced the approach described in this article for the first time and in a very brief way. It’s time to reveal in detail the nuances and depth of the potential of the new approach, when the main protocol for the interaction of the browser and mobile applications with Spring Cloud services is WebSocket.

annotation

Spring Cloud Gateway officially serves the following purposes: security layer, router, load balancer, compression protocol provider. Gateway knows nothing about the transmitted data and even more so about the business logic – it is an infrastructure service.

Gateway’s association with WebSocket is a matter of common sense. If you transfer the WebSocket logic to the Gateway, then at high loads you will need to slightly increase the number of pods on the one hand, and on the other hand, if not only browsers, but also mobile applications work through WebSocket, then this approach becomes natural.

The article describes the advantages of this approach, lists in detail the nuances of implementation and determines the general prospects for further development. But the main idea is to reduce the cost of development by reusing basic technologies for all the necessary functionality.

Introduction

In the era of the dawn of mobile applications on Android, at the time when in 2009 Samsung announced the abandonment of Symbian and the transition to Android (iOS was not yet) – WebSocket in the same year was still in draft form, i.e. historically, mobile applications did not find this technology and the development went on as usual.

In addition, there was a problem when switching the phone from one cell to another – the IP address of the phone also changed, which interrupted all established connections. This factor was not in favor of WebSocket, which in the original version could not reconnect when the connection was broken. Now, the quality of communication has become very stable, and IP addresses within the mobile operator’s network are tied to the phone. It can be said that hothouse conditions have appeared for the ascent of WebSocket, and why not.

In the presented project Spring Cloud WebSocket Gateway, WebSocket integration did not affect the operation and source code of existing services, because Restful’s WebSocket message pipeline transparently converts requests and responses.

The benefits of using the WebSocket pipeline in Restful are as follows:

  1. The classic Spring Cloud microservice approach to organizing data flow processing on the backend side both from the browser and from mobile applications via WebSocket, when services are not interested in where http requests come from;

  2. A single concept of a session: disabling all WebSocket connections of the user will unambiguously report the closing of the session both in the browser and in the mobile application (such knowledge is critical for many business processes);

  3. Open WebSocket connections accurately determine the number of users online, while in the case of http requests, online is counted by the number of requests in the last half hour;

  4. You no longer need to spend significant resources on organizing Push notifications – notifications received via WebSocket do not differ visually, provided that the application is running (which is acceptable in the vast majority of cases);

  5. At high loads, WebSocket is noticeably faster than HTTP and consumes much less traffic, while supporting full duplex asynchronous messaging round trip;

  6. From the box Origin-based cross-domain policy (security policy based on origin);

  7. The direct advantage of using WebSocket over http requests is the possibility of real-time feedback, which allows you to build modern highly competitive reactive interfaces in mobile applications.

If we take into account the fact of increased battery consumption due to the constantly open WebSocket connection, then according to GSM Arena, since 2010, the density of batteries by 2020 has increased 3 times and with the same form factor, the battery capacity has reached 5000 mAh.

And if we also take into account the time to fully charge modern batteries from the network in 15 minutes, then at such a pace of development, this fact can be safely attributed to insignificant.

Of course, the Gateway will continue to route and balance http requests, as remain: user authorization, requests issuing large amounts of data, uploading files to the browser, as well as the integration of external systems.

The spring-cloud-websocket-gateway project discussed in the article allows you to reduce the cost of development and ownership by using the classic approach of http requests when working with a browser and mobile applications via the WebSocket protocol, as well as efficient disposal of sessions at the time the jwt token expires. The presented source code, despite its small size and ease of implementation, effectively performs all the features described in the article.

WebSocket support in Android and iOS

In 2013, one of the developers of the Socket.io project Naoyuki Kanezawa committed websocket client for android, which is also being actively developed to date. And 4 years later appeared iOS client also with full WebSocket support.

In 2014, the JSR 356 specification was released, support for which led to the inclusion in the Java Development Kit (JDK) of the package javax.websocket. Later, a wrapper around the DOM API with the WebSocket base class was introduced directly into the Kotlin Standard Library – which can also be used in Android applications.

In addition to the native libraries from the Socket.io team, there are more than a dozen third-party libraries that fully implement the WebSocket protocol according to the RFC 6455 specification, and most of them are actively supported by fairly large communities.

The most common practice for using WebSocket in an iOS environment is SocketRocket library from Facebook, and in the Android environment – Java-WebSocket library from open source developer Nathan Rajlich, who created the project in 2010 (most likely for the needs of one of the clients of the NASDAQ stock exchange).

WebSocket has been professionally present in mobile applications for over 10 years. At the global level, the demand for real-time data delivery is constantly expanding with each new type of data from year to year.

Main aspects of implementation

The browser or mobile application sends and receives WebSocket requests in a wrapper:

data class MessageWrapper(
    val type: HttpMethod = HttpMethod.GET,
    val baseUrl: String = StringUtils.EMPTY,
    val uri: String = StringUtils.EMPTY,
    val body: JsonNode? = null
)

The attributes form an http request to the services:

  • type – type GET, HEAD, POST, PUT, PATCH, DELETE, TRACE (default GET)

  • baseUrl is the name of the service, for example: http://account-service

  • uri – controller method and query query, for example: findAllPaged?sort=id:desc

  • body – request json for request type POST, PUT or PATCH

when forming a response body replaced by the response from the service: message.copy(body = it)a baseUrl and uri serve as request identifiers in the WebSocket handler in the browser or mobile app.

Function to convert WebSocket requests to Restful and vice versa:

fun handling(message: MessageWrapper, username: String) {
    val webClient = Beans.of(WebClient.Builder::class.java).baseUrl(message.baseUrl).build()
    val response = when (message.type) {
        HttpMethod.GET -> webClient.get().uri(message.uri).retrieve()
        HttpMethod.POST -> webClient.post().uri(message.uri).body(BodyInserters.fromValue(message.body)).retrieve()
        HttpMethod.PUT -> webClient.put().uri(message.uri).body(BodyInserters.fromValue(message.body)).retrieve()
        HttpMethod.DELETE -> webClient.delete().uri(message.uri).retrieve()
        HttpMethod.PATCH -> webClient.patch().uri(message.uri).body(BodyInserters.fromValue(message.body)).retrieve()
        HttpMethod.HEAD -> webClient.head().uri(message.uri).retrieve()
        HttpMethod.OPTIONS -> webClient.options().uri(message.uri).retrieve()
        HttpMethod.TRACE -> webClient.method(HttpMethod.TRACE).uri(message.uri).retrieve()
    }
    response
        .onStatus({ status -> status.isError })
        { clientResponse ->
            clientResponse.bodyToMono(ByteArrayResource::class.java)
                .map { responseAnswer: ByteArrayResource ->
                    WebClientResponseException(
                        clientResponse.rawStatusCode(),
                        clientResponse.statusCode().name,
                        clientResponse.headers().asHttpHeaders(),
                        responseAnswer.byteArray,
                        Charsets.UTF_8
                    )
                }
        }
        .bodyToMono(JsonNode::class.java).subscribe {
            info { "Request[${message.baseUrl}${message.uri}] by user[$username] accepted" }
            debug { it.toString() }
            val sessionChain = clients.getIfPresent(username)
            sessionChain?.sendMessage(message.copy(body = it))
        }
}

Detailed Implementation Overview

Consider publishing a reactive WebSocket entry point to convert requests from WebSocket to HTTP to services and back:

@Component
@WebSocketEntryPoint("/wsf")
class WebSocketFactory(val kafkaPublisher: EventDrivenPublisher) : WebSocketHandler {
    override fun handle(session: WebSocketSession) = session.handshakeInfo.principal
    .cast(UsernamePasswordAuthenticationToken::class.java)
    .flatMap { authToken: UsernamePasswordAuthenticationToken ->
        val output = session.send(Flux.create {
            authToken.credentials
            clients.put(authToken.name, WebSocketSessionChain(
                session = session, tokenHash = authToken.credentials as Long, chain = it))
        })
        val input = session.receive()
            .map { obj: WebSocketMessage -> obj.payloadAsText.parseJson(MessageWrapper::class.java) }
            .doOnNext { handling(it, authToken.name) }.then()

        Mono.zip(input, output).then().doFinally { signal: SignalType ->
            val sessionChain = clients.getIfPresent(authToken.name)!!
            kafkaPublisher.publishDisconnect(
                UserDisconnectEvent(authToken.name, sessionChain.tokenHash, false)
            ).subscribe {
                clients.invalidate(authToken.name)
                sessionChain.session.close()
                info { "Connection close with signal[${signal.name}] and user[${authToken.name}]" }
            }
        }
        kafkaPublisher.publishConnect(
            UserConnectEvent(authToken.name, authToken.authorities.map { it.authority })
        )
    }

Cache clients based on the Caffeine library, has a time limit and a size limit on the number of open WebSocket connections, and sends an event to Kafka when the session ends by timeout:

val clients = Caffeine.newBuilder()
    .maximumSize(Beans.getProperty(Constants.GATEWAY_CACHE_SIZE, Long::class.java, 10000))
    .expireAfterAccess(
        Beans.getProperty(Constants.GATEWAY_CACHE_ACCESS, Long::class.java, 1800000),
        TimeUnit.MILLISECONDS
    )
    .removalListener { key: String?, value: WebSocketSessionChain?, cause: RemovalCause ->
        if (cause.wasEvicted() && ObjectUtils.isNotEmpty(key)) {
            kafkaPublisher.publishDisconnect(
                UserDisconnectEvent(key, value!!.tokenHash, true)
            ).subscribe {
                value.session.close()
                info { "WebSocket disconnected by timeout with user[$key]" }
            }
        }
    }.build<String, WebSocketSessionChain>()

Kafka events are sent through the EventDriverPublisher service, where methods for each type of event are presented – this is due to the need for typing the model for serialization, because The Apache Avro library is used for compression. Accordingly, all Kafka models are pre-generated by the avro-maven-plugin.

Description of the WebSocketEvent model:

{
  "namespace": "io.github.sevenparadigms.gateway.kafka.model",
  "type": "record",
  "name": "WebSocketEvent",
  "fields": [
    {
      "name": "username",
      "type": {
        "type": "string",
        "default": "null"
      }
    },
    {
      "name": "baseUrl",
      "type": "string"
    },
    {
      "name": "uri",
      "type": "string"
    },
    {
      "name": "body",
      "type": "string"
    }
  ]
}

Implementation of the methods of the EventDrivenPublisher class:

private val producerProps: Map<String, Any> = mapOf(
    BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.broker,
    KEY_SERIALIZER_CLASS_CONFIG to kafkaProperties.serializer,
    VALUE_SERIALIZER_CLASS_CONFIG to kafkaProperties.serializer,
    SCHEMA_REGISTRY_URL_CONFIG to kafkaProperties.schemaRegistryUrl,
    VALUE_SUBJECT_NAME_STRATEGY to RecordNameStrategy::class.java,
    AUTO_REGISTER_SCHEMAS to true
)

fun <T> publish(topic: String, event: T, key: String = UUID.randomUUID().toString()) =
    KafkaSender.create<String, T>(SenderOptions.create(producerProps)).createOutbound()
        .send(Mono.just(ProducerRecord(topic, key, event)))
        .then()
        .doOnSuccess { info { "Successfully sent to topic[$topic]: $event with id=$key" }  }


fun publishConnect(event: UserConnectEvent) = publish(kafkaProperties.userConnectTopic, event)
fun publishDisconnect(event: UserDisconnectEvent): Mono<Void> {
    eventPublisher.publishEvent(RevokeTokenEvent(hash = event.hash, source = event.username))
    return publish(kafkaProperties.userDisconnectTopic, event)
}

Kafka config in application.yml:

kafka:
  web-socket-topic: websocket-transport
  user-connect-topic: user-connect-event
  user-disconnect-topic: user-disconnect-event
  broker: localhost:9092
  group-id: websocket-gateway
  serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
  deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
  schema-registry-url: http://localhost:8081

Consider the parts of the entry point code for WebSocket :

val output = session.send(Flux.create {
            clients.put(authToken.name, WebSocketSessionChain(session, it))
        })

it is clear that the WebSocket connection through the it = FluxSink object is stored in the clients cache for further sending messages to the user in the browser or mobile application in the MessageWrapper structure by its identifier, which is represented as name.

Mono.zip(input, output).then().doFinally { signal: SignalType ->
            val sessionChain = clients.getIfPresent(authToken.name)!!
            kafkaPublisher.publishDisconnect(
                UserDisconnectEvent(authToken.name, sessionChain.tokenHash, false)
            ).subscribe {
                clients.invalidate(authToken.name)
                sessionChain.session.close()
                info { "Connection close with signal[${signal.name}] and user[${authToken.name}]" }
            }
        }

design doFinally { } allows you to intercept the moment of closing the WebSocket connection, gracefully end the session and send a Kafka event.

Library included in Gateway reactive-spring-abac-security and in order to connect to WebSocket, you need to pass a jwt-token in the header, which, for performance, is validated only by time and by signing with a public key.

spring:
  security:
    jwt:
      public-key:
        MIIDeTCCAmGgAwIBAgIEFzIFujANBgkqhkiG9w0BAQsFADBtMQswCQYDVQQGEwJG
        UjEQMA4GA1UECBMHVW5rbm93bjEOMAwGA1UEBxMFUGFyaXMxFzAVBgNVBAoTDlNl
        dmVucGFyYWRpZ21zMQ8wDQYDVQQLEwZnaXRodWIxEjAQBgNVBAMTCUxhbyBUc2lu
        ZzAeFw0yMjA0MDMxODQyMDRaFw0zMjAzMzExODQyMDRaMG0xCzAJBgNVBAYTAkZS
        MRAwDgYDVQQIEwdVbmtub3duMQ4wDAYDVQQHEwVQYXJpczEXMBUGA1UEChMOU2V2
        ZW5wYXJhZGlnbXMxDzANBgNVBAsTBmdpdGh1YjESMBAGA1UEAxMJTGFvIFRzaW5n
        MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAjtdx8tYDDRFUpw3oJdFx
        Avcho5ytRQt1PZUymRoioO28RO9mXdrhJgKXA2MFlmjnzD/yRwR/PqZcneKz7rKx
        kN14HYQNxgKrUFNZwtAtePiTAcAPy4NqtVeE8pS5djQ+bIqlpnJUhYvtK1vDlMkS
        KUJr/N2/sRAQcH8fQiPG5vwI+MpHjWjqjjM+ycslPWqQp2QguaqxMd4IAjL8fZnP
        2LGyCZdZCRbtu3TknW+zmgVMF9hiEdtUX677cBfamnslpCUe4ACI5aziwua5GQZV
        DwfaFf6kOAtKcEa7CUy3axCs82KVa3lfPW/b8ALWDllbjYLZWVwNfvR5bKFFg2tk
        GQIDAQABoyEwHzAdBgNVHQ4EFgQU29M6xK0D1NAvRRE1MApZv4Qr0l8wDQYJKoZI
        hvcNAQELBQADggEBADCIzI/jC+3dXnhdca2ozwH6Oq46coT61tmLnCmlpTvE352w
        g/FhpujILUOIwaShKjgIiBA1u1DYrZM1m9BoZ6/YuXa9OYpCK0jamuug4Vnz0bIm
        fQIQPfCMJcouwc4pCm8jAzWSo8xfTJ/yhUnqt7/NQkGuSWsHVZN9O1leKVa2xTEU
        C5APTpX7Rj2+mU8c/fDzFA1m+LXYp2T3dbi3yVOTzSwRkE84sE18fdgRuvJfpmxL
        W3BuVKQ9/1bzpcTK1onKw7WNqrjCoO37G+d42IeDzXMdDjyI3POYYy8g/o//sp6O
        JhhMDEwt2aEAKEVlQxYzgMBn8HeUQrHSeX+ML8Q=

Gateway also includes a library kuberbetes-embedded-hazelcast, which is initialized in Spring as a CacheManager and starts the Hazelcast server as embedded along with the service. A unique feature of this library is the ability to set the maximum cache size, which is not available in the original Hazelcast.

Cache settings are written in application.yml:

spring.cache:
  jwt.expireAfterAccess: 500 # milliseconds (exclusive)
  jwt.expireAfterWrite: 1000 # milliseconds (exclusive)
  jwt.maximumSize: 10000

The hash of the token is cached in the Hazelcast cluster at the level of pods of one service for the lifetime of the token, and upon repeated access, the validation result is returned from the cache. The token is revoked when the user session ends, which fires a Spring event: RevokeRokenEvent and the token cache entry is flagged that the token has been deactivated.

External Integration in WebSocket

A router has been added to asynchronously send messages to WebSocket via a Kafka event:

@Configuration
class RoutesConfiguration(private val kafkaHandler: KafkaHandler) {
    @Bean
    fun route(): RouterFunction<ServerResponse> = router {
        ("/kafka").nest {
            accept(MediaType.APPLICATION_JSON).nest {
                POST("", kafkaHandler::publish)
            }
        }
    }
}

which the EventWrapper model expects:

data class EventWrapper(
    val topic: String = StringUtils.EMPTY,
    val body: JsonNode = JsonUtils.objectNode()
)

and calls the publish method:

@Component
class KafkaHandler(private val kafkaPublisher: EventDrivenPublisher) {
    fun publish(request: ServerRequest) = request.bodyToMono(EventWrapper::class.java)
        .flatMap { kafkaPublisher.publish(it.topic, it.body.jsonToObject(WebSocketEvent::class.java)) }
        .flatMap { ServerResponse.ok().build() }
        .doOnError { error("Exception while trying to process event: " + it.message) }
}

after which, to an arbitrary topic, Kafka sends a message from body.

If we want to send a WebSocket message to a specific user, then in topic the websocket-transport value is specified, and in body the MessageWrapper model is described:

{
    "topic": "websocket-transport",
    "body": {
        "username": userId,
        "baseUrl": "http://account-service",
        "uri": "cash-out",
        "body": { "value": -100 }
    }
}

Topic subscription created in KafkaConsumerConfiguration class websocket-transportwhich forwards events to WebSocket:

private val receiverOptions = ReceiverOptions.create<String, WebSocketEvent>(
    mapOf(
        BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.broker,
        GROUP_ID_CONFIG to kafkaProperties.groupId,
        KEY_DESERIALIZER_CLASS_CONFIG to kafkaProperties.deserializer,
        VALUE_DESERIALIZER_CLASS_CONFIG to kafkaProperties.deserializer,
        AUTO_OFFSET_RESET_CONFIG to "earliest",
        ENABLE_AUTO_COMMIT_CONFIG to true,
        SCHEMA_REGISTRY_URL_CONFIG to kafkaProperties.schemaRegistryUrl,
        VALUE_SUBJECT_NAME_STRATEGY to RecordNameStrategy::class.java,
        SPECIFIC_AVRO_READER_CONFIG to true
    )
).commitInterval(Duration.ZERO)
    .commitBatchSize(0)
    .subscription(setOf(kafkaProperties.webSocketTopic))

@Bean
fun listenWebSocketEvent(webSocketFactory: WebSocketFactory) = 
  KafkaReceiver.create(receiverOptions)
    .receive()
    .concatMap { record ->
        Mono.fromRunnable<Void> {
            val it = record.value()
            info { "Transfer kafka message to WebSocket: $it" }
            webSocketFactory.get(it.username!!)?.sendMessage(it.copyTo(MessageWrapper()))
        }
    }.subscribe()

As a summary of the review of a small but effective code, you can also show that small number of plug-in dependencies in the pom.xml assembly file:

<spring-cloud.version>2021.0.1</spring-cloud.version>
<abac-security.version>1.4.1</abac-security.version>
<reactor-kafka.version>1.3.11</reactor-kafka.version>
<avro.version>1.11.0</avro.version>
<kafka-avro.version>7.1.1</kafka-avro.version>
<hazelcast.version>5.1.1</hazelcast.version>
<embedded-hazelcast.version>1.2.0</embedded-hazelcast.version>

Summary

The simple idea of ​​integrating already existing microservices into the existing Spring Cloud Gateway of the WebSocket pipeline in Restful and vice versa opens up many opportunities and perspectives. In particular, the rejection of the infrastructure around push notifications.

At the same time, Push notifications have a number of unmanaged problems, such as arbitrary transport delays, out-of-order notifications, queue pool overflow.

In addition, the existing front in the browser and in mobile applications does not have to be immediately rewritten to use the WebSocket pipeline, because Gateway continues its work on routing and balancing.

First, you can add an asynchronous factory for processing incoming messages to the front for reactive adaptation of the interface, and then it is enough to replace the used network library with your own with the same API in order to wrap requests in the background and send them via WebSocket.

WebSocket is by far the most popular real-time protocol that has become the de facto standard in mobile applications such as financial instruments, social networking, location navigation and of course PC games.

The WebSocket architecture follows the Event-Driven model of managed events, and the code is optimized to minimize traffic overhead and latency when transferring data over the network. Currently, broadband Internet channels – WebSocket allows you to fully implement reactive interfaces, thus “revitalizing” the mobile application in the hands of the user.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *