3
votes

I am creating a simple TCP server using Vert.x and data is sent between a client and a server in the form of compressed packets.

I would like to use Verticles to create something of this nature (where [Something] is a Verticle and arrows show flow of data):

Buffer In -> [Decompress Packet] -> [Parse Packet] -> Reply to NetSocket

The problem is that I am unsure how I can carry the NetSocket from one Verticle (the result from Decompress Packet) to the next. I can of course send the result from the Decompress Packet to the Parse Packet Verticle but when the Parse Packet Verticle receives this data it will not have any handle to reply to the NetSocket using the reference it has to the sender.

Essentially, I need to carry the NetSocket through the event bus so that once the final Verticle is reached, it can then reply to the data.

2
Have a think if you really need to split this over many verticles? There is the temptation to create verticles like one might create layers / classes, but this comes with added management and performance penalties. Having learnt from this, I now use a single Verticle to bootstrap a microservice and am happy with the results. If it's just accepting a TCP connection and doing some data transformation I'm sure a single Verticle will suffice. - Will
Hm, okay thanks. That's useful to know actually. In my head I assumed that more Verticles will help spread the load across the machine. Is there anything similar to ByteBuf from Netty? I want to step through my Buffer one datatype at a time and would rather not create a wrapper. - Ronald Rue
Vert.x has a Buffer class that wraps ByteBuf if that helps. If you're after parallelism, you can create more than one instance of the same Verticle which will allow you to spread the work over multiple cores. See vertx.io/docs/vertx-core/java/#_run_verticles - Will

2 Answers

1
votes

As it has been said in the comments you probably want a set of handlers instead of Verticles. Look for example how vertx-web handlers work. A handler is a simple lambda that performs one small task and can decide to pass the work to the next or abort the execution calling a failure method.

A very basic implementation is just to keep a List of lambdas (Java functional interfaces) that you add and once a socket is received you iterate the list.

If you need to perform async IO in your handlers then you cannot use a simple iterator you need to do it async, a basic async iterator wrapper could be:

abstract class AsyncIterator<T> implements Handler<T> {

  private final Iterator<T> iterator;
  private boolean end = false;

  public AsyncIterator(Iterable<T> iterable) {
    this(iterable.iterator());
  }

  public AsyncIterator(Iterator<T> iterator) {
    this.iterator = iterator;
    next();
  }

  public final boolean hasNext() {
    return !end;
  }

  public final void next() {
    if (iterator.hasNext()) {
      handle(iterator.next());
    } else {
      end = true;
      handle(null);
    }
  }

  public final void remove() {
    iterator.remove();
  }
}

and you just need to use it like:

new AsyncIterator<Object>(keys) {
    @Override
    public void handle(Object key) {
      if (hasNext()) {
        // here your handler code...
        // once it is complete your handler need to call:
        next();
      } else {
        // no more entries to iterate...
        // close your socket?
      }
    }
  };
});
1
votes

Actually, you don't have to pass netsockets between vertices.

In Vert.x, every socket automatically registers a handler on events, you can use that for your scenario. Check document here:

Every socket automatically registers a handler on the event bus, and when any buffers are received in this handler, it writes them to itself.

This enables you to write data to a socket which is potentially in a completely different verticle or even in a different Vert.x instance by sending the buffer to the address of that handler.

The address of the handler is given by writeHandlerID

Since writeHandlerID is a normal string, it is not a big deal to send it to verticle2 from verticle1. In verticle2, eventbus.send(writeHandlerID, [something you want to reply]). That's it.

We have applied this tip in our Application.