I am new to Scala and trying to implement a library where I will get thousands of URLs. My job is to download the content from those URLs. I would have opted for simple scalaj-http
library but it does not serve my purpose.
The code I came with is this:
class ProxyHttpClient {
def get(url: String, proxy: ProxySettings,urlDownloaderConfig:
UrlDownloaderConfig)(implicit ec: ExecutionContext): Either[HttpError,
HttpSuccessResponse] = {
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
val auth = headers.BasicHttpCredentials(proxy.userName,
proxy.secret)
val httpsProxyTransport =
ClientTransport.httpsProxy(InetSocketAddress.createUnresolved(
proxy.host, proxy.port), auth)
val settings =
ConnectionPoolSettings(system).withTransport(httpsProxyTransport)
val response: Future[HttpResponse] =
Http().singleRequest(HttpRequest().
withMethod(HttpMethods.GET).withUri(url), settings = settings)
val data: Future[Either[HttpError, HttpSuccessResponse]] = `response.map {`
case response@HttpResponse(StatusCodes.OK, _, _, _) => {
val content: Future[String] = Unmarshal(response.entity).to[String]
val finalContent = Await.ready(content, timeToWaitForContent).value.get.get.getBytes
Right(HttpSuccessResponse(url, response.status.intValue(), finalContent))
}
case errorResponse@HttpResponse(StatusCodes.GatewayTimeout, _, _, _) => Left(HttpError(url, errorResponse.status.intValue(), errorResponse.entity.toString))
}
val result: Try[Either[HttpError, HttpSuccessResponse]] = Await.ready(data, timeToWaitForResponse).value.get
val pop: Either[HttpError, HttpSuccessResponse] = try {
result.get
} catch {
case e: Exception => Left(HttpError(url, HttpStatus.SC_INTERNAL_SERVER_ERROR, e.getMessage))
}
pop
}
}
For calling get
method I am using
val forkJoinPool = new scala.concurrent.forkjoin.ForkJoinPool(8)
picList.par.tasksupport = new ForkJoinTaskSupport(forkJoinPool)
picList.par.map(testUrl => {
val resp = get(url, Option(proxy))
})
It ran smoothly few times but when I tried to invoke method for 1000 urls to fetch images in batch size of 100 it threw below error. After that even for single URL I am getting same error.
**java.lang.OutOfMemoryError: unable to create new native thread**
Should I use actors here instead of actorsystem and dedicate a separate dispatcher to it?
Since I am holding the content of image which is binary do I have to take care of removing it from memory after their purpose is served?
Code snippet will be more helpful. Thanks in advance
I tried to follow online suggestions where people have suggested to use
val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
But when I tried, system.dispatchers.lookup
is returning of type MessageDispacther.
implicit val system: ActorSystem = ActorSystem()
val ex: MessageDispatcher =system.dispatchers.lookup("io-blocking-dispatcher")
Is there any library or import missing for me?