Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3331,6 +3331,8 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
= TIMELINE_SERVICE_PREFIX
+ "entity-file.fs-support-append";
public static final boolean TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND_DEFAULT
= true;

public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_WITH_USER_DIR =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* This package provides ATS functionality.
*/
@InterfaceAudience.Private
package org.apache.hadoop.yarn.server.timeline;
import org.apache.hadoop.classification.InterfaceAudience;
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -141,6 +143,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
private long unknownActiveMillis;
private int appCacheMaxSize = 0;
private boolean recoveryEnabled;
private boolean isAppendSupported;
private Path checkpointFile;
private ConcurrentMap<String, Pair<Long, Long>> recoveredLogs =
new ConcurrentHashMap<String, Pair<Long, Long>>();
Expand Down Expand Up @@ -223,6 +226,10 @@ protected boolean removeEldestEntry(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RECOVERY_ENABLED,
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RECOVERY_ENABLED_DEFAULT);

isAppendSupported = conf.getBoolean(
YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND,
YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND_DEFAULT);

aclsEnabled = conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE,
YarnConfiguration.DEFAULT_YARN_ACL_ENABLE);
CallerContext.setCurrent(
Expand Down Expand Up @@ -842,11 +849,34 @@ long scanForLogs() throws IOException {
}
String attemptDirName = statAttempt.getPath().getName();
RemoteIterator<FileStatus> iterCache = list(statAttempt.getPath());
List<FileStatus> files = new ArrayList<>();
while (iterCache.hasNext()) {
FileStatus statCache = iterCache.next();
if (!statCache.isFile()) {
continue;
}
files.add(statCache);
}
if (isAppendSupported) {
Collections.sort(files, new Comparator<FileStatus>() {
@Override
public int compare(FileStatus o1, FileStatus o2) {
String[] ts1 = o1.getPath().getName().split("_");
String[] ts2 = o2.getPath().getName().split("_");
if (StringUtils.isNumeric(ts1[ts1.length - 1])
&& StringUtils.isNumeric(ts2[ts2.length - 1])) {
return (Integer.parseInt(ts1[ts1.length - 1])
- Integer.parseInt(ts2[ts2.length - 1]));
} else {
return o1.getPath().getName().compareTo(o2.getPath().getName());
}
}
});
}
Iterator<FileStatus> fileIterator = files.iterator();

while (fileIterator.hasNext()) {
FileStatus statCache = fileIterator.next();
String filename = statCache.getPath().getName();
String owner = statCache.getOwner();
//YARN-10884:Owner of File is set to Null on WASB Append Operation.ATS fails to read such
Expand Down