0
votes

I have a saveForm() function which is expected to perform below operations in order:

  1. Take form data and add it to FireStore collection as a document.
  2. On success, loop through(attachmentsFormArray) all the files the user has selected and upload each file to FireStorage.
  3. When all files are uploaded completely, assign the documentUrl of each file to the corresponding file map on the FireStore document we saved in step #1. Then make api call to actually save the updated firestore document.

Below is my saveForm() function:

saveForm() {
    let fixedDepositEntity = this.getEntityFromForm();
    this.fixedDepositsFirestoreCollection.add(fixedDepositEntity).then(documentRef => {
        if (this.attachmentsFormArray.controls.length !== 0) {
            this.attachmentsFormArray.controls.forEach(group => {

                let fileRef = this.fireStorage.ref(this.fixedDepositsStorageFolderPath + group.get('fileName').value);
                let uploadTask = fileRef.put(group.get('file').value);

                // observe percentage changes
                uploadTask.percentageChanges().subscribe(percent => {
                    group.get('percentComplete').setValue(Math.round(percent));
                });
                // get notified when the download URL is available
                uploadTask.snapshotChanges().pipe(
                    finalize(() => {
                        fileRef.getDownloadURL().subscribe(url => {
                            group.get('downloadUrl').setValue(url);
                        });
                    }))
                    .subscribe();
            });
        }
    });
}

Currently, the above code simply loops through the attachmentsFormArray and once the file is uploaded, finally it assigns the downloadUrl to the attachmentsFormArray.

When the user selects the multiple file, I have the below handleFileInput() event handler:

handleFileInput(files: FileList) {
    if (!files || files.length === 0) {
        return;
    }
    Array.from(files).forEach(file => {
        this.attachmentsFormArray.push(this.formBuilder.group({
            fileName: [file.name],
            fileSize: [file.size],
            label: [''],
            file: [file],
            downloadUrl: [''],
            percentComplete: [''],
            uploadTaskState: ['']

        }));
    });

The AngularFire library provides a snapshotChanges() method which returns Observable<UploadTaskSnapshot>. I would want to combine/merge all these Observables (so that know once all files are completely uploaded) and then subscribe the resultant Observable. But I am not sure how to associate the individual observable result with corresponding file object that the user selected (as described in #3).

I know we can achieve this behavior with RxJs operators, but not sure which one to use in my scenario. Any help is appreciated in advance.


EDIT 1: Implemented as per "Mrk Sef's" answer. It works fine most of the times. However, once in a while the downloadUrl is not set. I'm unable to understand the reason for this intermittent issue.

saveForm() {
    try {
        this.fixedDepositsFormGroup.disable();
        let fixedDepositEntity = this.getEntityFromForm();
        this.fixedDepositsFirestoreCollection
            .add(fixedDepositEntity)
            .then(documentRef => {
                this.isBusy = true;
                // Changes will be mapped to an array of Observable, once this mapping
                // is complete, we can subscribe and wait for them to finish
                console.log('Voila! Form Submitted.');
                if (this.attachmentsFormArray.controls.length !== 0) {
                    const changes = this.attachmentsFormArray.controls.map(
                        group => {
                            const fileRef = this.fireStorage.ref(this.fixedDepositsStorageFolderPath + group.get('fileName').value);
                            const uploadTask = fileRef.put(group.get('file').value);

                            const percentageChanges$ = uploadTask.percentageChanges().pipe(
                                tap(percent => group.get('percentComplete').setValue(Math.round(percent)))
                            );
                            const snapshotChanges$ = uploadTask.snapshotChanges().pipe(
                                finalize(() => fileRef.getDownloadURL().subscribe(url => group.get('downloadUrl').setValue(url)))
                            );
                            return [percentageChanges$, snapshotChanges$];
                        }
                    ).reduce((acc, val) => acc.concat(val), []);; // Turn our array of tuples into an array

                    // forkJoin doesn't emit until all source Observables complete
                    forkJoin(changes).subscribe(_ => {
                        // By now all files have been uploaded to FireStorage
                        // Now we update the attachments property in our fixed-deposit document
                        const attachmentValues = (this.getControlValue('attachments') as any[])
                            .map(item => <Attachment>{
                                fileName: item.fileName,
                                fileSize: item.fileSize,
                                label: item.label,
                                downloadUrl: item.downloadUrl
                            });
                        documentRef.update({ attachments: attachmentValues });
                        console.log("Files Uploaded Successfully and Document Updated !");
                    });
                }
            })
            .finally(() => {
                this.fixedDepositsFormGroup.enable();
                this.isBusy = false;
            });
    } finally {

    }
}

enter image description here

1
have you checked mergeMap and concatMap operator of RxJs?Bhavin Hirpara

1 Answers

1
votes

A common design you see when observable are generated by a third party is to tag it with some custom information that you know at call-time but may not know when you're subscribed.

For example, get the third word of each document whose title starts with 'M':

const documents: Document[] = getDocumentsService();

wordStreams: Observable<[Document, HttpResponse]>[] = documents
  .filter(file => file.title.charAt(0) === 'M')
  .map(file => getThirdWordService(file.id).pipe(
    map(serviceResponse => ([file, serviceResponse]))
  );

merge(...wordStreams).subscribe(([file, serviceResponse]) => {
  console.log(`The third word of ${file.title} is ${serviceResponse.value}`)
});

The big takeaway is that by mapping a value into a tuple or an object (The same pattern works objects, maps, ect) you can carry that information forward through the operations in a stream.

The only problem is that if you're not careful, you may end up with a stream that isn't purely functional (can cause side effects to your program state).


I'm not really sure what your example is doing, but here's my best guess at what you want:

saveForm() {
  let fixedDepositEntity = this.getEntityFromForm();
  this.fixedDepositsFirestoreCollection
    .add(fixedDepositEntity)
    .then(documentRef => {
      // Changes will be mapped to an array of Observable, once this mapping
      // is complete, we can subscribe and wait for them to finish
      const changes = this.attachmentsFormArray.controls.map(
        group => {
          const fileRef = this.fireStorage.ref(this.fixedDepositsStorageFolderPath + group.get('fileName').value);
          const uploadTask = fileRef.put(group.get('file').value);

          const percentageChanges$ = uploadTask.percentageChanges().pipe(
              tap(percent => group.get('percentComplete').setValue(Math.round(percent)))
          );
          const snapshotChanges$ = uploadTask.snapshotChanges().pipe(
            mergeMap(_ => fileRef.getDownloadURL()),
            tap(url => group.get('downloadUrl').setValue(url))
          );
          return [percentageChanges$, snapshotChanges$];
        }
      ).flat(); // Turn our array of tuples into an array

      // forkJoin doesn't emit until all source Observables complete
      forkJoin(changes).subscribe(_ => 
        console.log("All changes are complete")
      );
    });
}

If instead, you want to delay writing values out until your subscribe, here's another option that more clearly tags the observable stream with some added data that gets used later:

saveForm() {
  let fixedDepositEntity = this.getEntityFromForm();
  this.fixedDepositsFirestoreCollection
    .add(fixedDepositEntity)
    .then(documentRef => {
      // Changes will be mapped to an array of Observable, once this mapping
      // is complete, we can subscribe and wait for them to finish
      const changes = this.attachmentsFormArray.controls.map(
        group => {
          const fileRef = this.fireStorage.ref(this.fixedDepositsStorageFolderPath + group.get('fileName').value);
          const uploadTask = fileRef.put(group.get('file').value);

          const percentageChanges$ = uploadTask.percentageChanges().pipe(
              map(percent => ([group, percent]))
          );
          const snapshotChanges$ = uploadTask.snapshotChanges().pipe(
            mergeMap(_ => fileRef.getDownloadURL()),
            map(url => ([group, url]))
          );
          return [percentageChanges$, snapshotChanges$];
        }
      );

      const percentageChanges$ = changes.map(([a, b]) => a);
      const snapshotChanges$ = changes.map(([a, b]) => b);

      merge(...percentageChanges$).subscribe({
        next: ([group, percent]) => group.get('percentComplete').setValue(Math.round(percent)),
        complete: _ => console.log("All percentageChanges complete")
      });

      merge(...snapshotChanges$).subscribe({
        next: ([group, url]) => group.get('downloadUrl').setValue(url),
        complete: _ => console.log("All snapshotChanges complete")
      });
    });
}

It goes without saying that none of this is tested. It's my hope you can use what's described here to retool your solution to include files or whatever other information you find pertinent.


update

My solution created a stream called

const snapshotChanges$ = uploadTask.snapshotChanges().pipe(
  mergeMap(_ => fileRef.getDownloadURL()),
  tap(url => group.get('downloadUrl').setValue(url))
);

Which wasn't really what you were after, you wanted a stream that only starts after uploadTask.snapshotChanges() completes. Finalize is strange in that it operates on failure as well as completion, I'm sure there's an operator that can be configured to do that, but I don't know how.

My solution creates a custom operator (waitForEnd) that emits a boolean value when the source completes or errors and ignores all other elements from the source stream

const waitForEnd = () => 
  waitOn$ => new Observable(obsv => {
    const final = (bool) => {
      obsv.next(bool);
      obsv.complete();
    }
    waitOn$.subscribe({
      next: _ => {/*do nothing*/},
      complete: () => final(true),
      error: _ => final(false)
    });
    return {unsubscribe: () => waitOn$.unsubscribe()}
  });

let snapshotChanges$ = uploadTask.snapshotChanges().pipe(
  waitForEnd(),
  mergeMap(_ => fileRef.getDownloadURL()),
  tap(url => group.get('downloadUrl').setValue(url))
);

snapshotChanges$ will wait for uploadTask.snapshotChanges() to have ended, only then will it get the download URL and set the value before it completes as well.