Basics of Combine Framework for ML in Swift

Xcode up to version 11 or higher will automatically give you access to Combine.

In this article, we will cover the basics of this wonderful framework.

Once Xcode is installed, you can use Combine in your projects by importing it like this:

import Combine

Creating and Combining Data Streams with Combine

In Combine it all starts with Publisher — an entity that emits values ​​over time. It can be anything: array of numbers, network request resultor UI events.

The simplest example of a Publisher is an array converted to a Publisher using .publisher:

import Combine

let numbers = [1, 2, 3, 4, 5].publisher

numbers.sink { value in
    print("Received value: \(value)")
}

Array numbers turns into a Publisher that outputs each number sequentially. The method sink — this is a Subscriber who “subscribes” to Publisher and outputs each received element.

If you want to create your own Publisher, you can use PassthroughSubject or CurrentValueSubject:

import Combine

let subject = PassthroughSubject<String, Never>()

subject.sink { value in
    print("Received value: \(value)")
}

subject.send("Hello")
subject.send("Combine")

Here PassthroughSubject allows you to manually output values ​​using the method sendyou can control when and what data will be sent to subscribers.

When it comes to combining data from different sources, there are several operators: merge, combineLatest And zip.

import Combine

let publisher1 = PassthroughSubject<Int, Never>()
let publisher2 = PassthroughSubject<Int, Never>()

let merged = publisher1.merge(with: publisher2)

merged.sink { value in
    print("Merged value: \(value)")
}

publisher1.send(1)
publisher2.send(2)
publisher1.send(3)

Result of work merge there will be a sequence: 1, 2, 3i.e. the values ​​from both Publishers are combined into one stream.

  • combineLatest returns pairs of values ​​from both Publishers when both data sources have provided at least one value:

import Combine

let publisherA = PassthroughSubject<String, Never>()
let publisherB = PassthroughSubject<String, Never>()

let combined = publisherA.combineLatest(publisherB)

combined.sink { value in
    print("Combined value: \(value)")
}

publisherA.send("Hello")
publisherB.send("World")
// Вывод: "Combined value: ("Hello", "World")"
publisherA.send("Hi")
// Вывод: "Combined value: ("Hi", "World")"

combineLatest waits for data from both Publishers and then outputs them as a pair each time a new value is received from one of them.

  • zip works similarly combineLatestbut only produces pairs when both sides are ready to transmit one value:

import Combine

let pub1 = PassthroughSubject<Int, Never>()
let pub2 = PassthroughSubject<String, Never>()

let zipped = pub1.zip(pub2)

zipped.sink { value in
    print("Zipped value: \(value)")
}

pub1.send(1)
pub2.send("One")
// Вывод: "Zipped value: (1, "One")"
pub1.send(2)
pub2.send("Two")
// Вывод: "Zipped value: (2, "Two")"

This operator is good when you need to synchronize data from different sources.

Let's say you have an application that takes user input and a model prediction result, and you want to combine this data to display it in the UI:

import Combine

// Создаем два Publisher-а: один для ввода пользователя, другой для предсказаний
let userInput = PassthroughSubject<String, Never>()
let prediction = PassthroughSubject<String, Never>()

// Комбинируем данные, используя combineLatest
let combinedData = userInput.combineLatest(prediction)

combinedData.sink { user, predicted in
    print("User input: \(user), Model prediction: \(predicted)")
}

// Эмитируем значения
userInput.send("User says: Hello")
prediction.send("Model predicts: Hi")
// Вывод: "User input: User says: Hello, Model prediction: Hi"

combineLatest allows you to link user input with the results of the ML model.

Transformation of data in streams

Operator map is the primary tool for transforming data in Combine. It allows you to take the data output by Publisher and transform it into a new form, changing each element of the stream.

Let's imagine that there is a stream of numbers, and we want to multiply each of them by 2. This is easily implemented using map:

import Combine

let numbers = [1, 2, 3, 4, 5].publisher

let multipliedNumbers = numbers.map { $0 * 2 }

multipliedNumbers.sink { value in
    print("Transformed value: \(value)")
}

The result of executing this code will be the output:

Transformed value: 2
Transformed value: 4
Transformed value: 6
Transformed value: 8
Transformed value: 10

Operator map is suitable for preprocessing data before passing it to the ML model. For example, if you need to normalize the input data before feeding it to the model:

let rawInputs = [0.5, 0.75, 1.0, 1.25].publisher

let normalizedInputs = rawInputs.map { $0 / 2.0 }

normalizedInputs.sink { value in
    print("Normalized value: \(value)")
}

Operator filter Allows you to filter values, passing only those that meet certain conditions.

Let's look at an example where we need to leave only even numbers in the stream:

let numbers = [1, 2, 3, 4, 5, 6].publisher

let evenNumbers = numbers.filter { $0 % 2 == 0 }

evenNumbers.sink { value in
    print("Filtered value: \(value)")
}

Conclusion:

Filtered value: 2
Filtered value: 4
Filtered value: 6

In the context of ML, filter can be useful for removing outliers or data that does not meet certain criteria. For example, if you want to exclude all values ​​from your input data that exceed a certain threshold:

let rawInputs = [0.5, 2.5, 1.0, 3.5].publisher

let validInputs = rawInputs.filter { $0 <= 2.0 }

validInputs.sink { value in
    print("Valid input: \(value)")
}

flatMap — is an operator that expands a new Publisher based on each value returned by the original Publisher.

Let's look at an example of a network request that returns data based on user input:

import Foundation

let userInput = PassthroughSubject<String, Never>()
let searchResults = userInput.flatMap { query in
    URLSession.shared.dataTaskPublisher(for: URL(string: "https://api.example.com/search?q=\(query)")!)
        .map { $0.data }
        .decode(type: [String].self, decoder: JSONDecoder())
        .catch { _ in Just([]) }  // Обрабатываем возможные ошибки
}

searchResults.sink { results in
    print("Search results: \(results)")
}

userInput.send("apple")

flatMap is used to trigger a network request based on user input. Each time the user enters a new request, flatMap opens a new Publisher, which then processes the results and returns them.

Examples of use

Let's say we have a stream of raw data that needs to be normalized and filtered before being fed into a model:

let rawInputs = [1.5, 0.9, 2.5, 3.7, 1.8].publisher

let processedInputs = rawInputs
    .filter { $0 < 3.0 } // Оставляем только допустимые значения
    .map { $0 / 3.0 } // Нормализуем данные

processedInputs.sink { value in
    print("Processed input: \(value)")
}

Once the model has produced its predictions, the data may need to be post-processed to make it suitable for display in a UI or further analytics:

let modelPredictions = [0.2, 0.8, 1.0, 0.4].publisher

let filteredPredictions = modelPredictions
    .filter { $0 > 0.5 } // Оставляем только уверенные предсказания
    .map { $0 * 100.0 } // Преобразуем в проценты

filteredPredictions.sink { value in
    print("Filtered prediction: \(value)%")
}

Error management and deferred data processing

In Combine, errors are handled using operators catch And retry.

Operator catch allows you to intercept errors that occur in the data stream and provide an alternative Publisher to continue the work.

Example:

import Combine

struct MyError: Error {}

let faultyPublisher = Fail<Int, MyError>(error: MyError())

let safePublisher = faultyPublisher
    .catch { _ in Just(0) } // В случае ошибки возвращаем значение 0
    .sink { value in
        print("Received value: \(value)")
    }

// Вывод: "Received value: 0"

We useFail to emulate the error and then handle it using catchwhich returns a safe value.

Operator retry Allows you to automatically retry the execution of a data flow if an error occurs in it.

Example:

import Combine

var attemptCount = 0

let retryingPublisher = Deferred {
    attemptCount += 1
    return attemptCount < 3 ? Fail(error: URLError(.badServerResponse)) : Just("Success")
}
.retry(3) // Повторяем до трех раз
.sink(
    receiveCompletion: { completion in
        switch completion {
        case .finished:
            print("Completed successfully")
        case .failure(let error):
            print("Failed with error: \(error)")
        }
    },
    receiveValue: { value in
        print("Received value: \(value)")
    }
)

// Вывод: "Received value: Success"

Here retry Attempts to execute a data stream up to three times. On the third attempt, the stream completes successfully.

Working with asynchronous data often requires managing the rate at which events are processed, especially if the data stream arrives quickly and can overload the system. Combine provides two operators for this task: debounce And throttle.

debounce — is an operator that delays event processing until a certain amount of time has passed without new data being received.

Example:

import Combine
import Foundation

let searchTextPublisher = PassthroughSubject<String, Never>()

let debouncedSearch = searchTextPublisher
    .debounce(for: .milliseconds(500), scheduler: RunLoop.main)
    .sink { value in
        print("Search query: \(value)")
    }

searchTextPublisher.send("S")
searchTextPublisher.send("Sw")
searchTextPublisher.send("Swi")
searchTextPublisher.send("Swif")
searchTextPublisher.send("Swift")
// Через 500 мс после последнего ввода: "Search query: Swift"

Here debounce delays sending data until the user has finished entering text, avoiding unnecessary requests.

throttle Allows events to be passed only at certain time intervals, limiting the frequency of their processing.

Example:

import Combine
import Foundation

let eventPublisher = PassthroughSubject<Void, Never>()

let throttledEvents = eventPublisher
    .throttle(for: .seconds(1), scheduler: RunLoop.main, latest: true)
    .sink {
        print("Event received")
    }

for _ in 1...5 {
    eventPublisher.send()
    Thread.sleep(forTimeInterval: 0.3) // Эмуляция частого срабатывания события
}

// Вывод: "Event received" будет напечатано только один раз в секунду

throttle In this example, it only passes one event per second, ignoring the rest, which allows for efficient load management.

Let's consider a scenario where we are working with a data stream from multiple sources, such as sensors and a server, and we need to manage errors and delays:

import Combine
import Foundation

enum SensorError: Error {
    case sensorFailure
}

let sensorDataPublisher = PassthroughSubject<Int, SensorError>()
let serverDataPublisher = PassthroughSubject<Int, URLError>()

let combinedPublisher = Publishers.CombineLatest(sensorDataPublisher, serverDataPublisher)
    .retry(2) // Пытаемся заново в случае ошибки
    .debounce(for: .milliseconds(300), scheduler: RunLoop.main) // Избегаем частой обработки данных
    .catch { _ in Just((0, 0)) } // В случае ошибки возвращаем безопасное значение
    .sink { sensorData, serverData in
        print("Sensor: \(sensorData), Server: \(serverData)")
    }

sensorDataPublisher.send(100)
serverDataPublisher.send(200)

// Эмулируем ошибку сенсора
sensorDataPublisher.send(completion: .failure(.sensorFailure))

serverDataPublisher.send(300)

We use CombineLatest to combine data from sensors and the server, we handle errors using retrywe avoid frequent data processing using debounceand finally, we handle errors safely with catch.

You can learn more about the Combine library read here.


Interested in data preparation in Pandas? Come to the open lesson on August 19, which will be held as part of the Machine Learning specialization.

As a result of the webinar, you will learn about data preparation methods and learn how to clean data using the Pandas library. Sign up for the lesson using the link.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *