4
votes

For the past couple of days i've been working to improve the logstash google_bigquery connector. Currently i was able to add features such as error handling (bad lines), better connection management and couple of other stuff.

the last but most important feature that i'm been working on is uploading compressed data to BigQuery and well the documentation of the API is horrible.

for now i'm able to uploading CSV files directly to BQ using the Jobs.insert method and i've noted that its clearly written that data can be uploaded compressed.

the only question left is there a way i can do so without using Google Cloud Storage, as the compressed option is there to reduce the network bandwidth and its cost, and adding another route (that cost money) such as GCS is pointless

the error code i'm getting is:

BQ: job failed, please enable debug and check full response (probably the issue is an incompatible schema). NOT deleting local file. {:job_id=>"job_OvWTWOXGv9yGnLKfrTfGfukLytM", :filename=>"/Users/dave.e/Logstash/tmp/bq-logstash_daves-mpb.local_2014-08-26.part000.log.gz", :job_status=>{"state"=>"DONE", "errorResult"=>{"reason"=>"internalError", "location"=>"File: 0", "message"=>"Unexpected. Please try again."}, "errors"=>[{"reason"=>"internalError", "location"=>"File: 0", "message"=>"Unexpected. Please try again."}]}, :level=>:error}

I'll cut to the main case with the code and be grateful for your help

  # Uploads a local file to the configured bucket.
  def upload_object(filename)
    @logger.debug("entering upload_object")
    begin
      @logger.debug("1")
      require 'json'
      @logger.debug("2")
      table_id = @table_prefix + "_" + get_date_pattern(filename)
      @logger.debug("3")
      # BQ does not accept anything other than alphanumeric and _
      # Ref: https://developers.google.com/bigquery/browser-tool-quickstart?hl=en
      table_id = table_id.gsub(':','_').gsub('-', '_')
      @logger.debug("table bane has been modified")

      @logger.debug("BQ: upload object.",
                    :filename => filename,
                    :table_id => table_id)
      media = Google::APIClient::UploadIO.new(filename, "application/octet-stream")
      body = {
        "configuration" => {
          "load" => {
            "sourceFormat" => "NEWLINE_DELIMITED_JSON",
            "schema" => @json_schema,
            "destinationTable"  =>  {
              "projectId" => @project_id,
              "datasetId" => @dataset,
              "tableId" => table_id
            },
            'createDisposition' => 'CREATE_IF_NEEDED',
            'writeDisposition' => 'WRITE_APPEND',
            'maxBadRecords' => 99
          }
        }
      }

      @logger.debug("Execution details: ",
                   :body_object => body,
                   :parameters => {
                     'uploadType' => 'multipart',
                     'projectId' => @project_id
                   },
                  :media => media)

      datasetId = @project_id+":"+@dataset

      verify_dataset = @client.execute(:api_method => @bq.datasets.get,
                                       :parameters => {
                                         'projectId' => @project_id,
                                         'datasetId' => datasetId })

      status = JSON.parse(verify_dataset.response.body)["id"]
      if status != dataset
         @logger.info("BQ: dataset doesnt exist, creating it instead")
         create_dataset = @client.execute(:api_method => @bq.datasets.insert,
                                          :parameters => { 'projectId' => @project_id },
                                          :body_object => { 'id' => datasetId })
         sleep 10
      end

      insert_result = @client.execute(:api_method => @bq.jobs.insert,
                                      :body_object => body,
                                      :parameters => {
                                        'uploadType' => 'multipart',
                                        'projectId' => @project_id
                                      },
                                      :media => media)

      job_id = JSON.parse(insert_result.response.body)["jobReference"]["jobId"]
      @logger.debug("BQ: multipart insert",
                    :job_id => job_id)
      return job_id
    rescue => e
      @logger.error("BQ: failed to upload file", :exception => e)
      # TODO(rdc): limit retries?
      sleep 1
      if File.exist?(filename)
        retry
      end
    end
  end
1
I've never used BigQuery from ruby, but I do recommend using resumable upload instead of multipart (yes, this is supported in bigquery, I think a lot of docs may not mention it). More info here: developers.google.com/api-client-library/ruby/guide/….Jordan Tigani
I'm investigating the error ... according to our logs the input file was only 9107 bytes. Is correct, or is part of the file missing?Jordan Tigani
i think its correct i'm trying to debug it will small packages of data, as i dont want to try it out at first with 100G filesDave Ezrakhovich
and thanks for the resumable part advise, acctually i was aware of that. i didn't think of using it but maybe i should implement it for cases of files larger than XDave Ezrakhovich

1 Answers

1
votes

The error on our side was that the file did not appear to be a valid gzip file, and the gzip library was unable to open it.

This could be a problem with how the file was generated or with how it was uploaded. If you still have access to the file, can you verify that you're able to unzip it?