Skip to content

3.x: TestScheduler option to use onSchedule hook #7163

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ public static Scheduler computation() {
* @{@link io.reactivex.rxjava3.annotations.SchedulerSupport SchedulerSupport}({@link io.reactivex.rxjava3.annotations.SchedulerSupport#IO IO})
* annotation.
* <p>
* When the {@link Scheduler.Worker} is disposed, the underlying worker can be released to the cached worker pool in two modes:
* When the {@link io.reactivex.rxjava3.core.Scheduler.Worker Scheduler.Worker} is disposed,
* the underlying worker can be released to the cached worker pool in two modes:
* <ul>
* <li>In <em>eager</em> mode (default), the underlying worker is returned immediately to the cached worker pool
* and can be reused much quicker by operators. The drawback is that if the currently running task doesn't
Expand Down
76 changes: 66 additions & 10 deletions src/main/java/io/reactivex/rxjava3/schedulers/TestScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,28 @@

import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.annotations.*;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.disposables.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

/**
* A special, non thread-safe scheduler for testing operators that require
* a scheduler without introducing real concurrency and allows manually advancing
* a virtual time.
* <p>
* By default, the tasks submitted via the various {@code schedule} methods are not
* wrapped by the {@link RxJavaPlugins#onSchedule(Runnable)} hook. To enable this behavior,
* create a {@code TestScheduler} via {@link #TestScheduler(boolean)} or {@link #TestScheduler(long, TimeUnit, boolean)}.
*/
public final class TestScheduler extends Scheduler {
/** The ordered queue for the runnable tasks. */
final Queue<TimedRunnable> queue = new PriorityBlockingQueue<>(11);
/** Use the {@link RxJavaPlugins#onSchedule(Runnable)} hook when scheduling tasks. */
final boolean useOnScheduleHook;
/** The per-scheduler global order counter. */
long counter;
// Storing time in nanoseconds internally.
Expand All @@ -38,7 +46,20 @@ public final class TestScheduler extends Scheduler {
* Creates a new TestScheduler with initial virtual time of zero.
*/
public TestScheduler() {
// No-op.
this(false);
}

/**
* Creates a new TestScheduler with the option to use the
* {@link RxJavaPlugins#onSchedule(Runnable)} hook when scheduling tasks.
* @param useOnScheduleHook if {@code true}, the tasks submitted to this
* TestScheduler is wrapped via the
* {@link RxJavaPlugins#onSchedule(Runnable)} hook
* @since 3.0.10 - experimental
*/
@Experimental
public TestScheduler(boolean useOnScheduleHook) {
this.useOnScheduleHook = useOnScheduleHook;
}

/**
Expand All @@ -50,7 +71,27 @@ public TestScheduler() {
* the units of time that {@code delayTime} is expressed in
*/
public TestScheduler(long delayTime, TimeUnit unit) {
this(delayTime, unit, false);
}

/**
* Creates a new TestScheduler with the specified initial virtual time
* and with the option to use the
* {@link RxJavaPlugins#onSchedule(Runnable)} hook when scheduling tasks.
*
* @param delayTime
* the point in time to move the Scheduler's clock to
* @param unit
* the units of time that {@code delayTime} is expressed in
* @param useOnScheduleHook if {@code true}, the tasks submitted to this
* TestScheduler is wrapped via the
* {@link RxJavaPlugins#onSchedule(Runnable)} hook
* @since 3.0.10 - experimental
*/
@Experimental
public TestScheduler(long delayTime, TimeUnit unit, boolean useOnScheduleHook) {
time = unit.toNanos(delayTime);
this.useOnScheduleHook = useOnScheduleHook;
}

static final class TimedRunnable implements Comparable<TimedRunnable> {
Expand Down Expand Up @@ -163,10 +204,13 @@ public Disposable schedule(@NonNull Runnable run, long delayTime, @NonNull TimeU
if (disposed) {
return EmptyDisposable.INSTANCE;
}
if (useOnScheduleHook) {
run = RxJavaPlugins.onSchedule(run);
}
final TimedRunnable timedAction = new TimedRunnable(this, time + unit.toNanos(delayTime), run, counter++);
queue.add(timedAction);

return Disposable.fromRunnable(new QueueRemove(timedAction));
return new QueueRemove(timedAction);
}

@NonNull
Expand All @@ -175,26 +219,38 @@ public Disposable schedule(@NonNull Runnable run) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
if (useOnScheduleHook) {
run = RxJavaPlugins.onSchedule(run);
}
final TimedRunnable timedAction = new TimedRunnable(this, 0, run, counter++);
queue.add(timedAction);
return Disposable.fromRunnable(new QueueRemove(timedAction));
return new QueueRemove(timedAction);
}

@Override
public long now(@NonNull TimeUnit unit) {
return TestScheduler.this.now(unit);
}

final class QueueRemove implements Runnable {
final TimedRunnable timedAction;
final class QueueRemove extends AtomicReference<TimedRunnable> implements Disposable {

private static final long serialVersionUID = -7874968252110604360L;

QueueRemove(TimedRunnable timedAction) {
this.timedAction = timedAction;
this.lazySet(timedAction);
}

@Override
public void dispose() {
TimedRunnable tr = getAndSet(null);
if (tr != null) {
queue.remove(tr);
}
}

@Override
public void run() {
queue.remove(timedAction);
public boolean isDisposed() {
return get() == null;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription;
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.schedulers.TestScheduler.*;

public class TestSchedulerTest extends RxJavaTest {
Expand Down Expand Up @@ -260,4 +261,95 @@ public void constructorTimeSetsTime() {
assertEquals(5, ts.now(TimeUnit.SECONDS));
assertEquals(5000, ts.now(TimeUnit.MILLISECONDS));
}

@Test
public void withOnScheduleHook() {
AtomicInteger run = new AtomicInteger();
AtomicInteger counter = new AtomicInteger();
RxJavaPlugins.setScheduleHandler(r -> {
counter.getAndIncrement();
return r;
});
try {
Runnable r = () -> run.getAndIncrement();
TestScheduler ts = new TestScheduler(true);

ts.createWorker().schedule(r);
ts.createWorker().schedule(r, 1, TimeUnit.SECONDS);

ts.advanceTimeBy(1, TimeUnit.SECONDS);

assertEquals(2, run.get());
assertEquals(2, counter.get());

ts = new TestScheduler();

ts.createWorker().schedule(r);
ts.createWorker().schedule(r, 1, TimeUnit.SECONDS);

ts.advanceTimeBy(1, TimeUnit.SECONDS);

assertEquals(4, run.get());
assertEquals(2, counter.get());
} finally {
RxJavaPlugins.setScheduleHandler(null);
}
}

@Test
public void withOnScheduleHookInitialTime() {
AtomicInteger run = new AtomicInteger();
AtomicInteger counter = new AtomicInteger();
RxJavaPlugins.setScheduleHandler(r -> {
counter.getAndIncrement();
return r;
});
try {
Runnable r = () -> run.getAndIncrement();
TestScheduler ts = new TestScheduler(1, TimeUnit.HOURS, true);

ts.createWorker().schedule(r);
ts.createWorker().schedule(r, 1, TimeUnit.SECONDS);

ts.advanceTimeBy(1, TimeUnit.SECONDS);

assertEquals(2, run.get());
assertEquals(2, counter.get());

ts = new TestScheduler(1, TimeUnit.HOURS);

ts.createWorker().schedule(r);
ts.createWorker().schedule(r, 1, TimeUnit.SECONDS);

ts.advanceTimeBy(1, TimeUnit.SECONDS);

assertEquals(4, run.get());
assertEquals(2, counter.get());
} finally {
RxJavaPlugins.setScheduleHandler(null);
}
}

@Test
public void disposeWork() {
AtomicInteger run = new AtomicInteger();
Runnable r = () -> run.getAndIncrement();
TestScheduler ts = new TestScheduler(1, TimeUnit.HOURS, true);

Disposable d = ts.createWorker().schedule(r);

assertFalse(d.isDisposed());

d.dispose();

assertTrue(d.isDisposed());

d.dispose();

assertTrue(d.isDisposed());

ts.advanceTimeBy(1, TimeUnit.SECONDS);

assertEquals(0, run.get());
}
}