7
votes

I'm asking this in the context of Google Dataflow, but also generally.

Using PyTorch, I can reference a local directory containing multiple files that comprise a pretrained model. I happen to be working with a Roberta model, but the interface is the same for others.

ls some-directory/
      added_tokens.json
      config.json             
      merges.txt              
      pytorch_model.bin       
      special_tokens_map.json vocab.json
from pytorch_transformers import RobertaModel

# this works
model = RobertaModel.from_pretrained('/path/to/some-directory/')

However, my pretrained model is stored in a GCS bucket. Let's call it gs://my-bucket/roberta/.

In the context of loading this model in Google Dataflow, I'm trying to remain stateless and avoid persisting to disk, so my preference would be to get this model straight from GCS. As I understand it, the PyTorch general interface method from_pretrained() can take the string representation of a local dir OR a URL. However, I can't seem to load the model from a GCS URL.

# this fails
model = RobertaModel.from_pretrained('gs://my-bucket/roberta/')
# ValueError: unable to parse gs://mahmed_bucket/roberta-base as a URL or as a local path

If I try to use the public https URL of the directory blob, it will also fail, although that is likely due to lack of authentication since the credentials referenced in the python environment that can create clients don't translate to public requests to https://storage.googleapis

# this fails, probably due to auth
bucket = gcs_client.get_bucket('my-bucket')
directory_blob = bucket.blob(prefix='roberta')
model = RobertaModel.from_pretrained(directory_blob.public_url)
# ValueError: No JSON object could be decoded

# and for good measure, it also fails if I append a trailing /
model = RobertaModel.from_pretrained(directory_blob.public_url + '/')
# ValueError: No JSON object could be decoded

I understand that GCS doesn't actually have subdirectories and it's actually just being a flat namespace under the bucket name. However, it seems like I'm blocked by the necessity of authentication and a PyTorch not speaking gs://.

I can get around this by persisting the files locally first.

from pytorch_transformers import RobertaModel
from google.cloud import storage
import tempfile

local_dir = tempfile.mkdtemp()
gcs = storage.Client()
bucket = gcs.get_bucket(bucket_name)
blobs = bucket.list_blobs(prefix=blob_prefix)
for blob in blobs:
    blob.download_to_filename(local_dir + '/' + os.path.basename(blob.name))
model = RobertaModel.from_pretrained(local_dir)

But this seems like such a hack, and I keep thinking I must be missing something. Surely there's a way to stay stateless and not have to rely on disk persistence!

  • So is there a way to load a pretrained model stored in GCS?
  • Is there a way to authenticate when doing the public URL request in this context?
  • Even if there is a way to authenticate, will the non-existence of subdirectories still be an issue?

Thanks for the help! I'm also happy to be pointed to any duplicate questions 'cause I sure couldn't find any.


Edits and Clarifications

  • My Python session is already authenticated to GCS, which is why I'm able to download the blob files locally and then point to that local directory with load_frompretrained()

  • load_frompretrained() requires a directory reference because it needs all the files listed at the top of the question, not just pytorch-model.bin

  • To clarify question #2, I was wondering if there's some way of giving the PyTorch method a request URL that had encrypted credentials embedded or something like that. Kind of a longshot, but I wanted to make sure I hadn't missed anything.

  • To clarify question #3 (in addition to the comment on one answer below), even if there's a way to embed credentials in the URL that I don't know about, I still need to reference a directory rather than a single blob, and I don't know if the GCS subdirectory would be recognized as such because (as the Google docs state) subdirectories in GCS are an illusion and they don't represent a real directory structure. So I think this question is irrelevant or at least blocked by question #2, but it's a thread I chased for a bit so I'm still curious.

4

4 Answers

2
votes

MAJOR EDIT:

You can install wheel files on Dataflow workers, and you can also use worker temp storage to persist binary files locally!

It's true that (currently as of Nov 2019) you can't do this by supplying a --requirements argument. Instead you have to use setup.py like this. Assume any constants IN CAPS are defined elsewhere.

REQUIRED_PACKAGES = [
    'torch==1.3.0',
    'pytorch-transformers==1.2.0',
]

setup(
    name='project_dir',
    version=VERSION,
    packages=find_packages(),
    install_requires=REQUIRED_PACKAGES)

Run script

python setup.py sdist

python project_dir/my_dataflow_job.py \
--runner DataflowRunner \
--project ${GCP_PROJECT} \
--extra_package dist/project_dir-0.1.0.tar.gz \
# SNIP custom args for your job and required Dataflow Temp and Staging buckets #

And within the job, here's downloading and using the model from GCS in the context of a custom Dataflow operator. For convenience we wrapped a few utility methods in a SEPARATE MODULE (important to get around Dataflow dependency uploads) and imported them at the LOCAL SCOPE of the custom operator, not global.

class AddColumn(beam.DoFn):
    PRETRAINED_MODEL = 'gs://my-bucket/blah/roberta-model-files'

    def get_model_tokenizer_wrapper(self):
        import shutil
        import tempfile
        import dataflow_util as util
        try:
            return self.model_tokenizer_wrapper
        except AttributeError:
            tmp_dir = tempfile.mkdtemp() + '/'
            util.download_tree(self.PRETRAINED_MODEL, tmp_dir)
            model, tokenizer = util.create_model_and_tokenizer(tmp_dir)
            model_tokenizer_wrapper = util.PretrainedPyTorchModelWrapper(
                model, tokenizer)
            shutil.rmtree(tmp_dir)
            self.model_tokenizer_wrapper = model_tokenizer_wrapper
            logging.info(
                'Successfully created PretrainedPyTorchModelWrapper')
            return self.model_tokenizer_wrapper

    def process(self, elem):
        model_tokenizer_wrapper = self.get_model_tokenizer_wrapper()

        # And now use that wrapper to process your elem however you need.
        # Note that when you read from BQ your elements are dictionaries
        # of the column names and values for each BQ row.

Utility functions in SEPARATE MODULE within the codebase. In our case in the project root this was in dataflow_util/init.py but you don't have to do it that way.

from contextlib import closing
import logging

import apache_beam as beam
import numpy as np
from pytorch_transformers import RobertaModel, RobertaTokenizer
import torch

class PretrainedPyTorchModelWrapper():
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer

def download_tree(gcs_dir, local_dir):
    gcs = beam.io.gcp.gcsio.GcsIO()
    assert gcs_dir.endswith('/')
    assert local_dir.endswith('/')
    for entry in gcs.list_prefix(gcs_dir):
        download_file(gcs, gcs_dir, local_dir, entry)


def download_file(gcs, gcs_dir, local_dir, entry):
    rel_path = entry[len(gcs_dir):]
    dest_path = local_dir + rel_path
    logging.info('Downloading %s', dest_path)
    with closing(gcs.open(entry)) as f_read:
        with open(dest_path, 'wb') as f_write:
            # Download the file in chunks to avoid requiring large amounts of
            # RAM when downloading large files.
            while True:
                file_data_chunk = f_read.read(
                    beam.io.gcp.gcsio.DEFAULT_READ_BUFFER_SIZE)
                if len(file_data_chunk):
                    f_write.write(file_data_chunk)
                else:
                    break


def create_model_and_tokenizer(local_model_path_str):
    """
    Instantiate transformer model and tokenizer

      :param local_model_path_str: string representation of the local path 
             to the directory containing the pretrained model
      :return: model, tokenizer
    """
    model_class, tokenizer_class = (RobertaModel, RobertaTokenizer)

    # Load the pretrained tokenizer and model
    tokenizer = tokenizer_class.from_pretrained(local_model_path_str)
    model = model_class.from_pretrained(local_model_path_str)

    return model, tokenizer

And there you have it folks! More details can be found here: https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/


What I've discovered is that this whole chain of questioning is irrelevant because Dataflow only allows you to install source distribution packages on workers which means you can't actually install PyTorch.

When you supply a requirements.txt file, Dataflow will install with the --no-binary flag which prevents installation of Wheel (.whl) packages and only allows source distributions (.tar.gz). I decided trying to roll my own source distribution for PyTorch on Google Dataflow where it's half C++ and part Cuda and part who knows what was a fool's errand.

Thanks for the input along the way y'all.

1
votes

I don't know much about Pytorch or Roberta model, but I'll try to answer your inquiries refering to GCS :

1.- "So is there a way to load a pretrained model stored in GCS?"

In case your model can load the Blob directly from binary:

from google.cloud import storage

client = storage.Client()
bucket = client.get_bucket("bucket name")
blob = bucket.blob("path_to_blob/blob_name.ext")
data = blob.download_as_string() # you will have your binary data transformed into string here.

2.- "Is there a way to authenticate when doing the public URL request in this context?"

Here's the tricky part, because depending on which context you are running the script, it will be authenticated with a default service account. So when you are using the official GCP libs you can:

A.- Give permissions to that default service account to access to your bucket/objects.

B.- Create a new service account and authenticate with it inside the script (you will need to generate the authentication token for that service account as well):

from google.cloud import storage
from google.oauth2 import service_account

VISION_SCOPES = ['https://www.googleapis.com/auth/devstorage']
SERVICE_ACCOUNT_FILE = 'key.json'

cred = service_account.Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=VISION_SCOPES)

client = storage.Client(credentials=cred)
bucket = client.get_bucket("bucket_name")
blob = bucket.blob("path/object.ext")
data = blob.download_as_string()

However that works because the official libs handle the authentication to the API calls on the background, so in the case of from_pretrained() function not work.

So an alternative to that is making the object public, so you can access to it when using the public url.

3.- "Even if there is a way to authenticate, will the non-existence of subdirectories still be an issue?"

Not sure that you mean here, you can have folders inside your bucket.

1
votes

currently im not playing with Roberta, but with Bert for Token classification for NER, but i think it has the same mechanism..

below its my code:

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'your_gcs_auth.json'

# initiate storage
client = storage.Client()
en_bucket = client.get_bucket('your-gcs-bucketname')

# get blob
en_model_blob = en_bucket.get_blob('your-modelname-in-gcsbucket.bin')
en_model = en_model_blob.download_as_string()

# because model downloaded into string, need to convert it back
buffer = io.BytesIO(en_model)

# prepare loading model
state_dict = torch.load(buffer, map_location=torch.device('cpu'))
model = BertForTokenClassification.from_pretrained(pretrained_model_name_or_path=None, state_dict=state_dict, config=main_config)
model.load_state_dict(state_dict)

im not to sure whether download_as_string() method save the data into local disk or not, but from what i experience if i execute download_to_filename() that function will download the model into my local.

also if you modified the config for your transformers network(and you put this in GCS and need to load also), you need to modify class PretrainedConfig as well, since it can handle file produced by download_as_string() function.

cheers, hope it helps

0
votes

As you correctly stated, it seems that out of the box pytorch-transformers does not support this, but mainly just because it does not recognize the file link as an URL.

After some searching, I found the corresponding error message in this source file, around line 144-155.

Of course, you could try adding your 'gs' tag to line 144, and then interpret your connection to GCS as a http request (lines 269-272). If GCS accepts this, that should be the only thing required to change in order to work.
If this does not work, the only immediate fix would be to implement something analogous to the Amazon S3 bucket functions, but I don't know enough about S3 and GCS buckets to claim any meaningful judgement here.