From 980d51daca41702e2b99a50b6cdfd9be8859e75c Mon Sep 17 00:00:00 2001 From: Sergio Garcia Date: Mon, 25 Jan 2021 11:39:01 +0000 Subject: [PATCH 1/3] Support for scheduled release of threads in Io Scheduler --- .../internal/schedulers/IoScheduler.java | 21 ++++++++++++++++--- .../rxjava3/schedulers/Schedulers.java | 6 ++++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/schedulers/IoScheduler.java b/src/main/java/io/reactivex/rxjava3/internal/schedulers/IoScheduler.java index 6d63715770..0e0939cc58 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/schedulers/IoScheduler.java +++ b/src/main/java/io/reactivex/rxjava3/internal/schedulers/IoScheduler.java @@ -48,6 +48,10 @@ public final class IoScheduler extends Scheduler { /** The name of the system property for setting the thread priority for this Scheduler. */ private static final String KEY_IO_PRIORITY = "rx3.io-priority"; + /** The name of the system property for setting the release behaviour for this Scheduler. */ + private static final String KEY_SCHEDULED_RELEASE = "rx3.io-scheduled-release"; + private static final boolean USE_SCHEDULED_RELEASE; + static final CachedWorkerPool NONE; static { @@ -63,6 +67,8 @@ public final class IoScheduler extends Scheduler { EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority); + USE_SCHEDULED_RELEASE = Boolean.getBoolean(KEY_SCHEDULED_RELEASE); + NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY); NONE.shutdown(); } @@ -194,7 +200,7 @@ public int size() { return pool.get().allWorkers.size(); } - static final class EventLoopWorker extends Scheduler.Worker { + static final class EventLoopWorker extends Scheduler.Worker implements Runnable { private final CompositeDisposable tasks; private final CachedWorkerPool pool; private final ThreadWorker threadWorker; @@ -212,11 +218,20 @@ public void dispose() { if (once.compareAndSet(false, true)) { tasks.dispose(); - // releasing the pool should be the last action - pool.release(threadWorker); + if (USE_SCHEDULED_RELEASE) { + threadWorker.scheduleActual(this, 0, TimeUnit.NANOSECONDS, null); + } else { + // releasing the pool should be the last action + pool.release(threadWorker); + } } } + @Override + public void run() { + pool.release(threadWorker); + } + @Override public boolean isDisposed() { return once.get(); diff --git a/src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java b/src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java index ad7ff69bc7..90689654b9 100644 --- a/src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java +++ b/src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java @@ -32,6 +32,9 @@ *