Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class FSDownload implements Callable<Path> {
private static final Logger LOG =
LoggerFactory.getLogger(FSDownload.class);

private String containerId;
private FileContext files;
private final UserGroupInformation userUgi;
private Configuration conf;
Expand All @@ -94,12 +95,27 @@ public class FSDownload implements Callable<Path> {

public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf,
Path destDirPath, LocalResource resource) {
this(files, ugi, conf, destDirPath, resource, null);
this(files, ugi, conf, destDirPath, resource, null, "");
}

public FSDownload(String containerId, FileContext files, UserGroupInformation ugi,
Configuration conf,
Path destDirPath, LocalResource resource) {
this(files, ugi, conf, destDirPath, resource, null, containerId);
}

public FSDownload(FileContext files,
UserGroupInformation ugi, Configuration conf,
Path destDirPath, LocalResource resource,
LoadingCache<Path,Future<FileStatus>> statCache) {
this(files, ugi, conf, destDirPath, resource, statCache, "");
}

public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf,
Path destDirPath, LocalResource resource,
LoadingCache<Path,Future<FileStatus>> statCache) {
LoadingCache<Path,Future<FileStatus>> statCache,
String containerId) {
this.containerId = containerId;
this.conf = conf;
this.destDirPath = destDirPath;
this.files = files;
Expand Down Expand Up @@ -408,8 +424,8 @@ public Path call() throws Exception {
throw new IOException("Invalid resource", e);
}

LOG.debug("Starting to download {} {} {}", sCopy,
resource.getType(), resource.getPattern());
LOG.info("Starting to download {} {} {} for containerId: {}", sCopy,
resource.getType(), resource.getPattern(), containerId);

final Path destinationTmp = new Path(destDirPath + "_tmp");
createDir(destinationTmp, cachePerms);
Expand All @@ -430,8 +446,8 @@ public Void run() throws Exception {
changePermissions(dFinal.getFileSystem(conf), dFinal);
files.rename(destinationTmp, destDirPath, Rename.OVERWRITE);

LOG.debug("File has been downloaded to {} from {}",
new Path(destDirPath, sCopy.getName()), sCopy);
LOG.info("File has been downloaded to {} from {} for containerId: {}",
new Path(destDirPath, sCopy.getName()), sCopy, containerId);
} catch (Exception e) {
try {
files.delete(destDirPath, true);
Expand Down Expand Up @@ -478,7 +494,7 @@ private void changePermissions(FileSystem fs, final Path path)
perm = isDir ? PRIVATE_DIR_PERMS : PRIVATE_FILE_PERMS;
}

LOG.debug("Changing permissions for path {} to perm {}", path, perm);
LOG.info("Changing permissions for path {} to perm {}", path, perm);

final FsPermission fPerm = perm;
if (null == userUgi) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ public void startLocalizer(LocalizerStartContext ctx)
String user = ctx.getUser();
String appId = ctx.getAppId();
String locId = ctx.getLocId();
String containerId = ctx.getContainerId();
LocalDirsHandlerService dirsHandler = ctx.getDirsHandler();

List<String> localDirs = dirsHandler.getLocalDirs();
Expand Down Expand Up @@ -199,7 +200,7 @@ public void startLocalizer(LocalizerStartContext ctx)

ContainerLocalizer localizer =
createContainerLocalizer(user, appId, locId, tokenFn, localDirs,
localizerFc);
localizerFc, containerId);
// TODO: DO it over RPC for maintaining similarity?
localizer.runLocalization(nmAddr);
}
Expand All @@ -224,11 +225,11 @@ public void startLocalizer(LocalizerStartContext ctx)
@VisibleForTesting
protected ContainerLocalizer createContainerLocalizer(String user,
String appId, String locId, String tokenFileName, List<String> localDirs,
FileContext localizerFc) throws IOException {
FileContext localizerFc, String containerId) throws IOException {
ContainerLocalizer localizer =
new ContainerLocalizer(localizerFc, user, appId, locId, tokenFileName,
getPaths(localDirs),
RecordFactoryProvider.getRecordFactory(getConf()));
RecordFactoryProvider.getRecordFactory(getConf()), containerId);
return localizer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public class ContainerLocalizer {

private final String user;
private final String appId;
private final String containerId;
private final List<Path> localDirs;
private final String localizerId;
private final FileContext lfs;
Expand All @@ -120,7 +121,7 @@ public class ContainerLocalizer {

public ContainerLocalizer(FileContext lfs, String user, String appId,
String localizerId, String tokenFileName, List<Path> localDirs,
RecordFactory recordFactory) throws IOException {
RecordFactory recordFactory, String containerId) throws IOException {
if (null == user) {
throw new IOException("Cannot initialize for null user");
}
Expand All @@ -130,6 +131,7 @@ public ContainerLocalizer(FileContext lfs, String user, String appId,
this.lfs = lfs;
this.user = user;
this.appId = appId;
this.containerId = containerId;
this.localDirs = localDirs;
this.localizerId = localizerId;
this.recordFactory = recordFactory;
Expand All @@ -142,6 +144,12 @@ public ContainerLocalizer(FileContext lfs, String user, String appId,
"token file name cannot be null");
}

public ContainerLocalizer(FileContext lfs, String user, String appId,
String localizerId, List<Path> localDirs,
RecordFactory recordFactory) throws IOException {
this(lfs, user, appId, localizerId, localDirs, recordFactory, "");
}

@VisibleForTesting
@Private
Configuration initConfiguration() {
Expand Down Expand Up @@ -232,6 +240,12 @@ class FSDownloadWrapper extends FSDownload {
super(files, ugi, conf, destDirPath, resource);
}

FSDownloadWrapper(FileContext files, UserGroupInformation ugi,
Configuration conf, Path destDirPath, LocalResource resource,
String containerId) {
super(containerId, files, ugi, conf, destDirPath, resource);
}

@Override
public Path call() throws Exception {
Thread currentThread = Thread.currentThread();
Expand All @@ -258,7 +272,7 @@ Callable<Path> download(Path destDirPath, LocalResource rsrc,
}
diskValidator
.checkStatus(new File(destDirPath.getParent().toUri().getRawPath()));
return new FSDownloadWrapper(lfs, ugi, conf, destDirPath, rsrc);
return new FSDownloadWrapper(lfs, ugi, conf, destDirPath, rsrc, containerId);
}

private void createParentDirs(Path destDirPath) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1269,6 +1269,7 @@ public void run() {
.setAppId(context.getContainerId()
.getApplicationAttemptId().getApplicationId().toString())
.setLocId(localizerId)
.setContainerId(context.getContainerId().toString())
.setDirsHandler(dirsHandler)
.build());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public final class LocalizerStartContext {
private final String user;
private final String appId;
private final String locId;
public final String containerId;
private final LocalDirsHandlerService dirsHandler;

public static final class Builder {
Expand All @@ -47,6 +48,7 @@ public static final class Builder {
private String user;
private String appId;
private String locId;
private String containerId;
private LocalDirsHandlerService dirsHandler;

public Builder() {
Expand All @@ -72,6 +74,11 @@ public Builder setAppId(String appId) {
return this;
}

public Builder setContainerId(String containerId) {
this.containerId = containerId;
return this;
}

public Builder setLocId(String locId) {
this.locId = locId;
return this;
Expand All @@ -93,6 +100,7 @@ private LocalizerStartContext(Builder builder) {
this.user = builder.user;
this.appId = builder.appId;
this.locId = builder.locId;
this.containerId = builder.containerId;
this.dirsHandler = builder.dirsHandler;
}

Expand All @@ -116,6 +124,10 @@ public String getLocId() {
return this.locId;
}

public String getContainerId() {
return this.containerId;
}

public LocalDirsHandlerService getDirsHandler() {
return this.dirsHandler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,14 +549,17 @@ public Object answer(InvocationOnMock invocationOnMock)
@Override
public ContainerLocalizer createContainerLocalizer(String user,
String appId, String locId, String tokenFileName,
List<String> localDirs, FileContext localizerFc)
throws IOException {
List<String> localDirs, FileContext localizerFc,
String containerId) throws IOException {

// Spy on the localizer and make it return valid heart-beat
// responses even though there is no real NodeManager.
ContainerLocalizer localizer =
super.createContainerLocalizer(user, appId, locId,
tokenFileName, localDirs, localizerFc);
tokenFileName, localDirs, localizerFc, appId);
// in the above line passing appId in place of container Id as
// container id is just for logging purposes and has not other
// use
ContainerLocalizer spyLocalizer = spy(localizer);
LocalizationProtocol nmProxy = mock(LocalizationProtocol.class);
try {
Expand Down