1
votes

I'm trying, in a Java environment, to write log files to Google Cloud Storage in chunks. I have a process that parses raw log-files and produces lines of JSON; I store the JSON lines in a buffer, and I want to write to the same file in GCS every time the buffer hits 5mgb or so, until the original raw source has been fully parsed. I have a similar setup that writes to AWS S3. The writing in chunks is done due to memory issues.

I managed to write a file to GCS as follows (gcsService is a Storage object configured with authentications and so on):

private void uploadStream(String path, String name, String contentType, InputStream stream, String bucketName) throws IOException, GeneralSecurityException {
    InputStreamContent contentStream = new InputStreamContent(contentType, stream);
    StorageObject objectMetadata = new StorageObject()
                .setName(path+"/"+name)
                .setAcl(Arrays.asList(new ObjectAccessControl().setEntity("allUsers").setRole("READER")));
    Storage.Objects.Insert insertRequest = gcsService.objects()
                .insert(bucketName, objectMetadata, contentStream);   
    insertRequest.execute();
}

Unfortunately, I have been unable to figure out how to write to GCS in chunks. Google's documentation seems to suggest two approaches. One involves "Resumable" Insert requests: https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload

And the other approach involves "Compose" requests: https://cloud.google.com/storage/docs/json_api/v1/objects/compose

I've been trying to get a "Resumable" upload set up, but I can't get it to work.

Any ideas? My specific questions are:

  • What is an elegant and/or appropriate way to upload in chunks to GCS?
  • Does anyone know how to set up Resumable uploads to GCS via Insert requests in Java? Can that be done at all?
1

1 Answers

1
votes

Got it to work - it was a hassle. For the record, the answers to my questions are:

  • "Resumable" upload works in Java and is an elegant and perhaps the preferred way (I'm not an expert and so I'm not sure) for uploading files in chunks to GCS.
  • A "Resumable" upload can be set up in Java as described below.

I ended up having two methods - one for initiating the upload, and one for sending chunks.

private String initiateResumableUpload() throws IOException {
        String URI = "https://storage.googleapis.com/" + bucket + "/" + path;
        GenericUrl url = new GenericUrl(URI);
        HttpRequest req = requestFactory.buildPostRequest(url, new ByteArrayContent("text/plain", new byte[0]));
        HttpHeaders headers = new HttpHeaders();
        headers.set("x-goog-resumable", "start");
        headers.setContentLength((long) 0);
        headers.setContentType("text/plain");
        req.setHeaders(headers);
        req.setReadTimeout((int) DEFAULT_TIMEOUT);
        req.setResponseHeaders(headers);
        HttpResponse resp;
        try {
            resp = req.execute();
        } catch (IOException e) {
            throw e;
        }
        if (resp.getStatusCode() == 201) {
            String location = resp.getHeaders().getLocation();
            return location;

        } else {
            throw new IOException();
        }
    }

The requestFactory should know your appropriately generated credentials.

private void writeChunk(final boolean isFinalChunk) throws HttpResponseException, IOException {
    System.out.println("Writing chunk number " + Integer.toString(chunkCount) + ".");

    try (InputStream inputStream = new ByteBufInputStream(buffer)) {
        int length = Math.min(buffer.readableBytes(), DEFAULT_UPLOAD_CHUNK_SIZE);
        HttpContent contentsend = new InputStreamContent("text/plain", inputStream);

        String URI = location;
        GenericUrl url = new GenericUrl(URI);
        HttpRequest req = requestFactory.buildPutRequest(url, contentsend);

        int offset = chunkCount*DEFAULT_UPLOAD_CHUNK_SIZE;
        long limit = offset + length;
        HttpHeaders headers = new HttpHeaders();
        headers.setContentLength((long) length);
        headers.setContentRange("bytes " + (length == 0 ? "*" : offset + "-" + (limit - 1)) + (isFinalChunk ? "/" + limit : "/*"));

        req.setHeaders(headers);

        req.setReadTimeout((int) DEFAULT_TIMEOUT);

        try {
            req.execute();
            } 
        catch (HttpResponseException e) {
                if(e.getMessage().equals("308 Resume Incomplete"))
                {
                    ++chunkCount;
                }
                else
                {
                    throw e;
                }
            }
        catch (Exception e) {
            throw e;
        }
    }
}

My buffer is a io.netty.buffer.ByteBuf.

My GCS-related imports are:

import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.ByteArrayContent;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpContent;
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.HttpTransport;

There might be some bugs in the above code, but it did successfully write a file in chunks to GCS.

I also managed to accomplish the task via a different library and "Compose" requests. But the "Resumable" approach seems to be more appropriate.

Cheers and good luck.