0
votes

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH);

    ParameterTool parameters = ParameterTool.fromArgs(args);
    String ftpUri = "ftp://data-injest:12345@ftp-server:21/input/" + parameters.get("input-file-name");
    String fileUri = parameters.get("ftp").toUpperCase(Locale.ROOT).equals("TRUE")?ftpUri:localUri;
    MapFunction<String,Tuple2<Long,Collection<Some>>> mapFunction = { some code };

    SomeSink sink = new SomeSink();

    env.readTextFile(fileUri,"UTF-8")
            .map(mapFunction)
            .keyBy(tuple2 -> tuple2.f0)
            .reduce((tuple2, t1) -> {
                some-logic-including-loggers
        }).addSink(sink);
    env.execute("OPIS-PRICE-FEED-with-" + parameters.get("input-file-name"));

}

Which node executes the logic , eg ftpUri definitions above.

  1. I have tried to attach debuger to both job manager and task manager with breakpoints but I dont see those lines enabled.
  2. If a logger statement is added in the same section , which node logger would contain it.
1

1 Answers

0
votes

That setup code is executed in the client, and not in the job manager or task managers.