3
votes

I am appreciating all the complexity that RxJava can handle, everything from handling user events to large elaborate reactive algorithmic structures. However I still am struggling with the inception of the source Observable and where all events originate, and how to manage multiple event origins.

I know the answer to this question is probably "it depends", but is it a bad idea to consolidate source events to a single Subject or Hot Observable of some form?

For example, lets say I have a data-driven desktop application. I have an enumerable identifying the root-level event types.

public enum AppEvent { 
     TRANSACTIONS_REFRESH,
     CATEGORIES_REFRESH
}

Then I have an AppEventEngine singleton that houses a PublishSubjectand provides a means to post events and access the Observable<AppEvent>.

public final class AppEventEngine {

    private static final AppEventEngine instance = new AppEventEngine();

    private final PublishSubject<AppEvent> eventBus = PublishSubject.create();

    public void post(AppEvent appEvent) { 
        eventBus.onNext(appEvent);
    }
    public Observable<AppEvent> getEvents(AppEvent appEvent) { 
        return eventBus.startWith(appEvent).filter(e -> e.equals(appEvent));
    }

    public static AppEventEngine get() { 
        return instance;
    }
    private AppEventEngine() {}
}

I can then emit the AppEvent.CATEGORIES_REFRESH to the CategoryManager and use RxJava-JDBC to construct and emit a new List<Category> derived from a query.

public final class CategoryManager {

    private final Database db = //instantiate database;
    private final Observable<List<Category>> categories;

    private CategoryManager() { 
        categories = AppEventEngine.get().getEvents(AppEvent.CATEGORIES_REFRESH)
                .flatMap(e -> db.select("SELECT * FROM CATEGORY").get(rs -> new Category(rs.getInt("ID"), rs.getString("DESC"))).toList()).cache(1);
    }

    public Observable<List<Category>> getCategories() { 
        return categories;
    }
}

Then external client classes can not only transform and subscribe that Observable<List<Category>> but also push the AppEvent.CATEGORIES_REFRESH event at any time.

public final class SomeClientUX { 
    //gui code 

    public void categoryRefreshButtonPressed() {
        AppEventEngine.get().post(AppEvent.CATEGORIES_REFRESH);
    }
}

My question is this... is following this EventBus pattern (much like that in Google Guava) a kosher way to write an RxJava desktop application? Or should I be avoiding the use of a PublishSubject and deriving the events differently (in a decentralized way)?

1

1 Answers

3
votes

I think this is a valid pattern and is a common use case for PublishSubject. One bit of advice though is to serialize emissions from PublishSubject if you expect events to be pushed to it from different threads (unless there is a formal happens-before relationship between the pushed events, but that is an optimisation you probably don't need). So the declaration should be:

private final Subject<AppEvent> eventBus =
    PublishSubject.create().toSerialized();