Skip to content

Combine, an iterator adapter which statefully maps multiple input iterations to a single output iteration #379

@ajwock

Description

@ajwock

Proposal

Problem statement

A common iteration pattern I find myself wanting an adapter for is taking several iterations from a wrapped iterator which accumulate in state shared across those iterations to produce a single element from the wrapping iterator.

Motivating examples or use cases

One of the simplest usage examples for this might be converting an iterator of characters that contains null terminated strings into an iterator which just yields the strings, which is similar to the most recent use case which motivated this: parsing the names of functions in the __llvm_prf_names section in llvm profile instrumented binaries.

This is also useful for implementing finite state machines or stack automata which yield values upon reaching certain states and then continue parsing input.

Solution sketch

This API is an addition to std::Iterator. On any iterator, calling '.combine' will wrap it in the iterator below.

This iterator takes in an arbitrary State and a FnMut taking a mutable reference to the state and an element from the iterator being wrapped and returning Option<B> where B is yielded from the iterator. This is the same lambda prototype that Scan uses, though Scan and Combine are for largely unrelated use cases. The behavioral difference is that unlike Scan, a return value of None doesn't stop iteration. Instead, it continues consuming values from the iterator it wraps until a call to F yields Some.

The idea being that many input iterations are combined to yield a single value.

into_inner yields the internal state of the iterator, which may be useful in case reaching the end of iteration might be a valid time to yield a value. If this is needed as part of the iterator, calling .finish will create a CombineFinish iterator that handles this case using a FnOnce that is called upon the Combine iterator yielding None.

Here is my napkin implementation for this, which was sufficient to try it out in a motivating use case (not shown is the trait I used to stick .combine onto all implementations of Iterator. This would just be an additional method for the Iterator trait itself in practice)

pub struct Combine<I, St, F> {
    iter: I,
    f: F,
    state: St,
}

impl<B, I, St, F> Combine<I, St, F>
where
    I: Iterator,
    F: FnMut(&mut St, I::Item) -> Option<B>
{
    pub fn new(iter: I, state: St, f: F) -> Combine<I, St, F> {
        Combine { iter, state, f }
    }

    pub fn into_inner(self) -> St {
        self.state
    }

    pub fn finish<G: FnOnce(St) -> Option<B>>(self, g: G) -> CombineFinish<I, St, F, G> {
        CombineFinish::new(self, g)
    }
}

impl<B, I, St, F> Iterator for Combine<I, St, F>
where
    I: Iterator,
    F: FnMut(&mut St, I::Item) -> Option<B>
{
    type Item = B;

    fn next(&mut self) -> Option<B> {
        while let Some(item) = self.iter.next() {
            if let Some(output) = (self.f)(&mut self.state, item) {
                return Some(output)
            }
        }
        None
    }
}

pub struct CombineFinish<I, St, F, G> {
    inner: Option<(Combine<I, St, F>, G)>,
}

impl<B, I, St, F, G> CombineFinish<I, St, F, G>
where
    I: Iterator,
    G: FnOnce(St) -> Option<B>
{
    pub fn new(iter: Combine<I, St, F>, g: G) -> CombineFinish<I, St, F, G> {
        Self { inner: Some((iter, g)) }
    }
}

impl<B, I, St, F, G> Iterator for CombineFinish<I, St, F, G>
where
    I: Iterator,
    F: FnMut(&mut St, I::Item) -> Option<B>,
    G: FnOnce(St) -> Option<B>
{
    type Item = B;

    fn next(&mut self) -> Option<B> {
        match self.inner.as_mut()?.0.next() {
            Some(output) => Some(output),
            None => {
                let (iter, g) = self.inner.take()?;
                g(iter.into_inner())
            }
        }
    }    
}

A usage example (parsing function names from a decompressed __llvm_prf_names block):

let fn_names = outbuf.drain(..).combine(Vec::new(), |v, c| {
        match c {
            0x01 => return Some(std::mem::replace(v, Vec::new())),
            b':' => v.clear(),
            x => v.push(x)
        }
        None
    }).finish(|v| Some(v)).collect::<Vec<_>>();

Function names are terminated with 0x01, and the last function name in the block is not terminated so it is yielded when there are no more bytes left in the buffer. Some function names may be prepended with their containing filename followed by a colon, and in my usage I was uninterested in keeping that filename.

Alternatives

Indeed there are a number of ways to achieve this with existing APIs, though it would either require significant additional boilerplate such as making a custom iterator wrapper. Some creative use of filter_map might get you halfway there, but it would require declaring the internal state on a separate line and passing a reference to it in a closure and the name filter_map doesn't quite communicate what you're really trying to do. There's another trouble as well- what if the end of the input iterator represents a valid state to yield a value from? Therein lies the purpose of the optional CombineFinish extension.

This is a concise and inline way to express simple state machines which convert a series of tokens into a series of larger data yielded by combining those tokens, which occurs quite commonly in practice. This is a tool I I would have reached for several times in the past if it were available.

I will admit I had a hard time deciding on a satisfying name for this, so naming suggestions are more than welcome if you disagree that 'combine' or 'finish' get the point across well.

Links and related work

Similar to Java Stream's Gatherer: Java SE 22 Gatherer
Gatherer enables Many-to-One, One-to-One, and One-to-Many stream transformations. It likewise carries internal state and has .finisher(), which is run after the input stream is ended.

Activity

kennytm

kennytm commented on May 13, 2024

@kennytm
Member

for your particular case you could simply use

let fn_names = outbuf
        .split(|c| *c == 0x01)
        .map(|b| {
            // for simplicity i'm using .rsplit_once() here, but you could also use .rsplitn(2) in stable
            b.rsplit_once(|c| *c == b':')
                .map_or(b, |(_, suffix)| suffix)
                .to_vec()
        })
        .collect::<Vec<_>>();

(this won't "consume" the outbuf, but your original code will be copying the whole vector too)

it may even be more efficient using regex::bytes.


Some creative use of filter_map might get you halfway there, but it would require declaring the internal state on a separate line and passing a reference to it in a closure and the name filter_map doesn't quite communicate what you're really trying to do.

This does not sound like a convincing argument 😅. And in general, you can scope the state using a block expression:

        .filter_map({
            let mut v = Vec::new();
            move |c| {
                match c {
                    0x01 => return Some(std::mem::replace(&mut v, Vec::new())),
                    b':' => v.clear(),
                    x => v.push(x),
                }
                None
            }
        })

There's another trouble as well- what if the end of the input iterator represents a valid state to yield a value from?

You can .chain the sentinel (b'\x01') before calling .filter_map().

Unfortunately you cannot .chain(once(v)) after the .filter_map because of the borrow checker.

I think that some method to expose the final state of std::iter::Scan would be very useful to address this issue, e.g.

impl<I, St, F> Scan<I, St, F> {
    /// When the source iterator was exhausted (with the scan function never returning `None`),
    /// chain an additional iterator to the end using the final state as the input
    pub fn chain_when_exhausted(self, f: impl FnOnce(St) -> impl IntoIterator<Item = I::Item>) -> impl Iterator<Item = I::Item> {
        ...
    }
}

then the original example should become

let fn_name = outbuf
    .drain(..)
    .scan(Vec::new(), |v, c| {
        match c {
                0x01 => return Some(Some(std::mem::replace(v, Vec::new()))),
                b':' => v.clear(),
                x => v.push(x)
        }
        Some(None)
    })
    .chain_when_exhausted(|v| once(Some(v)))
    .filter_map(|v| v)
    .collect::<Vec<_>>()
ajwock

ajwock commented on May 13, 2024

@ajwock
Author

Yes, the sentinel value is an option. In fact, this functionality is completely available with current iterators, even if we pretend for the purposes of discussion here that instead of vec of 'u8' we're dealing with some abstract iterator yielding 'token'. Maybe I should look through my projects and find a real example of this. Not all tokens may have a natural sentinel value, but we can give them one by wrapping them in Option:

let fn_names = outbuf
        .drain(..)
        .map(|c| Some(c))
        .chain(std::iter::once(None))
        .scan(Vec::new(), |v, c| {
            match c {
                Some(0x01) | None => return Some(Some(std::mem::replace(v, Vec::new()))),
                Some(b':') => v.clear(),
                Some(x) => v.push(x)
            }
            Some(None)
        })
        .filter_map(|v| v)
        .collect()

Although this is an alternative way to achieve what Combine does, I believe it demonstrates the value of Combine through being rather noisy.

Though adding APIs has a cost, I believe Combine pays it through serving a common use pattern that is spottily covered by the existing adapters.

the8472

the8472 commented on May 13, 2024

@the8472
Member

Links and related work

Have you taken a look at Java's Stream Collector and Gatherer APIs?

ajwock

ajwock commented on May 13, 2024

@ajwock
Author

I'm not familiar with those, but this is indeed similar to Gatherer now that I'm reading about it. Thanks, I'll put that in the proposal.

traviscross

traviscross commented on May 21, 2024

@traviscross

We discussed this in the libs-api meeting today.

The feeling is that the team would like to see this first prototyped in itertools.

We'd also be interested, @ajwock, in whether gen blocks (or gen closures) would affect your thinking about the need for this API or the design of it. The motivation you state above is:

A common iteration pattern I find myself wanting an adapter for is taking several iterations from a wrapped iterator which accumulate in state shared across those iterations to produce a single element from the wrapping iterator.

In the gen blocks RFC, we give as a motivating example run-length encoding, which similarly must accumulate across several iterations:

//@ edition: 2024
#![feature(gen_blocks)]
fn rl_encode<I: IntoIterator<Item = u8>>(
    xs: I,
) -> impl Iterator<Item = u8> {
    gen {
        let mut xs = xs.into_iter();
        let (Some(mut cur), mut n) = (xs.next(), 0) else { return };
        for x in xs {
            if x == cur && n < u8::MAX {
                n += 1;
            } else {
                yield n; yield cur;
                (cur, n) = (x, 0);
            }
        }
        yield n; yield cur;
    }.into_iter()
}

Playground link

ajwock

ajwock commented on May 21, 2024

@ajwock
Author

There's something I really like about gen for this use-case, which is that it probably optimizes better (or has more potential to do so in the future) for the case of handling the final iteration due to (I assume) running as a sort of coroutine rather than 'manually' checking whether each iteration is the last. While optimization is complicated and inlining / LTO might not immediately handle coroutines as well as simple closures, it seems promising.

I do like the iterator adapter pattern because it allows me to read and write the code from left to right and top to bottom. Perhaps when gen is stabilized, there could be an iterator adapter which takes a Generator/Coroutine where type Yield = type Return and where R, the argument to resume, is of the type returned by the wrapped iterator's next(). Though, I've read the rfc for both Generator and Coroutine and I couldn't figure out where R appears in the body of the generator closure because it was always () in the examples. In this doc https://doc.rust-lang.org/std/ops/trait.Coroutine.html arg is glossed over, so I'm curious how it fits in. I can imagine it being quite useful for the case I'm describing here.

@traviscross Can you refer me to some examples of using arg: R where it isn't ()? Is the value of arg intended to move into the closure upon resume somehow?

traviscross

traviscross commented on May 23, 2024

@traviscross

I do like the iterator adapter pattern because it allows me to read and write the code from left to right and top to bottom.

An iterator combinator would probably take as an argument a gen closure that would in turn accept the iterator as an argument. Using the function from above, that might look like this:

for x in [1u8; 513].into_iter().then(rl_encode) {
    //   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    //   ^ Produces an iterator that yields the run-length
    //     encoding of this array.
    println!("{:?}", x);
}

Playground link (with further details)

Coroutines (i.e., R != ()) are further out. Here's a very speculative comparable example using those:

Playground link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    T-libs-apiapi-change-proposalA proposal to add or alter unstable APIs in the standard libraries

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

      Development

      No branches or pull requests

        Participants

        @kennytm@the8472@traviscross@ajwock

        Issue actions

          Combine, an iterator adapter which statefully maps multiple input iterations to a single output iteration · Issue #379 · rust-lang/libs-team