Guide to RxJS Subjects: Concepts with its variants

Sanish Shakya
Bajra Technologies Blog
10 min readApr 17, 2024

--

Reactive Extensions for JavaScript (RxJS) is a powerful library for reactive programming in JavaScript. It introduces the concept of “Observables”, entities capable of emitting multiple values over time, facilitating a reactive and event-driven programming paradigm.

What is RxJS Subject?

RxJS Subjects are a key feature within this library. They act as both an Observable and an Observer, serving as a pivotal mechanism for managing data propagation in reactive applications. It is used to multicast values to multiple Observers. While regular Observables follow a one-to-one relationship between Observer and Subscriber (uni-cast), Subjects enable a one-to-many relationship. They act as both an Observable and an Observer. This means a Subject can receive values and propagate them to numerous registered Observers. Subjects provide methods.

Three Methods of RxJS Subject

  • next(value) — Emitting Values
    1. This method is utilized to emit or push new values through the Subject.
    2. Whenever the next(value) is invoked, it notifies all subscribed Observers about the arrival of a new value.
    3. It’s the primary mechanism to propagate new data through the Subject to its subscribers.
  • error(error) — Error Notifications
    1. This method is responsible for sending error notifications to the subscribed Observers.
    2. If an error occurs during data emission, invoking this method allows the Subject to notify its Observers about the encountered error.
    3. Once an error(error) is called, the Subject signals an error state, prompting the Observers to handle error scenarios.
  • complete() — Task Completion
    1. This method is a signal that indicates the Subject has finished its work.
    2. Invoking complete() marks the completion of the Subject’s operation.
    3. After complete() is called, further invocations of next() or error() won’t have any effect on the Subject, signaling the end of data emissions.

How the RxJS Subject Works?

Figure: Flow Diagram

A simple example to understand RxJS Subject

A school Premise with a number of classrooms and announcements made by the dean or principal.

Loudspeaker (Similar to RxJS Subject)

  • Broadcast announcements to different areas.
  • When an announcement is made, all classrooms connected to the loudspeaker instantly hear it.

Classrooms (Similar to Observers)

  • Each room is connected to the loudspeaker.
  • When an announcement is broadcast, every classroom student hears the message immediately.

This setup is of an RxJS Subject, where the loudspeaker (Subject) disseminates announcements to connected classrooms (Observers), ensuring everyone hears the message at the same time.

Component Interaction in Angular: Shared Services and RxJS Subjects

Angular offers various mechanisms for passing data between components. Among these, Shared Services and RxJS Subjects stand out as powerful tools for managing component interaction.

Shared Services in Angular
Shared Services work as a shared instance across multiple components and modules, providing a single source of truth for data exchange. Shared Services in Angular are classes decorated with @Injectable. They serve as a central means for:

  • Passing Data between Components
    Shared Services act as intermediaries, allowing disparate components to exchange data. A piece of data modified or updated by one component can be accessed or subscribed to by others.
  • Separating Processing Tasks
    They help in segregating certain functionalities, such as network calls, error handling, logging, etc., thereby preventing the cluttering of component logic.

RxJS Subjects for Component Interaction
RxJS Subjects play a pivotal role in maintaining a one-to-many relationship, efficiently propagating data changes across various parts of an Angular application. RxJS Subjects are a part of Angular’s reactive programming paradigm and offer a powerful solution for component communication. Subjects, being both Observables and Observers, enable a reactive, event-driven approach to data sharing.

  • Multi-casting Data
    Subjects function as multi-casting Observables. They multi-cast data to numerous Observers, allowing a single piece of data to be broadcast to multiple components.
  • Real-time Communication
    Components subscribing to a Subject instantly receive updates when new data is emitted, facilitating real-time communication.

Why are RxJS subjects used in Angular and what is their contribution to the overall architecture?

RxJS Subjects are pivotal in Angular due to their ability to facilitate communication and data sharing among different parts of an application. Acting as both an observable and an observer, it makes them versatile for broadcasting and subscribing to values.

In Angular, where components often need to communicate with each other, Subjects play a crucial role. Here’s how they contribute to the architecture:

  • Event Bus: Subjects can act as an event bus where multiple components can subscribe to changes or events emitted by a Subject. This enables decoupling between components, allowing them to communicate without direct references to each other.
  • Service Communication: Subjects are frequently used within services to manage state or share data between components. This facilitates a central place for data manipulation and ensures consistency across the application.
  • Reactive State Management: In combination with other RxJS operators and patterns like BehaviorSubjects or ReplaySubjects, subjects can form the foundation of reactive state management. They allow for the creation of observable-based state containers that components can subscribe to, ensuring updates are reflected dynamically.
  • Cross-Component Communication: Subjects make it easy to share data across multiple components, regardless of their hierarchical relationship. This is especially useful when components are not directly related or nested within each other.
  • Facilitation of Asynchronous Operations: Subjects excel in managing asynchronous operations. They allow you to handle data streams, async events, and responses from various sources efficiently.
  • Testing: Subjects facilitate testing by providing a clear and controlled way to mock and test asynchronous behavior and data streams.

Overall, subjects in RxJS contribute significantly to the architecture of Angular applications by promoting a reactive and event-driven paradigm. They simplify data flow, enhance communication between components, and streamline the handling of asynchronous operations, leading to more manageable and maintainable code bases.

Angular Code Representation on the RxJS Subject Workflow

//shared-data.service.ts:

import { Injectable } from '@angular/core';
import { Subject } from 'rxjs';

@Injectable({ providedIn: 'root' })
// Creating a private Subject to manage data changes
export class SharedDataService {
private dataSubject = new Subject<string>();

// Exposing an observable that emits data changes without exposing the subject directly
data$ = this.dataSubject.asObservable();

// Method to update the data by pushing new values to the Subject
updateData(newData: string) {
// Emitting the new data to all subscribers
this.dataSubject.next(newData);
}
}
//component-a.component.ts

import { Component, OnInit } from '@angular/core';
import { SharedDataService } from 'path/to/shared-data.service';

@Component({ /* Component Metadata */ })
export class ComponentA implements OnInit {
// Holds the shared data within ComponentA
sharedData: string;

constructor(private sharedService: SharedDataService) {}

ngOnInit() {
// Subscribing to changes in the shared data observable from the SharedDataService
this.sharedService.data$.subscribe(data => this.sharedData = data);
// When data changes in the service, it updates the sharedData property in ComponentA
}
}
//component-b.component.ts

import { Component, OnInit } from '@angular/core';
import { SharedDataService } from 'path/to/shared-data.service';

@Component({ /* Component Metadata */ })
export class ComponentB implements OnInit {
// Holds the shared data within ComponentB
sharedData: string;

constructor(private sharedService: SharedDataService) {}

ngOnInit() {
// Subscribing to changes in the shared data observable from the SharedDataService
this.sharedService.data$.subscribe(data => this.sharedData = data);
// When data changes in the service, it updates the sharedData property in ComponentB
}
}

Subject has three variants. Let’s look at them one by one:

BehaviorSubject

A BehaviorSubject in RxJS is a specialized type of Subject that stores the “current” value. It emits the last emitted value to new subscribers immediately upon subscription, or a default value if no value has been emitted yet. In summary, a BehaviorSubject keeps track of the “current” value and immediately emits it to new subscribers upon subscription. This makes it a helpful tool for managing the state and sharing the most recent data or status updates among different parts of an application.

Some of the key characteristics of BehaviorSubject

  • Initial Value: Upon creation, a BehaviorSubject requires an initial value. If no initial value is provided, it starts with the default value of undefined.
  • Retains the Current Value: Unlike a regular Subject, a BehaviorSubject holds and emits the “current” value. This value can be accessed via the value property.
  • Emitting Last Value: When a new Observer subscribes to a BehaviorSubject, it immediately receives the “current” value stored by the BehaviorSubject, if any, even if it subscribes after the value has been emitted.
  • Consistent Emission: Subsequent emissions function as usual for Observers, allowing them to receive updates after the initial emission of the “current” value.
  • Continual Observation: Observers can continuously access the most recent value emitted by the BehaviorSubject or observe future updates.

Use Cases of BehaviorSubject

  1. State Management: Convenient for managing state in applications by holding and broadcasting the current state across various components.
  2. Default Value Emission: Useful when a default value needs to be provided to new subscribers or when an initial state is required for subsequent operations.

Code Syntax of BehaviorSubject

import { BehaviorSubject } from 'rxjs';

// Create a BehaviorSubject with an initial value
const behaviorSubject = new BehaviorSubject<DataType>(initialValue);

// Accessing the current value
const currentValue = behaviorSubject.value;

// Subscribe to the BehaviorSubject for emitted values
behaviorSubject.subscribe({
next: (value: DataType) => {
// Handle the emitted value
},
error: (err: any) => {
// Handle errors, if any
},
complete: () => {
// Handle completion (if required)
}
});

// Emit a new value through the BehaviorSubject
behaviorSubject.next(newValue);

ReplaySubject

A ReplaySubject in RxJS is a variant of the Subject that retains and emits a specific number of previously emitted values to new subscribers upon subscription. It buffers a specified number of values and replays them immediately to new or all subscribers, if available. In summary, a ReplaySubject in RxJS keeps a buffer of previously emitted values and immediately replays these values to new subscribers upon subscription, making it suitable for scenarios where access to historical or buffered data is required.

Some key characteristics of ReplaySubject

  • Buffered Emissions: A ReplaySubject stores and maintains a buffer of previously emitted values.
  • Number of Buffered Values: Upon creation, a ReplaySubject requires the specification of the number of previously emitted values to be stored in the buffer.
  • Immediate Replay: When a new Observer subscribes to a ReplaySubject, it immediately receives the specified number of buffered values, if available, even if it subscribes after these values have been emitted.
  • Consistent Emission from Buffer: Subsequent emissions function as usual for Observers, allowing them to receive new updates as well as the values stored in the buffer.
  • Configurable Replay: The ReplaySubject allows for customization in terms of buffer size and time constraints to replay the stored values.

Use cases of ReplaySubject

  1. Caching and History: Useful for caching and providing historical data, especially when new subscribers need access to a specific number of previously emitted values.
  2. Replay of Events: Applications that require the replay of recent events for new subscribers or for auditing purposes can benefit from ReplaySubject.

Code Syntax of ReplaySubject

import { ReplaySubject } from 'rxjs';

// Create a ReplaySubject with a specified buffer size
const replaySubject = new ReplaySubject<DataType>(bufferSize);

// Subscribe to the ReplaySubject
replaySubject.subscribe({
next: (value: DataType) => {
// Handle the emitted value
},
error: (err: any) => {
// Handle errors
},
complete: () => {
// Handle completion (if required)
}
});

// Emit a new value
replaySubject.next(newValue);

// Complete the sequence (if needed)
replaySubject.complete();

AsyncSubject

An AsyncSubject in RxJS is a specialized form of Subject that only emits the last value when the sequence completes. It will only emit the final value to its observers when the source Observable completes, regardless of how many values were emitted before completion. In summary, an AsyncSubject in RxJS focuses on emitting only the last value from the source Observable when it completes, disregarding previous values. It is best suited for scenarios where the only required value is the final one after the sequence has concluded.

Some key characteristics of AsyncSubject

  • Emits Only Last Value: An AsyncSubject emits only the last value emitted by the source Observable when the sequence completes.
  • Emits Value on Completion: It only signals its observers once the source Observable completes, emitting the last value emitted before completion.
  • Ignores Previous Emitted Values: Ignores all values emitted before the completion of the source Observable and only emits the final value.
  • Observe Completion State: The final emission occurs only when the source Observable completes, and it won’t emit anything if the source sequence terminates with an error.

Use cases of AsyncSubject

  1. Value Emphasis: Helpful when you are only interested in the final result of a sequence and don’t require intermediary values.
  2. Operation Results: Useful for scenarios where you need the result of a sequence after it’s done processing or when a task is completed.

Code syntax of AsyncSubject

import { AsyncSubject } from 'rxjs';

// Create an AsyncSubject
const asyncSubject = new AsyncSubject<DataType>();

// Subscribe to the AsyncSubject
asyncSubject.subscribe({
next: (value: DataType) => {
// Handle the emitted value
},
error: (err: any) => {
// Handle errors
},
complete: () => {
// Handle completion (if required)
}
});

// Emit a new value
asyncSubject.next(newValue);

// Complete the sequence
asyncSubject.complete();

Conclusion

Understanding the characteristics and best use cases for each Subject type will enable developers to harness the power of RxJS effectively in handling data streams and managing state within applications.

RxJS Subjects guidance for Developers

RxJS provides various Subject types, each with unique behaviors for handling data streams.

  • Choose Wisely: Select the appropriate Subject type based on the desired behavior and use case.
  • Understand Behaviors: Familiarize yourself with each Subject’s characteristics to leverage their strengths effectively.
  • Application State Management: Utilize BehaviorSubject for managing and distributing application state across components.
  • Data Replay Requirements: When historical data access is needed, consider using ReplaySubject to provide access to previously emitted values.
  • Final Sequence Values: Employ AsyncSubject when only the last value after the sequence completes is required.

--

--