Skip to content

Fixing tutorial with DataLoader #458

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,30 @@ Q: What should I do if the existing set of DataPipes does not do what I need?
A: You can
[implement your own custom DataPipe](https://pytorch.org/data/main/tutorial.html#implementing-a-custom-datapipe). If you
believe your use case is common enough such that the community can benefit from having your custom DataPipe added to
this library, feel free to open a GitHub issue.
this library, feel free to open a GitHub issue. We will be happy to discuss!

Q: What happens when the `Shuffler`/`Batcher` DataPipes are used with DataLoader?
Q: What happens when the `Shuffler` DataPipe is used with DataLoader?

A: If you choose those DataPipes while setting `shuffle=True`/`batch_size>1` for DataLoader, your samples will be
shuffled/batched more than once. You should choose one or the other.
A. In order to enable shuffling, you need to add a `Shuffler` to your DataPipe line. Then, by default, shuffling will
happen at the point where you specified as long as you do not set `shuffle=False` within DataLoader.

Q: What happens when the `Batcher` DataPipe is used with DataLoader?

A: If you choose to use `Batcher` while setting `batch_size > 1` for DataLoader, your samples will be batched more than
once. You should choose one or the other.

Q: Why are there fewer built-in `MapDataPipes` than `IterDataPipes`?

A: By design, there are fewer `MapDataPipes` than `IterDataPipes` to avoid duplicate implementations of the same
functionalities as `MapDataPipe`. We encourage users to use the built-in `IterDataPipe` for various functionalities, and
convert it to `MapDataPipe` as needed.

Q: How is multiprocessing handled with DataPipes?

A: Multi-process data loading is still handled by DataLoader, see the
[DataLoader documentation for more details](https://pytorch.org/docs/stable/data.html#single-and-multi-process-data-loading).
If you would like to shard data across processes, use `ShardingFilter` and provide a `worker_init_fn` as shown in the
[tutorial](https://pytorch.org/data/beta/tutorial.html#working-with-dataloader).

Q: What is the upcoming plan for DataLoader?

Expand Down
6 changes: 3 additions & 3 deletions docs/source/torchdata.datapipes.map.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ corresponding label from a folder on the disk.

By design, there are fewer ``MapDataPipe`` than ``IterDataPipe`` to avoid duplicate implementations of the same
functionalities as ``MapDataPipe``. We encourage users to use the built-in ``IterDataPipe`` for various functionalities,
and convert it to ``MapDataPipe`` as needed using ``MapToIterConverter`` or ``.to_iter_datapipe()``.
If you have any question about usage or best practices while using `MapDataPipe`, feel free to ask on the PyTorch forum
under the `'data' category <https://discuss.pytorch.org/c/data/37>`_.
and convert it to ``MapDataPipe`` as needed using :class:`.IterToMapConverter` or ``.to_map_datapipe()``.
If you have any question about usage or best practices while using ``MapDataPipe``, feel free to ask on the PyTorch
forum under the `'data' category <https://discuss.pytorch.org/c/data/37>`_.

We are open to add additional ``MapDataPipe`` where the operations can be lazily executed and ``__len__`` can be
known in advance. Feel free to make suggestions with description of your use case in
Expand Down
77 changes: 67 additions & 10 deletions docs/source/tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,33 @@ For this example, we will first have a helper function that generates some CSV f
row_data['label'] = random.randint(0, 9)
writer.writerow(row_data)

Next, we will build our DataPipes to read and parse through the generated CSV files:
Next, we will build our DataPipes to read and parse through the generated CSV files. Note that we prefer to have
pass defined functions to DataPipes rather than lambda functions because the formers are serializable with `pickle`.

.. code:: python

import numpy as np
import torchdata.datapipes as dp

def filter_for_data(filename):
return "sample_data" in filename and filename.endswith(".csv")

def row_processer(row):
return {"label": np.array(row[0], np.int32), "data": np.array(row[1:], dtype=np.float64)}

def build_datapipes(root_dir="."):
datapipe = dp.iter.FileLister(root_dir)
datapipe = datapipe.filter(filter_fn=lambda filename: "sample_data" in filename and filename.endswith(".csv"))
datapipe = dp.iter.FileOpener(datapipe, mode='rt')
datapipe = datapipe.filter(filter_fn=filter_for_data)
datapipe = datapipe.open_files(mode='rt')
datapipe = datapipe.parse_csv(delimiter=",", skip_lines=1)
datapipe = datapipe.map(lambda row: {"label": np.array(row[0], np.int32),
"data": np.array(row[1:], dtype=np.float64)})
# Shuffle will happen as long as you do NOT set `shuffle=False` later in the DataLoader
datapipe = datapipe.shuffle()
datapipe = datapipe.map(row_processer)
return datapipe

Lastly, we will put everything together in ``'__main__'`` and pass the DataPipe into the DataLoader.
Lastly, we will put everything together in ``'__main__'`` and pass the DataPipe into the DataLoader. Note that
if you choose to use `Batcher` while setting `batch_size > 1` for DataLoader, your samples will be
batched more than once. You should choose one or the other.

.. code:: python

Expand All @@ -105,20 +115,67 @@ Lastly, we will put everything together in ``'__main__'`` and pass the DataPipe
if __name__ == '__main__':
num_files_to_generate = 3
for i in range(num_files_to_generate):
generate_csv(file_label=i)
generate_csv(file_label=i, num_rows=10, num_features=3)
datapipe = build_datapipes()
dl = DataLoader(dataset=datapipe, batch_size=50, shuffle=True)
dl = DataLoader(dataset=datapipe, batch_size=5, num_workers=2)
first = next(iter(dl))
labels, features = first['label'], first['data']
print(f"Labels batch shape: {labels.size()}")
print(f"Feature batch shape: {features.size()}")
print(f"{labels = }\n{features = }")
n_sample = 0
for row in iter(dl):
n_sample += 1
print(f"{n_sample = }")

The following statements will be printed to show the shapes of a single batch of labels and features.

.. code::

Labels batch shape: 50
Feature batch shape: torch.Size([50, 20])
Labels batch shape: torch.Size([5])
Feature batch shape: torch.Size([5, 3])
labels = tensor([8, 9, 5, 9, 7], dtype=torch.int32)
features = tensor([[0.2867, 0.5973, 0.0730],
[0.7890, 0.9279, 0.7392],
[0.8930, 0.7434, 0.0780],
[0.8225, 0.4047, 0.0800],
[0.1655, 0.0323, 0.5561]], dtype=torch.float64)
n_sample = 12

The reason why ``n_sample = 12`` is because ``ShardingFilter`` (``datapipe.sharding_filter()``) was not used, such that
each worker will independently return all samples. In this case, there are 10 rows per file and 3 files, with a
batch size of 5, that gives us 6 batches per worker. With 2 workers, we get 12 total batches from the ``DataLoader``.

In order for DataPipe sharding to work with ``DataLoader``, we need to add the following. It is crucial to add
`ShardingFilter` after `Shuffler` to ensure that all worker processes have the same order of data for sharding.

.. code:: python

def build_datapipes(root_dir="."):
datapipe = ...
# Add the following line to `build_datapipes`
# Note that it is somewhere after `Shuffler` in the DataPipe line
datapipe = datapipe.sharding_filter()
return datapipe

def worker_init_fn(worker_id):
info = torch.utils.data.get_worker_info()
num_workers = info.num_workers
datapipe = info.dataset
torch.utils.data.graph_settings.apply_sharding(datapipe, num_workers, worker_id)

# Pass `worker_init_fn` into `DataLoader` within '__main__'
...
dl = DataLoader(dataset=datapipe, shuffle=True, batch_size=5, num_workers=2, worker_init_fn=worker_init_fn)
...

When we re-run, we will get:

.. code::

...
n_sample = 6


You can find more DataPipe implementation examples for various research domains `on this page <torchexamples.html>`_.

Expand Down