1
votes

TF version: latest master , b083cea

Below is a simple example using TF2.0 eager mode, and it ran successful with MirroredStrategy, but error with ParameterServerStrategy.

Can TF2.0 eager mode support ParameterServerStrategy now ? I did not found a success example so far :(

from __future__ import absolute_import, division, print_function, unicode_literals
import tensorflow as tf
import tensorflow_datasets as tfds
import os, json

datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)
mnist_train, mnist_test = datasets['train'], datasets['test']

os.environ['TF_CONFIG'] = json.dumps({
    "cluster": {
        "worker": ["localhost:12345"],
        "ps": ["localhost:12346"]
    },
    "task": {"type": "worker", "index": 0}
})

strategy = tf.distribute.experimental.ParameterServerStrategy()
#strategy = tf.distribute.MirroredStrategy()

print('Number of devices: {}'.format(strategy.num_replicas_in_sync))
num_train_examples = info.splits['train'].num_examples
num_test_examples = info.splits['test'].num_examples
BUFFER_SIZE = 10000
BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

def scale(image, label):
  image = tf.cast(image, tf.float32)
  image /= 255
  return image, label

train_dataset = mnist_train.map(scale).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE)

with strategy.scope():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10, activation='softmax')
  ])

  model.compile(loss='sparse_categorical_crossentropy',
                optimizer=tf.keras.optimizers.Adam(),
                metrics=['accuracy'])

checkpoint_dir = './training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

# Function for decaying the learning rate.
# You can define any decay function you need.
def decay(epoch):
  if epoch < 3:
    return 1e-3
  elif epoch >= 3 and epoch < 7:
    return 1e-4
  else:
    return 1e-5

# Callback for printing the LR at the end of each epoch.
class PrintLR(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    print('\nLearning rate for epoch {} is {}'.format(epoch + 1,
                                                      model.optimizer.lr.numpy()))
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir='./logs'),
    tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                                       save_weights_only=True),
    tf.keras.callbacks.LearningRateScheduler(decay),
    PrintLR()
]

model.fit(train_dataset, epochs=12, callbacks=callbacks)
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
eval_loss, eval_acc = model.evaluate(eval_dataset)
print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))

error message

    tf.keras.layers.Dense(10, activation='softmax')
  File "/usr/local/lib/python3.7/site-packages/tensorflow/python/training/tracking/base.py", line 456, in _method_wrapper
    result = method(self, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/tensorflow/python/keras/engine/sequential.py", line 116, in __init__
    super(Sequential, self).__init__(name=name, autocast=False)
  File "/usr/local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py", line 199, in __init__
    self._init_batch_counters()
  File "/usr/local/lib/python3.7/site-packages/tensorflow/python/training/tracking/base.py", line 456, in _method_wrapper
    result = method(self, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py", line 206, in _init_batch_counters
    self._train_counter = variables.Variable(0, dtype='int64', aggregation=agg)
  File "/usr/local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py", line 261, in __call__
    return cls._variable_v2_call(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py", line 255, in _variable_v2_call
    shape=shape)
  File "/usr/local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py", line 66, in getter
    return captured_getter(captured_previous, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py", line 1769, in creator_with_resource_vars
    return self._create_variable(next_creator, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/tensorflow/python/distribute/parameter_server_strategy.py", line 455, in _create_variable
    with ops.device(self._variable_device):
  File "/usr/local/lib/python3.7/site-packages/tensorflow/python/framework/ops.py", line 5183, in device
    "tf.device does not support functions when eager execution "
RuntimeError: tf.device does not support functions when eager execution is enabled.
1

1 Answers

-1
votes

In the tf.distribute.experimental.ParameterServerStrategy page, it states below

Note: This strategy only works with the Estimator API. Pass an instance of this strategy to the experimental_distribute argument when you create the RunConfig. This instance of RunConfig should then be passed to the Estimator instance on which train_and_evaluate is called.

Below is an example on how to use tf.distribute.experimental.ParameterServerStrategy() -

strategy = tf.distribute.experimental.ParameterServerStrategy()
run_config = tf.estimator.RunConfig(
    experimental_distribute.train_distribute=strategy)
estimator = tf.estimator.Estimator(config=run_config)
tf.estimator.train_and_evaluate(estimator,...)

Also if you go to the page Distributed training with TensorFlow, it explains what is supported in which scenarios in TF 2.0 at this time,

enter image description here

Hope this answers your question. Happy Learning.