23
23
import static org .apache .hadoop .hdfs .server .federation .FederationTestUtils .getFileSystem ;
24
24
import static org .apache .hadoop .hdfs .server .federation .FederationTestUtils .refreshRoutersCaches ;
25
25
import static org .apache .hadoop .hdfs .server .federation .MockNamenode .registerSubclusters ;
26
+ import static org .apache .hadoop .hdfs .server .federation .router .TestRouterConstants .ASYNC_MODE ;
27
+ import static org .apache .hadoop .hdfs .server .federation .router .TestRouterConstants .SYNC_MODE ;
26
28
import static org .apache .hadoop .hdfs .server .federation .store .FederationStateStoreTestUtils .getStateStoreConfiguration ;
27
29
import static org .junit .jupiter .api .Assertions .assertEquals ;
28
30
import static org .junit .jupiter .api .Assertions .assertNotNull ;
34
36
import java .io .IOException ;
35
37
import java .io .PrintWriter ;
36
38
import java .io .StringWriter ;
39
+ import java .lang .reflect .Method ;
37
40
import java .security .PrivilegedExceptionAction ;
38
41
import java .util .ArrayList ;
39
42
import java .util .Collection ;
76
79
import org .apache .hadoop .ipc .RemoteException ;
77
80
import org .apache .hadoop .security .UserGroupInformation ;
78
81
import org .apache .hadoop .test .LambdaTestUtils ;
79
- import org .junit .jupiter .api .AfterEach ;
80
- import org .junit .jupiter .api .BeforeEach ;
81
- import org .junit .jupiter .api .Test ;
82
+ import org .junit .jupiter .api .Nested ;
83
+ import org .junit .jupiter .api .extension .AfterEachCallback ;
84
+ import org .junit .jupiter .api .extension .BeforeEachCallback ;
85
+ import org .junit .jupiter .api .extension .ExtendWith ;
86
+ import org .junit .jupiter .api .extension .ExtensionContext ;
87
+ import org .junit .jupiter .params .ParameterizedTest ;
88
+ import org .junit .jupiter .params .provider .ValueSource ;
82
89
import org .slf4j .Logger ;
83
90
import org .slf4j .LoggerFactory ;
84
91
@@ -98,16 +105,14 @@ public class TestRouterFaultTolerant {
98
105
99
106
100
107
/** Namenodes for the test per name service id (subcluster). */
101
- private Map <String , MockNamenode > namenodes = new HashMap <>();
108
+ private static Map <String , MockNamenode > namenodes = new HashMap <>();
102
109
/** Routers for the test. */
103
- private List <Router > routers = new ArrayList <>();
110
+ private static List <Router > routers = new ArrayList <>();
104
111
105
112
/** Run test tasks in parallel. */
106
- private ExecutorService service ;
113
+ private static ExecutorService service ;
107
114
108
-
109
- @ BeforeEach
110
- public void setup () throws Exception {
115
+ public static void setup (String rpcMode ) throws Exception {
111
116
LOG .info ("Start the Namenodes" );
112
117
Configuration nnConf = new HdfsConfiguration ();
113
118
nnConf .setInt (DFSConfigKeys .DFS_NAMENODE_HANDLER_COUNT_KEY , 10 );
@@ -141,6 +146,9 @@ public void setup() throws Exception {
141
146
MultipleDestinationMountTableResolver .class ,
142
147
FileSubclusterResolver .class );
143
148
routerConf .addResource (stateStoreConf );
149
+ if (rpcMode .equals (ASYNC_MODE )) {
150
+ routerConf .setBoolean (RBFConfigKeys .DFS_ROUTER_ASYNC_RPC_ENABLE_KEY , true );
151
+ }
144
152
145
153
for (int i = 0 ; i < NUM_ROUTERS ; i ++) {
146
154
// router0 doesn't allow partial listing
@@ -160,8 +168,7 @@ public void setup() throws Exception {
160
168
service = Executors .newFixedThreadPool (10 );
161
169
}
162
170
163
- @ AfterEach
164
- public void cleanup () throws Exception {
171
+ public static void cleanup () throws Exception {
165
172
LOG .info ("Stopping the cluster" );
166
173
for (final MockNamenode nn : namenodes .values ()) {
167
174
nn .stop ();
@@ -202,12 +209,43 @@ private void updateMountPointFaultTolerant(final String mountPoint)
202
209
refreshRoutersCaches (routers );
203
210
}
204
211
212
+ @ Nested
213
+ @ ExtendWith (RouterServerHelperInTestRouterFaultTolerant .class )
214
+ class TestWithSyncRouterRpc {
215
+ @ ParameterizedTest
216
+ @ ValueSource (strings = {SYNC_MODE })
217
+ public void testWriteWithFailedSubclusterSync () throws Exception {
218
+ testWriteWithFailedSubcluster ();
219
+ }
220
+
221
+ @ ParameterizedTest
222
+ @ ValueSource (strings = {SYNC_MODE })
223
+ public void testReadWithFailedSubclusterSync () throws Exception {
224
+ testReadWithFailedSubcluster ();
225
+ }
226
+ }
227
+
228
+ @ Nested
229
+ @ ExtendWith (RouterServerHelperInTestRouterFaultTolerant .class )
230
+ class TestWithAsyncRouterRpc {
231
+ @ ParameterizedTest
232
+ @ ValueSource (strings = {ASYNC_MODE })
233
+ public void testWriteWithFailedSubclusterAsync () throws Exception {
234
+ testWriteWithFailedSubcluster ();
235
+ }
236
+
237
+ @ ParameterizedTest
238
+ @ ValueSource (strings = {ASYNC_MODE })
239
+ public void testReadWithFailedSubclusterAsync () throws Exception {
240
+ testReadWithFailedSubcluster ();
241
+ }
242
+ }
243
+
205
244
/**
206
245
* Test the behavior of the Router when one of the subclusters in a mount
207
246
* point fails. In particular, it checks if it can write files or not.
208
247
* Related to {@link TestRouterRpcMultiDestination#testSubclusterDown()}.
209
248
*/
210
- @ Test
211
249
public void testWriteWithFailedSubcluster () throws Exception {
212
250
213
251
LOG .info ("Stop ns1 to simulate an unavailable subcluster" );
@@ -614,7 +652,6 @@ private FileSystem getRandomRouterFileSystem() throws Exception {
614
652
(PrivilegedExceptionAction <FileSystem >) () -> getFileSystem (router ));
615
653
}
616
654
617
- @ Test
618
655
public void testReadWithFailedSubcluster () throws Exception {
619
656
620
657
DestinationOrder order = DestinationOrder .HASH_ALL ;
@@ -673,3 +710,30 @@ public void testReadWithFailedSubcluster() throws Exception {
673
710
}
674
711
}
675
712
}
713
+
714
+ class RouterServerHelperInTestRouterFaultTolerant implements
715
+ BeforeEachCallback , AfterEachCallback {
716
+ public static final ThreadLocal <RouterServerHelperInTestRouterFaultTolerant >
717
+ TEST_ROUTER_SERVER_TL = new InheritableThreadLocal <>();
718
+
719
+ @ Override
720
+ public void afterEach (ExtensionContext context ) throws Exception {
721
+ TestRouterFaultTolerant .cleanup ();
722
+ TEST_ROUTER_SERVER_TL .remove ();
723
+ }
724
+
725
+ @ Override
726
+ public void beforeEach (ExtensionContext context ) throws Exception {
727
+ Method testMethod = context .getRequiredTestMethod ();
728
+ ValueSource enumAnnotation = testMethod .getAnnotation (ValueSource .class );
729
+ if (enumAnnotation != null ) {
730
+ String [] strings = enumAnnotation .strings ();
731
+ for (String rpcMode : strings ) {
732
+ if (TEST_ROUTER_SERVER_TL .get () == null ) {
733
+ TestRouterFaultTolerant .setup (rpcMode );
734
+ }
735
+ }
736
+ }
737
+ TEST_ROUTER_SERVER_TL .set (RouterServerHelperInTestRouterFaultTolerant .this );
738
+ }
739
+ }
0 commit comments