1
1
package io .javaoperatorsdk .operator .api .config ;
2
2
3
+ import java .util .concurrent .BlockingQueue ;
3
4
import java .util .concurrent .ExecutorService ;
4
- import java .util .concurrent .ScheduledThreadPoolExecutor ;
5
+ import java .util .concurrent .LinkedBlockingQueue ;
6
+ import java .util .concurrent .RejectedExecutionHandler ;
7
+ import java .util .concurrent .ThreadFactory ;
8
+ import java .util .concurrent .ThreadPoolExecutor ;
9
+ import java .util .concurrent .TimeUnit ;
5
10
import java .util .concurrent .atomic .AtomicReference ;
6
11
7
12
class ExecutorServiceProducer {
8
13
9
- private final static AtomicReference <ScheduledThreadPoolExecutor > executor =
14
+ private final static AtomicReference <ThreadPoolExecutor > executor =
10
15
new AtomicReference <>();
11
16
12
17
static ExecutorService getExecutor (int threadPoolSize ) {
13
18
final var gotSet =
14
- executor .compareAndSet (null , new ScheduledThreadPoolExecutor (threadPoolSize ));
19
+ executor .compareAndSet (null , new DebugThreadPoolExecutor (threadPoolSize , threadPoolSize , 0L ,
20
+ TimeUnit .MILLISECONDS , new LinkedBlockingQueue <>()));
15
21
final var result = executor .get ();
16
22
if (!gotSet ) {
17
23
// check that we didn't try to change the pool size
@@ -23,4 +29,37 @@ static ExecutorService getExecutor(int threadPoolSize) {
23
29
}
24
30
return result ;
25
31
}
32
+
33
+ private static class DebugThreadPoolExecutor extends ThreadPoolExecutor {
34
+
35
+ public DebugThreadPoolExecutor (int corePoolSize , int maximumPoolSize , long keepAliveTime ,
36
+ TimeUnit unit ,
37
+ BlockingQueue <Runnable > workQueue ) {
38
+ super (corePoolSize , maximumPoolSize , keepAliveTime , unit , workQueue );
39
+ }
40
+
41
+ public DebugThreadPoolExecutor (int corePoolSize , int maximumPoolSize , long keepAliveTime ,
42
+ TimeUnit unit , BlockingQueue <Runnable > workQueue ,
43
+ ThreadFactory threadFactory ) {
44
+ super (corePoolSize , maximumPoolSize , keepAliveTime , unit , workQueue , threadFactory );
45
+ }
46
+
47
+ public DebugThreadPoolExecutor (int corePoolSize , int maximumPoolSize , long keepAliveTime ,
48
+ TimeUnit unit , BlockingQueue <Runnable > workQueue ,
49
+ RejectedExecutionHandler handler ) {
50
+ super (corePoolSize , maximumPoolSize , keepAliveTime , unit , workQueue , handler );
51
+ }
52
+
53
+ public DebugThreadPoolExecutor (int corePoolSize , int maximumPoolSize , long keepAliveTime ,
54
+ TimeUnit unit , BlockingQueue <Runnable > workQueue ,
55
+ ThreadFactory threadFactory , RejectedExecutionHandler handler ) {
56
+ super (corePoolSize , maximumPoolSize , keepAliveTime , unit , workQueue , threadFactory , handler );
57
+ }
58
+
59
+ @ Override
60
+ public void shutdown () {
61
+ Thread .dumpStack ();
62
+ super .shutdown ();
63
+ }
64
+ }
26
65
}
0 commit comments