For the past couple of days i've been working to improve the logstash google_bigquery connector. Currently i was able to add features such as error handling (bad lines), better connection management and couple of other stuff.
the last but most important feature that i'm been working on is uploading compressed data to BigQuery and well the documentation of the API is horrible.
for now i'm able to uploading CSV files directly to BQ using the Jobs.insert method and i've noted that its clearly written that data can be uploaded compressed.
the only question left is there a way i can do so without using Google Cloud Storage, as the compressed option is there to reduce the network bandwidth and its cost, and adding another route (that cost money) such as GCS is pointless
the error code i'm getting is:
BQ: job failed, please enable debug and check full response (probably the issue is an incompatible schema). NOT deleting local file. {:job_id=>"job_OvWTWOXGv9yGnLKfrTfGfukLytM", :filename=>"/Users/dave.e/Logstash/tmp/bq-logstash_daves-mpb.local_2014-08-26.part000.log.gz", :job_status=>{"state"=>"DONE", "errorResult"=>{"reason"=>"internalError", "location"=>"File: 0", "message"=>"Unexpected. Please try again."}, "errors"=>[{"reason"=>"internalError", "location"=>"File: 0", "message"=>"Unexpected. Please try again."}]}, :level=>:error}
I'll cut to the main case with the code and be grateful for your help
# Uploads a local file to the configured bucket.
def upload_object(filename)
@logger.debug("entering upload_object")
begin
@logger.debug("1")
require 'json'
@logger.debug("2")
table_id = @table_prefix + "_" + get_date_pattern(filename)
@logger.debug("3")
# BQ does not accept anything other than alphanumeric and _
# Ref: https://developers.google.com/bigquery/browser-tool-quickstart?hl=en
table_id = table_id.gsub(':','_').gsub('-', '_')
@logger.debug("table bane has been modified")
@logger.debug("BQ: upload object.",
:filename => filename,
:table_id => table_id)
media = Google::APIClient::UploadIO.new(filename, "application/octet-stream")
body = {
"configuration" => {
"load" => {
"sourceFormat" => "NEWLINE_DELIMITED_JSON",
"schema" => @json_schema,
"destinationTable" => {
"projectId" => @project_id,
"datasetId" => @dataset,
"tableId" => table_id
},
'createDisposition' => 'CREATE_IF_NEEDED',
'writeDisposition' => 'WRITE_APPEND',
'maxBadRecords' => 99
}
}
}
@logger.debug("Execution details: ",
:body_object => body,
:parameters => {
'uploadType' => 'multipart',
'projectId' => @project_id
},
:media => media)
datasetId = @project_id+":"+@dataset
verify_dataset = @client.execute(:api_method => @bq.datasets.get,
:parameters => {
'projectId' => @project_id,
'datasetId' => datasetId })
status = JSON.parse(verify_dataset.response.body)["id"]
if status != dataset
@logger.info("BQ: dataset doesnt exist, creating it instead")
create_dataset = @client.execute(:api_method => @bq.datasets.insert,
:parameters => { 'projectId' => @project_id },
:body_object => { 'id' => datasetId })
sleep 10
end
insert_result = @client.execute(:api_method => @bq.jobs.insert,
:body_object => body,
:parameters => {
'uploadType' => 'multipart',
'projectId' => @project_id
},
:media => media)
job_id = JSON.parse(insert_result.response.body)["jobReference"]["jobId"]
@logger.debug("BQ: multipart insert",
:job_id => job_id)
return job_id
rescue => e
@logger.error("BQ: failed to upload file", :exception => e)
# TODO(rdc): limit retries?
sleep 1
if File.exist?(filename)
retry
end
end
end