Skip to content

Commit 55a5769

Browse files
HADOOP-19131. Assist reflection IO with WrappedOperations class (#6686)
1. The class WrappedIO has been extended with more filesystem operations - openFile() - PathCapabilities - StreamCapabilities - ByteBufferPositionedReadable All these static methods raise UncheckedIOExceptions rather than checked ones. 2. The adjacent class org.apache.hadoop.io.wrappedio.WrappedStatistics provides similar access to IOStatistics/IOStatisticsContext classes and operations. Allows callers to: * Get a serializable IOStatisticsSnapshot from an IOStatisticsSource or IOStatistics instance * Save an IOStatisticsSnapshot to file * Convert an IOStatisticsSnapshot to JSON * Given an object which may be an IOStatisticsSource, return an object whose toString() value is a dynamically generated, human readable summary. This is for logging. * Separate getters to the different sections of IOStatistics. * Mean values are returned as a Map.Pair<Long, Long> of (samples, sum) from which means may be calculated. There are examples of the dynamic bindings to these classes in: org.apache.hadoop.io.wrappedio.impl.DynamicWrappedIO org.apache.hadoop.io.wrappedio.impl.DynamicWrappedStatistics These use DynMethods and other classes in the package org.apache.hadoop.util.dynamic which are based on the Apache Parquet equivalents. This makes re-implementing these in that library and others which their own fork of the classes (example: Apache Iceberg) 3. The openFile() option "fs.option.openfile.read.policy" has added specific file format policies for the core filetypes * avro * columnar * csv * hbase * json * orc * parquet S3A chooses the appropriate sequential/random policy as a A policy `parquet, columnar, vector, random, adaptive` will use the parquet policy for any filesystem aware of it, falling back to the first entry in the list which the specific version of the filesystem recognizes 4. New Path capability fs.capability.virtual.block.locations Indicates that locations are generated client side and don't refer to real hosts. Contributed by Steve Loughran
1 parent fa83c9a commit 55a5769

File tree

40 files changed

+4896
-49
lines changed

40 files changed

+4896
-49
lines changed

hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,4 +454,10 @@
454454
<Class name="org.apache.hadoop.ipc.internal.ShadedProtobufHelper" />
455455
<Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION" />
456456
</Match>
457+
458+
<!-- class cast after an assignableFrom check. -->
459+
<Match>
460+
<Class name="org.apache.hadoop.util.dynamic.DynMethods" />
461+
<Bug pattern="BC_UNCONFIRMED_CAST" />
462+
</Match>
457463
</FindBugsFilter>

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,4 +187,20 @@ private CommonPathCapabilities() {
187187
*/
188188
public static final String BULK_DELETE = "fs.capability.bulk.delete";
189189

190+
/**
191+
* Capability string to probe for block locations returned in {@code LocatedFileStatus}
192+
* instances from calls such as {@code getBlockLocations()} and {@code listStatus()}l
193+
* to be 'virtual' rather than actual values resolved against a Distributed Filesystem including
194+
* HDFS: {@value}.
195+
* <p>
196+
* Key implications from this path capability being true:
197+
* <ol>
198+
* <li>Work can be scheduled anywhere</li>
199+
* <li>Creation of the location list is a low cost-client side operation</li>
200+
* </ol>
201+
* Implication #2 means there is no performance penalty from use of FileSystem operations which
202+
* return lists or iterators of {@code LocatedFileStatus}.
203+
*/
204+
public static final String VIRTUAL_BLOCK_LOCATIONS = "fs.capability.virtual.block.locations";
205+
190206
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,14 @@ public int read(long position, ByteBuffer buf) throws IOException {
262262
"by " + in.getClass().getCanonicalName());
263263
}
264264

265+
/**
266+
* Delegate to the underlying stream.
267+
* @param position position within file
268+
* @param buf the ByteBuffer to receive the results of the read operation.
269+
* @throws IOException on a failure from the nested stream.
270+
* @throws UnsupportedOperationException if the inner stream does not
271+
* support this operation.
272+
*/
265273
@Override
266274
public void readFully(long position, ByteBuffer buf) throws IOException {
267275
if (in instanceof ByteBufferPositionedReadable) {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,12 @@ private OpenFileOptions() {
573573
public static final String FS_OPTION_OPENFILE_BUFFER_SIZE =
574574
FS_OPTION_OPENFILE + "buffer.size";
575575

576+
/**
577+
* OpenFile footer cache flag: {@value}.
578+
*/
579+
public static final String FS_OPTION_OPENFILE_FOOTER_CACHE =
580+
FS_OPTION_OPENFILE + "footer.cache";
581+
576582
/**
577583
* OpenFile option for read policies: {@value}.
578584
*/
@@ -586,6 +592,7 @@ private OpenFileOptions() {
586592
public static final Set<String> FS_OPTION_OPENFILE_STANDARD_OPTIONS =
587593
Collections.unmodifiableSet(Stream.of(
588594
FS_OPTION_OPENFILE_BUFFER_SIZE,
595+
FS_OPTION_OPENFILE_FOOTER_CACHE,
589596
FS_OPTION_OPENFILE_READ_POLICY,
590597
FS_OPTION_OPENFILE_LENGTH,
591598
FS_OPTION_OPENFILE_SPLIT_START,
@@ -599,11 +606,61 @@ private OpenFileOptions() {
599606
"adaptive";
600607

601608
/**
602-
* Read policy {@value} -whateve the implementation does by default.
609+
* We are an avro file: {@value}.
610+
*/
611+
public static final String FS_OPTION_OPENFILE_READ_POLICY_AVRO = "avro";
612+
613+
/**
614+
* This is a columnar file format.
615+
* Do whatever is needed to optimize for it: {@value}.
616+
*/
617+
public static final String FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR =
618+
"columnar";
619+
620+
/**
621+
* This is a CSV file of plain or UTF-8 text
622+
* to be read sequentially.
623+
* Do whatever is needed to optimize for it: {@value}.
624+
*/
625+
public static final String FS_OPTION_OPENFILE_READ_POLICY_CSV =
626+
"csv";
627+
628+
/**
629+
* Read policy {@value} -whatever the implementation does by default.
603630
*/
604631
public static final String FS_OPTION_OPENFILE_READ_POLICY_DEFAULT =
605632
"default";
606633

634+
/**
635+
* This is a table file for Apache HBase.
636+
* Do whatever is needed to optimize for it: {@value}.
637+
*/
638+
public static final String FS_OPTION_OPENFILE_READ_POLICY_HBASE =
639+
"hbase";
640+
641+
/**
642+
* This is a JSON file of UTF-8 text, including a
643+
* JSON line file where each line is a JSON entity.
644+
* Do whatever is needed to optimize for it: {@value}.
645+
*/
646+
public static final String FS_OPTION_OPENFILE_READ_POLICY_JSON =
647+
"json";
648+
649+
/**
650+
* This is an ORC file.
651+
* Do whatever is needed to optimize for it: {@value}.
652+
*/
653+
public static final String FS_OPTION_OPENFILE_READ_POLICY_ORC =
654+
"orc";
655+
656+
/**
657+
* This is a parquet file with a v1/v3 footer: {@value}.
658+
* Do whatever is needed to optimize for it, such as footer
659+
* prefetch and cache,
660+
*/
661+
public static final String FS_OPTION_OPENFILE_READ_POLICY_PARQUET =
662+
"parquet";
663+
607664
/**
608665
* Read policy for random IO: {@value}.
609666
*/
@@ -634,7 +691,13 @@ private OpenFileOptions() {
634691
public static final Set<String> FS_OPTION_OPENFILE_READ_POLICIES =
635692
Collections.unmodifiableSet(Stream.of(
636693
FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE,
694+
FS_OPTION_OPENFILE_READ_POLICY_AVRO,
695+
FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR,
696+
FS_OPTION_OPENFILE_READ_POLICY_CSV,
637697
FS_OPTION_OPENFILE_READ_POLICY_DEFAULT,
698+
FS_OPTION_OPENFILE_READ_POLICY_JSON,
699+
FS_OPTION_OPENFILE_READ_POLICY_ORC,
700+
FS_OPTION_OPENFILE_READ_POLICY_PARQUET,
638701
FS_OPTION_OPENFILE_READ_POLICY_RANDOM,
639702
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL,
640703
FS_OPTION_OPENFILE_READ_POLICY_VECTOR,

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1320,6 +1320,8 @@ public boolean hasPathCapability(final Path path, final String capability)
13201320
case CommonPathCapabilities.FS_PATHHANDLES:
13211321
case CommonPathCapabilities.FS_PERMISSIONS:
13221322
case CommonPathCapabilities.FS_TRUNCATE:
1323+
// block locations are generated locally
1324+
case CommonPathCapabilities.VIRTUAL_BLOCK_LOCATIONS:
13231325
return true;
13241326
case CommonPathCapabilities.FS_SYMLINKS:
13251327
return FileSystem.areSymlinksEnabled();

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/wrappedio/WrappedIO.java

Lines changed: 148 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,30 @@
1818

1919
package org.apache.hadoop.io.wrappedio;
2020

21+
import java.io.IOException;
22+
import java.io.InputStream;
2123
import java.io.UncheckedIOException;
24+
import java.nio.ByteBuffer;
2225
import java.util.Collection;
2326
import java.util.List;
2427
import java.util.Map;
28+
import javax.annotation.Nullable;
2529

2630
import org.apache.hadoop.classification.InterfaceAudience;
2731
import org.apache.hadoop.classification.InterfaceStability;
2832
import org.apache.hadoop.fs.BulkDelete;
33+
import org.apache.hadoop.fs.ByteBufferPositionedReadable;
34+
import org.apache.hadoop.fs.FSDataInputStream;
35+
import org.apache.hadoop.fs.FileStatus;
2936
import org.apache.hadoop.fs.FileSystem;
37+
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
3038
import org.apache.hadoop.fs.Path;
39+
import org.apache.hadoop.fs.PathCapabilities;
40+
import org.apache.hadoop.fs.StreamCapabilities;
41+
import org.apache.hadoop.util.functional.FutureIO;
3142

43+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
44+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
3245
import static org.apache.hadoop.util.functional.FunctionalIO.uncheckIOExceptions;
3346

3447
/**
@@ -82,7 +95,8 @@ public static int bulkDelete_pageSize(FileSystem fs, Path path) {
8295
* @param fs filesystem
8396
* @param base path to delete under.
8497
* @param paths list of paths which must be absolute and under the base path.
85-
* @return a list of all the paths which couldn't be deleted for a reason other than "not found" and any associated error message.
98+
* @return a list of all the paths which couldn't be deleted for a reason other
99+
* than "not found" and any associated error message.
86100
* @throws UnsupportedOperationException bulk delete under that path is not supported.
87101
* @throws UncheckedIOException if an IOE was raised.
88102
* @throws IllegalArgumentException if a path argument is invalid.
@@ -97,4 +111,137 @@ public static List<Map.Entry<Path, String>> bulkDelete_delete(FileSystem fs,
97111
}
98112
});
99113
}
114+
115+
/**
116+
* Does a path have a given capability?
117+
* Calls {@link PathCapabilities#hasPathCapability(Path, String)},
118+
* mapping IOExceptions to false.
119+
* @param fs filesystem
120+
* @param path path to query the capability of.
121+
* @param capability non-null, non-empty string to query the path for support.
122+
* @return true if the capability is supported under that part of the FS.
123+
* resolving paths or relaying the call.
124+
* @throws IllegalArgumentException invalid arguments
125+
*/
126+
public static boolean pathCapabilities_hasPathCapability(Object fs,
127+
Path path,
128+
String capability) {
129+
try {
130+
return ((PathCapabilities) fs).hasPathCapability(path, capability);
131+
} catch (IOException e) {
132+
return false;
133+
}
134+
}
135+
136+
/**
137+
* Does an object implement {@link StreamCapabilities} and, if so,
138+
* what is the result of the probe for the capability?
139+
* Calls {@link StreamCapabilities#hasCapability(String)},
140+
* @param object object to probe
141+
* @param capability capability string
142+
* @return true iff the object implements StreamCapabilities and the capability is
143+
* declared available.
144+
*/
145+
public static boolean streamCapabilities_hasCapability(Object object, String capability) {
146+
if (!(object instanceof StreamCapabilities)) {
147+
return false;
148+
}
149+
return ((StreamCapabilities) object).hasCapability(capability);
150+
}
151+
152+
/**
153+
* OpenFile assistant, easy reflection-based access to
154+
* {@link FileSystem#openFile(Path)} and blocks
155+
* awaiting the operation completion.
156+
* @param fs filesystem
157+
* @param path path
158+
* @param policy read policy
159+
* @param status optional file status
160+
* @param length optional file length
161+
* @param options nullable map of other options
162+
* @return stream of the opened file
163+
* @throws UncheckedIOException if an IOE was raised.
164+
*/
165+
@InterfaceStability.Stable
166+
public static FSDataInputStream fileSystem_openFile(
167+
final FileSystem fs,
168+
final Path path,
169+
final String policy,
170+
@Nullable final FileStatus status,
171+
@Nullable final Long length,
172+
@Nullable final Map<String, String> options) {
173+
final FutureDataInputStreamBuilder builder = uncheckIOExceptions(() ->
174+
fs.openFile(path));
175+
if (policy != null) {
176+
builder.opt(FS_OPTION_OPENFILE_READ_POLICY, policy);
177+
}
178+
if (status != null) {
179+
builder.withFileStatus(status);
180+
}
181+
if (length != null) {
182+
builder.opt(FS_OPTION_OPENFILE_LENGTH, Long.toString(length));
183+
}
184+
if (options != null) {
185+
// add all the options map entries
186+
options.forEach(builder::opt);
187+
}
188+
// wait for the opening.
189+
return uncheckIOExceptions(() ->
190+
FutureIO.awaitFuture(builder.build()));
191+
}
192+
193+
/**
194+
* Return path of the enclosing root for a given path.
195+
* The enclosing root path is a common ancestor that should be used for temp and staging dirs
196+
* as well as within encryption zones and other restricted directories.
197+
* @param fs filesystem
198+
* @param path file path to find the enclosing root path for
199+
* @return a path to the enclosing root
200+
* @throws IOException early checks like failure to resolve path cause IO failures
201+
*/
202+
public static Path fileSystem_getEnclosingRoot(FileSystem fs, Path path) throws IOException {
203+
return fs.getEnclosingRoot(path);
204+
}
205+
206+
/**
207+
* Delegate to {@link ByteBufferPositionedReadable#read(long, ByteBuffer)}.
208+
* @param in input stream
209+
* @param position position within file
210+
* @param buf the ByteBuffer to receive the results of the read operation.
211+
* Note: that is the default behaviour of {@link FSDataInputStream#readFully(long, ByteBuffer)}.
212+
*/
213+
public static void byteBufferPositionedReadable_readFully(
214+
InputStream in,
215+
long position,
216+
ByteBuffer buf) {
217+
if (!(in instanceof ByteBufferPositionedReadable)) {
218+
throw new UnsupportedOperationException("Not a ByteBufferPositionedReadable: " + in);
219+
}
220+
uncheckIOExceptions(() -> {
221+
((ByteBufferPositionedReadable) in).readFully(position, buf);
222+
return null;
223+
});
224+
}
225+
226+
/**
227+
* Probe to see if the input stream is an instance of ByteBufferPositionedReadable.
228+
* If the stream is an FSDataInputStream, the wrapped stream is checked.
229+
* @param in input stream
230+
* @return true if the stream implements the interface (including a wrapped stream)
231+
* and that it declares the stream capability.
232+
*/
233+
public static boolean byteBufferPositionedReadable_readFullyAvailable(
234+
InputStream in) {
235+
if (!(in instanceof ByteBufferPositionedReadable)) {
236+
return false;
237+
}
238+
if (in instanceof FSDataInputStream) {
239+
// ask the wrapped stream.
240+
return byteBufferPositionedReadable_readFullyAvailable(
241+
((FSDataInputStream) in).getWrappedStream());
242+
}
243+
// now rely on the input stream implementing path capabilities, which
244+
// all the Hadoop FS implementations do.
245+
return streamCapabilities_hasCapability(in, StreamCapabilities.PREADBYTEBUFFER);
246+
}
100247
}

0 commit comments

Comments
 (0)