I am struggling to convert Node streams to Rxjs Observables.
The streaming by itself works great when I try 1 URL.But, when I try to map the same function over an array of URLS, I get errors.
I am using Rx.Node to convert the stream into an Observable.
This is what I'm currently trying
// data_array is an array of 10 urls that I'm scraping data from.
let parentStream = Rx.Observable.from(data_array);
parentStream.map(createStream).subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete'));
function createStream(url){
return RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: 'a@href'}]).write().pipe(JSONStream.parse('*')))
}
But this is the output X 10(the number of URLS in data_array)
RefCountObservable {
source:
ConnectableObservable {
source: AnonymousObservable { source: undefined, __subscribe: [Function] },
_connection: null,
_source: AnonymousObservable { source: [Object], __subscribe: [Function: subscribe] },
_subject:
Subject {
isDisposed: false,
isStopped: false,
observers: [],
hasError: false } },
_count: 0,
_connectableSubscription: null }
I first thought flatMap would work because it's flattening observables in an observable....but when I try flatMap, I get this:
Complete
Error TypeError: unknown type returned
However, if I do this:
This works for 1 URL, but I can't capture all of the urls in the data_array in one stream.
let stream = RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: 'a@href'}]).write().pipe(JSONStream.parse('*')))
stream.subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete'))
I feel like I'm misunderstanding something not only because it clearing isn't working for multiple URLS, but even when it does work in the second example....I get 'Complete' first before all data comes in.
Clearly, I'm misunderstanding something. Any help would be wonderful. Thanks.
*UPDATE*
I tried a different path, which works, but does not use Node Stream. Node streams would be ideal, so still would like to make the above example work.
The approach I used next was to wrap a promise around my web scraping function, which is scrape below. This works, but the result is ten huge arrays with all the data from each URL in each array. What I really want is a stream of objects that I can compose a series of transformations as the data objects pass through.
Here is different, but working approach:
let parentStream = Rx.Observable.from(data_array);
parentStream.map(url => {
return Rx.Observable.defer(() => {
return scrape(url, '#centercol ul li', [{name: 'a', link: 'a@href'}]);
})
})
.concatAll()
.subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete'));
function scrape(url, selector, scope) {
return new Promise(
(resolve, reject) => x(
url,
selector,
scope
)((error, result) => error != null ? reject(error) : resolve(result))
);
}