1
votes

I have tried to transfer a file from Azure container to GCS bucket, but end up with below issues

  1. Order of the records in source file is different from the Destination file's records order as pipeline will do parallel processing
  2. Have to write lot of custom code to provide the custom name for the GCS destination file as pipeline give default name for it.

Is there anyway, Apache pipeline can transfer the file itself without dealing with the content of the file (so that, above mentioned issues won't happen)? As I need to transfer multiple files from Azure container to GCS bucket

below code I am using to transfer the files at the moment

String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("YYYY_MM_DD_HH_MM_SS3")).toString();

String connectionString = "<<AZURE_STORAGE_CONNECTION_STRING>>"; 
        
PipelineOptions options = PipelineOptionsFactory.create();
options.as(BlobstoreOptions.class).setAzureConnectionString(connectionString);
        
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("azfs://storageaccountname/containername/CSVSample.csv"))
.apply("",FileIO.<String>write().to("azfs://storageaccountname/containername/"+format+"/").withNumShards(1).withSuffix(".csv")
        .via(TextIO.sink()));
p.run().waitUntilFinish();
1

1 Answers

1
votes

You should be able to use FileIO transforms for this purpose.

For example (untested pseudocode),

FileIO.match().filepattern("azfs://storageaccountname/containername/CSVSample.csv")
.apply(FileIO.readMatches())
.apply(ParDo.of(new MyWriteDoFn()));

Above MyWriteDoFn() would be a DoFn that reads bytes from a single file (using AzureBlobStoreFileSystem) and writes to GCS (using GCSFileSystem). You can use the static methods in FileSystems class with the correct prefix instead of directly invoking methods of the underlying FileSystem implementations.