Promises, Observables, Subjects, BehaviorSubjects & ReplaySubjects.

GM Fuster
Nerd For Tech
Published in
7 min readFeb 1, 2023

--

All right, all those are similar but not the same. This is my attempt at clarifying this :-).

Promises (async)

We can do something that returns a promise, or we can create and use our own promises.

For example, using fetch is going to return a promise.

If I have code creating a promise, and I don’t have anybody listening to the promise, the promise is still going to happen:

let myPromise = new Promise ( function(resolve, reject)
{
setTimeout(
() => {
console.log("Resolving");
resolve("RESOLVED!!!");
}, 3000)
});
//Resolving

To consume the promise I need to do something like this:

let consumePromise = () => {
myPromise.then(
(resolve) => console.log(resolve) );
}
consumePromise();

If we want to also do something in case the promise is rejected (instead of resolved).

let myPromise = new Promise ( function(resolve, reject)
{
setTimeout(
() => {
console.log("Rejecting"); //or reject( new Error("Rejected!!!"));
reject("Rejected");
}, 2000)
});

let consumePromise = () => {
myPromise.then(
(resolve) => console.log(resolve),
(reject) => console.log("hey")
)
}

consumePromise();
//rejecting
//hey

Once a promise is happening, it cannot be cancelled. We can have multiple functions listening to the same promise. They will be handled in the order in which they were attached. They all will receive the same data. Only a single future value can come from a promise.

let myPromise = new Promise ( function(resolve, reject)
{
setTimeout(
() => {
resolve("YAY");
}, 3000)
});

let consumePromise = () => {
myPromise.then(
(resolve) => console.log("consumePromise"),
)
}
let consumePromise2 = () => {
myPromise.then(
(resolve) => console.log("consumePromise2"),
)
}

consumePromise();
consumePromise2();

Just adding this other code example to clarify something. If we attach another consumer after the promise has run, it will not run again, but it will retain the value it got after it was resolved, so the new consumer will still get that value.

let myPromise = new Promise ( function(resolve, reject)
{
setTimeout(
() => {
console.log("In the promise code");
resolve("YAY");
}, 1000)
});

let consumePromise = () => {
myPromise.then(
(resolve) => console.log(resolve),
)
};
let consumePromise2 = () => {
myPromise.then(
(resolve) => console.log(resolve),
)
};
consumePromise();

setTimeout(
()=>{
consumePromise2();
}, 3000);

//In the promise code
//YAY
//YAY

If something happens before we can resolve or reject, we are going to get an error. Look at this code. Notice the promise is going to throw an error BEFORE the setTimeout. Notice the reject in the consumer is commented out. And notice the catch.

let myPromise = new Promise ( function(resolve, reject)
{
throw new Error('oops!');//before setTimeout
setTimeout(
() => {
resolve("YAY");
}, 2000)
});

let consumePromise = () => {
myPromise.then(
(resolve) => console.log("consumePromise"),
//(reject) => console.log("Rejected")
).catch((e) => {
console.log("OH NO");
console.error(e.message);
})
}

consumePromise();

//OH NO
//oops!

But notice what happens if the error is thrown INSIDE the setTimeout. It is not going to go to the catch.

let myPromise = new Promise ( function(resolve, reject)
{
setTimeout(
() => {
throw new Error('oops!');
resolve("YAY");
}, 2000)
});

let consumePromise = () => {
myPromise.then(
(resolve) => console.log("consumePromise"),
//(reject) => console.log("Rejected")
).catch((e) => {
console.log("OH NO");
console.error(e.message);
})
}


consumePromise();
///home/runner/vsw682fyeno/index.js:5
// throw new Error('oops!');
// ^

//Error: oops!
// at Timeout._onTimeout (/home/runner/vsw682fyeno/index.js:5:17)
// at listOnTimeout (internal/timers.js:554:17)
//repl process died unexpectedly: exit status 1

Now I’m going to put the throw error back BEFORE the setTimeout, and I’m uncommenting the (reject)

let myPromise = new Promise ( function(resolve, reject)
{
throw new Error('oops!');
setTimeout(
() => {
resolve("YAY");
}, 2000)
});

let consumePromise = () => {
myPromise.then(
(resolve) => console.log("consumePromise"),
(reject) => console.log("Rejected")
).catch((e) => {
console.log("OH NO");
console.error(e.message);
})
}

consumePromise();

//Rejected

Errors happening after the resolve (fulfilled) will be ignored.

I have been using “function” with the promise, but you don’t have to (example below). then itself returns a promise, so we can chain promises like this.

const myPromise = new Promise((resolve, reject) => {
setTimeout(() => {
resolve("hey");
}, 2000);
});

myPromise.then(
(resolve) => {
console.log("1") ;
console.log(resolve);
return resolve; //send it to the next one
})
.then((resolve1) => {
console.log("2"); console.log(resolve1)})
.then((resolve3) => console.log(resolve3)); //2 didn't move if fwd so there is nothing here

//1
//hey
//2
//hey
//undefined

We could have the then this way, and just call some other functions somewhere else in the code. then(handleFulfilled1, handleRejected1);

The static methods of Promise are not part of this article, but just making you aware they exist.

Just one more thing, mentioning that promises also have a finally:

const myPromise = new Promise((resolve, reject) => {
setTimeout(() => {
resolve("hey");
}, 2000);
});

myPromise.then(
(resolve) => {
console.log(resolve);
}).finally( ()=> console.log("finally"));

//hey
//finally

Observables

They are similar to Promises but, they can send multiple values over time. They will not happen if nobody is listening (subscribed). They can be cancelled (by unsubscribing) or they can be stopped in the observable code itself. You can use operators on them.

You can use observables in an Angular app by installing npm install — save rxjs@6(or whatever version).

It’s important to unsubscribe from observables that we don’t need anymore.

Observables are usually used for async code, but they don’t have to be async. This would be an example of one:

import {Observable} from "rxjs";

...

const myObservable = new Observable( observer =>{
let value = 0;
setInterval ( () =>
{
observer.next(value); //this is what sends a new value
value = value + 10;
if(value > 50){
observer.complete(); //finish sending values
}
} ,1000 );
}
);

//one way to subscribe to it would be (being deprecated):
myObservable.subscribe(
val=> console.log(val), //for a value returned
error => console.log("problem"), //if something happens
() => console.log("Done")); //once it is done

//The new way
myObservable.subscribe( {
next: val => console.log("Second:" + val),
error: error => console.log(error),
complete: () => console.log("Completed")
} );

Optionally inside the observable code, we can clean up when the subscribers unsubscribe

          return {
unsubscribe() { //clean up
value=0;
}
};
}

With the above code both subscribers are going to get the values independently. Sometimes, instead of starting an independent execution for each subscriber, you want each subscription to get the same values, even if values have already started emitting. In this case you need multicasting (see Subject section later on).

rxjs provides some functions to create observables from other objects. You can create an observable with those functions, and subscribe like you do any observable.

//Create an Observable out of a promise
import { from, Observable } from 'rxjs';
const data = from(fetch('http://....'));

//create one as an interval, like our manual one above
import { interval } from 'rxjs';
const secondsCounter = interval(1000);

//from an event
import { fromEvent } from 'rxjs';
const el = document.getElementById('my-element')!;
const mouseMoves = fromEvent<MouseEvent>(el, 'mousemove');

//create just a set sequence
import { of } from 'rxjs';
const nums = of(1, 2, 3); //observable will send values 1, 2, 3, then done

As mentioned previously, observables let you use operators on them. One example would be:

import { of, map, filter } from 'rxjs';

ngOnInit(){
const nums = of(1, 2, 3, 4, 5).pipe(
filter( i=> i>3),
map(n => n * n)
);

nums.subscribe({
next: val => console.log(val),
error: error => console.log(error),
complete: () => console.log("Completed")
});

}
//16
//25
//Completed

One more thing you can have inside the observable code, a catchError and a retry in case something happens. Retry will try again however many times you indicate, and catchError can send a default value.

retry(3), // Retry up to 3 times before failing
catchError(() => of([]))

Note: There is no standard on it, but it’s good practice to name the observable variables ending with $.

Subjects

An RxJS Subject is a special type of Observable that allows values to be multicasted to many Observers. While plain Observables are unicast (each subscribed Observer owns an independent execution of the Observable), Subjects are multicast.

Note: even though your components can access the Subject itself, is better to let them access an observable from it (this.subject.asObservable();) Just to illustrate (the second subscriber will start wherever the value of the observable is):

import { Subject } from 'rxjs';

var subject = new Subject<number>();

subject.subscribe((data) => {
console.log("Subscriber 1 >>>>> "+ data);
});

for (var i=0; i<10; i++){
subject.next(i);
if (i==4){
subject.subscribe((data) => {
console.log("Subscriber 2 ----"+ data);
});
}
}

//Subscriber 1 >>>>> 0
//Subscriber 1 >>>>> 1
//Subscriber 1 >>>>> 2
//Subscriber 1 >>>>> 3
//Subscriber 1 >>>>> 4
//Subscriber 1 >>>>> 5
//Subscriber 2 ---- 5
//Subscriber 1 >>>>> 6
//Subscriber 2 ---- 6
//Subscriber 1 >>>>> 7
//Subscriber 2 ---- 7
//Subscriber 1 >>>>> 8
//Subscriber 2 ---- 8
//Subscriber 1 >>>>> 9
//Subscriber 2 ---- 9

To use the asObservable instead, you would have the code similar to the above one with the differences:

var sub2 = subject.asObservable();

sub2.subscribe((data) => {
console.log("Subscriber 1 >>>>> "+ data);
});

subject.next(i); //still using subject

sub2.subscribe((data) => {
console.log("Subscriber 2 ---- "+ data);
});

BehaviorSubject

It is a Subject that can be initialized with a value. A subscriber listening from the beginning will get this init value. A subscriber that starts later will get the last value. Similar to the code above:

//now we have this, but the rest is the same.
var subject = new BehaviorSubject<number>(2);

//Subscriber 1 >>>>> 2 ----getting the init value
//Subscriber 1 >>>>> 0
//Subscriber 1 >>>>> 1
//Subscriber 1 >>>>> 2
//Subscriber 1 >>>>> 3
//Subscriber 1 >>>>> 4
//Subscriber 2 ---- 4 ---- getting the previous value as the init one.
//Subscriber 1 >>>>> 5
//Subscriber 2 ---- 5
//Subscriber 1 >>>>> 6
//Subscriber 2 ---- 6
//Subscriber 1 >>>>> 7
//Subscriber 2 ---- 7
//Subscriber 1 >>>>> 8
//Subscriber 2 ---- 8
//Subscriber 1 >>>>> 9
//Subscriber 2 ---- 9

ReplaySubjects

Similar to BehaviorSubjects but it can keep track of multiple previous values.

//keep the prev 2 values for new subscribers
var subject = new ReplaySubject<number>(2);

subject.subscribe((data) => {
console.log("Subscriber 1 >>>>> "+ data);
});

for (var i=0; i<10; i++){
subject.next(i);
if (i==4){
subject.subscribe((data) => {
console.log("Subscriber 2 ---- "+ data);
});
}
}

//Subscriber 1 >>>>> 0
//Subscriber 1 >>>>> 1
///...
//Subscriber 2 ---- 3 keeps the prev 2, which are 3 and 4.
//Subscriber 2 ---- 4
//Subscriber 1 >>>>> 5
//Subscriber 2 ---- 5
//...

This was a lot, but I’m hoping that having everything together makes it easier to understand.

--

--

GM Fuster
Nerd For Tech

Software Dev. Always learning. Some notes here.