1
votes

I have methods which get tweets by userid, I used Twitter4J and Akka streams for that. I want to create an API in such a way that the user send get request and receives a set of tweets. The request would look like this: http://localhost:8080/tweets/534563976

For now, I can only get Ok status, but I do not know how to get tweets through some client like Postman (when I send get request). As I get all the tweets only on my console. My API route looks like this:

trait ApiRoute extends Databases {
  val twitterStream = TwitterStreamFilters.configureTwitterStream()
  val counter = new Counter
  twitterStream.addListener(counter)

  val routes = pathPrefix("tweets") {
    pathSingleSlash {
      complete("hello")
    }
  } ~
    pathPrefix("tweets") {
      pathPrefix(LongNumber) {
        userId =>
          get {
            onSuccess(Future.successful(TwitterStreamFilters.filterTwitterStreamByUserID(twitterStream, Array(userId)))) {
              complete(StatusCodes.OK)
            }
          }
      }
    }
}

But I would like to change it to something like this,. But when I write this I get TypeMismatch, expected:ToResponseMarshallable, actual:RequestContext compilation error:

onSuccess(Future.successful(TwitterStreamFilters.filterTwitterStreamByUserID(twitterStream, Array(userId)))) {
                  result => complete(result)
                }

This is the method of filtering tweets:

  def filterTwitterStreamByUserID(twitterStream: TwitterStream, userId: Array[Long]) = {
    twitterStream.filter(new FilterQuery().follow(userId))
  }

Here is my code to acquire a stream of tweets:

class Counter extends StatusAdapter{

  implicit val system = ActorSystem("EmojiTrends")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher
  implicit val LoggingAdapter =
    Logging(system, classOf[Counter])

  val overflowStrategy = OverflowStrategy.backpressure
  val bufferSize = 1000
  val statusSource = Source.queue[Status](
    bufferSize,
    overflowStrategy
  )

  val sink: Sink[Any, Future[Done]] = Sink.foreach(println)
  val flow: Flow[Status, String, NotUsed] = Flow[Status].map(status => status.getText)

  val graph = statusSource via flow to sink
  val queue = graph.run()

  override def onStatus(status: Status) =
    Await.result(queue.offer(status), Duration.Inf)
}

So how I change my API method call or the filtering method itself to get a set of tweets as a response to my get request? Or better to say how to redirect them to from console to the response body? Should I use a database for that? Any advice is appreciated!

1

1 Answers

1
votes

Match the result of onSuccess:

onSuccess(Future.successful(TwitterStreamFilters.filterTwitterStreamByUserID(twitterStream, Array(userId)))) {
  case tweets: Set[Tweet] => complete(StatusCodes.OK, tweets)
}

Of course, you will have to provide a response marshaller. If you want to have a JSON as response, you can use Akka HTTP JSON + Circe. Just add these two imports and it should work if you don't have some special types in response class (e.g. Tweet):

import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
import io.circe.generic.auto._