0
votes

I'm trying to send the output of a reduce to a map, by chaining in a pipeline, similar to this guy: I would like to chain multiple mapreduce jobs in google app engine in Python I tried his solution, but it didn't work. The flow of my pipeline is:
Map1
Reduce1
Map2
Reduce2
I'm saving the output of Reduce1 to the blobstore under a blob_key, and then trying to access the blob from Map2. But I get the following error while executing the second map: "BadReaderParamsError: Could not find blobinfo for key <blob_key here>".

Here's the pipeline code:

class SongsPurchasedTogetherPipeline(base_handler.PipelineBase):

  def run(self, filekey, blobkey):
    bucket_name = app_identity.get_default_gcs_bucket_name()
    intermediate_output = yield mapreduce_pipeline.MapreducePipeline(
        "songs_purchased_together_intermediate",
        "main.songs_purchased_together_map1",
        "main.songs_purchased_together_reduce1",
        "mapreduce.input_readers.BlobstoreLineInputReader",
        "mapreduce.output_writers.GoogleCloudStorageOutputWriter",
        mapper_params={
            "blob_keys": blobkey,
        },
        reducer_params={
            "output_writer": {
                "bucket_name": bucket_name,
                "content_type": "text/plain",
            }
        },
        shards=1)
    yield StoreOutput("SongsPurchasedTogetherIntermediate", filekey, intermediate_output)

    intermediate_output_key = yield BlobKey(intermediate_output)
    output = yield mapreduce_pipeline.MapreducePipeline(
        "songs_purchased_together",
        "main.songs_purchased_together_map2",
        "main.songs_purchased_together_reduce2",
        "mapreduce.input_readers.BlobstoreLineInputReader",
        "mapreduce.output_writers.GoogleCloudStorageOutputWriter",
        mapper_params=(intermediate_output_key),
        reducer_params={
            "output_writer": {
                "bucket_name": bucket_name,
                "content_type": "text/plain",
            }
        },
        shards=1)
    yield StoreOutput("SongsPurchasedTogether", filekey, output)

and here's the BlobKey class which takes the intermediate output and generates the blob key for Map2 to use:

class BlobKey(base_handler.PipelineBase):

  def run(self, output):
    blobstore_filename = "/gs" + output[0]
    blobstore_gs_key = blobstore.create_gs_key(blobstore_filename)
    return {
      "blob_keys": blobstore_gs_key
    }

The StoreOutput class is the same as the one in Google's MapReduce demo https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/demo/main.py, and does the same thing as the BlobKey class, but additionally sends the blob's URL to HTML as a link.

Manually visiting the URL appname/blobstore/<blob_key>, by typing it into a browser (after Reduce1 succeeds, but Map2 fails) displays the output expected from Reduce1. Why can't Map2 find the blob? Sorry I'm a newbie to AppEngine, and I'm probably going wrong somewhere because I don't fully understand blob storage.

1

1 Answers

0
votes

Okay, I found out that Google has removed the BlobstoreOutputWriter from the list of standard writers on the GAE GitHub repository, which makes things a little more complicated. I had to write to the Google Cloud Store, and read from there. I wrote a helper class which generates mapper parameters for the GoogleCloudStorageInputReader.

class GCSMapperParams(base_handler.PipelineBase):

  def run(self, GCSPath):
    bucket_name = app_identity.get_default_gcs_bucket_name()
    return {
            "input_reader": {
                "bucket_name": bucket_name,
                "objects": [path.split('/', 2)[2] for path in GCSPath],
            }
        }

The function takes as argument the output of one MapReduce stage which uses a GoogleCloudStorageOutputWriter, and returns a dictionary which can be assigned to mapper_params of the next MapReduce stage.

Basically, the value of the output of the first MapReduce stage is a list containing <app_name>/<pipeline_name>/key/output-[i], where i is the number of shards. In order to use a GoogleCloudStorageInputReader, the key to the data should be passed through the variable objects in the mapper_params. The key must be of the form key/output-[i], so the helper class simply removes the <app_name>/<pipeline_name>/ from it.