Skip to content

Commit 3775e19

Browse files
authored
event: add FeedOf[T] (#26310)
This PR adds a new type event.FeedOf[T], which is like event.Feed but parameterized over the channel element type. Performance is unchanged, and it still uses reflect. But unlike Feed, the generic version doesn't need to type-check interface{} arguments. All panic cases are gone from the API.
1 parent 3315bad commit 3775e19

File tree

2 files changed

+449
-0
lines changed

2 files changed

+449
-0
lines changed

event/feedof.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
// Copyright 2022 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
//go:build go1.18
18+
// +build go1.18
19+
20+
package event
21+
22+
import (
23+
"reflect"
24+
"sync"
25+
)
26+
27+
// FeedOf implements one-to-many subscriptions where the carrier of events is a channel.
28+
// Values sent to a Feed are delivered to all subscribed channels simultaneously.
29+
//
30+
// The zero value is ready to use.
31+
type FeedOf[T any] struct {
32+
once sync.Once // ensures that init only runs once
33+
sendLock chan struct{} // sendLock has a one-element buffer and is empty when held.It protects sendCases.
34+
removeSub chan chan<- T // interrupts Send
35+
sendCases caseList // the active set of select cases used by Send
36+
37+
// The inbox holds newly subscribed channels until they are added to sendCases.
38+
mu sync.Mutex
39+
inbox caseList
40+
}
41+
42+
func (f *FeedOf[T]) init() {
43+
f.removeSub = make(chan chan<- T)
44+
f.sendLock = make(chan struct{}, 1)
45+
f.sendLock <- struct{}{}
46+
f.sendCases = caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}}
47+
}
48+
49+
// Subscribe adds a channel to the feed. Future sends will be delivered on the channel
50+
// until the subscription is canceled.
51+
//
52+
// The channel should have ample buffer space to avoid blocking other subscribers. Slow
53+
// subscribers are not dropped.
54+
func (f *FeedOf[T]) Subscribe(channel chan<- T) Subscription {
55+
f.once.Do(f.init)
56+
57+
chanval := reflect.ValueOf(channel)
58+
sub := &feedOfSub[T]{feed: f, channel: channel, err: make(chan error, 1)}
59+
60+
// Add the select case to the inbox.
61+
// The next Send will add it to f.sendCases.
62+
f.mu.Lock()
63+
defer f.mu.Unlock()
64+
cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
65+
f.inbox = append(f.inbox, cas)
66+
return sub
67+
}
68+
69+
func (f *FeedOf[T]) remove(sub *feedOfSub[T]) {
70+
// Delete from inbox first, which covers channels
71+
// that have not been added to f.sendCases yet.
72+
f.mu.Lock()
73+
index := f.inbox.find(sub.channel)
74+
if index != -1 {
75+
f.inbox = f.inbox.delete(index)
76+
f.mu.Unlock()
77+
return
78+
}
79+
f.mu.Unlock()
80+
81+
select {
82+
case f.removeSub <- sub.channel:
83+
// Send will remove the channel from f.sendCases.
84+
case <-f.sendLock:
85+
// No Send is in progress, delete the channel now that we have the send lock.
86+
f.sendCases = f.sendCases.delete(f.sendCases.find(sub.channel))
87+
f.sendLock <- struct{}{}
88+
}
89+
}
90+
91+
// Send delivers to all subscribed channels simultaneously.
92+
// It returns the number of subscribers that the value was sent to.
93+
func (f *FeedOf[T]) Send(value T) (nsent int) {
94+
rvalue := reflect.ValueOf(value)
95+
96+
f.once.Do(f.init)
97+
<-f.sendLock
98+
99+
// Add new cases from the inbox after taking the send lock.
100+
f.mu.Lock()
101+
f.sendCases = append(f.sendCases, f.inbox...)
102+
f.inbox = nil
103+
f.mu.Unlock()
104+
105+
// Set the sent value on all channels.
106+
for i := firstSubSendCase; i < len(f.sendCases); i++ {
107+
f.sendCases[i].Send = rvalue
108+
}
109+
110+
// Send until all channels except removeSub have been chosen. 'cases' tracks a prefix
111+
// of sendCases. When a send succeeds, the corresponding case moves to the end of
112+
// 'cases' and it shrinks by one element.
113+
cases := f.sendCases
114+
for {
115+
// Fast path: try sending without blocking before adding to the select set.
116+
// This should usually succeed if subscribers are fast enough and have free
117+
// buffer space.
118+
for i := firstSubSendCase; i < len(cases); i++ {
119+
if cases[i].Chan.TrySend(rvalue) {
120+
nsent++
121+
cases = cases.deactivate(i)
122+
i--
123+
}
124+
}
125+
if len(cases) == firstSubSendCase {
126+
break
127+
}
128+
// Select on all the receivers, waiting for them to unblock.
129+
chosen, recv, _ := reflect.Select(cases)
130+
if chosen == 0 /* <-f.removeSub */ {
131+
index := f.sendCases.find(recv.Interface())
132+
f.sendCases = f.sendCases.delete(index)
133+
if index >= 0 && index < len(cases) {
134+
// Shrink 'cases' too because the removed case was still active.
135+
cases = f.sendCases[:len(cases)-1]
136+
}
137+
} else {
138+
cases = cases.deactivate(chosen)
139+
nsent++
140+
}
141+
}
142+
143+
// Forget about the sent value and hand off the send lock.
144+
for i := firstSubSendCase; i < len(f.sendCases); i++ {
145+
f.sendCases[i].Send = reflect.Value{}
146+
}
147+
f.sendLock <- struct{}{}
148+
return nsent
149+
}
150+
151+
type feedOfSub[T any] struct {
152+
feed *FeedOf[T]
153+
channel chan<- T
154+
errOnce sync.Once
155+
err chan error
156+
}
157+
158+
func (sub *feedOfSub[T]) Unsubscribe() {
159+
sub.errOnce.Do(func() {
160+
sub.feed.remove(sub)
161+
close(sub.err)
162+
})
163+
}
164+
165+
func (sub *feedOfSub[T]) Err() <-chan error {
166+
return sub.err
167+
}

0 commit comments

Comments
 (0)