Skip to the content.

DALI,加速视觉类数据读取

Contact me

本系列博客主页及相关见此处


来自DALI文档

视觉类的数据通常都是图像等数据,而深度学习框架提供的数据加载有两种情况:

DALI优化了数据的读取,可以兼顾速度和灵活性,同时可以利用GPU加速。

假设一个猫狗数据为:

images
|-file_list.txt
|-images/dog
  |-dog_4.jpg
  |-dog_5.jpg
  |-dog_9.jpg
  |-dog_6.jpg
  |-dog_3.jpg
  |-dog_7.jpg
  |-dog_10.jpg
  |-dog_2.jpg
  |-dog_8.jpg
  |-dog_1.jpg
  |-dog_11.jpg
|-images/kitten
  |-cat_10.jpg
  |-cat_5.jpg
  |-cat_9.jpg
  |-cat_8.jpg
  |-cat_1.jpg
  |-cat_7.jpg
  |-cat_6.jpg
  |-cat_3.jpg
  |-cat_2.jpg
  |-cat_4.jpg

基本用法

import nvidia.dali.ops as ops
import nvidia.dali.types as types

image_dir = "images"
batch_size = 8

class SimplePipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(SimplePipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
        self.input = ops.FileReader(file_root = image_dir)
        # instead of path to file directory file with pairs image_name image_label_value can be provided
        # self.input = ops.FileReader(file_root = image_dir, file_list = image_dir + '/file_list.txt')
        self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        return (images, labels)

通常只需要指定一下批量大小,CPU线程数,是否使用GPU,随机等设置。

pipe = SimplePipeline(batch_size, 1, 0)
pipe.build()
pipe_out = pipe.run()
print(pipe_out)
[<nvidia.dali.backend_impl.TensorListCPU object at 0x7ff6080bf180>, <nvidia.dali.backend_impl.TensorListCPU object at 0x7ff6080bf5e0>]
images, labels = pipe_out
print("Images is_dense_tensor: " + str(images.is_dense_tensor()))
print("Labels is_dense_tensor: " + str(labels.is_dense_tensor()))
Images is_dense_tensor: False
Labels is_dense_tensor: True

如果is_dense_tensor为True,那么可以转换为numpy:

import numpy as np

labels_tensor = labels.as_tensor()

print (labels_tensor.shape())
print (np.array(labels_tensor))
[8L, 1L]
[[0]
 [0]
 [0]
 [0]
 [0]
 [0]
 [0]
 [0]]

加入数据增强


class ShuffledSimplePipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(ShuffledSimplePipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
        self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 21)
        self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        return (images, labels)

initial_fill 参数制定了缓存的大小,默认是1000,如果是小数据集,会导致大量的复制,这里就指定数据集的大小。

class RotatedSimplePipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(RotatedSimplePipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
        self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 21)
        self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)
        self.rotate = ops.Rotate(angle = 10.0)

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        rotated_images = self.rotate(images)
        return (rotated_images, labels)
class RandomRotatedSimplePipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(RandomRotatedSimplePipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
        self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 21)
        self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)
        self.rotate = ops.Rotate()
        self.rng = ops.Uniform(range = (-10.0, 10.0))

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        angle = self.rng()
        rotated_images = self.rotate(images, angle = angle)
        return (rotated_images, labels)

GPU加速

class RandomRotatedGPUPipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(RandomRotatedGPUPipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
        self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 21)
        self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)
        self.rotate = ops.Rotate(device = "gpu")
        self.rng = ops.Uniform(range = (-10.0, 10.0))

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        angle = self.rng()
        rotated_images = self.rotate(images.gpu(), angle = angle)
        return (rotated_images, labels)

两点:在Rotate中设置device参数,对于输入images添加.gpu()拷贝到GPU中。取到结果后,不能如果想要查看内容,需要先拷贝回cpu中,images.as_cpu()

对于高分辨率的图像,解码可能成为瓶颈,因此出现了nvJPEG库,编码和解码分别放在CPU和GPU中,降低解码时间。

class HybridPipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(HybridPipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
        self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 21)
        self.decode = ops.ImageDecoder(device = "mixed", output_type = types.RGB)

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        # images are on the GPU
        return (images, labels)

比较一下cpu和mixed的速度:

from timeit import default_timer as timer

test_batch_size = 64

def speedtest(pipeclass, batch, n_threads):
    pipe = pipeclass(batch, n_threads, 0)
    pipe.build()
    # warmup
    for i in range(5):
        pipe.run()
    # test
    n_test = 20
    t_start = timer()
    for i in range(n_test):
        pipe.run()
    t = timer() - t_start
    print("Speed: {} imgs/s".format((n_test * batch)/t))
speedtest(ShuffledSimplePipeline, test_batch_size, 4)
Speed: 2905.71010277 imgs/s
speedtest(HybridPipeline, test_batch_size, 4)
Speed: 5714.61475087 imgs/s

多种数据格式支持