Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -454,4 +454,10 @@
<Class name="org.apache.hadoop.ipc.internal.ShadedProtobufHelper" />
<Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION" />
</Match>

<!-- class cast after an assignableFrom check. -->
<Match>
<Class name="org.apache.hadoop.util.dynamic.DynMethods" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,20 @@ private CommonPathCapabilities() {
*/
public static final String BULK_DELETE = "fs.capability.bulk.delete";

/**
* Capability string to probe for block locations returned in {@code LocatedFileStatus}
* instances from calls such as {@code getBlockLocations()} and {@code listStatus()}l
* to be 'virtual' rather than actual values resolved against a Distributed Filesystem including
* HDFS: {@value}.
* <p>
* Key implications from this path capability being true:
* <ol>
* <li>Work can be scheduled anywhere</li>
* <li>Creation of the location list is a low cost-client side operation</li>
* </ol>
* Implication #2 means there is no performance penalty from use of FileSystem operations which
* return lists or iterators of {@code LocatedFileStatus}.
*/
public static final String VIRTUAL_BLOCK_LOCATIONS = "fs.capability.virtual.block.locations";

}
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,14 @@ public int read(long position, ByteBuffer buf) throws IOException {
"by " + in.getClass().getCanonicalName());
}

/**
* Delegate to the underlying stream.
* @param position position within file
* @param buf the ByteBuffer to receive the results of the read operation.
* @throws IOException on a failure from the nested stream.
* @throws UnsupportedOperationException if the inner stream does not
* support this operation.
*/
@Override
public void readFully(long position, ByteBuffer buf) throws IOException {
if (in instanceof ByteBufferPositionedReadable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,12 @@ private OpenFileOptions() {
public static final String FS_OPTION_OPENFILE_BUFFER_SIZE =
FS_OPTION_OPENFILE + "buffer.size";

/**
* OpenFile footer cache flag: {@value}.
*/
public static final String FS_OPTION_OPENFILE_FOOTER_CACHE =
FS_OPTION_OPENFILE + "footer.cache";

/**
* OpenFile option for read policies: {@value}.
*/
Expand All @@ -586,6 +592,7 @@ private OpenFileOptions() {
public static final Set<String> FS_OPTION_OPENFILE_STANDARD_OPTIONS =
Collections.unmodifiableSet(Stream.of(
FS_OPTION_OPENFILE_BUFFER_SIZE,
FS_OPTION_OPENFILE_FOOTER_CACHE,
FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_LENGTH,
FS_OPTION_OPENFILE_SPLIT_START,
Expand All @@ -599,11 +606,61 @@ private OpenFileOptions() {
"adaptive";

/**
* Read policy {@value} -whateve the implementation does by default.
* We are an avro file: {@value}.
*/
public static final String FS_OPTION_OPENFILE_READ_POLICY_AVRO = "avro";

/**
* This is a columnar file format.
* Do whatever is needed to optimize for it: {@value}.
*/
public static final String FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR =
"columnar";

/**
* This is a CSV file of plain or UTF-8 text
* to be read sequentially.
* Do whatever is needed to optimize for it: {@value}.
*/
public static final String FS_OPTION_OPENFILE_READ_POLICY_CSV =
"csv";

/**
* Read policy {@value} -whatever the implementation does by default.
*/
public static final String FS_OPTION_OPENFILE_READ_POLICY_DEFAULT =
"default";

/**
* This is a table file for Apache HBase.
* Do whatever is needed to optimize for it: {@value}.
*/
public static final String FS_OPTION_OPENFILE_READ_POLICY_HBASE =
"hbase";

/**
* This is a JSON file of UTF-8 text, including a
* JSON line file where each line is a JSON entity.
* Do whatever is needed to optimize for it: {@value}.
*/
public static final String FS_OPTION_OPENFILE_READ_POLICY_JSON =
"json";

/**
* This is an ORC file.
* Do whatever is needed to optimize for it: {@value}.
*/
public static final String FS_OPTION_OPENFILE_READ_POLICY_ORC =
"orc";

/**
* This is a parquet file with a v1/v3 footer: {@value}.
* Do whatever is needed to optimize for it, such as footer
* prefetch and cache,
*/
public static final String FS_OPTION_OPENFILE_READ_POLICY_PARQUET =
"parquet";

/**
* Read policy for random IO: {@value}.
*/
Expand Down Expand Up @@ -634,7 +691,13 @@ private OpenFileOptions() {
public static final Set<String> FS_OPTION_OPENFILE_READ_POLICIES =
Collections.unmodifiableSet(Stream.of(
FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE,
FS_OPTION_OPENFILE_READ_POLICY_AVRO,
FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR,
FS_OPTION_OPENFILE_READ_POLICY_CSV,
FS_OPTION_OPENFILE_READ_POLICY_DEFAULT,
FS_OPTION_OPENFILE_READ_POLICY_JSON,
FS_OPTION_OPENFILE_READ_POLICY_ORC,
FS_OPTION_OPENFILE_READ_POLICY_PARQUET,
FS_OPTION_OPENFILE_READ_POLICY_RANDOM,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL,
FS_OPTION_OPENFILE_READ_POLICY_VECTOR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,8 @@ public boolean hasPathCapability(final Path path, final String capability)
case CommonPathCapabilities.FS_PATHHANDLES:
case CommonPathCapabilities.FS_PERMISSIONS:
case CommonPathCapabilities.FS_TRUNCATE:
// block locations are generated locally
case CommonPathCapabilities.VIRTUAL_BLOCK_LOCATIONS:
return true;
case CommonPathCapabilities.FS_SYMLINKS:
return FileSystem.areSymlinksEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,30 @@

package org.apache.hadoop.io.wrappedio;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BulkDelete;
import org.apache.hadoop.fs.ByteBufferPositionedReadable;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathCapabilities;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.util.functional.FutureIO;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.util.functional.FunctionalIO.uncheckIOExceptions;

/**
Expand Down Expand Up @@ -82,7 +95,8 @@ public static int bulkDelete_pageSize(FileSystem fs, Path path) {
* @param fs filesystem
* @param base path to delete under.
* @param paths list of paths which must be absolute and under the base path.
* @return a list of all the paths which couldn't be deleted for a reason other than "not found" and any associated error message.
* @return a list of all the paths which couldn't be deleted for a reason other
* than "not found" and any associated error message.
* @throws UnsupportedOperationException bulk delete under that path is not supported.
* @throws UncheckedIOException if an IOE was raised.
* @throws IllegalArgumentException if a path argument is invalid.
Expand All @@ -97,4 +111,137 @@ public static List<Map.Entry<Path, String>> bulkDelete_delete(FileSystem fs,
}
});
}

/**
* Does a path have a given capability?
* Calls {@link PathCapabilities#hasPathCapability(Path, String)},
* mapping IOExceptions to false.
* @param fs filesystem
* @param path path to query the capability of.
* @param capability non-null, non-empty string to query the path for support.
* @return true if the capability is supported under that part of the FS.
* resolving paths or relaying the call.
* @throws IllegalArgumentException invalid arguments
*/
public static boolean pathCapabilities_hasPathCapability(Object fs,
Path path,
String capability) {
try {
return ((PathCapabilities) fs).hasPathCapability(path, capability);
} catch (IOException e) {
return false;
}
}

/**
* Does an object implement {@link StreamCapabilities} and, if so,
* what is the result of the probe for the capability?
* Calls {@link StreamCapabilities#hasCapability(String)},
* @param object object to probe
* @param capability capability string
* @return true iff the object implements StreamCapabilities and the capability is
* declared available.
*/
public static boolean streamCapabilities_hasCapability(Object object, String capability) {
if (!(object instanceof StreamCapabilities)) {
return false;
}
return ((StreamCapabilities) object).hasCapability(capability);
}

/**
* OpenFile assistant, easy reflection-based access to
* {@link FileSystem#openFile(Path)} and blocks
* awaiting the operation completion.
* @param fs filesystem
* @param path path
* @param policy read policy
* @param status optional file status
* @param length optional file length
* @param options nullable map of other options
* @return stream of the opened file
* @throws UncheckedIOException if an IOE was raised.
*/
@InterfaceStability.Stable
public static FSDataInputStream fileSystem_openFile(
final FileSystem fs,
final Path path,
final String policy,
@Nullable final FileStatus status,
@Nullable final Long length,
@Nullable final Map<String, String> options) {
final FutureDataInputStreamBuilder builder = uncheckIOExceptions(() ->
fs.openFile(path));
if (policy != null) {
builder.opt(FS_OPTION_OPENFILE_READ_POLICY, policy);
}
if (status != null) {
builder.withFileStatus(status);
}
if (length != null) {
builder.opt(FS_OPTION_OPENFILE_LENGTH, Long.toString(length));
}
if (options != null) {
// add all the options map entries
options.forEach(builder::opt);
}
// wait for the opening.
return uncheckIOExceptions(() ->
FutureIO.awaitFuture(builder.build()));
}

/**
* Return path of the enclosing root for a given path.
* The enclosing root path is a common ancestor that should be used for temp and staging dirs
* as well as within encryption zones and other restricted directories.
* @param fs filesystem
* @param path file path to find the enclosing root path for
* @return a path to the enclosing root
* @throws IOException early checks like failure to resolve path cause IO failures
*/
public static Path fileSystem_getEnclosingRoot(FileSystem fs, Path path) throws IOException {
return fs.getEnclosingRoot(path);
}

/**
* Delegate to {@link ByteBufferPositionedReadable#read(long, ByteBuffer)}.
* @param in input stream
* @param position position within file
* @param buf the ByteBuffer to receive the results of the read operation.
* Note: that is the default behaviour of {@link FSDataInputStream#readFully(long, ByteBuffer)}.
*/
public static void byteBufferPositionedReadable_readFully(
InputStream in,
long position,
ByteBuffer buf) {
if (!(in instanceof ByteBufferPositionedReadable)) {
throw new UnsupportedOperationException("Not a ByteBufferPositionedReadable: " + in);
}
uncheckIOExceptions(() -> {
((ByteBufferPositionedReadable) in).readFully(position, buf);
return null;
});
}

/**
* Probe to see if the input stream is an instance of ByteBufferPositionedReadable.
* If the stream is an FSDataInputStream, the wrapped stream is checked.
* @param in input stream
* @return true if the stream implements the interface (including a wrapped stream)
* and that it declares the stream capability.
*/
public static boolean byteBufferPositionedReadable_readFullyAvailable(
InputStream in) {
if (!(in instanceof ByteBufferPositionedReadable)) {
return false;
}
if (in instanceof FSDataInputStream) {
// ask the wrapped stream.
return byteBufferPositionedReadable_readFullyAvailable(
((FSDataInputStream) in).getWrappedStream());
}
// now rely on the input stream implementing path capabilities, which
// all the Hadoop FS implementations do.
return streamCapabilities_hasCapability(in, StreamCapabilities.PREADBYTEBUFFER);
}
}
Loading