Skip to content

Commit 7ced474

Browse files
authored
Merge pull request #6 from kpp/stream_unfold
Implement stream::unfold
2 parents 1fa72b1 + 1606f3c commit 7ced474

File tree

2 files changed

+57
-12
lines changed

2 files changed

+57
-12
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,14 @@ Stream
4444
- [x] stream::iter
4545
- [x] stream::map
4646
- [x] stream::next
47+
- [ ] stream::poll_fn
4748
- [x] stream::repeat
4849
- [x] stream::skip
4950
- [x] stream::skip_while
5051
- [x] stream::take
5152
- [x] stream::take_while
5253
- [x] stream::then
53-
- [ ] stream::unfold
54+
- [x] stream::unfold
5455
- [x] stream::zip
5556

5657

src/stream.rs

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub fn map<St, U, F>(stream: St, f: F) -> impl Stream<Item = U>
3131
F: FnMut(St::Item) -> U,
3232
{
3333
let stream = Box::pin(stream);
34-
futures::stream::unfold((stream, f), async move | (mut stream, mut f)| {
34+
unfold((stream, f), async move | (mut stream, mut f)| {
3535
let item = next(&mut stream).await;
3636
item.map(|item| (f(item), (stream, f)))
3737
})
@@ -43,7 +43,7 @@ pub fn filter<St, Fut, F>(stream: St, f: F) -> impl Stream<Item = St::Item>
4343
Fut: Future<Output = bool>
4444
{
4545
let stream = Box::pin(stream);
46-
futures::stream::unfold((stream, f), async move | (mut stream, mut f)| {
46+
unfold((stream, f), async move | (mut stream, mut f)| {
4747
while let Some(item) = next(&mut stream).await {
4848
let matched = f(&item).await;
4949
if matched {
@@ -62,7 +62,7 @@ pub fn filter_map<St, Fut, F, U>(stream: St, f: F) -> impl Stream<Item = U>
6262
Fut: Future<Output = Option<U>>
6363
{
6464
let stream = Box::pin(stream);
65-
futures::stream::unfold((stream, f), async move | (mut stream, mut f)| {
65+
unfold((stream, f), async move | (mut stream, mut f)| {
6666
while let Some(item) = next(&mut stream).await {
6767
if let Some(item) = f(item).await {
6868
return Some((item, (stream, f)))
@@ -122,7 +122,7 @@ pub fn take<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
122122
where St: Stream,
123123
{
124124
let stream = Box::pin(stream);
125-
futures::stream::unfold((stream, n), async move | (mut stream, n)| {
125+
unfold((stream, n), async move | (mut stream, n)| {
126126
if n == 0 {
127127
None
128128
} else {
@@ -149,7 +149,7 @@ pub fn flatten<St, SubSt, T>(stream: St) -> impl Stream<Item = T>
149149
St: Stream<Item = SubSt>,
150150
{
151151
let stream = Box::pin(stream);
152-
futures::stream::unfold((Some(stream), None), async move | (mut state_stream, mut state_substream)| {
152+
unfold((Some(stream), None), async move | (mut state_stream, mut state_substream)| {
153153
loop {
154154
if let Some(mut substream) = state_substream.take() {
155155
if let Some(item) = next(&mut substream).await {
@@ -177,7 +177,7 @@ pub fn then<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
177177
Fut: Future<Output = St::Item>
178178
{
179179
let stream = Box::pin(stream);
180-
futures::stream::unfold((stream, f), async move | (mut stream, mut f)| {
180+
unfold((stream, f), async move | (mut stream, mut f)| {
181181
let item = next(&mut stream).await;
182182
if let Some(item) = item {
183183
let new_item = f(item).await;
@@ -192,7 +192,7 @@ pub fn skip<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
192192
where St: Stream,
193193
{
194194
let stream = Box::pin(stream);
195-
futures::stream::unfold((stream, n), async move | (mut stream, mut n)| {
195+
unfold((stream, n), async move | (mut stream, mut n)| {
196196
while n != 0 {
197197
if let Some(_) = next(&mut stream).await {
198198
n = n - 1;
@@ -215,7 +215,7 @@ pub fn zip<St1, St2>(stream: St1, other: St2) -> impl Stream<Item = (St1::Item,
215215
{
216216
let stream = Box::pin(stream);
217217
let other = Box::pin(other);
218-
futures::stream::unfold((stream, other), async move | (mut stream, mut other)| {
218+
unfold((stream, other), async move | (mut stream, mut other)| {
219219
let left = next(&mut stream).await;
220220
let right = next(&mut other).await;
221221
match (left, right) {
@@ -231,7 +231,7 @@ pub fn chain<St>(stream: St, other: St) -> impl Stream<Item = St::Item>
231231
let stream = Box::pin(stream);
232232
let other = Box::pin(other);
233233
let start_with_first = true;
234-
futures::stream::unfold((stream, other, start_with_first), async move | (mut stream, mut other, start_with_first)| {
234+
unfold((stream, other, start_with_first), async move | (mut stream, mut other, start_with_first)| {
235235
if start_with_first {
236236
if let Some(item) = next(&mut stream).await {
237237
return Some((item, (stream, other, start_with_first)))
@@ -251,7 +251,7 @@ pub fn take_while<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
251251
Fut: Future<Output = bool>,
252252
{
253253
let stream = Box::pin(stream);
254-
futures::stream::unfold((stream, f), async move | (mut stream, mut f)| {
254+
unfold((stream, f), async move | (mut stream, mut f)| {
255255
if let Some(item) = next(&mut stream).await {
256256
if f(&item).await {
257257
Some((item, (stream, f)))
@@ -271,7 +271,7 @@ pub fn skip_while<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
271271
{
272272
let stream = Box::pin(stream);
273273
let should_skip = true;
274-
futures::stream::unfold((stream, f, should_skip), async move | (mut stream, mut f, should_skip)| {
274+
unfold((stream, f, should_skip), async move | (mut stream, mut f, should_skip)| {
275275
while should_skip {
276276
if let Some(item) = next(&mut stream).await {
277277
if f(&item).await {
@@ -305,6 +305,36 @@ pub async fn fold<St, T, F, Fut>(stream: St, init: T, f: F) -> T
305305
acc
306306
}
307307

308+
pub fn unfold<T, F, Fut, It>(init: T, mut f: F) -> impl Stream<Item = It>
309+
where F: FnMut(T) -> Fut,
310+
Fut: Future<Output = Option<(It, T)>>,
311+
{
312+
use core::task::Poll;
313+
enum State<T, Fut> {
314+
Paused(T),
315+
Running(Pin<Box<Fut>>),
316+
}
317+
let mut state = Some(State::Paused(init));
318+
futures::stream::poll_fn(move|waker| -> Poll<Option<It>> {
319+
let mut future = match state.take() {
320+
Some(State::Running(fut)) => fut,
321+
Some(State::Paused(st)) => Box::pin(f(st)),
322+
None => panic!("this stream must not be polled any more"),
323+
};
324+
match future.as_mut().poll(waker) {
325+
Poll::Pending => {
326+
state = Some(State::Running(future));
327+
Poll::Pending
328+
},
329+
Poll::Ready(None) => Poll::Ready(None),
330+
Poll::Ready(Some((item, new_state))) => {
331+
state = Some(State::Paused(new_state));
332+
Poll::Ready(Some(item))
333+
},
334+
}
335+
})
336+
}
337+
308338
#[cfg(test)]
309339
mod tests {
310340
use futures::executor;
@@ -487,4 +517,18 @@ mod tests {
487517

488518
assert_eq!(15, executor::block_on(sum));
489519
}
520+
521+
#[test]
522+
fn test_unfold() {
523+
let stream = unfold(0, |state| {
524+
if state <= 2 {
525+
let next_state = state + 1;
526+
let yielded = state * 2;
527+
ready(Some((yielded, next_state)))
528+
} else {
529+
ready(None)
530+
}
531+
});
532+
assert_eq!(vec![0, 2, 4], executor::block_on(collect::<_, Vec<_>>(stream)));
533+
}
490534
}

0 commit comments

Comments
 (0)