I am new to tensorflow and I am now learning how to use queue runner. What I want to do is to read binary files from a dir and make each file an array. I use two threads and make 4 arrays a batch. The code is as follows.
import glob
import tensorflow as tf
def readfile(filenames_queue):
filename = filenames_queue.dequeue()
value_strings = tf.read_file(filename)
array = tf.decode_raw(value_strings,tf.uint8)
return [array]
def input_pipeline(filenames,batch_size,num_threads=2):
filenames_queue = tf.train.string_input_producer(filenames)
thread_lists = [readfile(filenames_queue) for _ in range(num_threads)]
min_after_dequeue = 1000
capacity = min_after_dequeue+3*batch_size
arrays = tf.train.shuffle_batch_join(thread_lists,batch_size,capacity,min_after_dequeue)
return arrays
if __name__ == "__main__":
filenames = glob.glob('dir/*')
arrays_batch = input_pipeline(filenames,4)
with tf.Session() as sess:
tf.global_variables_initializer().run()
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess,coord)
for i in range(100):
print sess.run(arrays_batch)
coord.request_stop()
coord.join(threads)
I have fixed the error pointed out by Victor and Sorin, but a new error rise:
File "input_queue.py", line 36, in print sess.run(im_arrays_batch)
File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 889, in run run_metadata_ptr)
File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 1120, in _run feed_dict_tensor, options, run_metadata)
File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 1317, in _do_run options, run_metadata)
File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 1336, in _do_call raise type(e)(node_def, op, message) tensorflow.python.framework.errors_impl.OutOfRangeError: RandomShuffleQueue '_1_shuffle_batch_join/random_shuffle_queue' is closed and has insufficient elements (requested 2, current size 0) [[Node: shuffle_batch_join = QueueDequeueManyV2[component_types=[DT_UINT8], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/device:CPU:0"](shuffle_batch_join/random_shuffle_queue, shuffle_batch_join/n)]]
Caused by op u'shuffle_batch_join', defined at:
File "input_queue.py", line 30, in im_arrays_batch = input_pipeline(filenames,2)
File "input_queue.py", line 23, in input_pipeline arrays_batch = tf.train.shuffle_batch_join(thread_lists,batch_size,capacity,min_after_dequeue)
File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/training/input.py", line 1367, in shuffle_batch_join name=name)
File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/training/input.py", line 833, in _shuffle_batch_join dequeued = queue.dequeue_many(batch_size, name=name)
File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/ops/data_flow_ops.py", line 464, in dequeue_many self._queue_ref, n=n, component_types=self._dtypes, name=name)
File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/ops/gen_data_flow_ops.py", line 2418, in _queue_dequeue_many_v2 component_types=component_types, timeout_ms=timeout_ms, name=name)
File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/framework/op_def_library.py", line 787, in _apply_op_helper op_def=op_def)
File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/framework/ops.py", line 2956, in create_op op_def=op_def)
File "/usr/local/anaconda2/lib/python2.7/site-packages/tensorflow/python/framework/ops.py", line 1470, in init self._traceback = self._graph._extract_stack() # pylint: disable=protected-access
OutOfRangeError (see above for traceback): RandomShuffleQueue '_1_shuffle_batch_join/random_shuffle_queue' is closed and has insufficient elements (requested 2, current size 0) [[Node: shuffle_batch_join = QueueDequeueManyV2[component_types=[DT_UINT8], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/device:CPU:0"](shuffle_batch_join/random_shuffle_queue, shuffle_batch_join/n)]]