17
17
*/
18
18
package org .apache .hadoop .hdfs .qjournal .client ;
19
19
20
+ import static org .apache .hadoop .hdfs .DFSConfigKeys .DFS_JOURNALNODE_MAINTENANCE_NODES_DEFAULT ;
21
+ import static org .apache .hadoop .hdfs .DFSConfigKeys .DFS_JOURNALNODE_MAINTENANCE_NODES_KEY ;
22
+
20
23
import java .io .IOException ;
21
24
import java .net .InetSocketAddress ;
22
25
import java .net .URI ;
31
34
import java .util .concurrent .TimeUnit ;
32
35
import java .util .concurrent .TimeoutException ;
33
36
37
+ import org .apache .hadoop .hdfs .server .blockmanagement .HostSet ;
34
38
import org .apache .hadoop .util .Lists ;
35
39
import org .slf4j .Logger ;
36
40
import org .slf4j .LoggerFactory ;
62
66
import org .apache .hadoop .classification .VisibleForTesting ;
63
67
import org .apache .hadoop .thirdparty .com .google .common .base .Joiner ;
64
68
import org .apache .hadoop .util .Preconditions ;
69
+
65
70
import org .apache .hadoop .thirdparty .protobuf .TextFormat ;
66
71
67
72
/**
@@ -108,6 +113,7 @@ public class QuorumJournalManager implements JournalManager {
108
113
private static final int OUTPUT_BUFFER_CAPACITY_DEFAULT = 512 * 1024 ;
109
114
private int outputBufferCapacity ;
110
115
private final URLConnectionFactory connectionFactory ;
116
+ private int quorumJournalCount ;
111
117
112
118
/** Limit logging about input stream selection to every 5 seconds max. */
113
119
private static final long SELECT_INPUT_STREAM_LOG_INTERVAL_MS = 5000 ;
@@ -146,6 +152,13 @@ public QuorumJournalManager(Configuration conf,
146
152
this .nameServiceId = nameServiceId ;
147
153
this .loggers = new AsyncLoggerSet (createLoggers (loggerFactory ));
148
154
155
+ // Check whether the number of jn maintenance lists is valid
156
+ int quorumThreshold = (quorumJournalCount / 2 ) + 1 ;
157
+ Preconditions .checkArgument (
158
+ this .loggers .size () >= quorumThreshold ,
159
+ "The total journalnode minus %s the number of blacklists must be greater than or equal to"
160
+ + " %s!" , DFS_JOURNALNODE_MAINTENANCE_NODES_KEY , quorumThreshold );
161
+
149
162
this .maxTxnsPerRpc =
150
163
conf .getInt (QJM_RPC_MAX_TXNS_KEY , QJM_RPC_MAX_TXNS_DEFAULT );
151
164
Preconditions .checkArgument (maxTxnsPerRpc > 0 ,
@@ -250,6 +263,9 @@ Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch()
250
263
251
264
@ Override
252
265
public void format (NamespaceInfo nsInfo , boolean force ) throws IOException {
266
+ if (isEnableJnMaintenance ()) {
267
+ throw new IOException ("format() does not support enabling jn maintenance mode" );
268
+ }
253
269
QuorumCall <AsyncLogger , Void > call = loggers .format (nsInfo , force );
254
270
try {
255
271
call .waitFor (loggers .size (), loggers .size (), 0 , timeoutMs ,
@@ -406,21 +422,39 @@ private void recoverUnclosedSegment(long segmentTxId) throws IOException {
406
422
logToSync .getStartTxId (),
407
423
logToSync .getEndTxId ()));
408
424
}
409
-
410
- static List <AsyncLogger > createLoggers (Configuration conf ,
425
+
426
+ List <AsyncLogger > createLoggers (Configuration conf ,
427
+ URI uri ,
428
+ NamespaceInfo nsInfo ,
429
+ AsyncLogger .Factory factory ,
430
+ String nameServiceId )
431
+ throws IOException {
432
+ String [] skipNodesHostPort = conf .getTrimmedStrings (
433
+ DFS_JOURNALNODE_MAINTENANCE_NODES_KEY , DFS_JOURNALNODE_MAINTENANCE_NODES_DEFAULT );
434
+ return createLoggers (conf , uri , nsInfo , factory , nameServiceId , skipNodesHostPort );
435
+ }
436
+
437
+ private List <AsyncLogger > createLoggers (Configuration conf ,
411
438
URI uri ,
412
439
NamespaceInfo nsInfo ,
413
440
AsyncLogger .Factory factory ,
414
- String nameServiceId )
441
+ String nameServiceId ,
442
+ String [] skipNodesHostPort )
415
443
throws IOException {
416
444
List <AsyncLogger > ret = Lists .newArrayList ();
417
445
List <InetSocketAddress > addrs = Util .getAddressesList (uri , conf );
418
446
if (addrs .size () % 2 == 0 ) {
419
447
LOG .warn ("Quorum journal URI '" + uri + "' has an even number " +
420
448
"of Journal Nodes specified. This is not recommended!" );
421
449
}
450
+ this .quorumJournalCount = addrs .size ();
451
+ HostSet skipSet = DFSUtil .getInetSocketAddress (skipNodesHostPort );
422
452
String jid = parseJournalId (uri );
423
453
for (InetSocketAddress addr : addrs ) {
454
+ if (skipSet .match (addr )) {
455
+ LOG .info ("The node {} is a maintenance node and will skip initialization." , addr );
456
+ continue ;
457
+ }
424
458
ret .add (factory .createLogger (conf , nsInfo , jid , nameServiceId , addr ));
425
459
}
426
460
return ret ;
@@ -667,6 +701,9 @@ AsyncLoggerSet getLoggerSetForTests() {
667
701
668
702
@ Override
669
703
public void doPreUpgrade () throws IOException {
704
+ if (isEnableJnMaintenance ()) {
705
+ throw new IOException ("doPreUpgrade() does not support enabling jn maintenance mode" );
706
+ }
670
707
QuorumCall <AsyncLogger , Void > call = loggers .doPreUpgrade ();
671
708
try {
672
709
call .waitFor (loggers .size (), loggers .size (), 0 , timeoutMs ,
@@ -684,6 +721,9 @@ public void doPreUpgrade() throws IOException {
684
721
685
722
@ Override
686
723
public void doUpgrade (Storage storage ) throws IOException {
724
+ if (isEnableJnMaintenance ()) {
725
+ throw new IOException ("doUpgrade() does not support enabling jn maintenance mode" );
726
+ }
687
727
QuorumCall <AsyncLogger , Void > call = loggers .doUpgrade (storage );
688
728
try {
689
729
call .waitFor (loggers .size (), loggers .size (), 0 , timeoutMs ,
@@ -701,6 +741,9 @@ public void doUpgrade(Storage storage) throws IOException {
701
741
702
742
@ Override
703
743
public void doFinalize () throws IOException {
744
+ if (isEnableJnMaintenance ()) {
745
+ throw new IOException ("doFinalize() does not support enabling jn maintenance mode" );
746
+ }
704
747
QuorumCall <AsyncLogger , Void > call = loggers .doFinalize ();
705
748
try {
706
749
call .waitFor (loggers .size (), loggers .size (), 0 , timeoutMs ,
@@ -719,6 +762,9 @@ public void doFinalize() throws IOException {
719
762
@ Override
720
763
public boolean canRollBack (StorageInfo storage , StorageInfo prevStorage ,
721
764
int targetLayoutVersion ) throws IOException {
765
+ if (isEnableJnMaintenance ()) {
766
+ throw new IOException ("canRollBack() does not support enabling jn maintenance mode" );
767
+ }
722
768
QuorumCall <AsyncLogger , Boolean > call = loggers .canRollBack (storage ,
723
769
prevStorage , targetLayoutVersion );
724
770
try {
@@ -753,6 +799,9 @@ public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
753
799
754
800
@ Override
755
801
public void doRollback () throws IOException {
802
+ if (isEnableJnMaintenance ()) {
803
+ throw new IOException ("doRollback() does not support enabling jn maintenance mode" );
804
+ }
756
805
QuorumCall <AsyncLogger , Void > call = loggers .doRollback ();
757
806
try {
758
807
call .waitFor (loggers .size (), loggers .size (), 0 , timeoutMs ,
@@ -770,6 +819,9 @@ public void doRollback() throws IOException {
770
819
771
820
@ Override
772
821
public void discardSegments (long startTxId ) throws IOException {
822
+ if (isEnableJnMaintenance ()) {
823
+ throw new IOException ("discardSegments() does not support enabling jn maintenance mode" );
824
+ }
773
825
QuorumCall <AsyncLogger , Void > call = loggers .discardSegments (startTxId );
774
826
try {
775
827
call .waitFor (loggers .size (), loggers .size (), 0 ,
@@ -789,6 +841,9 @@ public void discardSegments(long startTxId) throws IOException {
789
841
790
842
@ Override
791
843
public long getJournalCTime () throws IOException {
844
+ if (isEnableJnMaintenance ()) {
845
+ throw new IOException ("getJournalCTime() does not support enabling jn maintenance mode" );
846
+ }
792
847
QuorumCall <AsyncLogger , Long > call = loggers .getJournalCTime ();
793
848
try {
794
849
call .waitFor (loggers .size (), loggers .size (), 0 ,
@@ -819,4 +874,8 @@ public long getJournalCTime() throws IOException {
819
874
820
875
throw new AssertionError ("Unreachable code." );
821
876
}
877
+
878
+ private boolean isEnableJnMaintenance () {
879
+ return this .loggers .size () < quorumJournalCount ;
880
+ }
822
881
}
0 commit comments