13
13
//! This primitive is meant to be used to run one-time initialization. An
14
14
//! example use case would be for initializing an FFI library.
15
15
16
- use isize;
17
- use sync:: atomic:: { AtomicIsize , Ordering } ;
18
- use sync:: StaticMutex ;
16
+ // A "once" is a relatively simple primitive, and it's also typically provided
17
+ // by the OS as well (see `pthread_once` or `InitOnceExecuteOnce`). The OS
18
+ // primitives, however, tend to have surprising restrictions, such as the Unix
19
+ // one doesn't allow an argument to be passed to the function.
20
+ //
21
+ // As a result, we end up implementing it ourselves in the standard library.
22
+ // This also gives us the opportunity to optimize the implementation a bit which
23
+ // should help the fast path on call sites. Consequently, let's explain how this
24
+ // primitive works now!
25
+ //
26
+ // So to recap, the guarantees of a Once are that it will call the
27
+ // initialization closure at most once, and it will never return until the one
28
+ // that's running has finished running. This means that we need some form of
29
+ // blocking here while the custom callback is running at the very least.
30
+ // Additionally, we add on the restriction of **poisoning**. Whenever an
31
+ // initialization closure panics, the Once enters a "poisoned" state which means
32
+ // that all future calls will immediately panic as well.
33
+ //
34
+ // So to implement this, one might first reach for a `StaticMutex`, but those
35
+ // unfortunately need to be deallocated (e.g. call `destroy()`) to free memory
36
+ // on all OSes (some of the BSDs allocate memory for mutexes). It also gets a
37
+ // lot harder with poisoning to figure out when the mutex needs to be
38
+ // deallocated because it's not after the closure finishes, but after the first
39
+ // successful closure finishes.
40
+ //
41
+ // All in all, this is instead implemented with atomics and lock-free
42
+ // operations! Whee! Each `Once` has one word of atomic state, and this state is
43
+ // CAS'd on to determine what to do. There are four possible state of a `Once`:
44
+ //
45
+ // * Incomplete - no initialization has run yet, and no thread is currently
46
+ // using the Once.
47
+ // * Poisoned - some thread has previously attempted to initialize the Once, but
48
+ // it panicked, so the Once is now poisoned. There are no other
49
+ // threads currently accessing this Once.
50
+ // * Running - some thread is currently attempting to run initialization. It may
51
+ // succeed, so all future threads need to wait for it to finish.
52
+ // Note that this state is accompanied with a payload, described
53
+ // below.
54
+ // * Complete - initialization has completed and all future calls should finish
55
+ // immediately.
56
+ //
57
+ // With 4 states we need 2 bits to encode this, and we use the remaining bits
58
+ // in the word we have allocated as a queue of threads waiting for the thread
59
+ // responsible for entering the RUNNING state. This queue is just a linked list
60
+ // of Waiter nodes which is monotonically increasing in size. Each node is
61
+ // allocated on the stack, and whenever the running closure finishes it will
62
+ // consume the entire queue and notify all waiters they should try again.
63
+ //
64
+ // You'll find a few more details in the implementation, but that's the gist of
65
+ // it!
66
+
67
+ use marker;
68
+ use sync:: atomic:: { AtomicUsize , AtomicBool , Ordering } ;
69
+ use thread:: { self , Thread } ;
19
70
20
71
/// A synchronization primitive which can be used to run a one-time global
21
72
/// initialization. Useful for one-time initialization for FFI or related
@@ -35,23 +86,62 @@ use sync::StaticMutex;
35
86
/// ```
36
87
#[ stable( feature = "rust1" , since = "1.0.0" ) ]
37
88
pub struct Once {
38
- mutex : StaticMutex ,
39
- cnt : AtomicIsize ,
40
- lock_cnt : AtomicIsize ,
89
+ // This `state` word is actually an encoded version of just a pointer to a
90
+ // `Waiter`, so we add the `PhantomData` appropriately.
91
+ state : AtomicUsize ,
92
+ _marker : marker:: PhantomData < * mut Waiter > ,
93
+ }
94
+
95
+ // The `PhantomData` of a raw pointer removes these two auto traits, but we
96
+ // enforce both below in the implementation so this should be safe to add.
97
+ #[ stable( feature = "rust1" , since = "1.0.0" ) ]
98
+ unsafe impl Sync for Once { }
99
+ #[ stable( feature = "rust1" , since = "1.0.0" ) ]
100
+ unsafe impl Send for Once { }
101
+
102
+ /// State yielded to the `call_once_force` method which can be used to query
103
+ /// whether the `Once` was previously poisoned or not.
104
+ #[ unstable( feature = "once_poison" , issue = "31688" ) ]
105
+ pub struct OnceState {
106
+ poisoned : bool ,
41
107
}
42
108
43
109
/// Initialization value for static `Once` values.
44
110
#[ stable( feature = "rust1" , since = "1.0.0" ) ]
45
111
pub const ONCE_INIT : Once = Once :: new ( ) ;
46
112
113
+ // Four states that a Once can be in, encoded into the lower bits of `state` in
114
+ // the Once structure.
115
+ const INCOMPLETE : usize = 0x0 ;
116
+ const POISONED : usize = 0x1 ;
117
+ const RUNNING : usize = 0x2 ;
118
+ const COMPLETE : usize = 0x3 ;
119
+
120
+ // Mask to learn about the state. All other bits are the queue of waiters if
121
+ // this is in the RUNNING state.
122
+ const STATE_MASK : usize = 0x3 ;
123
+
124
+ // Representation of a node in the linked list of waiters in the RUNNING state.
125
+ struct Waiter {
126
+ thread : Option < Thread > ,
127
+ signaled : AtomicBool ,
128
+ next : * mut Waiter ,
129
+ }
130
+
131
+ // Helper struct used to clean up after a closure call with a `Drop`
132
+ // implementation to also run on panic.
133
+ struct Finish {
134
+ panicked : bool ,
135
+ me : & ' static Once ,
136
+ }
137
+
47
138
impl Once {
48
139
/// Creates a new `Once` value.
49
140
#[ stable( feature = "once_new" , since = "1.2.0" ) ]
50
141
pub const fn new ( ) -> Once {
51
142
Once {
52
- mutex : StaticMutex :: new ( ) ,
53
- cnt : AtomicIsize :: new ( 0 ) ,
54
- lock_cnt : AtomicIsize :: new ( 0 ) ,
143
+ state : AtomicUsize :: new ( INCOMPLETE ) ,
144
+ _marker : marker:: PhantomData ,
55
145
}
56
146
}
57
147
@@ -68,73 +158,223 @@ impl Once {
68
158
/// be reliably observed by other threads at this point (there is a
69
159
/// happens-before relation between the closure and code executing after the
70
160
/// return).
161
+ ///
162
+ /// # Examples
163
+ ///
164
+ /// ```
165
+ /// use std::sync::{Once, ONCE_INIT};
166
+ ///
167
+ /// static mut VAL: usize = 0;
168
+ /// static INIT: Once = ONCE_INIT;
169
+ ///
170
+ /// // Accessing a `static mut` is unsafe much of the time, but if we do so
171
+ /// // in a synchronized fashion (e.g. write once or read all) then we're
172
+ /// // good to go!
173
+ /// //
174
+ /// // This function will only call `expensive_computation` once, and will
175
+ /// // otherwise always return the value returned from the first invocation.
176
+ /// fn get_cached_val() -> usize {
177
+ /// unsafe {
178
+ /// INIT.call_once(|| {
179
+ /// VAL = expensive_computation();
180
+ /// });
181
+ /// VAL
182
+ /// }
183
+ /// }
184
+ ///
185
+ /// fn expensive_computation() -> usize {
186
+ /// // ...
187
+ /// # 2
188
+ /// }
189
+ /// ```
190
+ ///
191
+ /// # Panics
192
+ ///
193
+ /// The closure `f` will only be executed once if this is called
194
+ /// concurrently amongst many threads. If that closure panics, however, then
195
+ /// it will *poison* this `Once` instance, causing all future invocations of
196
+ /// `call_once` to also panic.
197
+ ///
198
+ /// This is similar to [poisoning with mutexes][poison].
199
+ ///
200
+ /// [poison]: struct.Mutex.html#poisoning
71
201
#[ stable( feature = "rust1" , since = "1.0.0" ) ]
72
202
pub fn call_once < F > ( & ' static self , f : F ) where F : FnOnce ( ) {
73
- // Optimize common path: load is much cheaper than fetch_add .
74
- if self . cnt . load ( Ordering :: SeqCst ) < 0 {
203
+ // Fast path, just see if we've completed initialization .
204
+ if self . state . load ( Ordering :: SeqCst ) == COMPLETE {
75
205
return
76
206
}
77
207
78
- // Implementation-wise, this would seem like a fairly trivial primitive.
79
- // The stickler part is where our mutexes currently require an
80
- // allocation, and usage of a `Once` shouldn't leak this allocation.
81
- //
82
- // This means that there must be a deterministic destroyer of the mutex
83
- // contained within (because it's not needed after the initialization
84
- // has run).
85
- //
86
- // The general scheme here is to gate all future threads once
87
- // initialization has completed with a "very negative" count, and to
88
- // allow through threads to lock the mutex if they see a non negative
89
- // count. For all threads grabbing the mutex, exactly one of them should
90
- // be responsible for unlocking the mutex, and this should only be done
91
- // once everyone else is done with the mutex.
92
- //
93
- // This atomicity is achieved by swapping a very negative value into the
94
- // shared count when the initialization routine has completed. This will
95
- // read the number of threads which will at some point attempt to
96
- // acquire the mutex. This count is then squirreled away in a separate
97
- // variable, and the last person on the way out of the mutex is then
98
- // responsible for destroying the mutex.
99
- //
100
- // It is crucial that the negative value is swapped in *after* the
101
- // initialization routine has completed because otherwise new threads
102
- // calling `call_once` will return immediately before the initialization
103
- // has completed.
104
-
105
- let prev = self . cnt . fetch_add ( 1 , Ordering :: SeqCst ) ;
106
- if prev < 0 {
107
- // Make sure we never overflow, we'll never have isize::MIN
108
- // simultaneous calls to `call_once` to make this value go back to 0
109
- self . cnt . store ( isize:: MIN , Ordering :: SeqCst ) ;
208
+ let mut f = Some ( f) ;
209
+ self . call_inner ( false , & mut |_| f. take ( ) . unwrap ( ) ( ) ) ;
210
+ }
211
+
212
+ /// Performs the same function as `call_once` except ignores poisoning.
213
+ ///
214
+ /// If this `Once` has been poisoned (some initialization panicked) then
215
+ /// this function will continue to attempt to call initialization functions
216
+ /// until one of them doesn't panic.
217
+ ///
218
+ /// The closure `f` is yielded a structure which can be used to query the
219
+ /// state of this `Once` (whether initialization has previously panicked or
220
+ /// not).
221
+ /// poisoned or not.
222
+ #[ unstable( feature = "once_poison" , issue = "31688" ) ]
223
+ pub fn call_once_force < F > ( & ' static self , f : F ) where F : FnOnce ( & OnceState ) {
224
+ // same as above, just with a different parameter to `call_inner`.
225
+ if self . state . load ( Ordering :: SeqCst ) == COMPLETE {
110
226
return
111
227
}
112
228
113
- // If the count is negative, then someone else finished the job,
114
- // otherwise we run the job and record how many people will try to grab
115
- // this lock
116
- let guard = self . mutex . lock ( ) ;
117
- if self . cnt . load ( Ordering :: SeqCst ) > 0 {
118
- f ( ) ;
119
- let prev = self . cnt . swap ( isize:: MIN , Ordering :: SeqCst ) ;
120
- self . lock_cnt . store ( prev, Ordering :: SeqCst ) ;
229
+ let mut f = Some ( f) ;
230
+ self . call_inner ( true , & mut |p| {
231
+ f. take ( ) . unwrap ( ) ( & OnceState { poisoned : p } )
232
+ } ) ;
233
+ }
234
+
235
+ // This is a non-generic function to reduce the monomorphization cost of
236
+ // using `call_once` (this isn't exactly a trivial or small implementation).
237
+ //
238
+ // Additionally, this is tagged with `#[cold]` as it should indeed be cold
239
+ // and it helps let LLVM know that calls to this function should be off the
240
+ // fast path. Essentially, this should help generate more straight line code
241
+ // in LLVM.
242
+ //
243
+ // Finally, this takes an `FnMut` instead of a `FnOnce` because there's
244
+ // currently no way to take an `FnOnce` and call it via virtual dispatch
245
+ // without some allocation overhead.
246
+ #[ cold]
247
+ fn call_inner ( & ' static self ,
248
+ ignore_poisoning : bool ,
249
+ mut init : & mut FnMut ( bool ) ) {
250
+ let mut state = self . state . load ( Ordering :: SeqCst ) ;
251
+
252
+ ' outer: loop {
253
+ match state {
254
+ // If we're complete, then there's nothing to do, we just
255
+ // jettison out as we shouldn't run the closure.
256
+ COMPLETE => return ,
257
+
258
+ // If we're poisoned and we're not in a mode to ignore
259
+ // poisoning, then we panic here to propagate the poison.
260
+ POISONED if !ignore_poisoning => {
261
+ panic ! ( "Once instance has previously been poisoned" ) ;
262
+ }
263
+
264
+ // Otherwise if we see a poisoned or otherwise incomplete state
265
+ // we will attempt to move ourselves into the RUNNING state. If
266
+ // we succeed, then the queue of waiters starts at null (all 0
267
+ // bits).
268
+ POISONED |
269
+ INCOMPLETE => {
270
+ let old = self . state . compare_and_swap ( state, RUNNING ,
271
+ Ordering :: SeqCst ) ;
272
+ if old != state {
273
+ state = old;
274
+ continue
275
+ }
276
+
277
+ // Run the initialization routine, letting it know if we're
278
+ // poisoned or not. The `Finish` struct is then dropped, and
279
+ // the `Drop` implementation here is responsible for waking
280
+ // up other waiters both in the normal return and panicking
281
+ // case.
282
+ let mut complete = Finish {
283
+ panicked : true ,
284
+ me : self ,
285
+ } ;
286
+ init ( state == POISONED ) ;
287
+ complete. panicked = false ;
288
+ return
289
+ }
290
+
291
+ // All other values we find should correspond to the RUNNING
292
+ // state with an encoded waiter list in the more significant
293
+ // bits. We attempt to enqueue ourselves by moving us to the
294
+ // head of the list and bail out if we ever see a state that's
295
+ // not RUNNING.
296
+ _ => {
297
+ assert ! ( state & STATE_MASK == RUNNING ) ;
298
+ let mut node = Waiter {
299
+ thread : Some ( thread:: current ( ) ) ,
300
+ signaled : AtomicBool :: new ( false ) ,
301
+ next : 0 as * mut Waiter ,
302
+ } ;
303
+ let me = & mut node as * mut Waiter as usize ;
304
+ assert ! ( me & STATE_MASK == 0 ) ;
305
+
306
+ while state & STATE_MASK == RUNNING {
307
+ node. next = ( state & !STATE_MASK ) as * mut Waiter ;
308
+ let old = self . state . compare_and_swap ( state,
309
+ me | RUNNING ,
310
+ Ordering :: SeqCst ) ;
311
+ if old != state {
312
+ state = old;
313
+ continue
314
+ }
315
+
316
+ // Once we've enqueued ourselves, wait in a loop.
317
+ // Aftewards reload the state and continue with what we
318
+ // were doing from before.
319
+ while !node. signaled . load ( Ordering :: SeqCst ) {
320
+ thread:: park ( ) ;
321
+ }
322
+ state = self . state . load ( Ordering :: SeqCst ) ;
323
+ continue ' outer
324
+ }
325
+ }
326
+ }
121
327
}
122
- drop ( guard) ;
328
+ }
329
+ }
123
330
124
- // Last one out cleans up after everyone else, no leaks!
125
- if self . lock_cnt . fetch_add ( -1 , Ordering :: SeqCst ) == 1 {
126
- unsafe { self . mutex . destroy ( ) }
331
+ impl Drop for Finish {
332
+ fn drop ( & mut self ) {
333
+ // Swap out our state with however we finished. We should only ever see
334
+ // an old state which was RUNNING.
335
+ let queue = if self . panicked {
336
+ self . me . state . swap ( POISONED , Ordering :: SeqCst )
337
+ } else {
338
+ self . me . state . swap ( COMPLETE , Ordering :: SeqCst )
339
+ } ;
340
+ assert_eq ! ( queue & STATE_MASK , RUNNING ) ;
341
+
342
+ // Decode the RUNNING to a list of waiters, then walk that entire list
343
+ // and wake them up. Note that it is crucial that after we store `true`
344
+ // in the node it can be free'd! As a result we load the `thread` to
345
+ // signal ahead of time and then unpark it after the store.
346
+ unsafe {
347
+ let mut queue = ( queue & !STATE_MASK ) as * mut Waiter ;
348
+ while !queue. is_null ( ) {
349
+ let next = ( * queue) . next ;
350
+ let thread = ( * queue) . thread . take ( ) . unwrap ( ) ;
351
+ ( * queue) . signaled . store ( true , Ordering :: SeqCst ) ;
352
+ thread. unpark ( ) ;
353
+ queue = next;
354
+ }
127
355
}
128
356
}
129
357
}
130
358
359
+ impl OnceState {
360
+ /// Returns whether the associated `Once` has been poisoned.
361
+ ///
362
+ /// Once an initalization routine for a `Once` has panicked it will forever
363
+ /// indicate to future forced initialization routines that it is poisoned.
364
+ #[ unstable( feature = "once_poison" , issue = "31688" ) ]
365
+ pub fn poisoned ( & self ) -> bool {
366
+ self . poisoned
367
+ }
368
+ }
369
+
131
370
#[ cfg( test) ]
132
371
mod tests {
133
372
use prelude:: v1:: * ;
134
373
374
+ use panic;
375
+ use sync:: mpsc:: channel;
135
376
use thread;
136
377
use super :: Once ;
137
- use sync:: mpsc:: channel;
138
378
139
379
#[ test]
140
380
fn smoke_once ( ) {
@@ -179,4 +419,71 @@ mod tests {
179
419
rx. recv ( ) . unwrap ( ) ;
180
420
}
181
421
}
422
+
423
+ #[ test]
424
+ fn poison_bad ( ) {
425
+ static O : Once = Once :: new ( ) ;
426
+
427
+ // poison the once
428
+ let t = panic:: recover ( || {
429
+ O . call_once ( || panic ! ( ) ) ;
430
+ } ) ;
431
+ assert ! ( t. is_err( ) ) ;
432
+
433
+ // poisoning propagates
434
+ let t = panic:: recover ( || {
435
+ O . call_once ( || { } ) ;
436
+ } ) ;
437
+ assert ! ( t. is_err( ) ) ;
438
+
439
+ // we can subvert poisoning, however
440
+ let mut called = false ;
441
+ O . call_once_force ( |p| {
442
+ called = true ;
443
+ assert ! ( p. poisoned( ) )
444
+ } ) ;
445
+ assert ! ( called) ;
446
+
447
+ // once any success happens, we stop propagating the poison
448
+ O . call_once ( || { } ) ;
449
+ }
450
+
451
+ #[ test]
452
+ fn wait_for_force_to_finish ( ) {
453
+ static O : Once = Once :: new ( ) ;
454
+
455
+ // poison the once
456
+ let t = panic:: recover ( || {
457
+ O . call_once ( || panic ! ( ) ) ;
458
+ } ) ;
459
+ assert ! ( t. is_err( ) ) ;
460
+
461
+ // make sure someone's waiting inside the once via a force
462
+ let ( tx1, rx1) = channel ( ) ;
463
+ let ( tx2, rx2) = channel ( ) ;
464
+ let t1 = thread:: spawn ( move || {
465
+ O . call_once_force ( |p| {
466
+ assert ! ( p. poisoned( ) ) ;
467
+ tx1. send ( ( ) ) . unwrap ( ) ;
468
+ rx2. recv ( ) . unwrap ( ) ;
469
+ } ) ;
470
+ } ) ;
471
+
472
+ rx1. recv ( ) . unwrap ( ) ;
473
+
474
+ // put another waiter on the once
475
+ let t2 = thread:: spawn ( || {
476
+ let mut called = false ;
477
+ O . call_once ( || {
478
+ called = true ;
479
+ } ) ;
480
+ assert ! ( !called) ;
481
+ } ) ;
482
+
483
+ tx2. send ( ( ) ) . unwrap ( ) ;
484
+
485
+ assert ! ( t1. join( ) . is_ok( ) ) ;
486
+ assert ! ( t2. join( ) . is_ok( ) ) ;
487
+
488
+ }
182
489
}
0 commit comments