2
votes

For example, given a simple pipeline such as:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("the", "quick", "brown", "fox"))
 .map(mapFn)
 .writeTo(Sinks.logger());

I'd like mapFn to be something requiring a non-serialisable dependency to do its work.

I know I can do this:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("the", "quick", "brown", "fox"))
 .mapUsingService(JetSpringServiceFactories.bean("myDependencies"),
         MyDependencies::addDependencies);
 .map(mapFn)
 .writeTo(Sinks.logger());

This wraps the strings being read from the source in another object that includes the dependencies, giving mapFn access to those dependencies without them needing to be injected into that object itself. That will work, but I want to use my mapping function outside Jet as well as part of a pipeline, and in that case it's a bit weird to have to pass dependencies along with the items being mapped rather than just initialising the mapper with the dependencies it needs. It also forces me to pointlessly create a wrapper object per item in my stream/batch.

The docs say another alternative is to use the @SpringAware annotation on Processor, but I think that means using the Core API, which the docs says "mostly offers you a lot of ways to make mistakes", so I'd prefer to avoid that.

In vanilla Hazelcast IMDG, anything that is deserialised can use a ManagedContext to initialise itself, and this is obviously the case with Jet too, but the functions, filters etc. of the pipeline are wrapped in lots of layers of Jet pipeline stuff and so there seems to be no way to get to them.

Am I missing something or have I listed all the options I have (other than resorting to some "global" static dependencies object)?

2

2 Answers

2
votes

The way you described is actually quite close to how it should be done. You can simplify it by just doing:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("the", "quick", "brown", "fox"))
 .mapUsingService(bean("myDependencies"), (dep, item) -> mapFn.apply(dep, item));
 .writeTo(Sinks.logger());

This avoids creating an intermediate item. As you said already, this requires that the mapping function takes the dependency as a parameter as well.

If you want to avoid that, another option is writing a custom ServiceFactory which will do the mapping and also take the dependency. This way, you can rewrite your mapping function as a service and have the dependency injected at the constructor level.

Regarding having a static container I think that would be possible to implement but would require some changes in core. A similar thing was done for Metrics class which also works in a static context. There's also this related issue here:

https://github.com/hazelcast/hazelcast-jet/issues/954

If you are interested in contributing I can give you some pointers.

1
votes

If your mapFn is inside a bean then you can just use it as your service:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("the", "quick", "brown", "fox"))
 .mapUsingService(JetSpringServiceFactories.bean(MyService.class), MyService::mapFn);
 .writeTo(Sinks.logger());

Full self contained example, where the pipeline calls mapFn on MyService, which uses its dependency for the mapping:

@SpringBootApplication
public class TutorialApplication implements InitializingBean {

    @Autowired
    JetInstance jetInstance;

    public static void main(String[] args) {
        SpringApplication.run(TutorialApplication.class, args);
    }

    @Override
    public void afterPropertiesSet() {

        Pipeline p = Pipeline.create();
        p.readFrom(TestSources.items("the", "quick", "brown", "fox"))
         .mapUsingService(JetSpringServiceFactories.bean(MyService.class), MyService::mapFn)
         .writeTo(Sinks.logger());

        jetInstance.newJob(p);
    }

    @Component
    public static class MyService {

        @Autowired
        MyDependency foo;

        public String mapFn(String s) {
            return foo.bar(s);
        }
    }

    @Component
    public static class MyDependency {

        public String bar(String s) {
            return "mod: " + s;
        }
    }
}