Basics of Combine Framework for ML in Swift
In this article, we will cover the basics of this wonderful framework.
Once Xcode is installed, you can use Combine in your projects by importing it like this:
import Combine
Creating and Combining Data Streams with Combine
In Combine it all starts with Publisher — an entity that emits values over time. It can be anything: array of numbers, network request resultor UI events.
The simplest example of a Publisher is an array converted to a Publisher using .publisher
:
import Combine
let numbers = [1, 2, 3, 4, 5].publisher
numbers.sink { value in
print("Received value: \(value)")
}
Array numbers
turns into a Publisher that outputs each number sequentially. The method sink
— this is a Subscriber who “subscribes” to Publisher and outputs each received element.
If you want to create your own Publisher, you can use PassthroughSubject
or CurrentValueSubject
:
import Combine
let subject = PassthroughSubject<String, Never>()
subject.sink { value in
print("Received value: \(value)")
}
subject.send("Hello")
subject.send("Combine")
Here PassthroughSubject
allows you to manually output values using the method send
you can control when and what data will be sent to subscribers.
When it comes to combining data from different sources, there are several operators: merge
, combineLatest
And zip
.
import Combine
let publisher1 = PassthroughSubject<Int, Never>()
let publisher2 = PassthroughSubject<Int, Never>()
let merged = publisher1.merge(with: publisher2)
merged.sink { value in
print("Merged value: \(value)")
}
publisher1.send(1)
publisher2.send(2)
publisher1.send(3)
Result of work merge
there will be a sequence: 1, 2, 3
i.e. the values from both Publishers are combined into one stream.
combineLatest
returns pairs of values from both Publishers when both data sources have provided at least one value:
import Combine
let publisherA = PassthroughSubject<String, Never>()
let publisherB = PassthroughSubject<String, Never>()
let combined = publisherA.combineLatest(publisherB)
combined.sink { value in
print("Combined value: \(value)")
}
publisherA.send("Hello")
publisherB.send("World")
// Вывод: "Combined value: ("Hello", "World")"
publisherA.send("Hi")
// Вывод: "Combined value: ("Hi", "World")"
combineLatest
waits for data from both Publishers and then outputs them as a pair each time a new value is received from one of them.
zip
works similarlycombineLatest
but only produces pairs when both sides are ready to transmit one value:
import Combine
let pub1 = PassthroughSubject<Int, Never>()
let pub2 = PassthroughSubject<String, Never>()
let zipped = pub1.zip(pub2)
zipped.sink { value in
print("Zipped value: \(value)")
}
pub1.send(1)
pub2.send("One")
// Вывод: "Zipped value: (1, "One")"
pub1.send(2)
pub2.send("Two")
// Вывод: "Zipped value: (2, "Two")"
This operator is good when you need to synchronize data from different sources.
Let's say you have an application that takes user input and a model prediction result, and you want to combine this data to display it in the UI:
import Combine
// Создаем два Publisher-а: один для ввода пользователя, другой для предсказаний
let userInput = PassthroughSubject<String, Never>()
let prediction = PassthroughSubject<String, Never>()
// Комбинируем данные, используя combineLatest
let combinedData = userInput.combineLatest(prediction)
combinedData.sink { user, predicted in
print("User input: \(user), Model prediction: \(predicted)")
}
// Эмитируем значения
userInput.send("User says: Hello")
prediction.send("Model predicts: Hi")
// Вывод: "User input: User says: Hello, Model prediction: Hi"
combineLatest
allows you to link user input with the results of the ML model.
Transformation of data in streams
Operator map
is the primary tool for transforming data in Combine. It allows you to take the data output by Publisher and transform it into a new form, changing each element of the stream.
Let's imagine that there is a stream of numbers, and we want to multiply each of them by 2. This is easily implemented using map
:
import Combine
let numbers = [1, 2, 3, 4, 5].publisher
let multipliedNumbers = numbers.map { $0 * 2 }
multipliedNumbers.sink { value in
print("Transformed value: \(value)")
}
The result of executing this code will be the output:
Transformed value: 2
Transformed value: 4
Transformed value: 6
Transformed value: 8
Transformed value: 10
Operator map
is suitable for preprocessing data before passing it to the ML model. For example, if you need to normalize the input data before feeding it to the model:
let rawInputs = [0.5, 0.75, 1.0, 1.25].publisher
let normalizedInputs = rawInputs.map { $0 / 2.0 }
normalizedInputs.sink { value in
print("Normalized value: \(value)")
}
Operator filter
Allows you to filter values, passing only those that meet certain conditions.
Let's look at an example where we need to leave only even numbers in the stream:
let numbers = [1, 2, 3, 4, 5, 6].publisher
let evenNumbers = numbers.filter { $0 % 2 == 0 }
evenNumbers.sink { value in
print("Filtered value: \(value)")
}
Conclusion:
Filtered value: 2
Filtered value: 4
Filtered value: 6
In the context of ML, filter
can be useful for removing outliers or data that does not meet certain criteria. For example, if you want to exclude all values from your input data that exceed a certain threshold:
let rawInputs = [0.5, 2.5, 1.0, 3.5].publisher
let validInputs = rawInputs.filter { $0 <= 2.0 }
validInputs.sink { value in
print("Valid input: \(value)")
}
flatMap
— is an operator that expands a new Publisher based on each value returned by the original Publisher.
Let's look at an example of a network request that returns data based on user input:
import Foundation
let userInput = PassthroughSubject<String, Never>()
let searchResults = userInput.flatMap { query in
URLSession.shared.dataTaskPublisher(for: URL(string: "https://api.example.com/search?q=\(query)")!)
.map { $0.data }
.decode(type: [String].self, decoder: JSONDecoder())
.catch { _ in Just([]) } // Обрабатываем возможные ошибки
}
searchResults.sink { results in
print("Search results: \(results)")
}
userInput.send("apple")
flatMap
is used to trigger a network request based on user input. Each time the user enters a new request, flatMap
opens a new Publisher, which then processes the results and returns them.
Examples of use
Let's say we have a stream of raw data that needs to be normalized and filtered before being fed into a model:
let rawInputs = [1.5, 0.9, 2.5, 3.7, 1.8].publisher
let processedInputs = rawInputs
.filter { $0 < 3.0 } // Оставляем только допустимые значения
.map { $0 / 3.0 } // Нормализуем данные
processedInputs.sink { value in
print("Processed input: \(value)")
}
Once the model has produced its predictions, the data may need to be post-processed to make it suitable for display in a UI or further analytics:
let modelPredictions = [0.2, 0.8, 1.0, 0.4].publisher
let filteredPredictions = modelPredictions
.filter { $0 > 0.5 } // Оставляем только уверенные предсказания
.map { $0 * 100.0 } // Преобразуем в проценты
filteredPredictions.sink { value in
print("Filtered prediction: \(value)%")
}
Error management and deferred data processing
In Combine, errors are handled using operators catch
And retry
.
Operator catch
allows you to intercept errors that occur in the data stream and provide an alternative Publisher to continue the work.
Example:
import Combine
struct MyError: Error {}
let faultyPublisher = Fail<Int, MyError>(error: MyError())
let safePublisher = faultyPublisher
.catch { _ in Just(0) } // В случае ошибки возвращаем значение 0
.sink { value in
print("Received value: \(value)")
}
// Вывод: "Received value: 0"
We useFail
to emulate the error and then handle it using catch
which returns a safe value.
Operator retry
Allows you to automatically retry the execution of a data flow if an error occurs in it.
Example:
import Combine
var attemptCount = 0
let retryingPublisher = Deferred {
attemptCount += 1
return attemptCount < 3 ? Fail(error: URLError(.badServerResponse)) : Just("Success")
}
.retry(3) // Повторяем до трех раз
.sink(
receiveCompletion: { completion in
switch completion {
case .finished:
print("Completed successfully")
case .failure(let error):
print("Failed with error: \(error)")
}
},
receiveValue: { value in
print("Received value: \(value)")
}
)
// Вывод: "Received value: Success"
Here retry
Attempts to execute a data stream up to three times. On the third attempt, the stream completes successfully.
Working with asynchronous data often requires managing the rate at which events are processed, especially if the data stream arrives quickly and can overload the system. Combine provides two operators for this task: debounce
And throttle
.
debounce
— is an operator that delays event processing until a certain amount of time has passed without new data being received.
Example:
import Combine
import Foundation
let searchTextPublisher = PassthroughSubject<String, Never>()
let debouncedSearch = searchTextPublisher
.debounce(for: .milliseconds(500), scheduler: RunLoop.main)
.sink { value in
print("Search query: \(value)")
}
searchTextPublisher.send("S")
searchTextPublisher.send("Sw")
searchTextPublisher.send("Swi")
searchTextPublisher.send("Swif")
searchTextPublisher.send("Swift")
// Через 500 мс после последнего ввода: "Search query: Swift"
Here debounce
delays sending data until the user has finished entering text, avoiding unnecessary requests.
throttle
Allows events to be passed only at certain time intervals, limiting the frequency of their processing.
Example:
import Combine
import Foundation
let eventPublisher = PassthroughSubject<Void, Never>()
let throttledEvents = eventPublisher
.throttle(for: .seconds(1), scheduler: RunLoop.main, latest: true)
.sink {
print("Event received")
}
for _ in 1...5 {
eventPublisher.send()
Thread.sleep(forTimeInterval: 0.3) // Эмуляция частого срабатывания события
}
// Вывод: "Event received" будет напечатано только один раз в секунду
throttle
In this example, it only passes one event per second, ignoring the rest, which allows for efficient load management.
Let's consider a scenario where we are working with a data stream from multiple sources, such as sensors and a server, and we need to manage errors and delays:
import Combine
import Foundation
enum SensorError: Error {
case sensorFailure
}
let sensorDataPublisher = PassthroughSubject<Int, SensorError>()
let serverDataPublisher = PassthroughSubject<Int, URLError>()
let combinedPublisher = Publishers.CombineLatest(sensorDataPublisher, serverDataPublisher)
.retry(2) // Пытаемся заново в случае ошибки
.debounce(for: .milliseconds(300), scheduler: RunLoop.main) // Избегаем частой обработки данных
.catch { _ in Just((0, 0)) } // В случае ошибки возвращаем безопасное значение
.sink { sensorData, serverData in
print("Sensor: \(sensorData), Server: \(serverData)")
}
sensorDataPublisher.send(100)
serverDataPublisher.send(200)
// Эмулируем ошибку сенсора
sensorDataPublisher.send(completion: .failure(.sensorFailure))
serverDataPublisher.send(300)
We use CombineLatest
to combine data from sensors and the server, we handle errors using retry
we avoid frequent data processing using debounce
and finally, we handle errors safely with catch
.
You can learn more about the Combine library read here.
Interested in data preparation in Pandas? Come to the open lesson on August 19, which will be held as part of the Machine Learning specialization.
As a result of the webinar, you will learn about data preparation methods and learn how to clean data using the Pandas library. Sign up for the lesson using the link.