DALI,加速视觉类数据读取
Contact me
- Blog -> https://cugtyt.github.io/blog/index
- Email -> cugtyt@qq.com
- GitHub -> Cugtyt@GitHub
本系列博客主页及相关见此处
来自DALI文档
视觉类的数据通常都是图像等数据,而深度学习框架提供的数据加载有两种情况:
- 快,灵活性差,由c++编写,由python提供一个固定的接口
- 慢,灵活性好,可以灵活定制,但是速度慢,一个重要的原因是python的GIL。
DALI优化了数据的读取,可以兼顾速度和灵活性,同时可以利用GPU加速。
假设一个猫狗数据为:
images
|-file_list.txt
|-images/dog
|-dog_4.jpg
|-dog_5.jpg
|-dog_9.jpg
|-dog_6.jpg
|-dog_3.jpg
|-dog_7.jpg
|-dog_10.jpg
|-dog_2.jpg
|-dog_8.jpg
|-dog_1.jpg
|-dog_11.jpg
|-images/kitten
|-cat_10.jpg
|-cat_5.jpg
|-cat_9.jpg
|-cat_8.jpg
|-cat_1.jpg
|-cat_7.jpg
|-cat_6.jpg
|-cat_3.jpg
|-cat_2.jpg
|-cat_4.jpg
基本用法
import nvidia.dali.ops as ops
import nvidia.dali.types as types
image_dir = "images"
batch_size = 8
class SimplePipeline(Pipeline):
def __init__(self, batch_size, num_threads, device_id):
super(SimplePipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
self.input = ops.FileReader(file_root = image_dir)
# instead of path to file directory file with pairs image_name image_label_value can be provided
# self.input = ops.FileReader(file_root = image_dir, file_list = image_dir + '/file_list.txt')
self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)
def define_graph(self):
jpegs, labels = self.input()
images = self.decode(jpegs)
return (images, labels)
通常只需要指定一下批量大小,CPU线程数,是否使用GPU,随机等设置。
pipe = SimplePipeline(batch_size, 1, 0)
pipe.build()
pipe_out = pipe.run()
print(pipe_out)
[<nvidia.dali.backend_impl.TensorListCPU object at 0x7ff6080bf180>, <nvidia.dali.backend_impl.TensorListCPU object at 0x7ff6080bf5e0>]
images, labels = pipe_out
print("Images is_dense_tensor: " + str(images.is_dense_tensor()))
print("Labels is_dense_tensor: " + str(labels.is_dense_tensor()))
Images is_dense_tensor: False
Labels is_dense_tensor: True
如果is_dense_tensor为True,那么可以转换为numpy:
import numpy as np
labels_tensor = labels.as_tensor()
print (labels_tensor.shape())
print (np.array(labels_tensor))
[8L, 1L]
[[0]
[0]
[0]
[0]
[0]
[0]
[0]
[0]]
加入数据增强
- 随机打乱
class ShuffledSimplePipeline(Pipeline):
def __init__(self, batch_size, num_threads, device_id):
super(ShuffledSimplePipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 21)
self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)
def define_graph(self):
jpegs, labels = self.input()
images = self.decode(jpegs)
return (images, labels)
initial_fill 参数制定了缓存的大小,默认是1000,如果是小数据集,会导致大量的复制,这里就指定数据集的大小。
- 旋转
class RotatedSimplePipeline(Pipeline):
def __init__(self, batch_size, num_threads, device_id):
super(RotatedSimplePipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 21)
self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)
self.rotate = ops.Rotate(angle = 10.0)
def define_graph(self):
jpegs, labels = self.input()
images = self.decode(jpegs)
rotated_images = self.rotate(images)
return (rotated_images, labels)
- 随机旋转
class RandomRotatedSimplePipeline(Pipeline):
def __init__(self, batch_size, num_threads, device_id):
super(RandomRotatedSimplePipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 21)
self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)
self.rotate = ops.Rotate()
self.rng = ops.Uniform(range = (-10.0, 10.0))
def define_graph(self):
jpegs, labels = self.input()
images = self.decode(jpegs)
angle = self.rng()
rotated_images = self.rotate(images, angle = angle)
return (rotated_images, labels)
GPU加速
class RandomRotatedGPUPipeline(Pipeline):
def __init__(self, batch_size, num_threads, device_id):
super(RandomRotatedGPUPipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 21)
self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)
self.rotate = ops.Rotate(device = "gpu")
self.rng = ops.Uniform(range = (-10.0, 10.0))
def define_graph(self):
jpegs, labels = self.input()
images = self.decode(jpegs)
angle = self.rng()
rotated_images = self.rotate(images.gpu(), angle = angle)
return (rotated_images, labels)
两点:在Rotate中设置device参数,对于输入images添加.gpu()
拷贝到GPU中。取到结果后,不能如果想要查看内容,需要先拷贝回cpu中,images.as_cpu()
。
- 混合解码
对于高分辨率的图像,解码可能成为瓶颈,因此出现了nvJPEG库,编码和解码分别放在CPU和GPU中,降低解码时间。
class HybridPipeline(Pipeline):
def __init__(self, batch_size, num_threads, device_id):
super(HybridPipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 21)
self.decode = ops.ImageDecoder(device = "mixed", output_type = types.RGB)
def define_graph(self):
jpegs, labels = self.input()
images = self.decode(jpegs)
# images are on the GPU
return (images, labels)
比较一下cpu和mixed的速度:
from timeit import default_timer as timer
test_batch_size = 64
def speedtest(pipeclass, batch, n_threads):
pipe = pipeclass(batch, n_threads, 0)
pipe.build()
# warmup
for i in range(5):
pipe.run()
# test
n_test = 20
t_start = timer()
for i in range(n_test):
pipe.run()
t = timer() - t_start
print("Speed: {} imgs/s".format((n_test * batch)/t))
speedtest(ShuffledSimplePipeline, test_batch_size, 4)
Speed: 2905.71010277 imgs/s
speedtest(HybridPipeline, test_batch_size, 4)
Speed: 5714.61475087 imgs/s