2
votes

I have a large video file sharded in a Cassandra table. I am trying stream it back to the API client, by using Source streaming.

My service code looks like below,

def getShards(id: String, shards: Int) = {
  def getShardsInternal(shardNo: Int, shards: Future[Array[Byte]]): Future[Array[Byte]] = {
    if (shardNo == 0) shards
    else getShardsInternal(shardNo - 1, shards.flatMap(x => Database.ShardModel.find(id, shardNo)))
  }
  getShardsInternal(shards, Future.successful(Array()))
}

In my AKKA HTTP route, I try to build a Source from the returned future, as shown below,

def getAsset = get {
  pathPrefix("asset") {
    parameters('id) { id =>
      complete {
        val f = mediaService.getMetadata(id).flatMap { x =>
          mediaService.getShards(id, x.shards)
        }
        Source.fromFuture(f)
      }
    }
  }
}

I am not sure how Source.fromFuture gets committed to the response. The future that is passed is essentially a series of flat-mapped futures expected to execute sequentially. However, I don't believe this will go back as a chunked byte stream back to the client.

Any pointers on this will be highly appreciated.

EDIT 1 I have been trying to narrow this down further with the following,

get {
  pathPrefix("asset") {
    parameters('id) { id =>
      complete {
        Source.fromFuture {
          Future.successful("Hello".getBytes()).flatMap(x => Future.successful("World".getBytes()))
        }
      }
    }
  }
}

I was expecting this to return

[72,101,108,108,111,32,87,111,114,108,100]

However, I only get the result of the last future as below,

[[87,111,114,108,100]]

Kind regards Meeraj

1

1 Answers

2
votes

Convert your Source[Array[Byte], NotUsed] to a Source[ByteString, NotUsed], and use HttpEntity with ContentTypes:

import akka.util.ByteString

def getAsset = get {
  pathPrefix("asset") {
    parameters('id) { id =>
      val f = mediaService.getMetadata(id).flatMap { x =>
        mediaService.getShards(id, x.shards)
      }
      val source = Source.fromFuture(f).map(ByteString.apply)
      complete(HttpEntity(ContentTypes.`application/octet-stream`, source))
    }
  }
}

Here I'm using application/octet-stream as an example. Since you're streaming a video, you might need to use ContentType.Binary with an appropriate media type. For example:

complete(HttpEntity(ContentType.Binary(MediaTypes.`video/mpeg`), source))    

Addressing your comment and update, it appears that you want to concatenate the results of the Futures in getShards: as you've discovered, flatMap doesn't do that. Use Future.reduceLeft instead:

def getShards(id: String, shards: Int): Future[Array[Byte]] = {
  val futures = (1 to shards).map(Database.ShardModel.find(id, _))
  Future.reduceLeft(futures)(_ ++ _)
}

Alternatively, instead of concatenating the results into a single array, you could redefine getShards to return a Future[List[Array[Byte]]], then create a Source using flatMapConcat:

def getShards(id: String, shards: Int): Future[List[Array[Byte]]] = {
  val futures = (1 to shards).map(Database.ShardModel.find(id, _)).toList
  Future.sequence(futures)
}

def getAsset = get {
  pathPrefix("asset") {
    parameters('id) { id =>
      val f = mediaService.getMetadata(id).flatMap { x =>
        mediaService.getShards(id, x.shards)
      }
      val source =
        Source.fromFuture(f)
              .flatMapConcat(Source.apply)
              .map(ByteString.apply)
      complete(HttpEntity(/* a content type */, source))
    }
  }
}