2
votes

I'm working in a application than convert one list of objects in other objects, for this, I use Observable of Rx Java.

I have a two methods, one is a adapter (Jersey facade) that have an asyn service and this method consume from another service one Observable.

I need consume a Observable and process each item when this es complete I nedd create one list of all item processed and return a new Observable where E is a list of each item processed.

To process each item I use a flatMap operator, but I dont know how to create a new Observer that have one type diferente by example a List of all each processed by the flatmap operator.

Any Idea?

Thanks

Update:

This code, process the each Element and have return the other Observable, but I dont know if this is well done.

@Override
public Observable<ArrayList> getGeoJson2() {
    WKTReader2 reader2 = new WKTReader2();
    WKBReader wkbReader = new WKBReader();
    ArrayList featureCollection = new ArrayList();

    Subject<ArrayList,ArrayList> subject = PublishSubject.create();

    manzanaRepository.getManzanas().map(new Func1<Manzana, SimpleFeature>() {
        @Override
        public SimpleFeature call(Manzana manzana) {
            try {
                SimpleFeatureType TYPE = DataUtilities.createType("", "geom,name:String");
                return SimpleFeatureBuilder.build( TYPE, new Object[]{ null, "name1"}, null);
            }catch (Exception e){
                System.out.println(e.getMessage());
                return null;
            }

        }
    }).subscribe(new Subscriber<SimpleFeature>() {
        @Override
        public void onCompleted() {
            subject.onNext(featureCollection);
            subject.onCompleted();
        }

        @Override
        public void onError(Throwable throwable) {
            subject.onError(throwable);
        }

        @Override
        public void onNext(SimpleFeature simpleFeature) {
            featureCollection.add(simpleFeature);
        }
    });

    return subject;
}

And this code is the one who uses the Observable returnee:

@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/async/geom")
public void asyncGetGeom(@Suspended final AsyncResponse asyncResponse) {

    Observable<ArrayList> features = service.getGeoJson2();

    features.subscribe(new Observer<ArrayList>() {
        @Override
        public void onCompleted() {

            System.out.println("Se completo la accion!!!");
        }

        @Override
        public void onError(Throwable throwable) {

            System.out.println(throwable.getMessage());
        }

        @Override
        public void onNext(ArrayList features) {
            asyncResponse.resume(features);
        }
    });
}

The method onNext() is never Called!!!

Thanks

2
Please provide a short code example of what you've tried so far.ehehhh
Can you also show what this method returns? manzanaRepository.getManzanas()ehehhh
Observable<Manzana>Cristian Rinaldi

2 Answers

1
votes

Try replacing your getGeoJson2() with this:

@Override
public Observable<List<SimpleFeature>> getGeoJson2() {
    return manzanaRepository.getManzanas()
            .map(new Func1<Manzana, SimpleFeature>() {
                @Override
                public SimpleFeature call(Manzana manzana) {
                    try {
                        SimpleFeatureType TYPE = DataUtilities.createType("", "geom,name:String");
                        return SimpleFeatureBuilder.build( TYPE, new Object[]{ null, "name1"}, null));
                    } catch (Exception e) {
                        System.out.println(e.getMessage());
                        return null;
                    }
                }
            })
            .filter(new Func1<SimpleFeature, Boolean>() {
                @Override
                public Boolean call(SimpleFeature sf) {
                    return sf != null;
                }
            })
            .toList();
}

Explanation: toList() operator is used, which waits for the onCompleted from source observable and then emits all the items emitted by the source observable as a list.

0
votes

the problem with your code is that you are using PublishSubject which completes before you start using it

if you change it to BehaviorSubject, it will work

however, better (and much shorter) to implement it as follows -

manzanaRepository.getManzanas().map(/**/).collect(/*create array*/, /*add next item*/)