TensorFlow程序读取数据一共有3种方法:
- 预加载数据: 在TensorFlow图中定义常量或变量来保存所有数据(仅适用于数据量比较小的情况)。
- 供给数据(Feeding):在TensorFlow程序运行的每一步,让Python代码来供给数据。
- 基于队列多线程从文件中读取数据:在TensorFlow图的起始,让一个输入管线从文件中读取数据。
预加载数据
这仅用于可以完全加载到存储器中的小的数据集。有两种方法:
- 存储在常数中。
- 存储在变量中,初始化后,永远不要改变它的值。
使用常数更简单一些,但是会使用更多的内存(因为常数会内联的存储在数据流图数据结构中,这个结构体可能会被复制几次)。
training_data = ...
training_labels = ...
with tf.Session():
input_data = tf.constant(training_data)
input_labels = tf.constant(training_labels)
...
要改为使用变量的方式,您就需要在数据流图建立后初始化这个变量。
training_data = ...
training_labels = ...
with tf.Session() as sess:
data_initializer = tf.placeholder(dtype=training_data.dtype,
shape=training_data.shape)
label_initializer = tf.placeholder(dtype=training_labels.dtype,
shape=training_labels.shape)
input_data = tf.Variable(data_initalizer, trainable=False, collections=[])
input_labels = tf.Variable(label_initalizer, trainable=False, collections=[])
...
sess.run(input_data.initializer,
feed_dict={data_initializer: training_data})
sess.run(input_labels.initializer,
feed_dict={label_initializer: training_lables})
设定trainable=False可以防止该变量被数据流图的 GraphKeys.TRAINABLE_VARIABLES收集,这样我们就不会在训练的时候尝试更新它的值,设定collections=[]可以防止GraphKeys.VARIABLES收集后做为保存和恢复的中断点。
- 在tensorflow/g3doc/how_tos/reading_data/fully_connected_preloaded.py 中可以找到一个MNIST例子,使用常数来预加载。
- 使用变量来预加载的例子在tensorflow/g3doc/how_tos/reading_data/fully_connected_preloaded_var.py
供给数据
- TensorFlow的数据供给机制允许你在TensorFlow运算图中将数据注入到任一张量中。因此,python运算可以把数据直接设置到TensorFlow图中。
- 通过给run()或者eval()函数输入feed_dict参数, 可以启动运算过程。
with tf.Session():
input = tf.placeholder(tf.float32)
classifier = ...
print classifier.eval(feed_dict={input: my_python_preprocessing_fn()})
虽然你可以使用常量和变量来替换任何一个张量,但是最好的做法应该是使用placeholder op节点。设计placeholder节点的唯一的意图就是为了提供数据供给(feeding)的方法。placeholder节点被声明的时候是未初始化的,也不包含数据,如果没有为它供给数据,则TensorFlow运算的时候会产生错误,所以千万不要忘了为placeholder提供数据。
基于队列多线程从文件中读取数据
一共典型的文件读取管线会包含下面这些步骤:
- 文件名列表
- 可配置的 文件名乱序(shuffling)
- 可配置的 最大训练迭代数(epoch limit)
- 文件名队列
- 针对输入文件格式的阅读器
- 纪录解析器
- 可配置的预处理器
- 样本队列
1.基本知识
文件名, 乱序(shuffling), 和最大训练迭代数(epoch limits)
- 可以使用字符串张量(比如[“file0”, “file1”], [(“file%d” % i) for i in range(2)], [(“file%d” % i) for i in range(2)])或者tf.train.match_filenames_once 函数来产生文件名列表。
- 注意:相比使用tf.train.match_filenames_once,使用python标准库glob更好,以免被tensorflow绑架
- 将文件名列表交给tf.train.string_input_producer函数.string_input_producer来生成一个先入先出的队列, 文件阅读器会需要它来读取数据。
- string_input_producer提供的可配置参数来设置文件名乱序和最大的训练迭代数, QueueRunner会为每次迭代(epoch)将所有的文件名加入文件名队列中, 如果shuffle=True的话,会对文件名进行乱序处理。这一过程是比较均匀的,因此它可以产生均衡的文件名队列。
- 这个QueueRunner的工作线程是独立于文件阅读器的线程,因此乱序和将文件名推入到文件名队列这些过程不会阻塞文件阅读器运行。
读取文件
- 根据你的文件格式,选择对应的文件阅读器,然后将文件名队列提供给阅读器的read方法。
- 阅读器的read方法会输出一个key来表征输入的文件和其中的纪录(对于调试非常有用),同时得到一个字符串标量,这个字符串标量可以被一个或多个解析器,或者转换操作将其解码为张量并且构造成为样本。
2.具体实例
读取图像文件JPG、PNG
- 直接读取
import tensorflow as tf;
image_raw_data = tf.gfile.FastGFile('landscape.png').read()
image = tf.image.decode_jpeg(image_raw_data) #图片解码
print image.eval(session=tf.Session())
- 使用队列的方式(推荐)
# coding=utf-8
import tensorflow as tf
with tf.Session() as sess:
# 文件名列表
filename = ['portrait.png', 'landscape.png']
# 文件名队列:string_input_producer会产生一个文件名队列
# num_epochs:一个local variable,需要通过local_variables_initializer来初始化
filename_queue = tf.train.string_input_producer(filename, shuffle=False, num_epochs=2)
# reader从文件名队列中读数据、解析数据
reader = tf.WholeFileReader()
key, value = reader.read(filename_queue)
image_tensor = tf.image.decode_png(value) # 同样也有解析jpg格式图片的函数:tf.image.decode_jpg(value)
# num_epochs是local variable,要对它进行初始化
tf.local_variables_initializer().run()
# 使用start_queue_runners之后,才会开始填充队列
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
i = 0
try:
while i<6 :
i += 1
# 获取图片转为tensor后的数据
print(sess.run(image_tensor))
except tf.errors.OutOfRangeError:
print('Done training -- epoch limit reached')
finally:
coord.request_stop()
coord.join(threads)
读取二进制文件
读取csv文件
- 从CSV文件中读取数据, 需要使用TextLineReader和decode_csv 操作,
- 每次read的执行都会从文件中读取一行内容,decode_csv操作会解析这一行内容并将其转为张量列表。如果输入的参数有缺失,record_default参数可以根据张量的类型来设置默认值。
- 在调用run或者eval去执行read之前,你必须调用tf.train.start_queue_runners来将文件名填充到队列。否则read操作会被阻塞到文件名队列中有值为止。
- 如下面的例子所示:
filename_queue = tf.train.string_input_producer(["file0.csv", "file1.csv"])
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
# Default values, in case of empty columns. Also specifies the type of the
# decoded result.
record_defaults = [[1], [1], [1], [1], [1]]
col1, col2, col3, col4, col5 = tf.decode_csv(
value, record_defaults=record_defaults)
features = tf.concat(0, [col1, col2, col3, col4])
with tf.Session() as sess:
# Start populating the filename queue.
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
for i in range(1200):
# Retrieve a single instance:
example, label = sess.run([features, col5])
coord.request_stop()
coord.join(threads)
读取TFRecords文件
- 将任意的数据转换为TensorFlow所支持的格式,这种方法可以使TensorFlow的数据集更容易与网络应用架构相匹配。
- TFRecords文件为TensorFlow所支持的格式,TFRecords文件包含了tf.train.Example协议内存块(protocol buffer)(协议内存块包含了字段Features)。
- 步骤
- 首先写一段代码获取你的数据,将数据填入到Example协议内存块(protocol buffer),将协议内存块序列化为一个字符串,并且通过tf.python_io.TFRecordWriter class写入到TFRecords文件
- 从TFRecords文件中读取数据,可以使用tf.TFRecordReader的tf.parse_single_example解析器。这个parse_single_example操作可以将Example协议内存块(protocol buffer)解析为张量。
- 代码实例
3.进阶-样本批处理
细节分析
- 在数据输入管线的末端,我们需要有另一个队列来执行输入样本的训练,评价和推理。因此我们使用tf.train.shuffle_batch函数来对队列中的样本进行乱序处理
- 在下面的实例中,虽然只使用了一个文件名队列,但是TensorFlow依然能保证多个文件阅读器从同一次迭代(epoch)的不同文件中读取数据,直到这次迭代的所有文件都被开始读取为止。(通常来说一个线程来对文件名队列进行填充的效率是足够的)
- 示例:
def read_my_file_format(filename_queue):
reader = tf.SomeReader()
key, record_string = reader.read(filename_queue)
example, label = tf.some_decoder(record_string)
processed_example = some_processing(example)
return processed_example, label
def input_pipeline(filenames, batch_size, num_epochs=None):
filename_queue = tf.train.string_input_producer(
filenames, num_epochs=num_epochs, shuffle=True)
example, label = read_my_file_format(filename_queue)
# min_after_dequeue defines how big a buffer we will randomly sample
# from -- bigger means better shuffling but slower start up and more
# memory used.
# capacity must be larger than min_after_dequeue and the amount larger
# determines the maximum we will prefetch. Recommendation:
# min_after_dequeue + (num_threads + a small safety margin) * batch_size
min_after_dequeue = 10000
capacity = min_after_dequeue + 3 * batch_size
example_batch, label_batch = tf.train.shuffle_batch(
[example, label], batch_size=batch_size, capacity=capacity,
min_after_dequeue=min_after_dequeue)
return example_batch, label_batch
- 如果你需要对不同文件中的样子有更强的乱序和并行处理,可以使用tf.train.shuffle_batch_join 函数
- 示例:
def read_my_file_format(filename_queue):
# Same as above
def input_pipeline(filenames, batch_size, read_threads, num_epochs=None):
filename_queue = tf.train.string_input_producer(
filenames, num_epochs=num_epochs, shuffle=True)
example_list = [read_my_file_format(filename_queue)
for _ in range(read_threads)]
min_after_dequeue = 10000
capacity = min_after_dequeue + 3 * batch_size
example_batch, label_batch = tf.train.shuffle_batch_join(
example_list, batch_size=batch_size, capacity=capacity,
min_after_dequeue=min_after_dequeue)
return example_batch, label_batch
- 创建线程并使用QueueRunner对象来预取
- 使用上面列出的许多tf.train函数添加QueueRunner到你的数据流图中。在你运行任何训练步骤之前,需要调用tf.train.start_queue_runners函数,否则数据流图将一直挂起。
- tf.train.start_queue_runners这个函数将会启动输入管道的线程,填充样本到队列中,以便出队操作可以从队列中拿到样本。
- 这种情况下最好配合使用一个tf.train.Coordinator,这样可以在发生错误的情况下正确地关闭这些线程。
- 如果你对训练迭代数做了限制,那么需要使用一个训练迭代数计数器num_epochs,并且需要被初始化。
- 推荐的代码模板如下:
# Create the graph, etc.
init_op = tf.initialize_all_variables()
# Create a session for running operations in the Graph.
sess = tf.Session()
# Initialize the variables (like the epoch counter).
sess.run(init_op)
# Start input enqueue threads.
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
try:
while not coord.should_stop():
# Run training steps or whatever
sess.run(train_op)
except tf.errors.OutOfRangeError:
print 'Done training -- epoch limit reached'
finally:
# When done, ask the threads to stop.
coord.request_stop()
# Wait for threads to finish.
coord.join(threads)
sess.close()
整体分析
- 首先,我们先创建数据流图,这个数据流图由一些流水线的阶段组成,阶段间用队列连接在一起。第一阶段将生成文件名,我们读取这些文件名并且把他们排到文件名队列中。第二阶段从文件中读取数据(使用Reader),产生样本,而且把样本放在一个样本队列中。
- 在tf.train中要创建这些队列和执行入队操作,就要添加tf.train.QueueRunner到一个使用tf.train.add_queue_runner函数的数据流图中。每个QueueRunner负责一个阶段,处理那些需要在线程中运行的入队操作的列表。一旦数据流图构造成功,tf.train.start_queue_runners函数就会要求数据流图中每个QueueRunner去开始它的线程运行入队操作。
- 如果一切顺利的话,你现在可以执行你的训练步骤,同时队列也会被后台线程来填充。如果您设置了最大训练迭代数,在某些时候,样本出队的操作可能会得到一个tf.OutOfRangeError的错误。这其实是TensorFlow的“文件结束”(EOF) ———— 这就意味着已经达到了最大训练迭代数,已经没有更多可用的样本了。
- 最后一个因素是Coordinator。这是负责在收到任何关闭信号的时候,让所有的线程都知道。最常用的是在发生异常时这种情况就会呈现出来,比如说其中一个线程在运行某些操作时出现错误(或一个普通的Python异常)。
完整代码实例