0
votes

I'm struggling to understand rxJs observer/observable.

I have an Angular application, where an Injectable service creates an observable:

chat.service.ts:

public chatObserver: Observer<any>;
public chatObservable: Observable<any>;
constructor() {
  this.chatObservable = Observable.create((observer: Observer<any>) => {
  this.chatObserver = observer;
  });
}

When a message is received in the chat.service.ts, the message is (supposedly) multicasted:

this.chatObserver.next(chatMessage);

Two components are subscribing to the Observable this way:

constructor(private _chatService: ChatService) {}

ngOnInit() {
    this._chatService.chatObservable.subscribe(
      (message) => {
        console.log('New chat message received!', message)
        // Do something with message
      }
    );

Problem: Only one (the last one to subscribe) receives the message.

Question: How can I configure the Observable to multicast to all subscribers?

2
Observables are already multicast. Typically I've used Subject.asObservable` to create the observable from a Subject, which has the next method.jonrsharpe
@jonrsharpe Which Observables? Observable and Subject? Observable is unicast and a Subject is multicast. You are wrong when you saying Observables are already multicast. Subject is both a producer and a data consumer.Serkan Sipahi
@Bitcollage it seems with a bit more reading observables can be unicast or multicast; creating them from a Subject definitely gives multicast observables, because I've used that pattern a lot before. In any case, a bit of searching would have solved the problem, e.g. stackoverflow.com/questions/39627396/…jonrsharpe
Is the Observable (in my example) multicast or unicast? Can you please provide an example of how to achieve multicast functionality?Vingtoft
@Vingtoft Observables are unicast by design and Subject is multicast. Multicasting basically means that one Observable execution is shared among multiple subscribers.Serkan Sipahi

2 Answers

1
votes

Here is an example how to use a multicast observer:

// chat.service.ts:

public chatObserver: Subject<any>;
constructor() {
  this.chatObserver = new Subject();
}

emit chatMessage (Subject has by design the next, error and complete methods):

this.chatObserver.next(chatMessage);

Subscribing in the components:

constructor(private _chatService: ChatService) {}

ngOnInit() {

  this._chatService.chatObserver.subscribe(
    (message) => {
      console.log('New chat message received!', message)
      // Do something with message
    }
  );

Note: But when some of the components are subscribing to the chatObserver after its emits a chatMessage then i recommend to use the BehaviorSubject instead of Subject (Just replace the Subject with BehaviorSubject).

The BehaviorSubject has the characteristic that it stores the “current” value. This means that you can always directly get the last emitted value from the BehaviorSubject (source/quote: https://medium.com/@luukgruijs/understanding-rxjs-behaviorsubject-replaysubject-and-asyncsubject-8cc061f1cfc0)

1
votes

A simple refresher. Use BehaviourSubject if you want to receive some initial / current values upon subscribing( manually in services/pipe etc, or via async pipe in html templates. Here is a basic proposal for what you are trying to achieve. Of course there is a lot more to be done considering production. Regards.

chat.service.ts:

BackEndDB : ChatMessage[] = <Obtain data for chatHistory from your DB>();

If you use any BackEnd storage for your chat history, initialize BackEndDB via that source , if not let messages$ to be new BehaviorSubject<ChatMessage[]>(null);. This will not persist your chat history if you reload the page.

messages$ = new BehaviorSubject<ChatMessage[]> // (this.BackEndDB OR null);
sendMessage(message: ChatMessage): void {
  this.BackEndDB.push(message);
  this.messages$.next(this.BackEndDB);
}
getMessages(): Observable<ChatMessage[]> {
  return this.messages$.asObservable();
}

and then in your component:

import {ChatService} from 'yourPath/chat.service'; 

@Component({ 
  selector: 'app-chat',
  templateUrl: ['./chat.component.html'],
  styleUrls: ['./chat.component.css'],
})
export class ChatComponent {
  inbox$: Observable<ChatMessage[]>;
  constructor(private chatService: ChatService) {
    this.inbox$ = this.chatService.getMessages(); 
  } 
  sendMessage(username, msg) {
    let message: ChatMessage = { 
      msg: msg, 
      sender: username 
    }
    this.chatService.sendMessage(message);
  }
} 

and then chat.component.html:

<label> Your Name:  </label>
<input type="text" #userName>
<label> Your Message:  </label>
<input type="text" #userMessage>  
<button type="button" (click) = "sendMessage(userName.value,userMessage.value)">Send</button>
<div *ngIf="inbox$ | async; let messages ">
  <div *ngFor= "let message of messages ">
    <p> {{ message.sender  }} </p>
    <p> said: </p>
    <p> {{ message.msg }} </p>
  </div>
</div>

You can create the ChatMessage Interface in a shared folder, if you are going to reuse it, or directly in your component in case it is only used there:

interface ChatMessage {
  msg: string;
  sender: string;
}

The only reason we push the new messages to the BackEndDB variable and then call next on it, is because this is a simple demo. In reality you would use FirestoreCollectionDocument.valueChanges() or any other realtime data service.