Skip to content

Observable-aware version of doOnNext for handling asynchronous side-effects #3989

Closed
@tednaleid

Description

@tednaleid

doOnNext is useful for doing side-effects in a synchronous manner, but sometimes I want to perform a side-effect on my stream using a function that returns an Observable. If the side-effect fails, I want to be notified of that downstream, but if it succeeds, I want the stream to continue with the original object.

The use-case that I've got on my current project is doing an asynchronous save of an object followed by publishing a notification of the updated object on one or more message queue topics. I want the saved object to be returned to callers of the save method. Currently, I need to do something nested like this (groovy code):

public Observable<Widget> saveAndPublish(Widget widget) {
    return save(widget)
        .flatMap { Widget savedWidget ->
            return publishNotification(savedWidget)
                .map { NotificationResult ignore -> savedWidget }
        }
}

public Observable<Widget> save(Widget widget) { ... }
public Observable<NotificationResult> publishNotification(Widget widget) { ... }

And that code works if there is only one notification returned per Widget, otherwise it'd emit savedWidget once for every notification in the stream.

If I need to ignore multiple notifications, I need to do:

public Observable<Widget> saveAndPublish(Widget widget) {
    return save(widget)
        .flatMap { Widget savedWidget ->
            return publishNotification(savedWidget)
                    .ignoreElements()
                    .cast(Widget)
                    .defaultIfEmpty(savedWidget)
        }
}

If the notifications had been synchronous, I could have used doOnNext:

public Observable<Widget> saveAndPublish(Widget widget) {
    return save(widget)
        .doOnNext(this::publishNotification);
}

But that doesn't work because the notifications return an Observable.

This pattern has been common enough in my code that I created a groovy extension jar that has a method in it called flatTap that does asynchronous side-effects without changing the stream. It lets me change my code from the above to:

public Observable<Widget> saveAndPublish(Widget widget) {
    return save(widget)
        .flatTap(this::publishNotification);
}

This feels like a gap to me in RxJava. I've got map for synchronously changing the stream, and flatMap for asynchronously changing it. I've got doOnNext for synchronous side effects that don't modify the stream, but no equivalent to flatMap for doing asynchronous side effects.

If there are others that would find this useful, I'd be willing to work on putting together a PR to integrate this into RxJava.

I'm also open to changing the naming of it. In my jar, I aliased doOnNext to tap (which is a method in Ruby which does the same thing) and called my asynch method flatTap as I liked the symmetry of:

map : flatMap :: tap : flatTap

but I can see how that might not fit within the existing conventions of Rx.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions