7
votes

I need to read a file from a GCS bucket. I know I'll have to use GCS API/Client Libraries but I cannot find any example related to it.

I have been referring to this link in the GCS documentation: GCS Client Libraries. But couldn't really make a dent. If anybody can provide an example that would really help. Thanks.

1
All transforms like TextIO, AvroIO etc. are able to work with GCS files by default, e.g. TextIO.read().from("gs://your-bucket/path/to/your-files/*"). Are they not working for you?jkff
Hi @jkff I know that. I forgot to mention why I'm looking to use Client Libraries. Actually the problem that I'm facing is that when I read the file using TextIO.read(), the data that I get is not in the same order as it appears in the file. I need to have the data in the exact same order as it is in the file. How to do this?rish0097
Correct, PCollections are unordered because they are processed in parallel and sequential ordering is the opposite of parallelism. That said, there definitely exist cases where you want to do some sequential processing being inside a parallel pipeline. Could you tell more about your use case and why you need ordering?jkff
@jkff I'm trying to accomplish this "stackoverflow.com/questions/45862173/…". So I just want to know will client libraries help in this case? As in will it give me the file data in order?rish0097

1 Answers

7
votes

OK. If you want to simply read files from GCS, not as a PCollection but as regular files, and if you are having trouble with the GCS Java client libraries, you can also use the Apache Beam FileSystems API:

First, you need to make sure that you have a Maven dependency in your pom.xml on beam-sdks-java-extensions-google-cloud-platform-core which contains implementation of the gs:// filesystem:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId>
</dependency>

Then set up the FileSystems API (it is set up by default in all pipelines, but if you're using it outside a pipeline, you need to do it manually).

PipelineOptions options = PipelineOptionsFactory.create();
// ...Optionally fill in options such as GCP credentials...
// (see GcpOptions class)
FileSystems.setDefaultPipelineOptions(options);

Then you can use it:

ReadableByteChannel chan = FileSystems.open(FileSystems.matchNewResource(
  "gs://path/to/your/file", false /* is_directory */));
try (InputStream stream = Channels.newInputStream(chan)) {
  // Use regular Java utilities to work with the input stream.
}