1
votes

I have an unbound dataflow pipeline that reads from Pub/Sub, applies a ParDo and writes to Cassandra. It applies only ParDo transformations so I am using the default global window with the default triggering even though the source is unbound.

In a pipeline like that how should I keep the connection to Cassandra?

Currently I am keeping it in startBundle like this:

private class CassandraWriter <T> extends DoFn<T, Void> {
  private transient Cluster cluster;
  private transient Session session;
  private transient MappingManager mappingManager;

  @Override
  public void startBundle(Context c) {
    this.cluster = Cluster.builder()
        .addContactPoints(hosts)
        .withPort(port)
        .withoutMetrics()
        .withoutJMXReporting()
        .build();
    this.session = cluster.connect(keyspace);
    this.mappingManager = new MappingManager(session);
  }

  @Override
  public void processElement(ProcessContext c) throws IOException {
    T element = c.element();
    Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(element.getClass());
    mapper.save(element);
  }

  @Override
  public void finishBundle(Context c) throws IOException {
    session.close();
    cluster.close();
  }
}

However, this way a new connection is created for every element.

Another option is to pass it as a side input like in https://github.com/benjumanji/cassandra-dataflow:

public PDone apply(PCollection<T> input) {
  Pipeline p = input.getPipeline();

  CassandraWriteOperation<T> op = new CassandraWriteOperation<T>(this);

  Coder<CassandraWriteOperation<T>> coder =
    (Coder<CassandraWriteOperation<T>>)SerializableCoder.of(op.getClass());

  PCollection<CassandraWriteOperation<T>> opSingleton =
    p.apply(Create.<CassandraWriteOperation<T>>of(op)).setCoder(coder);

  final PCollectionView<CassandraWriteOperation<T>> opSingletonView =
    opSingleton.apply(View.<CassandraWriteOperation<T>>asSingleton());

  PCollection<Void> results = input.apply(ParDo.of(new DoFn<T, Void>() {
    @Override
    public void processElement(ProcessContext c) throws Exception {
       // use the side input here
    }
  }).withSideInputs(opSingletonView));

  PCollectionView<Iterable<Void>> voidView = results.apply(View.<Void>asIterable());

  opSingleton.apply(ParDo.of(new DoFn<CassandraWriteOperation<T>, Void>() {
    private static final long serialVersionUID = 0;

    @Override
    public void processElement(ProcessContext c) {
      CassandraWriteOperation<T> op = c.element();
      op.finalize();
    }

  }).withSideInputs(voidView));

  return new PDone();
}

However this way I have to use windowing since PCollectionView<Iterable<Void>> voidView = results.apply(View.<Void>asIterable()); applies a group by.

In general, how should a PTransform that writes from an unbounded PCollection to an external database keep its connection to the database?

2

2 Answers

2
votes

You are correctly observing that a typical bundle size in the streaming/unbounded case is smaller compared to the batch/bounded case. The actual bundle size depends on many parameters, and sometimes bundles may contain a single element.

One way of solving this problem would be to use a pool of connections per worker, stored in a static state of your DoFn. You should be able to initialize it during the first call to startBundle, and use it across bundles. Alternatively, you can create a connection on demand and release it to the pool for reuse when no longer necessary.

You should make sure the static static is thread-safe, and that you aren't making any assumptions how Dataflow manages bundles.

1
votes

As Davor Bonaci suggested, using a static variable solved the problem.

public class CassandraWriter<T> extends DoFn<T, Void> {
  private static final Logger log = LoggerFactory.getLogger(CassandraWriter.class);

  // Prevent multiple threads from creating multiple cluster connection in parallel.
  private static transient final Object lock = new Object();
  private static transient Cluster cluster;
  private static transient Session session;
  private static transient MappingManager mappingManager;

  private final String[] hosts;
  private final int port;
  private final String keyspace;

  public CassandraWriter(String[] hosts, int port, String keyspace) {
    this.hosts = hosts;
    this.port = port;
    this.keyspace = keyspace;
  }

  @Override
  public void startBundle(Context c) {
    synchronized (lock) {
      if (cluster == null) {
        cluster = Cluster.builder()
            .addContactPoints(hosts)
            .withPort(port)
            .withoutMetrics()
            .withoutJMXReporting()
            .build();
        session = cluster.connect(keyspace);
        mappingManager = new MappingManager(session);
      }
    }
  }

  @Override
  public void processElement(ProcessContext c) throws IOException {
    T element = c.element();
    Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(element.getClass());
    mapper.save(element);
  }
}