Subject objects in RxDart and how they are useful to a Flutter developer
Dart in conjunction with the package Async has good functionality in terms of working with streams. However, there are still restrictions. To make streaming more convenient, a package is used RxDart.
ReactiveX (Rx) appeared in 2010 for .NET, and has since been ported to almost all modern programming languages and become a standard. The version for Dart was published in 2015, and at the moment it is one of the Flutter Favorite packages – it is supported by the community as much as possible.
My name is Vitaly, I am Flutter Team Lead at Surf, and this short article will be the first in a series of publications on the topic of RxDart.
Abstracts
Subject
– an object to which you can subscribe and listen to the values passed to it, analogueStreamController
in Dart.PublishSubject
—Subject
which is analogous to the standard broadcast controllerStreamController.broadcast()
.ReplaySubject
—Subject
which stores all previously transmitted values and, when subscribed, returns all past values at once.BehaviorSubject
—Subject
which stores in itself last value sentand when you subscribe to thisSubject
, immediately returns this value to the listener. Can only be initialized with an initial value.
What does Dart offer out of the box?
Out of the box, Dart provides a class for working with Stream StreamController
which allows you to manage streams.
There are two types of subscription:
– single subscription — there can be only one listener, who is guaranteed to receive all messages received after subscribing to the stream;
– broadcast – there can be many listeners, but they will also receive messages that are included in the stream after subscribing.
For StreamController.broadcast()
You can draw an analogy with the radio – information is delivered to active listeners “live”, and if you connect to it later than others, then there is no way to listen to what you missed.
import 'dart:async';
void _firstListener(int value) => print('first: $value');
void _secondListener(int value) => print('second: $value');
void main() {
// создаем контроллер
final streamController = StreamController<int>.broadcast();
// добавляем первый слушатель, перед добавлением значения
streamController.stream.listen(_firstListener);
// добавляем значение
streamController.add(1);
// _firstListener выведет 'first: 1'
// добавляем второй слушатель
streamController.stream.listen(_secondListener);
// ничего не выведется
}
What does RxDart offer?
Package adds three dedicated controllers for working with Stream – varieties Subject
:
PublishSubject
ReplaySubject
BehaviorSubject
Let’s look at them separately.
PublishSubject
PublishSubject
— broadcast (“broadcast” or “hot”) controlleranalogue of a standard broadcast controller StreamController.broadcast()
, which was written about above. We won’t stop here.
ReplaySubject
ReplaySubject
— Also broadcast controller, which is worth using if the listener needs to pass all past passed events. For new listeners, he “plays” all past events starting from the first.
Analogue is a live broadcast that can be rewinded to earlier moments.
import 'package:rxdart/subjects.dart';
// ... _firstListener и _secondListener
void main() {
// создаём replay subject
final replaySubject = ReplaySubject<int>();
replaySubject..add(1)..add(2)..add(3)..add(4);
// добавляем первый слушатель, перед добавлением значения
replaySubject.stream.listen(_firstListener);
// _firstListener выведет:
// first: 1
// first: 2
// first: 3
// first: 4
// добавляем второй слушатель
replaySubject.stream.listen(_secondListener);
// _secondListener выведет:
// second: 1
// second: 2
// second: 3
// second: 4
}
BehaviorSubject
BehaviorSubject
– broadcast controllerwhich stores in itself last thing value or errorand when subscribing to this stream, immediately returns to the listener the last event sent to the controller.
Analogue – live broadcast without the ability to rewind. When you connect, you start watching from the very last frame.
import 'package:rxdart/subjects.dart';
// ... _firstListener и _secondListener
void main() {
// создаём behavior subject
final behaviorSubject = BehaviorSubject<int>();
// добавляем первый слушатель, перед добавлением значения
behaviorSubject.stream.listen(_firstListener);
// добавляем значение
behaviorSubject.add(1);
// _firstListener выведет 'first: 1'
// добавляем второй слушатель
behaviorSubject.stream.listen(_secondListener);
// _secondListener выведет 'second: 1'
}
Optionally, you can pass the initial value to the listener when subscribing – using the constructor BehaviorSubject<T>.seeded
for Rx this is a more “native” way of declaring BehaviorSubject
.
import 'package:rxdart/subjects.dart';
// ... _firstListener и _secondListener
void main() {
// создаём behavior subject
final behaviorSubject = BehaviorSubject<int>.seeded(1);
// добавляем первый слушатель, перед добавлением значения
behaviorSubject.stream.listen(_firstListener);
// _firstListener выведет 'first: 1'
// добавляем второй слушатель
behaviorSubject.stream.listen(_secondListener);
// _secondListener выведет 'second: 1'
}
BehaviorSubject<T>.seeded
can be used when the original meaning needs to be conveyed into the stream and listeners need to “react” to it.
For example, the state of a shopping cart with goods is stored in BehaviorSubject
, and on the cart screen link the output of its contents directly to the state in the conditional class CartService
.
import 'package:rxdart/subjects.dart';
class Product {
final String title;
const Product(this.title);
}
class CartState {
final List<Product> products;
const CartState({required this.products});
factory CartState.empty() => const CartState(products: []);
}
class CartService {
final _cartState = BehaviorSubject<CartState>.seeded(CartState.empty());
Stream<CartState> get cartStateStreamed => _cartState.stream;
void addProduct(Product product) {
_cartState.add(
CartState(
products: [
..._cartState.value.products,
product,
],
),
);
}
}
// где-то в приложении объявляем сервис для работы с корзиной
final service = CartService();
// подписываемся на состояние корзины, для обновления счётчика товаров,
// например в BottomAppBar, он будет обновляться при изменении состояния корзины
service.cartStateStreamed.listen((cartState) {
print('Число товаров: ${cartState.products.length}');
});
// добавляем товары
service..addProduct(const Product("Капуста"))..addProduct(const Product("Картошка"));
// чуть позже на экране содержимого корзины подписываемся на состояние и выводим названия товаров
service.cartStateStreamed.listen((cartState) {
print('Названия товаров: ${cartState.products.map((p) => p.title).join(',')}');
});
In some cases BehaviorSubject
can replace ValueNotifier
which does not notify listeners of its latest value when subscribing. BehaviorSubject
allows you to similarly access the last stream value via a getter BehaviorSubject.value
.
Conclusion
If you want to use the RxDart package in your projects and make them more efficient, do not forget about documentation.
It is also worth checking out documentation for RxJS — a package for JavaScript, also relevant for RxDart, making allowance for a language different from the Flutter stack. This package has a cool visualization of Rx principles, since Rx packages follow a common contract for all methods and classes.
We don’t say goodbye to this – we will continue to write on the topic and look for opportunities to improve the efficiency of your work.
More useful information about Flutter can be found in the Surf Flutter Team Telegram channel. Cases, best practices, news and vacancies for the Flutter Surf team in one place. Join us.