I'm trying to send the output of a reduce to a map, by chaining in a pipeline, similar to this guy:
I would like to chain multiple mapreduce jobs in google app engine in Python
I tried his solution, but it didn't work.
The flow of my pipeline is:
Map1
Reduce1
Map2
Reduce2
I'm saving the output of Reduce1 to the blobstore under a blob_key, and then trying to access the blob from Map2. But I get the following error while executing the second map: "BadReaderParamsError: Could not find blobinfo for key <blob_key here>"
.
Here's the pipeline code:
class SongsPurchasedTogetherPipeline(base_handler.PipelineBase):
def run(self, filekey, blobkey):
bucket_name = app_identity.get_default_gcs_bucket_name()
intermediate_output = yield mapreduce_pipeline.MapreducePipeline(
"songs_purchased_together_intermediate",
"main.songs_purchased_together_map1",
"main.songs_purchased_together_reduce1",
"mapreduce.input_readers.BlobstoreLineInputReader",
"mapreduce.output_writers.GoogleCloudStorageOutputWriter",
mapper_params={
"blob_keys": blobkey,
},
reducer_params={
"output_writer": {
"bucket_name": bucket_name,
"content_type": "text/plain",
}
},
shards=1)
yield StoreOutput("SongsPurchasedTogetherIntermediate", filekey, intermediate_output)
intermediate_output_key = yield BlobKey(intermediate_output)
output = yield mapreduce_pipeline.MapreducePipeline(
"songs_purchased_together",
"main.songs_purchased_together_map2",
"main.songs_purchased_together_reduce2",
"mapreduce.input_readers.BlobstoreLineInputReader",
"mapreduce.output_writers.GoogleCloudStorageOutputWriter",
mapper_params=(intermediate_output_key),
reducer_params={
"output_writer": {
"bucket_name": bucket_name,
"content_type": "text/plain",
}
},
shards=1)
yield StoreOutput("SongsPurchasedTogether", filekey, output)
and here's the BlobKey class which takes the intermediate output and generates the blob key for Map2 to use:
class BlobKey(base_handler.PipelineBase):
def run(self, output):
blobstore_filename = "/gs" + output[0]
blobstore_gs_key = blobstore.create_gs_key(blobstore_filename)
return {
"blob_keys": blobstore_gs_key
}
The StoreOutput class is the same as the one in Google's MapReduce demo https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/demo/main.py, and does the same thing as the BlobKey class, but additionally sends the blob's URL to HTML as a link.
Manually visiting the URL appname/blobstore/<blob_key>
, by typing it into a browser (after Reduce1 succeeds, but Map2 fails) displays the output expected from Reduce1. Why can't Map2 find the blob? Sorry I'm a newbie to AppEngine, and I'm probably going wrong somewhere because I don't fully understand blob storage.