|
20 | 20 |
|
21 | 21 | import java.io.IOException;
|
22 | 22 | import java.util.ArrayList;
|
| 23 | +import java.util.Arrays; |
23 | 24 | import java.util.List;
|
24 | 25 |
|
| 26 | +import java.util.concurrent.Callable; |
| 27 | +import java.util.concurrent.ExecutorService; |
| 28 | +import java.util.concurrent.Executors; |
| 29 | +import java.util.concurrent.Future; |
| 30 | +import java.util.concurrent.TimeUnit; |
25 | 31 | import java.util.function.Supplier;
|
26 | 32 | import org.apache.curator.RetryPolicy;
|
27 | 33 | import org.apache.curator.framework.CuratorFramework;
|
@@ -572,4 +578,53 @@ public void testCreateNameSpaceRepeatedly() throws Exception {
|
572 | 578 | "KeeperErrorCode = NodeExists for "+workingPath,
|
573 | 579 | () -> createModeStat.forPath(workingPath));
|
574 | 580 | }
|
| 581 | + |
| 582 | + @Test |
| 583 | + public void testMultipleInit() throws Exception { |
| 584 | + |
| 585 | + String connectString = zkServer.getConnectString(); |
| 586 | + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); |
| 587 | + Configuration conf = getSecretConf(connectString); |
| 588 | + CuratorFramework curatorFramework = |
| 589 | + CuratorFrameworkFactory.builder() |
| 590 | + .connectString(connectString) |
| 591 | + .retryPolicy(retryPolicy) |
| 592 | + .build(); |
| 593 | + curatorFramework.start(); |
| 594 | + ZKDelegationTokenSecretManager.setCurator(curatorFramework); |
| 595 | + |
| 596 | + DelegationTokenManager tm1 = new DelegationTokenManager(conf, new Text("foo")); |
| 597 | + DelegationTokenManager tm2 = new DelegationTokenManager(conf, new Text("bar")); |
| 598 | + // When the init method is called, |
| 599 | + // the ZKDelegationTokenSecretManager#startThread method will be called, |
| 600 | + // and the creatingParentContainersIfNeeded will be called to create the nameSpace. |
| 601 | + ExecutorService executorService = Executors.newFixedThreadPool(2); |
| 602 | + |
| 603 | + Callable<Boolean> tm1Callable = () -> { |
| 604 | + tm1.init(); |
| 605 | + return true; |
| 606 | + }; |
| 607 | + Callable<Boolean> tm2Callable = () -> { |
| 608 | + tm2.init(); |
| 609 | + return true; |
| 610 | + }; |
| 611 | + List<Future<Boolean>> futures = executorService.invokeAll( |
| 612 | + Arrays.asList(tm1Callable, tm2Callable)); |
| 613 | + for(Future<Boolean> future : futures) { |
| 614 | + Assert.assertTrue(future.get()); |
| 615 | + } |
| 616 | + executorService.shutdownNow(); |
| 617 | + Assert.assertTrue(executorService.awaitTermination(1, TimeUnit.SECONDS)); |
| 618 | + tm1.destroy(); |
| 619 | + tm2.destroy(); |
| 620 | + |
| 621 | + String workingPath = "/" + conf.get(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, |
| 622 | + ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT) + "/ZKDTSMRoot"; |
| 623 | + |
| 624 | + // Check if the created NameSpace exists. |
| 625 | + Stat stat = curatorFramework.checkExists().forPath(workingPath); |
| 626 | + Assert.assertNotNull(stat); |
| 627 | + |
| 628 | + curatorFramework.close(); |
| 629 | + } |
575 | 630 | }
|
0 commit comments