4
votes

I am starting to embrace reactive programming a bit more, and I'm trying to apply it to my typical business problems. One pattern I often design with is database-driven classes. I have some defined unit class like ActionProfile whose instances are managed by an ActionProfileManager, which creates the instances off a database table and stores them in a Map<Integer,ActionProfile> where Integer is the actionProfileId key. The ActionProfileManager may clear and re-import the data periodically, and notify all dependencies to re-pull from its map.

public final class ActionProfileManager {
    private volatile ImmutableMap<Integer,ActionProfile> actionProfiles;

    private ActionProfileManager() { 
        this.actionProfiles = importFromDb();
    }

    public void refresh() { 
        this.actionProfiles = importFromDb();
        notifyEventBus();
    }

    //called by clients on their construction or when notifyEventBus is called
    public ActionProfile forKey(int actionProfileId) { 
        return actionProfiles.get(actionProfiles);
    }

    private ImmutableMap<Integer,ActionProfile> importFromDb() { 
        return ImmutableMap.of(); //import data here
    }
    private void notifyEventBus() { 
        //notify event through EventBus here
    }
}

However, if I want this to be more reactive creating the map would kind of break the monad. One approach I could do is make the Map itself an Observable, and return a monad that looks up a specific key for the client. However the intermediate imperative operations may not be ideal, especially if I start using the rxjava-jdbc down the road. But the hashmap may help lookup performance significantly in intensive cases.

public final class ActionProfileManager {
    private final BehaviorSubject<ImmutableMap<Integer,ActionProfile>> actionProfiles;

    private ActionProfileManager() { 
        this.actionProfiles = BehaviorSubject.create(importFromDb());
    }

    public void refresh() { 
        actionProfiles.onNext(importFromDb());
    }

    public Observable<ActionProfile> forKey(int actionProfileId) { 
        return actionProfiles.map(m -> m.get(actionProfileId));
    } 
    private ImmutableMap<Integer,ActionProfile> importFromDb() { 
        return ImmutableMap.of(); //import data here
    }
}

Therefore, the most reactive approach to me seems to be just pushing everything from the database on each refresh through an Observable<ActionProfile> and filtering for the last matching ID for the client.

public final class ActionProfileManager {
    private final ReplaySubject<ActionProfile> actionProfiles;

    private ActionProfileManager() { 
        this.actionProfiles = ReplaySubject.create();
        importFromDb();
    }

    public void refresh() { 
        importFromDb();
    }

    public Observable<ActionProfile> forKey(int actionProfileId) { 
        return actionProfiles.filter(m -> m.getActionProfileID() == actionProfileId).last();
    } 
    private void importFromDb() { 
        // call onNext() on actionProfiles and pass each new ActionProfile coming from database
    }
}

Is this the optimal approach? What about old data causing memory leaks and not being GC'd? Is it more practical to maintain the map and make it observable?

What is the most optimal reactive approach above to data driven classes? Or is there a better way I have not discovered?

1
IMO you might want to consider using a Redis database for this - IMO you could improve both your mapping (using its key-value entries) and your observable (since Redis implements JMS)ControlAltDel
Been doing some other research, perhaps I shouldn't be using subjects but rather my own Observable implementations github.com/ReactiveX/RxJava/issues/1794tmn
Is the goal of your current approach to have a cache for any consumers requesting data? What is the trigger for invoking a refresh?benjchristensen
The trigger will probably be fired from UI events, likely from JavaFX controls. I may also periodically create a timer to refresh business parameters and rebuild the maps. I do want a cache of some sorts (I think) because new subscribers could latch on at any time in a UI environment. But I don't want stale data to be retained either.tmn
I ultimately shunned this use of Subject and instead opted to use Obersvable.defer() and/or Observable.cache(). I found the Subject's infinite hot nature to be a hassle when used to support complex UI-triggered processes, and I always had to call take(1) to make it cold and finite for that process instance.tmn

1 Answers

3
votes

Using BehaviorSubject is the right thing to do here if you don't care about earlier values.

Note most post discouraging Subjects were written in the early days of Rx.NET and is mostly quoted over and over again without much thought. I attribute this to the possibility that such authors didn't really understand how Subjects work or run into some problems with them and just declared they shouldn't be used.

I think Subjects are a great way to multicast events (coming from a single thread usually) where you control or you are the source of the events and the event dispatching is somewhat 'global' (such as listening to mouse move events).