2
votes

I'm new to TensorFlow Serving. I trained a wide and deep model using an estimator. Now I want to serve my model. I create my serving input receiver function and save the model. When I try to predict using the saved model I always receive InternalError: Unable to get element as bytes. I don't really understand what should be inside the serving function or what format type should I send. Could someone first explain me the concept of serving function and also how to properly create the function.

Reproducible example: https://github.com/dangz90/wide_and_deep_debugging/blob/master/wide%20and%20deep%20debug.ipynb

Serving function:

def serving_input_receiver_fn():
  serialized_tf_example = tf.placeholder(dtype=tf.string, 
                                         shape=[],
                                         name='input_tensor')
  receiver_tensors = {'inputs': serialized_tf_example}

  parsed_features = tf.parse_single_example(
            serialized_tf_example,
            # Defaults are not specified since both keys are required.
            features={
                'var1': tf.FixedLenFeature(shape=[1], dtype=tf.string),
                'var2': tf.FixedLenFeature(shape=[1], dtype=tf.string),
                'var3': tf.FixedLenFeature(shape=[1], dtype=tf.string),
                'var4': tf.VarLenFeature(dtype=tf.string),
            })

  return tf.estimator.export.ServingInputReceiver(parsed_features, receiver_tensors)

estimator_predictor = tf.contrib.predictor.from_estimator(m, serving_input_receiver_fn)
estimator_predictor({ 'inputs': examples.SerializeToString() })

I've tried sending a pandas, example, etc. But really have no idea what format should the data be. For my training the data fed was saved as tfrecord then loaded as dataset. Note: I if use the model directly m.predict() I'm able to get the prediction correctly

Full error:

--------------------------------------------------------------------------- InternalError                             Traceback (most recent call last) /databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _do_call(self, fn, *args)    1333     try:
-> 1334       return fn(*args)    1335     except errors.OpError as e:

/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _run_fn(feed_dict, fetch_list, target_list, options, run_metadata)  1318       return self._call_tf_sessionrun(
-> 1319           options, feed_dict, fetch_list, target_list, run_metadata)    1320 

/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _call_tf_sessionrun(self, options, feed_dict, fetch_list, target_list, run_metadata)    1406         self._session, options, feed_dict, fetch_list, target_list,
-> 1407         run_metadata)    1408 

InternalError: Unable to get element as bytes.

During handling of the above exception, another exception occurred:

InternalError                             Traceback (most recent call last) <command-364073753108128> in <module>()
      3 
      4 estimator_predictor = tf.contrib.predictor.from_estimator(m, serving_input_receiver_fn)
----> 5 estimator_predictor({ 'inputs': examples_ })

/databricks/python/lib/python3.6/site-packages/tensorflow/contrib/predictor/predictor.py in __call__(self, input_dict)
     75       if value is not None:
     76         feed_dict[self.feed_tensors[key]] = value
---> 77     return self._session.run(fetches=self.fetch_tensors, feed_dict=feed_dict)

/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, fetches, feed_dict, options, run_metadata)
    674                           feed_dict=feed_dict,
    675                           options=options,
--> 676                           run_metadata=run_metadata)
    677 
    678   def run_step_fn(self, step_fn):

/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, fetches, feed_dict, options, run_metadata)    1169        feed_dict=feed_dict,    1170                               options=options,
-> 1171                               run_metadata=run_metadata)    1172       except _PREEMPTION_ERRORS as e:    1173         logging.info('An error was raised. This may be due to a preemption in '

/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, *args, **kwargs)    1268         raise six.reraise(*original_exc_info)    1269       else:
-> 1270         raise six.reraise(*original_exc_info)    1271     1272 

/databricks/python/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, *args, **kwargs)    1253   def run(self, *args,
**kwargs):    1254     try:
-> 1255       return self._sess.run(*args, **kwargs)    1256     except _PREEMPTION_ERRORS:    1257       raise

/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, fetches, feed_dict, options, run_metadata)    1325        feed_dict=feed_dict,    1326                                   options=options,
-> 1327                                   run_metadata=run_metadata)    1328     1329     for hook in self._hooks:

/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, *args, **kwargs)    1089     1090   def run(self, *args,
**kwargs):
-> 1091     return self._sess.run(*args, **kwargs)    1092     1093   def run_step_fn(self, step_fn, raw_session, run_with_hooks):

/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in run(self, fetches, feed_dict, options, run_metadata)
    927     try:
    928       result = self._run(None, fetches, feed_dict, options_ptr,
--> 929                          run_metadata_ptr)
    930       if run_metadata:
    931         proto_data = tf_session.TF_GetBuffer(run_metadata_ptr)

/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _run(self, handle, fetches, feed_dict, options, run_metadata)    1150     if final_fetches or final_targets or (handle and feed_dict_tensor):    1151       results = self._do_run(handle, final_targets, final_fetches,
-> 1152                              feed_dict_tensor, options, run_metadata)    1153     else:    1154       results = []

/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _do_run(self, handle, target_list, fetch_list, feed_dict, options, run_metadata)    1326     if handle is None:    1327       return self._do_call(_run_fn, feeds, fetches, targets, options,
-> 1328                            run_metadata)    1329     else:    1330       return self._do_call(_prun_fn, handle, feeds, fetches)

/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _do_call(self, fn, *args)    1346           pass    1347       message = error_interpolation.interpolate(message, self._graph)
-> 1348       raise type(e)(node_def, op, message)    1349     1350   def _extend_graph(self):

InternalError: Unable to get element as bytes.

Here's how examples look like before being serialize.

features {
  feature {
    key: "var4"
    value {
      bytes_list {
        value: "43"
        value: "65"
        value: "89"
        value: "02"
      }
    }
  }
  feature {
    key: "var3"
    value {
      bytes_list {
        value: "0123194"
      }
    }
  }
  feature {
    key: "var2"
    value {
      bytes_list {
        value: "1243"
      }
    }
  }
  feature {
    key: "var1"
    value {
      bytes_list {
        value: "54"
      }
    }
  }
}

To get example I run the following script:

def serialise_input(data):
  dict_feature = {}

  for e in data.items():
    if e[0] == "var4":
      dict_feature.update({e[0]: Feature(bytes_list=BytesList(value=[m.encode('utf-8') for m in e[1]]))})
    else:
      dict_feature.update({e[0]: Feature(bytes_list=BytesList( value=[e[1].encode()] ))})

  example = Example(features=Features(feature=dict_feature))        

  return example.SerializeToString()

# Serialize Input
raw_data = test.toPandas().dropna().iloc[0,:-1]
examples_ = serialise_input(raw_data)

Thanks in advance.

2
I forgot to mention that example_ was already serialized - Daniel Zapata
I saw you updated some code in your example and I think maybe I understand what's going on. Can you try the following code snippet on your example? as_string = examples.SerializeToString() print(as_string) as_example = tf.train.Example.FromString(as_string) print(as_example) I suspect that examples is a proto in text format not byte string format - kempy
If that crashes for you, I have a potential solution in github.com/kempy/tf-scratch/blob/master/text_proto_buffs.ipynb to use text_format to parse your text format proto buf. Alternatively to trying to fix it properly try using the bytestring of your protobuf that I generated examples = b'\nS\n\x1a\n\x04var4\x12\x12\n\x10\n\x0243\n\x0265\n\x0289\n\x0202\n\x0e\n\x04var1\x12\x06\n\x04\n\x0254\n\x10\n\x04var2\x12\x08\n\x06\n\x041243\n\x13\n\x04var3\x12\x0b\n\t\n\x070123194' - kempy
I still get the same error. I'll add how I get examples_ - Daniel Zapata
Can you provide what test is here? It's a bit difficult to debug further without a reproducible example - kempy

2 Answers

2
votes

It looks like your serialized_tf_example placeholder has shape=[] which is a single example. You should pass a single tf.train.Example, serialized as a string:

# I assume here examples_ is a list of tf.train.Examples
example = examples_[0].SerializeToString()
estimator_predictor({ 'inputs': example })

If you want to feed a batch of examples instead of a single serialized example, you need to use shape=[None] and tf.io.parse_example instead.

Additionally, the example that you feed should have features defined on it that you reference in your features dictionary (e.g. var1) so that they can be properly parsed.


A serving function specifies how your model should receive its' input at prediction time - when you export your trained TensorFlow graph / model as a SavedModel. Instead of having an active TF session like in training where you normally use placeholder or tf.data.Dataset and requires stateful python / TF runtime execution you want your SavedModel to be able to be serialized and written to disk so that you can deploy with TF Serving or run it on a mobile device, etc.

So you export this SavedModel - the serving input is the piece inside that defines how to parse the requests it gets sent and then connect that into your model The way you probably want to send them is as tf.train.Examples protocol buffers (serialized as strings so that can be sent in an RPC). And then you parse them into a feature dict so your estimator can understand them in the same way it understood your training data.

1
votes

The problem was with how I was constructing the input and the input serving function. Here is the solution:

feat_name_type = {'var1':str, 'var2':str, 'var3':str, 'var4':list}

def input_fn(df):
    examples = [None] * len(df)
    for i, sample in df.iterrows():
        ex = tf.train.Example()
        for feat_name, feat_type in feat_name_type.items():
            feat = ex.features.feature[feat_name]
            if feat_type == int:
                feat.int64_list.value.extend([sample[feat_name]])
            elif feat_type == float:
                feat.float_list.value.extend([sample[feat_name]])
            elif feat_type == str:
                feat.bytes_list.value.extend([sample[feat_name].encode()])                
            elif feat_type == list:
                feat.bytes_list.value.extend([s.encode() for s in sample[feat_name]])
        examples[i] = ex.SerializeToString()
    return {"inputs": examples}

'''Service input function'''
tf_feat_cols = deep_columns + wide_columns #The feature columns created for feeding into the model

serve_rcvr_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
    tf.feature_column.make_parse_example_spec(tf_feat_cols)
)
rcvr_fn_map = {
    tf.estimator.ModeKeys.PREDICT: serve_rcvr_fn,
}