Skip to content

Commit e657635

Browse files
authored
Io Scheduler, Scheduled worker release. (#7160)
* Support for scheduled release of threads in Io Scheduler * Test for rx3.io-scheduled-release * Fix tests, addressed Javadoc feedback
1 parent e46ce76 commit e657635

File tree

3 files changed

+88
-3
lines changed

3 files changed

+88
-3
lines changed

src/main/java/io/reactivex/rxjava3/internal/schedulers/IoScheduler.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ public final class IoScheduler extends Scheduler {
4848
/** The name of the system property for setting the thread priority for this Scheduler. */
4949
private static final String KEY_IO_PRIORITY = "rx3.io-priority";
5050

51+
/** The name of the system property for setting the release behaviour for this Scheduler. */
52+
private static final String KEY_SCHEDULED_RELEASE = "rx3.io-scheduled-release";
53+
static boolean USE_SCHEDULED_RELEASE;
54+
5155
static final CachedWorkerPool NONE;
5256

5357
static {
@@ -63,6 +67,8 @@ public final class IoScheduler extends Scheduler {
6367

6468
EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority);
6569

70+
USE_SCHEDULED_RELEASE = Boolean.getBoolean(KEY_SCHEDULED_RELEASE);
71+
6672
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
6773
NONE.shutdown();
6874
}
@@ -194,7 +200,7 @@ public int size() {
194200
return pool.get().allWorkers.size();
195201
}
196202

197-
static final class EventLoopWorker extends Scheduler.Worker {
203+
static final class EventLoopWorker extends Scheduler.Worker implements Runnable {
198204
private final CompositeDisposable tasks;
199205
private final CachedWorkerPool pool;
200206
private final ThreadWorker threadWorker;
@@ -212,11 +218,20 @@ public void dispose() {
212218
if (once.compareAndSet(false, true)) {
213219
tasks.dispose();
214220

215-
// releasing the pool should be the last action
216-
pool.release(threadWorker);
221+
if (USE_SCHEDULED_RELEASE) {
222+
threadWorker.scheduleActual(this, 0, TimeUnit.NANOSECONDS, null);
223+
} else {
224+
// releasing the pool should be the last action
225+
pool.release(threadWorker);
226+
}
217227
}
218228
}
219229

230+
@Override
231+
public void run() {
232+
pool.release(threadWorker);
233+
}
234+
220235
@Override
221236
public boolean isDisposed() {
222237
return once.get();

src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
* <ul>
3333
* <li>{@code rx3.io-keep-alive-time} (long): sets the keep-alive time of the {@link #io()} Scheduler workers, default is {@link IoScheduler#KEEP_ALIVE_TIME_DEFAULT}</li>
3434
* <li>{@code rx3.io-priority} (int): sets the thread priority of the {@link #io()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
35+
* <li>{@code rx3.io-scheduled-release} (boolean): {@code true} sets the worker release mode of the
36+
* {@link #io()} Scheduler to <em>scheduled</em>, default is {@code false} for <em>eager</em> mode.</li>
3537
* <li>{@code rx3.computation-threads} (int): sets the number of threads in the {@link #computation()} Scheduler, default is the number of available CPUs</li>
3638
* <li>{@code rx3.computation-priority} (int): sets the thread priority of the {@link #computation()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
3739
* <li>{@code rx3.newthread-priority} (int): sets the thread priority of the {@link #newThread()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
@@ -159,6 +161,8 @@ public static Scheduler computation() {
159161
* <ul>
160162
* <li>{@code rx3.io-keep-alive-time} (long): sets the keep-alive time of the {@code io()} Scheduler workers, default is {@link IoScheduler#KEEP_ALIVE_TIME_DEFAULT}</li>
161163
* <li>{@code rx3.io-priority} (int): sets the thread priority of the {@code io()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
164+
* <li>{@code rx3.io-scheduled-release} (boolean): {@code true} sets the worker release mode of the
165+
* {@code #io()} Scheduler to <em>scheduled</em>, default is {@code false} for <em>eager</em> mode.</li>
162166
* </ul>
163167
* <p>
164168
* The default value of this scheduler can be overridden at initialization time via the
@@ -175,6 +179,21 @@ public static Scheduler computation() {
175179
* <p>Operators on the base reactive classes that use this scheduler are marked with the
176180
* &#64;{@link io.reactivex.rxjava3.annotations.SchedulerSupport SchedulerSupport}({@link io.reactivex.rxjava3.annotations.SchedulerSupport#IO IO})
177181
* annotation.
182+
* <p>
183+
* When the {@link Scheduler.Worker} is disposed, the underlying worker can be released to the cached worker pool in two modes:
184+
* <ul>
185+
* <li>In <em>eager</em> mode (default), the underlying worker is returned immediately to the cached worker pool
186+
* and can be reused much quicker by operators. The drawback is that if the currently running task doesn't
187+
* respond to interruption in time or at all, this may lead to delays or deadlock with the reuse use of the
188+
* underlying worker.
189+
* </li>
190+
* <li>In <em>scheduled</em> mode (enabled via the system parameter {@code rx3.io-scheduled-release}
191+
* set to {@code true}), the underlying worker is returned to the cached worker pool only after the currently running task
192+
* has finished. This can help prevent premature reuse of the underlying worker and likely won't lead to delays or
193+
* deadlock with such reuses. The drawback is that the delay in release may lead to an excess amount of underlying
194+
* workers being created.
195+
* </li>
196+
* </ul>
178197
* @return a {@link Scheduler} meant for IO-bound work
179198
*/
180199
@NonNull
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.schedulers;
15+
16+
import io.reactivex.rxjava3.core.Completable;
17+
import io.reactivex.rxjava3.core.Flowable;
18+
import io.reactivex.rxjava3.core.RxJavaTest;
19+
import io.reactivex.rxjava3.schedulers.Schedulers;
20+
import org.junit.Test;
21+
22+
import java.util.concurrent.TimeUnit;
23+
24+
public class IoScheduledReleaseTest extends RxJavaTest {
25+
26+
/* This test will be stuck in a deadlock if IoScheduler.USE_SCHEDULED_RELEASE is not set */
27+
@Test
28+
public void scheduledRelease() {
29+
boolean savedScheduledRelease = IoScheduler.USE_SCHEDULED_RELEASE;
30+
IoScheduler.USE_SCHEDULED_RELEASE = true;
31+
try {
32+
Flowable.just("item")
33+
.observeOn(Schedulers.io())
34+
.firstOrError()
35+
.map(item -> {
36+
for (int i = 0; i < 50; i++) {
37+
Completable.complete()
38+
.observeOn(Schedulers.io())
39+
.blockingAwait();
40+
}
41+
return "Done";
42+
})
43+
.ignoreElement()
44+
.test()
45+
.awaitDone(5, TimeUnit.SECONDS)
46+
.assertComplete();
47+
} finally {
48+
IoScheduler.USE_SCHEDULED_RELEASE = savedScheduledRelease;
49+
}
50+
}
51+
}

0 commit comments

Comments
 (0)