1
votes

I am new to tensorflow and I am now learning how to use queue runner. What I want to do is to read binary files from a dir and make each file an array. I use two threads and make 4 arrays a batch. The code is as follows.

  import glob

  import tensorflow as tf

  def readfile(filenames_queue):

        filename = filenames_queue.dequeue()
        value_strings = tf.read_file(filename)
        array = tf.decode_raw(value_strings,tf.uint8)
        return [array]
 def input_pipeline(filenames,batch_size,num_threads=2):

       filenames_queue = tf.train.string_input_producer(filenames)
       thread_lists = [readfile(filenames_queue) for _ in range(num_threads)] 
       min_after_dequeue = 1000 
       capacity = min_after_dequeue+3*batch_size
       arrays = tf.train.shuffle_batch_join(thread_lists,batch_size,capacity,min_after_dequeue)
       return arrays
if __name__ == "__main__":

      filenames = glob.glob('dir/*')
      arrays_batch = input_pipeline(filenames,4)
      with tf.Session() as sess:
           tf.global_variables_initializer().run()
           coord = tf.train.Coordinator()
           threads = tf.train.start_queue_runners(sess,coord)
           for i in range(100):
                 print sess.run(arrays_batch)
           coord.request_stop()
           coord.join(threads)

I have fixed the error pointed out by Victor and Sorin, but a new error rise:

File "input_queue.py", line 36, in print sess.run(im_arrays_batch)

File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 889, in run run_metadata_ptr)

File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 1120, in _run feed_dict_tensor, options, run_metadata)

File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 1317, in _do_run options, run_metadata)

File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 1336, in _do_call raise type(e)(node_def, op, message) tensorflow.python.framework.errors_impl.OutOfRangeError: RandomShuffleQueue '_1_shuffle_batch_join/random_shuffle_queue' is closed and has insufficient elements (requested 2, current size 0) [[Node: shuffle_batch_join = QueueDequeueManyV2[component_types=[DT_UINT8], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/device:CPU:0"](shuffle_batch_join/random_shuffle_queue, shuffle_batch_join/n)]]

Caused by op u'shuffle_batch_join', defined at:

File "input_queue.py", line 30, in im_arrays_batch = input_pipeline(filenames,2)

File "input_queue.py", line 23, in input_pipeline arrays_batch = tf.train.shuffle_batch_join(thread_lists,batch_size,capacity,min_after_dequeue)

File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/training/input.py", line 1367, in shuffle_batch_join name=name)

File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/training/input.py", line 833, in _shuffle_batch_join dequeued = queue.dequeue_many(batch_size, name=name)

File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/ops/data_flow_ops.py", line 464, in dequeue_many self._queue_ref, n=n, component_types=self._dtypes, name=name)

File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/ops/gen_data_flow_ops.py", line 2418, in _queue_dequeue_many_v2 component_types=component_types, timeout_ms=timeout_ms, name=name)

File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/framework/op_def_library.py", line 787, in _apply_op_helper op_def=op_def)

File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/framework/ops.py", line 2956, in create_op op_def=op_def)

File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/framework/ops.py", line 1470, in init self._traceback = self._graph._extract_stack() # pylint: disable=protected-access

OutOfRangeError (see above for traceback): RandomShuffleQueue '_1_shuffle_batch_join/random_shuffle_queue' is closed and has insufficient elements (requested 2, current size 0) [[Node: shuffle_batch_join = QueueDequeueManyV2[component_types=[DT_UINT8], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/device:CPU:0"](shuffle_batch_join/random_shuffle_queue, shuffle_batch_join/n)]]

2

2 Answers

0
votes

Your readfile(...): function is supposed to return an iterable so that you can return features and labels or other things like that.

So to fix your code change readfile(...): to

return [arrays]
0
votes

From tf.train.shuffle_batch_join:

The tensors_list argument is a list of tuples of tensors

Here, your calls to tf.decode_raw produces Tensor instances, and you put them in a list with thread_lists = [readfile(filenames_queue) for _ in range(num_threads)].

It is therefore not a list of tuples of tensors you give but a list of tensors, and consequently the tensors are trying to be iterated on, hence the error TypeError: 'Tensor' object is not iterable.