0
votes

I'd like to download a bunch of gzip files in a distributed way. I created a list which contains all file's urls and parallelized them using spark. Using a map on this rdd, I downloaded the current file. Then I'd like to save it in my hdfs in order to reopen it and resave it in amazones3 using boto library.

As an example this is my code,I just tried to download the file and save it in my hdfs directory , however I got an error, which come from the path.

try:
    # For Python 3.0 and later
    from urllib.request import urlopen
except ImportError:
    # Fall back to Python 2's urllib2
    from urllib2 import urlopen

import StringIO
import gzip
from gzip import GzipFile


def dowload_and_save(x):
    response = urlopen(x)

    compressedFile = StringIO.StringIO()
    compressedFile.write(response.read())

    compressedFile.seek(0)

    decompressedFile = gzip.GzipFile(fileobj=compressedFile, mode='rb')
    with open('http://localhost:50070/webhdfs/user/root/ruben', 'w') as outfile:
        outfile.write(decompressedFile.read())



url_lists=['https://dumps.wikimedia.org/other/pagecounts-raw/2007/2007-12/pagecounts-20071209-190000.gz','https://dumps.wikimedia.org/other/pagecounts-raw/2007/2007-12/pagecounts-20071209-200000.gz']

url_lists_rdd=sc.parallelize(url_lists)

url_lists_rdd.map(dowload_and_save)
1

1 Answers

0
votes

I found the solution

import boto
from boto.s3.key import Key
import requests
import os 
os.environ['S3_USE_SIGV4'] = 'True'

def dowload_and_save(x):

    bucket_name='magnet-fwm'
    k = Key(bucket_name)

    access_key=''
    secret=''

    r = requests.get(x)
    #return (r.content)

    c = boto.connect_s3(access_key, secret, host='s3-eu-west-1.amazonaws.com')
    b = c.get_bucket(bucket_name,validate=False)

    if r.status_code == 200:
    #upload the file
        k = Key(b)
        k.key = "file.gz"

        k.content_type = r.headers['content-type']
        k.set_contents_from_string(r.content)
    return 'a'



list=['https://dumps.wikimedia.org/other/pagecounts-raw/2007/2007-12/pagecounts-20071209-180000.gz','https://dumps.wikimedia.org/other/pagecounts-raw/2008/2008-01/pagecounts-20080101-050000.gz']

url_lists_rdd=sc.parallelize(list)



#url_lists_rdd.map(lambda x: dowload_and_save(x,access_key,secret,bucket_name))  
a=url_lists_rdd.map(dowload_and_save)