How would I go about executing an Apache Beam pipeline to Google Cloud Dataflow using Spring? This question is similar to Running Apache Beam pipeline in Spring Boot project on Google Data Flow, but this one concerns more about launching the pipeline from a Spring controller, rather from a CommandLineRunner.
@RestController
@RequestMapping("/task/import-csv-file")
public class ImportCsvController {
@PostMapping("/process-csv-file")
private ResponseEntity<Void> processCsvFile(
@RequestParam String gcsFileName,
@RequestParam String bucketName
) {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject("same-project-as-this-app-engine-instance");
options.setStagingLocation("gs://" + bucketName + "/binaries");
options.setRunner(DataflowRunner.class);
options.setJobname("process-csv");
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("ReadFile", TextIO.read().from("gs://" + bucketName + "/" + gcsFileName));
// ... apply some more transforms here, which will eventually
// write csv rows as Google Datastore entities ...
pipeline.run().waitUntilFinish();
return ResponseEntity.ok().build();
}
}
I am running this controller using Google Cloud Tasks, using the following code below:
@Service
public class TaskQueueService {
private Queue csvImportsQueue;
public TaskQueueService() {
this.csvImportsQueue = QueueFactory.getQueue("csv-import-queue");
}
public void queueImportCsvFile(String gcsFileName, String bucketName) {
String url = "/task/import-csv-file/process-csv-file";
TaskOptions taskOptions = TaskOptions.Builder
.withUrl(url)
.method(POST)
.param("gcsFileName", gcsFileName)
.param("bucketName", bucketName);
queue.add(ofy().getTransaction(), taskOptions);
}
}
From Google Cloud Logging, I got this error
java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
Down in the stacktrace, this error message looks useful:
Caused by: java.lang.IllegalArgumentException: Missing required value for [public abstract java.lang.String org.apache.beam.runners.dataflow.options.DataflowPipelineOptions.getProject(), "Project id. Required when running a Dataflow in the cloud. See https://cloud.google.com/storage/docs/projects for further details."].
But as you can see above, I do set the project with the line options.setProject("same-project-as-this-app-engine-instance");
Edit: I found a different stacktrace, which has different error messages. Here is the stacktrace in its entirety.
java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
at org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:155)
at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:55)
at org.apache.beam.sdk.Pipeline.create(Pipeline.java:149)
at com.example.application.controllers.tasks.ImportCsvController.processCsvFile(ImportCsvController.java:64)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974)
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:877)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1772)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:320)
at com.example.application.filters.SwitchUserProfileFilter.doFilter(SwitchUserProfileFilter.java:127)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:127)
at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:91)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:119)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:170)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.saml.SAMLLogoutFilter.processLogout(SAMLLogoutFilter.java:168)
at org.springframework.security.saml.SAMLLogoutFilter.doFilter(SAMLLogoutFilter.java:110)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.saml.SAMLDiscovery.doFilter(SAMLDiscovery.java:137)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.saml.SAMLLogoutProcessingFilter.processLogout(SAMLLogoutProcessingFilter.java:209)
at org.springframework.security.saml.SAMLLogoutProcessingFilter.doFilter(SAMLLogoutProcessingFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.saml.SAMLEntryPoint.doFilter(SAMLEntryPoint.java:102)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.saml.metadata.MetadataDisplayFilter.doFilter(MetadataDisplayFilter.java:84)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.saml.metadata.MetadataGeneratorFilter.doFilter(MetadataGeneratorFilter.java:87)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:66)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at com.example.application.filters.CustomDomainFilter.doFilterInternal(CustomDomainFilter.java:43)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:215)
at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:178)
at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:357)
at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:270)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:357)
at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:270)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
at com.google.apphosting.utils.servlet.JdbcMySqlConnectionCleanupFilter.doFilter(JdbcMySqlConnectionCleanupFilter.java:60)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
at org.springframework.boot.web.servlet.support.ErrorPageFilter.doFilter(ErrorPageFilter.java:117)
at org.springframework.boot.web.servlet.support.ErrorPageFilter.access$000(ErrorPageFilter.java:61)
at org.springframework.boot.web.servlet.support.ErrorPageFilter$1.doFilterInternal(ErrorPageFilter.java:92)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.boot.web.servlet.support.ErrorPageFilter.doFilter(ErrorPageFilter.java:110)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
at com.googlecode.objectify.ObjectifyFilter.doFilter(ObjectifyFilter.java:48)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:582)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:524)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:226)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at com.google.apphosting.runtime.jetty9.ParseBlobUploadHandler.handle(ParseBlobUploadHandler.java:119)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1182)
at com.google.apphosting.runtime.jetty9.AppEngineWebAppContext.doHandle(AppEngineWebAppContext.java:187)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at com.google.apphosting.runtime.jetty9.AppVersionHandlerMap.handle(AppVersionHandlerMap.java:293)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.eclipse.jetty.server.Server.handle(Server.java:539)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:333)
at com.google.apphosting.runtime.jetty9.RpcConnection.handle(RpcConnection.java:213)
at com.google.apphosting.runtime.jetty9.RpcConnector.serviceRequest(RpcConnector.java:81)
at com.google.apphosting.runtime.jetty9.JettyServletEngineAdapter.serviceRequest(JettyServletEngineAdapter.java:134)
at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.dispatchServletRequest(JavaRuntime.java:757)
at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.dispatchRequest(JavaRuntime.java:720)
at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.run(JavaRuntime.java:690)
at com.google.apphosting.runtime.JavaRuntime$NullSandboxRequestRunnable.run(JavaRuntime.java:882)
at com.google.apphosting.runtime.ThreadGroupPool$PoolEntry.run(ThreadGroupPool.java:270)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException: null
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:214)
... 123 common frames omitted
Caused by: java.lang.NoSuchMethodError: com.google.common.cache.CacheBuilder.expireAfterWrite(Ljava/time/Duration;)Lcom/google/common/cache/CacheBuilder;
at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.<init>(GoogleCloudStorageImpl.java:149)
at org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:243)
at org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:82)
at org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:104)
at org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:87)
at org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:158)
at com.sun.proxy.$Proxy164.getGcsUtil(Unknown Source)
at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.tryCreateDefaultBucket(GcpOptions.java:354)
at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:300)
at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:288)
at org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:158)
at com.sun.proxy.$Proxy149.getGcpTempLocation(Unknown Source)
at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:249)
... 128 common frames omitted
Edit 2: I typed these two imports in one of my source files:
import com.google.common.cache.CacheBuilder;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl;
Then I pressed Cmd+Click to go to their implementation in Intellij, and I found that the method overload does not exist (as buried in the long stacktrace above).
~/.gradle/caches/modules-2/files-2.1/com.google.cloud.bigdataoss/gcsio/2.0.0/ba86cba5b74f7ded14feb682cc81ece7724573a7/gcsio-2.0.0-sources.jar!/com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl.java
private final LoadingCache<String, Boolean> autoBuckets =
CacheBuilder.newBuilder()
.expireAfterWrite(Duration.ofHours(1))
.build(
new CacheLoader<String, Boolean>() {
final List<String> iamPermissions = ImmutableList.of("storage.buckets.get");
@Override
public Boolean load(String bucketName) throws Exception {
try {
gcs.buckets()
.testIamPermissions(bucketName, iamPermissions)
.executeUnparsed()
.disconnect();
} catch (IOException e) {
return errorExtractor.userProjectMissing(e);
}
return false;
}
});
~/.gradle/caches/modules-2/files-2.1/com.google.guava/guava/29.0-android/63f9bc5fbf2ebfe6b17683f8eac8419588295a28/guava-29.0-android-sources.jar!/com/google/common/cache/CacheBuilder.java
public CacheBuilder<K, V> expireAfterWrite(long duration, TimeUnit unit) {
checkState(
expireAfterWriteNanos == UNSET_INT,
"expireAfterWrite was already set to %s ns",
expireAfterWriteNanos);
checkArgument(duration >= 0, "duration cannot be negative: %s %s", duration, unit);
this.expireAfterWriteNanos = unit.toNanos(duration);
return this;
}
Edit 3: Here are the Apache Beam versions I am using:
dependencies {
compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.20.0'
compile group: 'org.apache.beam', name: 'beam-runners-google-cloud-dataflow-java', version: '2.20.0'
}