|
1 |
| -import enum |
2 | 1 | import functools
|
3 |
| -import gzip |
4 |
| -import lzma |
5 |
| -import os |
6 |
| -import os.path |
7 | 2 | import pathlib
|
8 | 3 | import pickle
|
9 | 4 | from typing import BinaryIO
|
|
16 | 11 | TypeVar,
|
17 | 12 | Iterator,
|
18 | 13 | Dict,
|
19 |
| - Optional, |
20 | 14 | IO,
|
21 | 15 | Sized,
|
22 | 16 | )
|
|
35 | 29 | "BUILTIN_DIR",
|
36 | 30 | "read_mat",
|
37 | 31 | "MappingIterator",
|
38 |
| - "Enumerator", |
39 | 32 | "getitem",
|
40 | 33 | "path_accessor",
|
41 | 34 | "path_comparator",
|
42 |
| - "Decompressor", |
43 | 35 | "read_flo",
|
44 | 36 | "hint_sharding",
|
45 | 37 | ]
|
@@ -75,15 +67,6 @@ def __iter__(self) -> Iterator[Union[Tuple[K, D], D]]:
|
75 | 67 | yield from iter(mapping.values() if self.drop_key else mapping.items())
|
76 | 68 |
|
77 | 69 |
|
78 |
| -class Enumerator(IterDataPipe[Tuple[int, D]]): |
79 |
| - def __init__(self, datapipe: IterDataPipe[D], start: int = 0) -> None: |
80 |
| - self.datapipe = datapipe |
81 |
| - self.start = start |
82 |
| - |
83 |
| - def __iter__(self) -> Iterator[Tuple[int, D]]: |
84 |
| - yield from enumerate(self.datapipe, self.start) |
85 |
| - |
86 |
| - |
87 | 70 | def _getitem_closure(obj: Any, *, items: Sequence[Any]) -> Any:
|
88 | 71 | for item in items:
|
89 | 72 | obj = obj[item]
|
@@ -123,50 +106,6 @@ def path_comparator(getter: Union[str, Callable[[pathlib.Path], D]], value: D) -
|
123 | 106 | return functools.partial(_path_comparator_closure, accessor=path_accessor(getter), value=value)
|
124 | 107 |
|
125 | 108 |
|
126 |
| -class CompressionType(enum.Enum): |
127 |
| - GZIP = "gzip" |
128 |
| - LZMA = "lzma" |
129 |
| - |
130 |
| - |
131 |
| -class Decompressor(IterDataPipe[Tuple[str, BinaryIO]]): |
132 |
| - types = CompressionType |
133 |
| - |
134 |
| - _DECOMPRESSORS: Dict[CompressionType, Callable[[BinaryIO], BinaryIO]] = { |
135 |
| - types.GZIP: lambda file: cast(BinaryIO, gzip.GzipFile(fileobj=file)), |
136 |
| - types.LZMA: lambda file: cast(BinaryIO, lzma.LZMAFile(file)), |
137 |
| - } |
138 |
| - |
139 |
| - def __init__( |
140 |
| - self, |
141 |
| - datapipe: IterDataPipe[Tuple[str, BinaryIO]], |
142 |
| - *, |
143 |
| - type: Optional[Union[str, CompressionType]] = None, |
144 |
| - ) -> None: |
145 |
| - self.datapipe = datapipe |
146 |
| - if isinstance(type, str): |
147 |
| - type = self.types(type.upper()) |
148 |
| - self.type = type |
149 |
| - |
150 |
| - def _detect_compression_type(self, path: str) -> CompressionType: |
151 |
| - if self.type: |
152 |
| - return self.type |
153 |
| - |
154 |
| - # TODO: this needs to be more elaborate |
155 |
| - ext = os.path.splitext(path)[1] |
156 |
| - if ext == ".gz": |
157 |
| - return self.types.GZIP |
158 |
| - elif ext == ".xz": |
159 |
| - return self.types.LZMA |
160 |
| - else: |
161 |
| - raise RuntimeError("FIXME") |
162 |
| - |
163 |
| - def __iter__(self) -> Iterator[Tuple[str, BinaryIO]]: |
164 |
| - for path, file in self.datapipe: |
165 |
| - type = self._detect_compression_type(path) |
166 |
| - decompressor = self._DECOMPRESSORS[type] |
167 |
| - yield path, decompressor(file) |
168 |
| - |
169 |
| - |
170 | 109 | class PicklerDataPipe(IterDataPipe):
|
171 | 110 | def __init__(self, source_datapipe: IterDataPipe[Tuple[str, IO[bytes]]]) -> None:
|
172 | 111 | self.source_datapipe = source_datapipe
|
|
0 commit comments