rocAL uses simple Python operators to provide high performance and flexibility by utilizing the underlying hardware capabilities in a very efficient manner.
The rocAL pipeline is a Python script that defines a data loader, augmentation graph, and instructions to build and execute it. The most significant part of data processing with rocAL is pipeline creation. A pipeline is composed of multiple operations connected in an ordered graph that is encapsulated in an object of amd.rocal.pipeline. amd.rocal.pipeline is a single library that can be integrated to build preprocessing pipelines for both training and inference applications.
To import a rocAL pipeline using the library, use:
from amd.rocal.pipeline import Pipeline
The library provides functions required to define, build, and run the pipeline.
To start using a rocAL pipeline, perform the steps below, which are explained in detail in the following sections:
- Instantiate the pipeline class.
- Define the pipeline.
- Build the pipeline.
- Run the pipeline.
A pipeline is defined by instantiating a pipeline object and adding rocAL operators into the pipeline.
Given below is an example of a file reader, which takes a folder of images as input and decodes the images followed by a resize augmentation. The pipeline runs on the CPU if rocal_cpu is True, or else it runs on the device with the specified device_id.
# Create Pipeline instance
pipe = SimplePipeline(batch_size=batch_size, num_threads=num_threads, device_id=args.local_rank, seed=random_seed, rocal_cpu=rocal_cpu, tensor_layout=types.NHWC if args.NHWC else types.NCHW , tensor_dtype=types.FLOAT16 if args.fp16 else types.FLOAT)
# Set Params
output_set = 0
rocal_device = 'cpu' if rocal_cpu else 'gpu'
decoder_device = 'cpu' if rocal_cpu else 'gpu'
# Use pipeline instance to make calls to reader, decoder & augmentation's
with pipe:
jpegs, _ = fn.readers.file(file_root=data_path, shard_id=local_rank, num_shards=world_size, random_shuffle=True)
images = fn.decoders.image(jpegs, file_root=data_path, device=decoder_device, output_type=types.RGB, shard_id=0, num_shards=1, random_shuffle=True)
images = fn.resize(images, device=rocal_device, resize_x=300, resize_y=300)
To define a pipeline, see https://github.com/ROCm/rocAL/blob/master/rocAL_pybind/amd/rocal/pipeline.py#L29.
class Pipeline(object):
"""Pipeline class internally calls RocalCreate which returns context which will have all
the info set by the user.
Parameters
----------
`batch_size` : int, optional, default = -1
Batch size of the pipeline. Negative values for this parameter
are invalid - the default value may only be used with
serialized pipeline (the value stored in serialized pipeline
is used instead).
`num_threads` : int, optional, default = -1
Number of CPU threads used by the pipeline.
Negative values for this parameter are invalid - the default
value may only be used with serialized pipeline (the value
stored in serialized pipeline is used instead).
`device_id` : int, optional, default = -1
Id of GPU used by the pipeline.
Negative values for this parameter are invalid - the default
value may only be used with serialized pipeline (the value
stored in serialized pipeline is used instead).
`seed` : int, optional, default = -1
Seed used for random number generation. Leaving the default value
for this parameter results in random seed.
`exec_pipelined` : bool, optional, default = True
Whether to execute the pipeline in a way that enables
overlapping CPU and GPU computation, typically resulting
in faster execution speed, but larger memory consumption.
`prefetch_queue_depth` : int or {"cpu_size": int, "gpu_size": int}, optional, default = 2
Depth of the executor pipeline. Deeper pipeline makes ROCAL
more resistant to uneven execution time of each batch, but it
also consumes more memory for internal buffers.
Specifying a dict:
``{ "cpu_size": x, "gpu_size": y }``
instead of an integer will cause the pipeline to use separated
queues executor, with buffer queue size `x` for cpu stage
and `y` for mixed and gpu stages. It is not supported when both `exec_async`
and `exec_pipelined` are set to `False`.
Executor will buffer cpu and gpu stages separatelly,
and will fill the buffer queues when the first :meth:`amd.rocal.pipeline.Pipeline.run`
is issued.
`exec_async` : bool, optional, default = True
Whether to execute the pipeline asynchronously.
This makes :meth:`amd.rocal.pipeline.Pipeline.run` method
run asynchronously with respect to the calling Python thread.
In order to synchronize with the pipeline one needs to call
:meth:`amd.rocal.pipeline.Pipeline.outputs` method.
`bytes_per_sample` : int, optional, default = 0
A hint for ROCAL for how much memory to use for its tensors.
`set_affinity` : bool, optional, default = False
Whether to set CPU core affinity to the one closest to the
GPU being used.
`max_streams` : int, optional, default = -1
Limit the number of HIP streams used by the executor.
Value of -1 does not impose a limit.
This parameter is currently unused (and behavior of
unrestricted number of streams is assumed).
`default_cuda_stream_priority` : int, optional, default = 0
HIP stream priority used by ROCAL.
Following are the important functions available in the Pipeline class, which is an instance of amd.rocal.pipeline
:
build()
: Used to build a pipeline graph__init__ constructor
: Defines all the operators to be used in the graph with the corresponding parametersis_empty()
: Used to check if all the pipeline handles are emptyrocalResetLoaders()
: Used to reset the iterator to the beginningset_outputs()
: Used to set the augmentations output of the graph
Building the pipeline ensures that all operators are validated with the corresponding inputs and outputs.
To build the pipeline, see https://github.com/ROCm/rocAL/blob/master/rocAL_pybind/examples/rocAL_api_python_unittest.py#L166
# build the pipeline
pipe = SimplePipeline(batch_size=max_batch_size, num_threads=1, device_id=0)
pipe.build()
To run/use the pipeline, simply create a data loader using the pipeline and iterate through it to get the next batch of images with labels.
To run the pipeline, see https://github.com/ROCm/rocAL/blob/master/rocAL_pybind/examples/rocAL_api_python_unittest.py#L168
# Dataloader
data_loader = ROCALClassificationIterator(pipe,device=device)
# Enumerate over the Dataloader
for epoch in range(int(args.num_epochs)):
print("EPOCH:::::", epoch)
for i, it in enumerate(data_loader, 0):
The output of the pipeline created above for 4 iterations (number of epochs) with a batch size of 2 is shown below for your reference. Each image is decoded and resized to 224x224.
Figure 3. Sample Pipeline Output
rocAL not only reads images from the disk and batches them into tensors, it can also perform various augmentations on those images.
To read images, decode them, and rotate them in the pipeline, see https://github.com/ROCm/rocAL/blob/master/rocAL_pybind/examples/rocAL_api_python_unittest.py#L77
def rotated_pipeline():
jpegs, labels = fn.readers.file(file_root=image_dir, random_shuffle=True)
images = fn.decoders.image(jpegs, device='cpu')
# Rotate the decoded images at an angle of 10ᵒ and fill the remaining space
With black color (0)
rotated_images = fn.rotate(images, angle=10.0, fill_value=0)
return rotated_images, labels
pipe = rotated_pipeline(batch_size=max_batch_size, num_threads=1, device_id=0)
pipe.build()
To run the pipeline, see:
pipe_out = pipe.run()
images, labels = pipe_out
show_images(images)
All the rocAL data types are defined under amd.rocal.types. Import this library in the application to access the various data types such as rocAL status, processing mode, tensor output type, image size evaluation policy, image color, tensor layout, decode device, resize scaling mode, and resize interpolation type.
Here are some of the commonly used rocAL data types:
- Processing modes: Values (GPU/CPU). Use the rocal_cpu argument in the pipeline to set the processing mode.
- rocal_cpu = True: This performs data loading on the CPU. If GPUs are heavily used for training, it is viable to create the data-loading pipeline using CPU.
- rocal_cpu = False: This performs data loading on the available GPU as specified using the device_id argument in the pipeline.
- Tensor output types: Values (NCHW/NHWC). Example:
- tensor_layout = types.NCHW
- tensor_layout = types.NHWC
- Tensor data types: Values (FLOAT/FLOAT16). Example:
- tensor_dtype = types.FLOAT
- tensor_dtype = types.FLOAT16
To see the usage of the above-mentioned data types, see https://github.com/ROCm/rocAL/blob/master/rocAL_pybind/amd/rocal/pipeline.py#L97
def __init__(self, batch_size=-1, num_threads=-1, device_id=-1, seed=-1,
exec_pipelined=True, prefetch_queue_depth=2,
exec_async=True, bytes_per_sample=0,
rocal_cpu=False, max_streams=-1, default_cuda_stream_priority=0, tensor_layout = types.NCHW, reverse_channels = False, multiplier = [1.0,1.0,1.0], offset = [0.0, 0.0, 0.0], tensor_dtype=types.FLOAT):
if(rocal_cpu):
# print("comes to cpu")
self._handle = b.rocalCreate(
batch_size, types.CPU, device_id, num_threads,prefetch_queue_depth,types.FLOAT)
else:
print("comes to gpu")
self._handle = b.rocalCreate(
batch_size, types.GPU, device_id, num_threads,prefetch_queue_depth,types.FLOAT)