0
votes

I have a very strange behavior while working with Angular 9 and Rxjs Observables. I have a component and a service. The service has a public $messages Behavior subject, that I subscribe to in the component.

My problem is whenever I execute the next function on the behavior subject in the chat service, the async operator in the html seems to ignore all the operators in the pipe in the component, including the DelayWhen operator. Although the map operator follows the right behahvior, and executes after seconds.

What might be the cause of this behavior and how to fix it?

Chat Service

export class ChatService {
    messages: Message[];
    messages$: BehaviorSubject<Message[]>;

    constructor(){

            this.messages = [
                {
                    message: 'Hi How can I help you today?',
                    created_at: '2020-04-11 17:58:27',
                    agent_id: '1'
                  },
                  {
                    message: 'Whats the pricing model?',
                    created_at: '2020-04-11 13:58:27',
                    visitor_id: '043-reer-reer-reer-re-er'
                  }  
            ];

            this.messages$ = new BehaviorSubject<Message[]>(this.messages);

            this.cn.on("messageReceived", (message:Message) => {

                this.apiService.getMessage(message.id).subscribe(
                    res => {
                        this.messages.push(res.model);
                        this.flush_to_storage();  
                        this.messages$.next(this.messages); <-- I call the next here
                       ....
    }

Component

export class ChatComponent implements OnInit {

    messages$: Observable<Message[]> ;
    show_typing_indicator: boolean = false;

    ...

    ngOninit(){

        this.messages$ = this.chatService.messages$.pipe( 
            delayWhen( ms => { /* This is ignore altogether by the async in the html component */
                let m = ms[ms.length - 1];
                if (m.is_bot) {
                    this.show_typing_indicator = true;
                    setTimeout( () => this.show_typing_indicator = false, 3000 );
                    return interval(3000);
                }
                else return of(undefined);
            }),
            map( ms => {console.log(ms); return ms})
        );

}

Component Html

                    <div class="cl-message-block" *ngFor="let m of (messages$ | async)" [@fadeIn] [class.sent]="!m.agent_id">


                        <div class="cl-message" [ngClass]="{ 'cl-received': (m.agent_id), 'cl-sent': (!m.agent_id) }">
                            {{ m.message}}
                            <span>{{ m.created_at | messageTime }}</span>
                        </div>

                    </div>
                    <charla-typing-indicator *ngIf="show_typing_indicator"></charla-typing-indicator>
4
"Although the map operator follows the right behahvior, and executes after seconds" do you mean that you see the messages printed on the console by map( ms => {console.log(ms); return ms})?Picci
yes but after 3 secondsYehia A.Salam

4 Answers

1
votes

I have replicated ur code and I think it's working fine now

U have multiple issues with it

1-Saving state in this.messages in ChatService is bad idea bcs u are using BehaviorSubject then use BehaviorSubject.value, this.Messages$.value has ur last Message[].

2- Inplace editing is not good thing maybe bcs of it and changeDetectionStratgy.onpush u are having issue

this.messages.push(res.model);
this.messages$.next(this.messages);  

instead create a new array to add new message

 let newMessages = [...this.messages$.value, newMessage ]
 this.messages$.next(newMessages);

//miner one

3-when using WhenDelay in else condition use interval(0) instead of of(undefine) bcs first one make sense if u think about it

if (m.is_bot) {
                  // other code
                    return interval(3000);
                }
                else {
                  return interval(0);
                }
0
votes

chat service

messages = new BehaviorSubject<any>(null);
messagesListner = this.messages.asObservable();

constructor() {
    this.cn.on("messageReceived", (message: Message) => {

        this.apiService.getMessage(message.id).subscribe(
            res => {
                this.messages.push(res.model);
                this.flush_to_storage();
                this.messages.next(this.messages); <--I call the next here
                   ....
}

component

import { ChartService } from '';
import { Subscription } from 'rxjs';

constructor(
    private chatService: ChartService
) {}
messageSubscription: Subscription;
ngOninit(){

    this.messageSubscription = this.messagesListner.messages.pipe( 
        delayWhen( ms => { /* This is ignore altogether by the async in the html component */
            let m = ms[ms.length - 1];
            if (m.is_bot) {
                this.show_typing_indicator = true;
                setTimeout( () => this.show_typing_indicator = false, 3000 );
                return interval(3000);
            }
            else return of(undefined);
        }),
        map( ms => {console.log(ms); return ms})
    );

}
0
votes

BehaviourSubjects have an initial value, we don't see where you set it in your code, but typically that first value is initialised to null or undefined. You aren't doing any null checks in your delayWhen body. Wrap that in a

if (ms) {
}

and see if that changes your output.

You are initialising messages$ somewhere... aren't you?

messages$ = new BehaviorSubject<Message[]>(null);

The real lesson here is to always add error handling on your subscription so the error from your asynchronous source will be reported to you and you will instantly understand what the problem is.

e.g.

....subscribe({
  next: (result) => {
    if (result) {
      // Do the work
    }
  }
  error: (err) => {
    console.log(err);
  }
});

or a catchError if you are subscribing in your template

....pipe(
  // Assorted RxJS operators go here

  // either this:
  catchError((err) => {
    console.log(err);
    return throwError(err);  // re-throw it if you might want to also see the error downstream
  }

  // or this:
  catchError((err) => {
    console.log(err);
    return of<MyType>(undefined);  // return a clean (typed) observable if you don't want the error downstream
  }
});
0
votes

Maybe it helps when you use a ReplaySubject instead of a BehaviorSubject.
ReplaySubjects have no intial values and when you not have a requirement for initial values (or in other words: default values), then you should use them.

See this https://stackoverflow.com/a/57214877/6537157 for more details.