2
votes

Python Shell Jobs was introduced in AWS Glue. They mentioned:

You can now use Python shell jobs, for example, to submit SQL queries to services such as ... Amazon Athena ...

Ok. We have an example to read data from Athena tables here:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

persons = glueContext.create_dynamic_frame.from_catalog(
             database="legislators",
             table_name="persons_json")
print("Count: ", persons.count())
persons.printSchema()
# TODO query all persons

However, it uses Spark instead of Python Shell. There are no such libraries that are normally available with Spark job type and I have an error:

ModuleNotFoundError: No module named 'awsglue.transforms'

How can I rewrite the code above to make it executable in the Python Shell job type?

3
Did you ever figure this out? I have the same question?Michael Black
I solved this issue for me. Check out my answer to the question that I posted just now, please.Alex Karasev

3 Answers

3
votes

The thing is, Python Shell type has its own limited set of built-in libraries.

I only managed to achieve my goal using Boto 3 to query data and Pandas to read it into a dataframe.

Here is the code snippet:

import boto3
import pandas as pd

s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
athena_client = boto3.client(service_name='athena', region_name='us-east-1')
bucket_name = 'bucket-with-csv'
print('Working bucket: {}'.format(bucket_name))

def run_query(client, query):
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={ 'Database': 'sample-db' },
        ResultConfiguration={ 'OutputLocation': 's3://{}/fromglue/'.format(bucket_name) },
    )
    return response

def validate_query(client, query_id):
    resp = ["FAILED", "SUCCEEDED", "CANCELLED"]
    response = client.get_query_execution(QueryExecutionId=query_id)
    # wait until query finishes
    while response["QueryExecution"]["Status"]["State"] not in resp:
        response = client.get_query_execution(QueryExecutionId=query_id)

    return response["QueryExecution"]["Status"]["State"]

def read(query):
    print('start query: {}\n'.format(query))
    qe = run_query(athena_client, query)
    qstate = validate_query(athena_client, qe["QueryExecutionId"])
    print('query state: {}\n'.format(qstate))

    file_name = "fromglue/{}.csv".format(qe["QueryExecutionId"])
    obj = s3_client.get_object(Bucket=bucket_name, Key=file_name)
    return pd.read_csv(obj['Body'])

time_entries_df = read('SELECT * FROM sample-table')
3
votes

SparkContext won't be available in Glue Python Shell. Hence you need to depend on Boto3 and Pandas to handle the data retrieval. But it comes a lot of overhead to query Athena using boto3 and poll the ExecutionId to check if the query execution got finished.

Recently awslabs released a new package called AWS Data Wrangler. It extends power of Pandas library to AWS to easily interact with Athena and lot of other AWS Services.

Reference link:

  1. https://github.com/awslabs/aws-data-wrangler
  2. https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/006%20-%20Amazon%20Athena.ipynb

Note: AWS Data Wrangler library wont be available by default inside Glue Python shell. To include it in Python shell, follow the instructions in following link:

https://aws-data-wrangler.readthedocs.io/en/latest/install.html#aws-glue-python-shell-jobs

-1
votes

I have a few month using glue, i use:

from pyspark.context import SparkContext
from awsglue.context import GlueContext

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

data_frame = spark.read.format("com.databricks.spark.csv")\
    .option("header","true")\
    .load(<CSVs THAT IS USING FOR ATHENA - STRING>)