I'm working in a application than convert one list of objects in other objects, for this, I use Observable of Rx Java.
I have a two methods, one is a adapter (Jersey facade) that have an asyn service and this method consume from another service one Observable.
I need consume a Observable and process each item when this es complete I nedd create one list of all item processed and return a new Observable where E is a list of each item processed.
To process each item I use a flatMap operator, but I dont know how to create a new Observer that have one type diferente by example a List of all each processed by the flatmap operator.
Any Idea?
Thanks
Update:
This code, process the each Element and have return the other Observable, but I dont know if this is well done.
@Override
public Observable<ArrayList> getGeoJson2() {
WKTReader2 reader2 = new WKTReader2();
WKBReader wkbReader = new WKBReader();
ArrayList featureCollection = new ArrayList();
Subject<ArrayList,ArrayList> subject = PublishSubject.create();
manzanaRepository.getManzanas().map(new Func1<Manzana, SimpleFeature>() {
@Override
public SimpleFeature call(Manzana manzana) {
try {
SimpleFeatureType TYPE = DataUtilities.createType("", "geom,name:String");
return SimpleFeatureBuilder.build( TYPE, new Object[]{ null, "name1"}, null);
}catch (Exception e){
System.out.println(e.getMessage());
return null;
}
}
}).subscribe(new Subscriber<SimpleFeature>() {
@Override
public void onCompleted() {
subject.onNext(featureCollection);
subject.onCompleted();
}
@Override
public void onError(Throwable throwable) {
subject.onError(throwable);
}
@Override
public void onNext(SimpleFeature simpleFeature) {
featureCollection.add(simpleFeature);
}
});
return subject;
}
And this code is the one who uses the Observable returnee:
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/async/geom")
public void asyncGetGeom(@Suspended final AsyncResponse asyncResponse) {
Observable<ArrayList> features = service.getGeoJson2();
features.subscribe(new Observer<ArrayList>() {
@Override
public void onCompleted() {
System.out.println("Se completo la accion!!!");
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable.getMessage());
}
@Override
public void onNext(ArrayList features) {
asyncResponse.resume(features);
}
});
}
The method onNext() is never Called!!!
Thanks
manzanaRepository.getManzanas()
– ehehhh