1
votes

I'm trying to start dataflow pipeline job from a servlet on GAE standard environment. The job reads from Cloud SQL database and writes in BigQuery, both in the same project. The cloud dataflow service account has all needed permissions (Storage Object Admin, Cloud SQL Client, BigQuery Admin, BigQuery Data Editor) I guess. I got the following exception:

    java.lang.RuntimeException: Interrupted while staging packages
at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:401)
at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:273)
at org.apache.beam.runners.dataflow.util.GcsStager.stageFiles(GcsStager.java:82)
at org.apache.beam.runners.dataflow.util.GcsStager.stageDefaultFiles(GcsStager.java:70)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:741)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:186)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
at com.mycompany.prj.servers.dataflow.PipelineExecutor.run(PipelineExecutor.java:138)
at com.mycompany.prj.servers.dataflow.CloudSqlToBigQueryExporter.run(CloudSqlToBigQueryExporter.java:226)
at com.mycompany.prj.servers.dataflow.CloudSqlToBigQueryExporter.doTest(CloudSqlToBigQueryExporter.java:281)
at com.mycompany.prj.server.web.CronServlet.doGet(Unknown Source)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1780)
at com.mycompany.prj.server.web.TxFilter.doFilter(Unknown Source)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1767)
at com.googlecode.objectify.ObjectifyFilter.doFilter(ObjectifyFilter.java:48)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1767)
at com.mycompany.prj.servers.web.HttpCrossOriginFilter.doFilter(HttpCrossOriginFilter.java:23)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1767)
at com.mycompany.prj.servers.web.LogSchemeFilter.doFilter(LogSchemeFilter.java:17)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1767)
at com.google.apphosting.utils.servlet.JdbcMySqlConnectionCleanupFilter.doFilter(JdbcMySqlConnectionCleanupFilter.java:60)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1767)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:513)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:226)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at com.google.apphosting.runtime.jetty9.ParseBlobUploadHandler.handle(ParseBlobUploadHandler.java:111)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1182)
at com.google.apphosting.runtime.jetty9.AppEngineWebAppContext.doHandle(AppEngineWebAppContext.java:187)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:513)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at com.google.apphosting.runtime.jetty9.AppVersionHandlerMap.handle(AppVersionHandlerMap.java:293)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.eclipse.jetty.server.Server.handle(Server.java:539)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:333)
at com.google.apphosting.runtime.jetty9.RpcConnection.handle(RpcConnection.java:216)
at com.google.apphosting.runtime.jetty9.RpcConnector.serviceRequest(RpcConnector.java:81)
at com.google.apphosting.runtime.jetty9.JettyServletEngineAdapter.serviceRequest(JettyServletEngineAdapter.java:134)
at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.dispatchServletRequest(JavaRuntime.java:772)
at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.dispatchRequest(JavaRuntime.java:735)
at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.run(JavaRuntime.java:705)
at com.google.apphosting.runtime.ThreadGroupPool$PoolEntry.run(ThreadGroupPool.java:261)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:347)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:68)
at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:384)
... 50 more

I managed to run it on GAE standard. The problem was that I never scheduled it on a cron job, just run it through URL trigger. So for one hour I scheduled a dozen of requests and most of them passed. But some failed with this exception:

Failed java.lang.RuntimeException: Error while staging packages
java.lang.RuntimeException: Error while staging packages
at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:403)
at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:273)
at org.apache.beam.runners.dataflow.util.GcsStager.stageFiles(GcsStager.java:82)
at org.apache.beam.runners.dataflow.util.GcsStager.stageDefaultFiles(GcsStager.java:70)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:741)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:186)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
at com.mycompany.prj.servers.dataflow.PipelineExecutor.run(PipelineExecutor.java:138)
at com.mycompany.prj.servers.dataflow.CloudSqlToBigQueryExporter.run(CloudSqlToBigQueryExporter.java:245)
at com.mycompany.prj.servers.dataflow.CloudSqlToBigQueryExporter.doTestOnExp(CloudSqlToBigQueryExporter.java:304)
at com.mycompany.prj.servers.dataflow.CloudSqlToBigQueryExporter.doTestOnExp(CloudSqlToBigQueryExporter.java:285)
at com.mycompany.prj.server.web.CronServlet.doGet(Unknown Source)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1780)
at com.mycompany.prj.server.web.TxFilter.doFilter(Unknown Source)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1767)
at com.googlecode.objectify.ObjectifyFilter.doFilter(ObjectifyFilter.java:48)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1767)
at com.mycompany.prj.servers.web.HttpCrossOriginFilter.doFilter(HttpCrossOriginFilter.java:23)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1767)
at com.mycompany.prj.servers.web.LogSchemeFilter.doFilter(LogSchemeFilter.java:17)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1767)
at com.google.apphosting.utils.servlet.JdbcMySqlConnectionCleanupFilter.doFilter(JdbcMySqlConnectionCleanupFilter.java:60)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1767)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:524)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:226)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at com.google.apphosting.runtime.jetty9.ParseBlobUploadHandler.handle(ParseBlobUploadHandler.java:111)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1182)
at com.google.apphosting.runtime.jetty9.AppEngineWebAppContext.doHandle(AppEngineWebAppContext.java:187)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:513)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at com.google.apphosting.runtime.jetty9.AppVersionHandlerMap.handle(AppVersionHandlerMap.java:293)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.eclipse.jetty.server.Server.handle(Server.java:539)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:333)
at com.google.apphosting.runtime.jetty9.RpcConnection.handle(RpcConnection.java:216)
at com.google.apphosting.runtime.jetty9.RpcConnector.serviceRequest(RpcConnector.java:81)
at com.google.apphosting.runtime.jetty9.JettyServletEngineAdapter.serviceRequest(JettyServletEngineAdapter.java:134)
at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.dispatchServletRequest(JavaRuntime.java:772)
at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.dispatchRequest(JavaRuntime.java:735)
at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.run(JavaRuntime.java:705)
at com.google.apphosting.runtime.ThreadGroupPool$PoolEntry.run(ThreadGroupPool.java:261)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException: Operation not allowed in a thread that is neither the original request thread nor a thread created by ThreadManager
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory.newThread(ApiProxyImpl.java:1310)
at java.util.concurrent.ThreadPoolExecutor$Worker.(ThreadPoolExecutor.java:619)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:932)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors$ListeningDecorator.execute(MoreExecutors.java:537)
at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
at org.apache.beam.sdk.util.MoreFutures.supplyAsync(MoreFutures.java:101)
at org.apache.beam.runners.dataflow.util.PackageUtil.stagePackage(PackageUtil.java:172)
at org.apache.beam.runners.dataflow.util.PackageUtil.lambda$stageClasspathElements$2(PackageUtil.java:362)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

My appengine-web.xml looks like this:

<?xml version="1.0" encoding="utf-8"?>
<appengine-web-app xmlns="http://appengine.google.com/ns/1.0">

    <application>my-app-name</application>


    <version>1</version>
    <threadsafe>true</threadsafe>
    <sessions-enabled>true</sessions-enabled>
    <runtime>java8</runtime>

    <system-properties>
        <property name="java.util.logging.config.file" value="WEB-INF/logging.properties"/>
        <property name="file.encoding" value="UTF-8" />
        <property name="DEFAULT_ENCODING" value="UTF-8" />
    </system-properties>

    <instance-class>F1</instance-class>

    <automatic-scaling>
        <min-idle-instances>0</min-idle-instances>
        <max-idle-instances>1</max-idle-instances>
    </automatic-scaling>

</appengine-web-app>

The cron servlet is pretty simple:

public class CronServlet extends HttpServlet {

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) {
        String command = req.getParameter("command");
        if (command == null) {
            return;
        }

        if (command.equals("dataflow-job-test")) {
            CloudSqlToBigQueryExporter.doTestOnExp();
            //return;
        }
    }


    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) {
        doGet(req, resp);
    }
}

and the cron.yaml is like this:

cron:
- url: '/cron?command=dataflow-job-test'
  schedule: 'every 5 minutes'
  description: 'Dataflow job execution test'
1
Could you tell me your aim with this pipeline ? Could you share the steps you took to configure the pipeline? - Alexandre Moraes
@AlexandreMoraes I'm trying to schedule every day export from Cloud SQL database (PostgreSQL in particular) to BigQuery table. Configuring the pipeline is done properly since I managed to start it from "main" method. But trying to execute it from cron servlet it fails. I found this blog post where It is mentioned that you could only trigger pipeline jobs from flexible GAE, but I was wondering if it's still relevant, bacause it's posted 5 years ago. - Plamen
Can you share more details like the app.yaml, cron.yaml and the code of your servlet? - Rafael Lemos
I added more info in the post. Thank you! - Plamen
The dataflow job is pretty simple - count on around 200 sql entries from one table and write the number in BQ table, all in the same project. - Plamen

1 Answers

1
votes

The issue you are getting here, as described in the error message:

Operation not allowed in a thread that is neither the original request thread nor a thread created by ThreadManager

Is because your servlet is creating new Threads which is not allowed in App Engine unless you use GAE's Thread Manager intead of using tradicional Threads.

So in order to fix that you need to change your code to generate threads using the currentRequestThreadFactory().