1
votes

I have created the following sample code for DAG to understand the aggregation. It seems, the slidingWindow vertex does not emit any records.

Not sure, what's going wrong here..

public DAG buildDAG() {
    DAG dag = new DAG();
    SlidingWindowPolicy winPolicy = slidingWinPolicy(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS);

     Vertex source = dag.newVertex("source", SourceProcessors.streamRemoteMapP(getRemoteSourceName(),
              getClientConfig(), START_FROM_OLDEST,  WatermarkGenerationParams.noWatermarks()));


    Vertex slidingWindow = dag.newVertex("aggregate-to-sliding-win",
          aggregateToSlidingWindowP(
                  singletonList((v) ->  getUserID((Entry<String, CacheEntry<Record>>)v)),
                  singletonList((v) ->  getTimeStamp((Entry<String, CacheEntry<Record>>)v)),
                  TimestampKind.EVENT,
                  winPolicy,
                  counting(),
                  TimestampedEntry::new));


    Vertex peekOP = dag.newVertex("peekOP", DiagnosticProcessors.writeLoggerP());
    Vertex peekOP1 = dag.newVertex("peekOP1", DiagnosticProcessors.writeLoggerP());

    Vertex sink = dag.newVertex("sink", SinkProcessors.writeFileP("c:\\\\data\\\\op1.txt"));

  return dag
          .edge(between(source, peekOP))
          .edge(between(peekOP, slidingWindow))
          .edge(between(slidingWindow,peekOP1))
          .edge(between(peekOP1, sink));
}   

Similarly, I created the following sample code for Pipeline API for aggregation.

This works well. This prints the records in the text file.

private Pipeline buildPipeline() {

    Pipeline p = Pipeline.create();

    p.drawFrom(Sources.<String, CacheEntry<AuditLogRecord>>remoteMapJournal("cache_AuditLog", getClientConfig(), START_FROM_OLDEST))
      .addTimestamps((v) ->  getTimeStamp(v), 3000)
      .peek()
    .groupingKey((v) -> Tuple2.tuple2(getUserID(v),getTranType(v)))
    .window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
    .aggregate(counting())
    .map((v)-> getMapKey(v))
    .drainTo(Sinks.files("c:\\data\\op.txt"));
    return p;
  }

Will you please help me to correct the DAG definition?

1

1 Answers

2
votes

There are multiple problems:

  1. WatermarkGenerationParams.noWatermarks(): to have any output for windowing processors, you need watermarks. Use wmGenParams((v) -> getTimeStamp(v), limitingLag(3000), emitByFrame(winPolicy), -1)

  2. the DiagnosticProcessors.writeLoggerP() is a sink. It receives items but doesn't emit any. To peek at a vertex, wrap the processor supplier in peekInputP( /* original supplier */ ) or peekOutputP

  3. the edge to slidingWindow must be distributed and partitioned. Without these you'll get results, but incorrect ones.

The DAG API is intended for advanced use cases which are not possible using the Pipeline API. With each Jet release the need to use DAG API diminishes. As your samples show, the Pipeline API is easier to write and more concise.