Subject objects in RxDart and how they are useful to a Flutter developer

Dart in conjunction with the package Async has good functionality in terms of working with streams. However, there are still restrictions. To make streaming more convenient, a package is used RxDart.

ReactiveX (Rx) appeared in 2010 for .NET, and has since been ported to almost all modern programming languages ​​and become a standard. The version for Dart was published in 2015, and at the moment it is one of the Flutter Favorite packages – it is supported by the community as much as possible.

My name is Vitaly, I am Flutter Team Lead at Surf, and this short article will be the first in a series of publications on the topic of RxDart.

Abstracts

  • Subject – an object to which you can subscribe and listen to the values ​​​​passed to it, analogue StreamController in Dart.

  • PublishSubjectSubjectwhich is analogous to the standard broadcast controller StreamController.broadcast().

  • ReplaySubjectSubjectwhich stores all previously transmitted values ​​and, when subscribed, returns all past values ​​at once.

  • BehaviorSubjectSubjectwhich stores in itself last value sentand when you subscribe to this Subject, immediately returns this value to the listener. Can only be initialized with an initial value.

What does Dart offer out of the box?

Out of the box, Dart provides a class for working with Stream StreamControllerwhich allows you to manage streams.

There are two types of subscription:

– single subscription — there can be only one listener, who is guaranteed to receive all messages received after subscribing to the stream;

– broadcast – there can be many listeners, but they will also receive messages that are included in the stream after subscribing.

For StreamController.broadcast() You can draw an analogy with the radio – information is delivered to active listeners “live”, and if you connect to it later than others, then there is no way to listen to what you missed.

import 'dart:async';

void _firstListener(int value) => print('first: $value');
void _secondListener(int value) => print('second: $value');

void main() {
  // создаем контроллер
  final streamController = StreamController<int>.broadcast();
  
  // добавляем первый слушатель, перед добавлением значения
  streamController.stream.listen(_firstListener);
  
  // добавляем значение
  streamController.add(1);
  // _firstListener выведет 'first: 1'
  
  // добавляем второй слушатель
  streamController.stream.listen(_secondListener);
  // ничего не выведется
}

What does RxDart offer?

Package adds three dedicated controllers for working with Stream – varieties Subject:

  • PublishSubject

  • ReplaySubject

  • BehaviorSubject

Let’s look at them separately.

PublishSubject

PublishSubject — broadcast (“broadcast” or “hot”) controlleranalogue of a standard broadcast controller StreamController.broadcast(), which was written about above. We won’t stop here.

ReplaySubject

ReplaySubjectAlso broadcast controller, which is worth using if the listener needs to pass all past passed events. For new listeners, he “plays” all past events starting from the first.

Analogue is a live broadcast that can be rewinded to earlier moments.

import 'package:rxdart/subjects.dart';

// ... _firstListener и _secondListener

void main() {
  // создаём replay subject
  final replaySubject = ReplaySubject<int>();
  
  replaySubject..add(1)..add(2)..add(3)..add(4);
  
  // добавляем первый слушатель, перед добавлением значения
  replaySubject.stream.listen(_firstListener);
  // _firstListener выведет:
  // first: 1 
  // first: 2 
  // first: 3 
  // first: 4
  
  // добавляем второй слушатель
  replaySubject.stream.listen(_secondListener);
  // _secondListener выведет:
  // second: 1 
  // second: 2 
  // second: 3 
  // second: 4
}

BehaviorSubject

BehaviorSubject – broadcast controllerwhich stores in itself last thing value or errorand when subscribing to this stream, immediately returns to the listener the last event sent to the controller.

Analogue – live broadcast without the ability to rewind. When you connect, you start watching from the very last frame.

import 'package:rxdart/subjects.dart';

// ... _firstListener и _secondListener

void main() {
  // создаём behavior subject
  final behaviorSubject = BehaviorSubject<int>();
  
  // добавляем первый слушатель, перед добавлением значения
  behaviorSubject.stream.listen(_firstListener);
  
  // добавляем значение
  behaviorSubject.add(1);
  // _firstListener выведет 'first: 1'
  
  // добавляем второй слушатель
  behaviorSubject.stream.listen(_secondListener);
  // _secondListener выведет 'second: 1'
}

Optionally, you can pass the initial value to the listener when subscribing – using the constructor BehaviorSubject<T>.seededfor Rx this is a more “native” way of declaring BehaviorSubject.

import 'package:rxdart/subjects.dart';

// ... _firstListener и _secondListener

void main() {
  // создаём behavior subject
  final behaviorSubject = BehaviorSubject<int>.seeded(1);
  
  // добавляем первый слушатель, перед добавлением значения
  behaviorSubject.stream.listen(_firstListener);
  // _firstListener выведет 'first: 1'
  
  // добавляем второй слушатель
  behaviorSubject.stream.listen(_secondListener);
  // _secondListener выведет 'second: 1'
}

BehaviorSubject<T>.seeded can be used when the original meaning needs to be conveyed into the stream and listeners need to “react” to it.

For example, the state of a shopping cart with goods is stored in BehaviorSubject, and on the cart screen link the output of its contents directly to the state in the conditional class CartService.

import 'package:rxdart/subjects.dart';

class Product {
  final String title;

  const Product(this.title);
}

class CartState {
  final List<Product> products;

  const CartState({required this.products});
	
  factory CartState.empty() => const CartState(products: []);
}

class CartService {
  final _cartState = BehaviorSubject<CartState>.seeded(CartState.empty());

  Stream<CartState> get cartStateStreamed => _cartState.stream;

  void addProduct(Product product) {
    _cartState.add(
      CartState(
        products: [
          ..._cartState.value.products,
          product,
        ],
      ),
    );
  }
}

// где-то в приложении объявляем сервис для работы с корзиной
final service = CartService();

// подписываемся на состояние корзины, для обновления счётчика товаров, 
// например в BottomAppBar, он будет обновляться при изменении состояния корзины
service.cartStateStreamed.listen((cartState) {
  print('Число товаров: ${cartState.products.length}');
});

// добавляем товары
service..addProduct(const Product("Капуста"))..addProduct(const Product("Картошка"));

// чуть позже на экране содержимого корзины подписываемся на состояние и выводим названия товаров
service.cartStateStreamed.listen((cartState) {
  print('Названия товаров: ${cartState.products.map((p) => p.title).join(',')}');
});

In some cases BehaviorSubject can replace ValueNotifierwhich does not notify listeners of its latest value when subscribing. BehaviorSubject allows you to similarly access the last stream value via a getter BehaviorSubject.value.

Conclusion

If you want to use the RxDart package in your projects and make them more efficient, do not forget about documentation.

It is also worth checking out documentation for RxJS — a package for JavaScript, also relevant for RxDart, making allowance for a language different from the Flutter stack. This package has a cool visualization of Rx principles, since Rx packages follow a common contract for all methods and classes.

We don’t say goodbye to this – we will continue to write on the topic and look for opportunities to improve the efficiency of your work.

More useful information about Flutter can be found in the Surf Flutter Team Telegram channel. Cases, best practices, news and vacancies for the Flutter Surf team in one place. Join us.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *