Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ private String formatTokenId(TokenIdent id) {
/**
* Access to currentKey is protected by this object lock
*/
private DelegationKey currentKey;
private volatile DelegationKey currentKey;

private long keyUpdateInterval;
private long tokenMaxLifetime;
private long tokenRemoverScanInterval;
private long tokenRenewInterval;
private final long keyUpdateInterval;
private final long tokenMaxLifetime;
private final long tokenRemoverScanInterval;
private final long tokenRenewInterval;
/**
* Whether to store a token's tracking ID in its TokenInformation.
* Can be overridden by a subclass.
Expand Down Expand Up @@ -486,17 +486,18 @@ private synchronized void removeExpiredKeys() {
}

@Override
protected synchronized byte[] createPassword(TokenIdent identifier) {
protected byte[] createPassword(TokenIdent identifier) {
int sequenceNum;
long now = Time.now();
sequenceNum = incrementDelegationTokenSeqNum();
identifier.setIssueDate(now);
identifier.setMaxDate(now + tokenMaxLifetime);
identifier.setMasterKeyId(currentKey.getKeyId());
DelegationKey delegationCurrentKey = currentKey;
identifier.setMasterKeyId(delegationCurrentKey.getKeyId());
identifier.setSequenceNumber(sequenceNum);
LOG.info("Creating password for identifier: " + formatTokenId(identifier)
+ ", currentKey: " + currentKey.getKeyId());
byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
+ ", currentKey: " + delegationCurrentKey.getKeyId());
byte[] password = createPassword(identifier.getBytes(), delegationCurrentKey.getKey());
DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
try {
Expand All @@ -521,7 +522,6 @@ protected synchronized byte[] createPassword(TokenIdent identifier) {
*/
protected DelegationTokenInformation checkToken(TokenIdent identifier)
throws InvalidToken {
assert Thread.holdsLock(this);
DelegationTokenInformation info = getTokenInfo(identifier);
String err;
if (info == null) {
Expand All @@ -541,7 +541,7 @@ protected DelegationTokenInformation checkToken(TokenIdent identifier)
}

@Override
public synchronized byte[] retrievePassword(TokenIdent identifier)
public byte[] retrievePassword(TokenIdent identifier)
throws InvalidToken {
return checkToken(identifier).getPassword();
}
Expand All @@ -553,7 +553,7 @@ protected String getTrackingIdIfEnabled(TokenIdent ident) {
return null;
}

public synchronized String getTokenTrackingId(TokenIdent identifier) {
public String getTokenTrackingId(TokenIdent identifier) {
DelegationTokenInformation info = getTokenInfo(identifier);
if (info == null) {
return null;
Expand All @@ -567,7 +567,7 @@ public synchronized String getTokenTrackingId(TokenIdent identifier) {
* @param password Password in the token.
* @throws InvalidToken InvalidToken.
*/
public synchronized void verifyToken(TokenIdent identifier, byte[] password)
public void verifyToken(TokenIdent identifier, byte[] password)
throws InvalidToken {
byte[] storedPassword = retrievePassword(identifier);
if (!MessageDigest.isEqual(password, storedPassword)) {
Expand All @@ -584,7 +584,7 @@ public synchronized void verifyToken(TokenIdent identifier, byte[] password)
* @throws InvalidToken if the token is invalid
* @throws AccessControlException if the user can't renew token
*/
public synchronized long renewToken(Token<TokenIdent> token,
public long renewToken(Token<TokenIdent> token,
String renewer) throws InvalidToken, IOException {
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf);
Expand Down Expand Up @@ -646,7 +646,7 @@ public synchronized long renewToken(Token<TokenIdent> token,
* @throws InvalidToken for invalid token
* @throws AccessControlException if the user isn't allowed to cancel
*/
public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
public TokenIdent cancelToken(Token<TokenIdent> token,
String canceller) throws IOException {
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;

import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
Expand Down Expand Up @@ -148,7 +149,7 @@ protected static CuratorFramework getCurator() {
private final int seqNumBatchSize;
private int currentSeqNum;
private int currentMaxSeqNum;

private final ReentrantLock currentSeqNumLock;
private final boolean isTokenWatcherEnabled;

public ZKDelegationTokenSecretManager(Configuration conf) {
Expand All @@ -164,6 +165,7 @@ public ZKDelegationTokenSecretManager(Configuration conf) {
ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);
isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED,
ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT);
this.currentSeqNumLock = new ReentrantLock(true);
if (CURATOR_TL.get() != null) {
zkClient =
CURATOR_TL.get().usingNamespace(
Expand Down Expand Up @@ -520,24 +522,28 @@ protected int incrementDelegationTokenSeqNum() {
// The secret manager will keep a local range of seq num which won't be
// seen by peers, so only when the range is exhausted it will ask zk for
// another range again
if (currentSeqNum >= currentMaxSeqNum) {
try {
// after a successful batch request, we can get the range starting point
currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
LOG.info("Fetched new range of seq num, from {} to {} ",
currentSeqNum+1, currentMaxSeqNum);
} catch (InterruptedException e) {
// The ExpirationThread is just finishing.. so dont do anything..
LOG.debug(
"Thread interrupted while performing token counter increment", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
throw new RuntimeException("Could not increment shared counter !!", e);
try {
this.currentSeqNumLock.lock();
if (currentSeqNum >= currentMaxSeqNum) {
try {
// after a successful batch request, we can get the range starting point
currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
LOG.info("Fetched new range of seq num, from {} to {} ",
currentSeqNum+1, currentMaxSeqNum);
} catch (InterruptedException e) {
// The ExpirationThread is just finishing.. so dont do anything..
LOG.debug(
"Thread interrupted while performing token counter increment", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
throw new RuntimeException("Could not increment shared counter !!", e);
}
}
return ++currentSeqNum;
} finally {
this.currentSeqNumLock.unlock();
}

return ++currentSeqNum;
}

@Override
Expand Down