diff --git a/README.md b/README.md index de504fb6c..5ed68afea 100644 --- a/README.md +++ b/README.md @@ -177,17 +177,30 @@ Q: What should I do if the existing set of DataPipes does not do what I need? A: You can [implement your own custom DataPipe](https://pytorch.org/data/main/tutorial.html#implementing-a-custom-datapipe). If you believe your use case is common enough such that the community can benefit from having your custom DataPipe added to -this library, feel free to open a GitHub issue. +this library, feel free to open a GitHub issue. We will be happy to discuss! -Q: What happens when the `Shuffler`/`Batcher` DataPipes are used with DataLoader? +Q: What happens when the `Shuffler` DataPipe is used with DataLoader? -A: If you choose those DataPipes while setting `shuffle=True`/`batch_size>1` for DataLoader, your samples will be -shuffled/batched more than once. You should choose one or the other. +A. In order to enable shuffling, you need to add a `Shuffler` to your DataPipe line. Then, by default, shuffling will +happen at the point where you specified as long as you do not set `shuffle=False` within DataLoader. + +Q: What happens when the `Batcher` DataPipe is used with DataLoader? + +A: If you choose to use `Batcher` while setting `batch_size > 1` for DataLoader, your samples will be batched more than +once. You should choose one or the other. + +Q: Why are there fewer built-in `MapDataPipes` than `IterDataPipes`? + +A: By design, there are fewer `MapDataPipes` than `IterDataPipes` to avoid duplicate implementations of the same +functionalities as `MapDataPipe`. We encourage users to use the built-in `IterDataPipe` for various functionalities, and +convert it to `MapDataPipe` as needed. Q: How is multiprocessing handled with DataPipes? A: Multi-process data loading is still handled by DataLoader, see the [DataLoader documentation for more details](https://pytorch.org/docs/stable/data.html#single-and-multi-process-data-loading). +If you would like to shard data across processes, use `ShardingFilter` and provide a `worker_init_fn` as shown in the +[tutorial](https://pytorch.org/data/beta/tutorial.html#working-with-dataloader). Q: What is the upcoming plan for DataLoader? diff --git a/docs/source/torchdata.datapipes.map.rst b/docs/source/torchdata.datapipes.map.rst index 467df023e..701ac6bb3 100644 --- a/docs/source/torchdata.datapipes.map.rst +++ b/docs/source/torchdata.datapipes.map.rst @@ -15,9 +15,9 @@ corresponding label from a folder on the disk. By design, there are fewer ``MapDataPipe`` than ``IterDataPipe`` to avoid duplicate implementations of the same functionalities as ``MapDataPipe``. We encourage users to use the built-in ``IterDataPipe`` for various functionalities, -and convert it to ``MapDataPipe`` as needed using ``MapToIterConverter`` or ``.to_iter_datapipe()``. -If you have any question about usage or best practices while using `MapDataPipe`, feel free to ask on the PyTorch forum -under the `'data' category `_. +and convert it to ``MapDataPipe`` as needed using :class:`.IterToMapConverter` or ``.to_map_datapipe()``. +If you have any question about usage or best practices while using ``MapDataPipe``, feel free to ask on the PyTorch +forum under the `'data' category `_. We are open to add additional ``MapDataPipe`` where the operations can be lazily executed and ``__len__`` can be known in advance. Feel free to make suggestions with description of your use case in diff --git a/docs/source/tutorial.rst b/docs/source/tutorial.rst index 8577afa04..910b7ca83 100644 --- a/docs/source/tutorial.rst +++ b/docs/source/tutorial.rst @@ -80,23 +80,33 @@ For this example, we will first have a helper function that generates some CSV f row_data['label'] = random.randint(0, 9) writer.writerow(row_data) -Next, we will build our DataPipes to read and parse through the generated CSV files: +Next, we will build our DataPipes to read and parse through the generated CSV files. Note that we prefer to have +pass defined functions to DataPipes rather than lambda functions because the formers are serializable with `pickle`. .. code:: python import numpy as np import torchdata.datapipes as dp + def filter_for_data(filename): + return "sample_data" in filename and filename.endswith(".csv") + + def row_processer(row): + return {"label": np.array(row[0], np.int32), "data": np.array(row[1:], dtype=np.float64)} + def build_datapipes(root_dir="."): datapipe = dp.iter.FileLister(root_dir) - datapipe = datapipe.filter(filter_fn=lambda filename: "sample_data" in filename and filename.endswith(".csv")) - datapipe = dp.iter.FileOpener(datapipe, mode='rt') + datapipe = datapipe.filter(filter_fn=filter_for_data) + datapipe = datapipe.open_files(mode='rt') datapipe = datapipe.parse_csv(delimiter=",", skip_lines=1) - datapipe = datapipe.map(lambda row: {"label": np.array(row[0], np.int32), - "data": np.array(row[1:], dtype=np.float64)}) + # Shuffle will happen as long as you do NOT set `shuffle=False` later in the DataLoader + datapipe = datapipe.shuffle() + datapipe = datapipe.map(row_processer) return datapipe -Lastly, we will put everything together in ``'__main__'`` and pass the DataPipe into the DataLoader. +Lastly, we will put everything together in ``'__main__'`` and pass the DataPipe into the DataLoader. Note that +if you choose to use `Batcher` while setting `batch_size > 1` for DataLoader, your samples will be +batched more than once. You should choose one or the other. .. code:: python @@ -105,20 +115,67 @@ Lastly, we will put everything together in ``'__main__'`` and pass the DataPipe if __name__ == '__main__': num_files_to_generate = 3 for i in range(num_files_to_generate): - generate_csv(file_label=i) + generate_csv(file_label=i, num_rows=10, num_features=3) datapipe = build_datapipes() - dl = DataLoader(dataset=datapipe, batch_size=50, shuffle=True) + dl = DataLoader(dataset=datapipe, batch_size=5, num_workers=2) first = next(iter(dl)) labels, features = first['label'], first['data'] print(f"Labels batch shape: {labels.size()}") print(f"Feature batch shape: {features.size()}") + print(f"{labels = }\n{features = }") + n_sample = 0 + for row in iter(dl): + n_sample += 1 + print(f"{n_sample = }") The following statements will be printed to show the shapes of a single batch of labels and features. .. code:: - Labels batch shape: 50 - Feature batch shape: torch.Size([50, 20]) + Labels batch shape: torch.Size([5]) + Feature batch shape: torch.Size([5, 3]) + labels = tensor([8, 9, 5, 9, 7], dtype=torch.int32) + features = tensor([[0.2867, 0.5973, 0.0730], + [0.7890, 0.9279, 0.7392], + [0.8930, 0.7434, 0.0780], + [0.8225, 0.4047, 0.0800], + [0.1655, 0.0323, 0.5561]], dtype=torch.float64) + n_sample = 12 + +The reason why ``n_sample = 12`` is because ``ShardingFilter`` (``datapipe.sharding_filter()``) was not used, such that +each worker will independently return all samples. In this case, there are 10 rows per file and 3 files, with a +batch size of 5, that gives us 6 batches per worker. With 2 workers, we get 12 total batches from the ``DataLoader``. + +In order for DataPipe sharding to work with ``DataLoader``, we need to add the following. It is crucial to add +`ShardingFilter` after `Shuffler` to ensure that all worker processes have the same order of data for sharding. + +.. code:: python + + def build_datapipes(root_dir="."): + datapipe = ... + # Add the following line to `build_datapipes` + # Note that it is somewhere after `Shuffler` in the DataPipe line + datapipe = datapipe.sharding_filter() + return datapipe + + def worker_init_fn(worker_id): + info = torch.utils.data.get_worker_info() + num_workers = info.num_workers + datapipe = info.dataset + torch.utils.data.graph_settings.apply_sharding(datapipe, num_workers, worker_id) + + # Pass `worker_init_fn` into `DataLoader` within '__main__' + ... + dl = DataLoader(dataset=datapipe, shuffle=True, batch_size=5, num_workers=2, worker_init_fn=worker_init_fn) + ... + +When we re-run, we will get: + +.. code:: + + ... + n_sample = 6 + You can find more DataPipe implementation examples for various research domains `on this page `_.