1
votes

I am trying to stream data from Mongodb using reactivemongo-akkastream 0.12.1 and return the result into a CSV stream in one of the routes (using Akka-http). I did implement that following the exemple here:

http://doc.akka.io/docs/akka-http/10.0.0/scala/http/routing-dsl/source-streaming-support.html#simple-csv-streaming-example

and it looks working fine.

The only problem I am facing now is how to add the headers to the output CSV file. Any ideas?

Thanks

1

1 Answers

5
votes

Aside from the fact that that example isn't really a robust method of generating CSV (doesn't provide proper escaping) you'll need to rework it a bit to add headers. Here's what I would do:

  1. make a Flow to convert a Source[Tweet] to a source of CSV rows, e.g. a Source[List[String]]
  2. concatenate it to a source containing your headers as a single List[String]
  3. adapt the marshaller to render a source of rows rather than tweets

Here's some example code:

case class Tweet(uid: String, txt: String)

def getTweets: Source[Tweet, NotUsed] = ???

val tweetToRow: Flow[Tweet, List[String], NotUsed] =
  Flow[Tweet].map { t =>
    List(
      t.uid,
      t.txt.replaceAll(",", "."))
  }

// provide a marshaller from a row (List[String]) to a ByteString
implicit val tweetAsCsv = Marshaller.strict[List[String], ByteString] { row =>
  Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () =>
    ByteString(row.mkString(","))
  )
}

// enable csv streaming
implicit val csvStreaming = EntityStreamingSupport.csv()

val route = path("tweets") {
  val headers = Source.single(List("uid", "text"))
  val tweets: Source[List[String], NotUsed] = getTweets.via(tweetToRow)
  complete(headers.concat(tweets))
}

Update: if your getTweets method returns a Future you can just map over its source value and prepend the headers that way, e.g:

val route = path("tweets") {
  val headers = Source.single(List("uid", "text"))
  val rows: Future[Source[List[String], NotUsed]] = getTweets
      .map(tweets => headers.concat(tweets.via(tweetToRow)))
  complete(rows)
}