1
votes

I have Apache Beam code implementation on Go SDK as described below. The pipeline has 3 steps. One is textio.Read, other one is CountLines and the last step is ProcessLines. ProcessLines step takes around 10 seconds time. I just added a Sleep function for the sake of brevity.

I am calling the pipeline with 20 workers. When I run the pipeline, my expectation was 20 workers would run in parallel and textio.Read read 20 lines from the file and ProcessLines would do 20 parallel executions in 10 seconds. However, the pipeline did not work like that. It's currently working in a way that textio.Read reads one line from the file, pushes the data to the next step and waits until ProcessLines step completes its 10 seconds work. There is no parallelism and there is only one line string from the file throughout the pipeline. Could you please clarify me what I'm doing wrong for parallelism? How should I update the code to achieve parallelism as described above?

package main

import (
    "context"
    "flag"
    "time"

    "github.com/apache/beam/sdks/go/pkg/beam"
    "github.com/apache/beam/sdks/go/pkg/beam/io/textio"
    "github.com/apache/beam/sdks/go/pkg/beam/log"
    "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
)

// metrics to be monitored
var (
    input         = flag.String("input", "", "Input file (required).")
    numberOfLines = beam.NewCounter("extract", "numberOfLines")
    lineLen       = beam.NewDistribution("extract", "lineLenDistro")
)

func countLines(ctx context.Context, line string) string {
    lineLen.Update(ctx, int64(len(line)))
    numberOfLines.Inc(ctx, 1)

    return line
}

func processLines(ctx context.Context, line string) {
    time.Sleep(10 * time.Second)
}

func CountLines(s beam.Scope, lines beam.PCollection) beam.PCollection 
{
    s = s.Scope("Count Lines")

    return beam.ParDo(s, countLines, lines)
}

func ProcessLines(s beam.Scope, lines beam.PCollection) {
    s = s.Scope("Process Lines")

    beam.ParDo0(s, processLines, lines)
}

func main() {
    // If beamx or Go flags are used, flags must be parsed first.
    flag.Parse()
    // beam.Init() is an initialization hook that must be called on startup. On
    // distributed runners, it is used to intercept control.
    beam.Init()

    // Input validation is done as usual. Note that it must be after Init().
    if *input == "" {
        log.Fatal(context.Background(), "No input file provided")
    }

    p := beam.NewPipeline()
    s := p.Root()

    l := textio.Read(s, *input)
    lines := CountLines(s, l)
    ProcessLines(s, lines)

    // Concept #1: The beamx.Run convenience wrapper allows a number of
    // pre-defined runners to be used via the --runner flag.
    if err := beamx.Run(context.Background(), p); err != nil {
        log.Fatalf(context.Background(), "Failed to execute job: %v", err.Error())
    }
}

EDIT:

After I got the answer about the problem might be caused by fusion, I changed the related part of the code but it did not work again.

Now the first and second step is working in parallel, however the third step ProcessLines is not working in parallel. I only made the following changes. Can anyone tell me what the problem is?

func AddRandomKey(s beam.Scope, col beam.PCollection) beam.PCollection {
    return beam.ParDo(s, addRandomKeyFn, col)
}

func addRandomKeyFn(elm beam.T) (int, beam.T) {
    return rand.Int(), elm
}

func countLines(ctx context.Context, _ int, lines func(*string) bool, emit func(string)) {
    var line string
    for lines(&line) {
        lineLen.Update(ctx, int64(len(line)))
        numberOfLines.Inc(ctx, 1)
        emit(line)
    }
}
func processLines(ctx context.Context, _ int, lines func(*string) bool) {
    var line string
    for lines(&line) {
        time.Sleep(10 * time.Second)
        numberOfLinesProcess.Inc(ctx, 1)
    }
}

func CountLines(s beam.Scope, lines beam.PCollection) beam.PCollection {
    s = s.Scope("Count Lines")
    keyed := AddRandomKey(s, lines)
    grouped := beam.GroupByKey(s, keyed)

    return beam.ParDo(s, countLines, grouped)
}

func ProcessLines(s beam.Scope, lines beam.PCollection) {
    s = s.Scope("Process Lines")
    keyed := AddRandomKey(s, lines)
    grouped := beam.GroupByKey(s, keyed)

    beam.ParDo0(s, processLines, grouped)
}
1

1 Answers

0
votes

Many advanced runners of MapReduce-type pipelines fuse stages that can be run in memory together. Apache Beam and Dataflow are no exception.

What's happening here is that the three steps of your pipeline are fused, and happening in the same machine. Furthermore, the Go SDK does not currently support splitting the Read transform, unfortunately.

To achieve parallelism in the third transform, you can break the fusion between Read and ProcessLines. You can do that adding a random key to your lines, and a GroupByKey transform.

In Python, it would be:

(p | beam.ReadFromText(...)
   | CountLines()
   | beam.Map(lambda x: (random.randint(0, 1000), x))
   | beam.GroupByKey()
   | beam.FlatMap(lambda k, v: v)  # Discard the key, and return the values
   | ProcessLines())

This would allow you to parallelize ProcessLines.