9
votes

I'm using the Netty library (version 4 from GitHub). It works great in Scala, but I am hoping for my library to be able to use continuation passing style for the asynchronous waiting.

Traditionally with Netty you would do something like this (an example asynchronous connect operation):

//client is a ClientBootstrap
val future:ChannelFuture = client.connect(remoteAddr);
future.addListener(new ChannelFutureListener {
    def operationComplete (f:ChannelFuture) = {
        //here goes the code that happens when the connection is made   
    }
})

If you are implementing a library (which I am) then you basically have three simple options to allow the user of the library to do stuff after the connection is made:

  1. Just return the ChannelFuture from your connect method and let the user deal with it - this doesn't provide much abstraction from netty.
  2. Take a ChannelFutureListener as a parameter of your connect method and add it as a listener to the ChannelFuture.
  3. Take a callback function object as a parameter of your connect method and call that from within the ChannelFutureListener that you create (this would make for a callback-driven style somewhat like node.js)

What I am trying to do is a fourth option; I didn't include it in the count above because it is not simple.

I want to use scala delimited continuations to make the use of the library be somewhat like a blocking library, but it will be nonblocking behind the scenes:

class MyLibraryClient {
    def connect(remoteAddr:SocketAddress) = {
        shift { retrn: (Unit => Unit) => {
                val future:ChannelFuture = client.connect(remoteAddr);
                future.addListener(new ChannelFutureListener {
                    def operationComplete(f:ChannelFuture) = {
                        retrn();
                    }   
                });
            }
        }   
    }
}

Imagine other read/write operations being implemented in the same fashion. The goal of this being that the user's code can look more like this:

reset {
    val conn = new MyLibraryClient();
    conn.connect(new InetSocketAddress("127.0.0.1", 1337));
    println("This will happen after the connection is finished");
}

In other words, the program will look like a simple blocking-style program but behind the scenes there won't be any blocking or threading.

The trouble I'm running into is that I don't fully understand how the typing of delimited continuations work. When I try to implement it in the above way, the compiler complains that my operationComplete implementation actually returns Unit @scala.util.continuations.cpsParam[Unit,Unit => Unit] instead of Unit. I get that there is sort of a "gotcha" in scala's CPS in that you must annotate a shift method's return type with @suspendable, which gets passed up the call stack until the reset, but there doesn't seem to be any way to reconcile that with a pre-existing Java library that has no concept of delimited continuations.

I feel like there really must be a way around this - if Swarm can serialize continuations and jam them over the network to be computed elsewhere, then it must be possible to simply call a continuation from a pre-existing Java class. But I can't figure out how it can be done. Would I have to rewrite entire parts of netty in Scala in order to make this happen?

1
I don't know howto fix the scala stuff but I suggest against your idea. Let me tell you why. But making the user "unaware" of the async nature of your libary you will tell him thats its ok todo "blocking" calls in the listener code. In fact he would not know that he even write his code in a listener. Doing a blocking call in a listener can lead to all kind of problems. The Problem which you will see most of the times is that it "slow" down other io-tasks and so limit the troughput.Norman Maurer
You have a good point, but I disagree. I think that the user of my library, if there even are any besides me, will probably have to understand what reset does to begin with, and thus will understand that the calls are non-blocking. This is really just a way to A) get a deeper understanding of delimited continuations, and B) experiment with writing essentially callback-driven code in a cleaner way.Jeremy

1 Answers

4
votes

I found this explanation of Scala's continuations extremely helpful when I started out. In particular pay attention to the parts where he explains shift[A, B, C] and reset[B, C]. Adding a dummy null as the last statement of operationComplete should help.

Btw, you need to invoke retrn() inside another reset if it may have a shift nested inside it.

Edit: Here is a working example

import scala.util.continuations._
import java.util.concurrent.Executors

object Test {

  val execService = Executors.newFixedThreadPool(2)

  def main(args: Array[String]): Unit = {
    reset {
      val conn = new MyLibraryClient();
      conn.connect("127.0.0.1");
      println("This will happen after the connection is finished");
    }
    println("Outside reset");
  }
}

class ChannelFuture {
  def addListener(listener: ChannelFutureListener): Unit = {
    val future = this
    Test.execService.submit(new Runnable {
      def run(): Unit = {
        listener.operationComplete(future)
      }
    })
  }
}

trait ChannelFutureListener {
  def operationComplete(f: ChannelFuture): Unit
}

class MyLibraryClient {
  def connect(remoteAddr: String): Unit@cps[Unit] = {
    shift {
      retrn: (Unit => Unit) => {
        val future: ChannelFuture = new ChannelFuture()
        future.addListener(new ChannelFutureListener {
          def operationComplete(f: ChannelFuture): Unit = {
            println("operationComplete starts")
            retrn();
            null
          }
        });
      }
    }
  }
}

with a possible output:

Outside reset
operationComplete starts
This will happen after the connection is finished