17
votes

In Amazon Redshift's Getting Started Guide, data is pulled from Amazon S3 and loaded into an Amazon Redshift Cluster utilizing SQLWorkbench/J. I'd like to mimic the same process of connecting to the cluster and loading sample data into the cluster utilizing Boto3.

However in Boto3's documentation of Redshift, I'm unable to find a method that would allow me to upload data into Amazon Redshift cluster.

I've been able to connect with Redshift utilizing Boto3 with the following code:

client = boto3.client('redshift')

But I'm not sure what method would allow me to either create tables or upload data to Amazon Redshift the way it's done in the tutorial with SQLWorkbenchJ.

3

3 Answers

12
votes

Go back to step 4 in that tutorial you linked. See where it shows you how to get the URL of the cluster? You have to connect to that URL with a PostgreSQL driver. The AWS SDKs such as Boto3 provide access to the AWS API. You need to connect to Redshift over a PostgreSQL API, just like you would connect to a PostgreSQL database on RDS.

23
votes

Right, you need psycopg2 Python module to execute COPY command.

My code looks like this:

import psycopg2
#Amazon Redshift connect string 
conn_string = "dbname='***' port='5439' user='***' password='***' host='mycluster.***.redshift.amazonaws.com'"  
#connect to Redshift (database should be open to the world)
con = psycopg2.connect(conn_string);
sql="""COPY %s FROM '%s' credentials 
      'aws_access_key_id=%s; aws_secret_access_key=%s'
       delimiter '%s' FORMAT CSV %s %s; commit;""" % 
      (to_table, fn, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY,delim,quote,gzip)

#Here
#  fn - s3://path_to__input_file.gz
#  gzip = 'gzip'

cur = con.cursor()
cur.execute(sql)
con.close() 

I used boto3/psycopg2 to write CSV_Loader_For_Redshift

1
votes

Using psycopyg2 & get_cluster_credentials

Prerequisites -

  • IAM ROLE attached to respective User

    IAM Role with get_cluster_credentials policy LINK

  • On cloud (EC2) with appropriate IAM Role attached

The below code will work only if you deploying it on a PC/VM where a user's AWS Credentials are already configured [ CLI - aws configure ] OR you are on an instance in the same Account,VPC.

  1. Have a config.ini file -

     [Redshift]
    
     port = 3389
    
     username = please_enter_username
    
     database_name = please_database-name
    
     cluster_id = please_enter_cluster_id_name
    
     url = please_enter_cluster_endpoint_url
    
     region = us-west-2
    
  2. My Redshift_connection.py

     import logging
    
     import psycopg2
    
     import boto3
    
     import ConfigParser
    
    
     def db_connection():
        logger = logging.getLogger(__name__)
    
        parser = ConfigParser.ConfigParser()
    
        parser.read('config.ini')
    
        RS_PORT = parser.get('Redshift','port')
    
        RS_USER = parser.get('Redshift','username')
    
        DATABASE = parser.get('Redshift','database_name')
    
        CLUSTER_ID = parser.get('Redshift','cluster_id')
    
        RS_HOST = parser.get('Redshift','url')
    
        REGION_NAME = parser.get('Redshift','region')
    
        client = boto3.client('redshift',region_name=REGION_NAME)
    
        cluster_creds = client.get_cluster_credentials(DbUser=RS_USER,
                                                    DbName=DATABASE,
                                                    ClusterIdentifier=CLUSTER_ID,
                                                    AutoCreate=False)
    
     try:
       conn = psycopg2.connect(
         host=RS_HOST,
         port=RS_PORT,
         user=cluster_creds['DbUser'],
         password=cluster_creds['DbPassword'],
         database=DATABASE
       )
    
       return conn
     except psycopg2.Error:
       logger.exception('Failed to open database connection.')
       print "Failed"
    
  3. Query Execution script -

     from Redshift_Connection import db_connection
    
     def executescript(redshift_cursor):
         query = "SELECT * FROM <SCHEMA_NAME>.<TABLENAME>"
         cur=redshift_cursor
         cur.execute(query)
    
     conn = db_connection()
     conn.set_session(autocommit=False)
     cursor = conn.cursor()
     executescript(cursor)
     conn.close()