2
votes

Long time no see. I'd want to get 5GB of data from MySql into BigQuery. My best bet seems to be some sort of CSV export / import. Which doesn't work for various reasons, see:

agile-coral-830:splitpapers1501200518aa150120052659
agile-coral-830:splitpapers1501200545aa150120055302
agile-coral-830:splitpapers1501200556aa150120060231

This is likely because I don't have the right MySql incantation able to generate perfect CSV in accordance with RFC 4180. However, instead of arguing RFC 4180 minutia, this whole load business could be solved in five minutes by supporting customizable multi-character field separators and multi-character line separators. I'm pretty sure my data doesn't contain either ### nor @@@, so the following would work like a charm:

mysql> select * from $TABLE_NAME 
into outfile '$DATA.csv' 
fields terminated by '###' 
enclosed by ''
lines terminated by '@@@'

$ bq load  --nosync -F '###' -E '@@@' $TABLE_NAME $DATA.csv $SCHEMA.json

Edit: Fields contain '\n', '\r', ',' and '"'. They also contain NULLs, which MySql represents as [escape]N, in the example "N. Sample row:

"10.1.1.1.1483","5","9074080","Candidate high myopia loci on chromosomes 18p and 12q do not play a major role in susceptibility to common myopia","Results
There was no strong evidence of linkage of common myopia to these candidate regions: all two-point and multipoint heterogeneity LOD scores were < 1.0 and non-parametric linkage p-values were > 0.01. However, one Amish family showed slight evidence of linkage (LOD>1.0) on 12q; another 3 Amish families each gave LOD >1.0 on 18p; and 3 Jewish families each gave LOD >1.0 on 12q.
Conclusions
Significant evidence of linkage (LOD> 3) of myopia was not found on chromosome 18p or 12q loci in these families. These results suggest that these loci do not play a major role in the causation of common myopia in our families studied.","2004","BMC MEDICAL GENETICS","JOURNAL","N,"5","20","","","","0","1","USER","2007-11-19 05:00:00","rep1","PDFLib TET","0","2009-05-24 20:33:12"
2
Please provide a few lines sample from your CSV lines and also the JSON schema. Also I ran into a bug last time when doing the output from MySQL stackoverflow.com/questions/24610691/…Pentium10
<moved to main post>Cristian Petrescu-Prahova

2 Answers

3
votes

I found loading through a CSV very difficult. More restrictions and complications. I have been messing around this morning with moving data from MySQL to BigQuery.

Bellow is a Python script that will build the table decorator and stream the data directly into the BigQuery table.

My db is in the Cloud so you may need to change the connection string. Fill in the missing values for your particular situation then call it by:

SQLToBQBatch(tableName, limit)

I put the limit in to test with. For my final test I sent 999999999 for the limit and everything worked fine.

I would recommend using a backend module to run this over 5g.

Use "RowToJSON" to clean up and invalid characters (ie anything non utf8).

I haven't tested on 5gb but it was able to do 50k rows in about 20 seconds. The same load in CSV was over 2 minutes.

I wrote this to test things, so please excuse the bad codding practices and mini hacks. It works so feel free to clean it up for any production level work.

import MySQLdb
import logging
from apiclient.discovery import build
from oauth2client.appengine import AppAssertionCredentials
import httplib2

OAUTH_SCOPE = 'https://www.googleapis.com/auth/bigquery'



PROJECT_ID = 
DATASET_ID = 
TABLE_ID = 

SQL_DATABASE_NAME = 
SQL_DATABASE_DB = 
SQL_USER = 
SQL_PASS = 


def Connect():
    return MySQLdb.connect(unix_socket='/cloudsql/' + SQL_DATABASE_NAME, db=SQL_DATABASE_DB, user=SQL_USER, passwd=SQL_PASS)


def RowToJSON(cursor, row, fields):
    newData = {}
    for i, value in enumerate(row):
        try:
            if fields[i]["type"] == bqTypeDict["int"]:
                value = int(value)
            else:
                value = float(value)
        except:
            if value is not None:
                value = value.replace("\x92", "'") \
                                .replace("\x96", "'") \
                                .replace("\x93", '"') \
                                .replace("\x94", '"') \
                                .replace("\x97", '-') \
                                .replace("\xe9", 'e') \
                                .replace("\x91", "'") \
                                .replace("\x85", "...") \
                                .replace("\xb4", "'") \
                                .replace('"', '""')

        newData[cursor.description[i][0]] = value
    return newData


def GetBuilder():
    return build('bigquery', 'v2',http = AppAssertionCredentials(scope=OAUTH_SCOPE).authorize(httplib2.Http()))

bqTypeDict = { 'int' : 'INTEGER',
                   'varchar' : 'STRING',
                   'double' : 'FLOAT',
                   'tinyint' : 'INTEGER',
                   'decimal' : 'FLOAT',
                   'text' : 'STRING',
                   'smallint' : 'INTEGER',
                   'char' : 'STRING',
                   'bigint' : 'INTEGER',
                   'float' : 'FLOAT',
                   'longtext' : 'STRING'
                  }

def BuildFeilds(table):
    conn = Connect()
    cursor = conn.cursor()
    cursor.execute("DESCRIBE %s;" % table)
    tableDecorator = cursor.fetchall()
    fields = []

    for col in tableDecorator:
        field = {}
        field["name"] = col[0]
        colType = col[1].split("(")[0]
        if colType not in bqTypeDict:
            logging.warning("Unknown type detected, using string: %s", str(col[1]))
        field["type"] = bqTypeDict.get(colType, "STRING")
        if col[2] == "YES":
            field["mode"] = "NULLABLE"
        fields.append(field)
    return fields


def SQLToBQBatch(table, limit=3000):
    logging.info("****************************************************")
    logging.info("Starting SQLToBQBatch. Got: Table: %s, Limit: %i" % (table, limit))   
    bqDest = GetBuilder()
    fields = BuildFeilds(table)

    try:
        responce = bqDest.datasets().insert(projectId=PROJECT_ID, body={'datasetReference' : 
                                                                {'datasetId' : DATASET_ID} }).execute()
        logging.info("Added Dataset")
        logging.info(responce)
    except Exception, e:
        logging.info(e)
        if ("Already Exists: " in str(e)):
            logging.info("Dataset already exists")
        else:
            logging.error("Error creating dataset: " + str(e), "Error")

    try:
        responce = bqDest.tables().insert(projectId=PROJECT_ID, datasetId=DATASET_ID, body={'tableReference' : {'projectId'  : PROJECT_ID,
                                                                                               'datasetId' : DATASET_ID,
                                                                                               'tableId'  : TABLE_ID},
                                                                            'schema' : {'fields' : fields}}
                                                                                ).execute()
        logging.info("Added Table")
        logging.info(responce)
    except Exception, e:
        logging.info(e)
        if ("Already Exists: " in str(e)):
            logging.info("Table already exists")
        else:
            logging.error("Error creating table: " + str(e), "Error")

    conn = Connect()
    cursor = conn.cursor()

    logging.info("Starting load loop")
    count = -1
    cur_pos = 0
    total = 0
    batch_size = 1000

    while count != 0 and cur_pos < limit:
        count = 0
        if batch_size + cur_pos > limit:
            batch_size = limit - cur_pos
        sqlCommand = "SELECT * FROM %s LIMIT %i, %i" % (table, cur_pos, batch_size) 
        logging.info("Running: %s", sqlCommand)
        cursor.execute(sqlCommand)
        data = []
        for _, row in enumerate(cursor.fetchall()):
            data.append({"json": RowToJSON(cursor, row, fields)})
            count += 1
        logging.info("Read complete")

        if count != 0:

            logging.info("Sending request")   
            insertResponse = bqDest.tabledata().insertAll(
                                                        projectId=PROJECT_ID,
                                                        datasetId=DATASET_ID,
                                                        tableId=TABLE_ID,
                                                        body={"rows":data}).execute()
            cur_pos += batch_size
            total += count
            logging.info("Done %i, Total: %i, Response: %s", count, total, insertResponse)
            if "insertErrors" in insertResponse:
                logging.error("Error inserting data index: %i", insertResponse["insertErrors"]["index"])
                for error in insertResponse["insertErrors"]["errors"]:
                    logging.error(error)
        else:
            logging.info("No more rows")
0
votes

•	Generate google service account key
o	IAM & Admin > Service account > create_Service_account
o	Once created then create key , download and save It to the project folder on local machine – google_key.json
•	Run the code in pycharm environment after installing the packages. 



NOTE : The table data in mysql remains intact. Also , if one uses preview in BQ to see that you won’t see. Go to console and fire the query. 


o	CODE
o	import MySQLdb
from google.cloud import bigquery
import mysql.connector
import logging
import os
from MySQLdb.converters import conversions
import click
import MySQLdb.cursors
from google.cloud.exceptions import ServiceUnavailable
import sys

bqTypeDict = {'int': 'INTEGER',
              'varchar': 'STRING',
              'double': 'FLOAT',
              'tinyint': 'INTEGER',
              'decimal': 'FLOAT',
              'text': 'STRING',
              'smallint': 'INTEGER',
              'char': 'STRING',
              'bigint': 'INTEGER',
              'float': 'FLOAT',
              'longtext': 'STRING',
              'datetime': 'TIMESTAMP'
              }


def conv_date_to_timestamp(str_date):
    import time
    import datetime

    date_time = MySQLdb.times.DateTime_or_None(str_date)
    unix_timestamp = (date_time - datetime.datetime(1970, 1, 1)).total_seconds()

    return unix_timestamp


def Connect(host, database, user, password):
    return mysql.connector.connect(host='',
                                    port='',
                                    database='recommendation_spark',
                                    user='',
                                    password='')


def BuildSchema(host, database, user, password, table):
    logging.debug('build schema for table %s in database %s' % (table, database))
    conn = Connect(host, database, user, password)
    cursor = conn.cursor()
    cursor.execute("DESCRIBE %s;" % table)

    tableDecorator = cursor.fetchall()
    schema = []

    for col in tableDecorator:
        colType = col[1].split("(")[0]
        if colType not in bqTypeDict:
            logging.warning("Unknown type detected, using string: %s", str(col[1]))

        field_mode = "NULLABLE" if col[2] == "YES" else "REQUIRED"
        field = bigquery.SchemaField(col[0], bqTypeDict.get(colType, "STRING"), mode=field_mode)

        schema.append(field)

    return tuple(schema)


def bq_load(table, data, max_retries=5):
    logging.info("Sending request")
    uploaded_successfully = False
    num_tries = 0

    while not uploaded_successfully and num_tries < max_retries:
        try:
            insertResponse = table.insert_data(data)

            for row in insertResponse:
                if 'errors' in row:
                    logging.error('not able to upload data: %s', row['errors'])

            uploaded_successfully = True
        except ServiceUnavailable as e:
            num_tries += 1
            logging.error('insert failed with exception trying again retry %d', num_tries)
        except Exception as e:
            num_tries += 1
            logging.error('not able to upload data: %s', str(e))


@click.command()
@click.option('-h', '--host', default='tempus-qa.hashmapinc.com', help='MySQL hostname')
@click.option('-d', '--database', required=True, help='MySQL database')
@click.option('-u', '--user', default='root', help='MySQL user')
@click.option('-p', '--password', default='docker', help='MySQL password')
@click.option('-t', '--table', required=True, help='MySQL table')
@click.option('-i', '--projectid', required=True, help='Google BigQuery Project ID')
@click.option('-n', '--dataset', required=True, help='Google BigQuery Dataset name')
@click.option('-l', '--limit', default=0, help='max num of rows to load')
@click.option('-s', '--batch_size', default=1000, help='max num of rows to load')
@click.option('-k', '--key', default='key.json',help='Location of google service account key (relative to current working dir)')
@click.option('-v', '--verbose', default=0, count=True, help='verbose')
def SQLToBQBatch(host, database, user, password, table, projectid, dataset, limit, batch_size, key, verbose):
    # set to max verbose level
    verbose = verbose if verbose < 3 else 3
    loglevel = logging.ERROR - (10 * verbose)

    logging.basicConfig(level=loglevel)

    logging.info("Starting SQLToBQBatch. Got: Table: %s, Limit: %i", table, limit)
    ## set env key to authenticate application

    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = os.path.join(os.getcwd(), key)
    print('file found')
    # Instantiates a client
    bigquery_client = bigquery.Client()
    print('Project id created')

    try:

        bq_dataset = bigquery_client.dataset(dataset)
        bq_dataset.create()
        logging.info("Added Dataset")
    except Exception as e:
        if ("Already Exists: " in str(e)):
            logging.info("Dataset already exists")
        else:
            logging.error("Error creating dataset: %s Error", str(e))



    bq_table = bq_dataset.table(table)
    bq_table.schema = BuildSchema(host, database, user, password, table)
    print('Creating schema using build schema')
    bq_table.create()
    logging.info("Added Table %s", table)

    conn = Connect(host, database, user, password)
    cursor = conn.cursor()

    logging.info("Starting load loop")
    cursor.execute("SELECT * FROM %s" % (table))

    cur_batch = []
    count = 0

    for row in cursor:
        count += 1

        if limit != 0 and count >= limit:
            logging.info("limit of %d rows reached", limit)
            break

        cur_batch.append(row)

        if count % batch_size == 0 and count != 0:
            bq_load(bq_table, cur_batch)

            cur_batch = []
            logging.info("processed %i rows", count)

    # send last elements
    bq_load(bq_table, cur_batch)
    logging.info("Finished (%i total)", count)
    print("table created")


if __name__ == '__main__':
    # run the command
    SQLToBQBatch()
    
o	Command to run the file : python mysql_to_bq.py -d 'recommendation_spark' -t temp_market_store -i inductive-cocoa-250507 -n practice123 -k key.json