diff --git a/Control/Concurrent/Async/Internal.hs b/Control/Concurrent/Async/Internal.hs index e099fd9..3048164 100644 --- a/Control/Concurrent/Async/Internal.hs +++ b/Control/Concurrent/Async/Internal.hs @@ -1,5 +1,5 @@ {-# LANGUAGE CPP, MagicHash, UnboxedTuples, RankNTypes, - ExistentialQuantification #-} + ExistentialQuantification, ConstraintKinds, KindSignatures #-} #if __GLASGOW_HASKELL__ >= 701 {-# LANGUAGE Trustworthy #-} #endif @@ -55,7 +55,19 @@ import Data.IORef import GHC.Exts import GHC.IO hiding (finally, onException) -import GHC.Conc (ThreadId(..)) +import GHC.Conc (ThreadId(..), labelThread) + +#ifdef DEBUG_AUTO_LABEL +import qualified GHC.Stack +#else +import qualified GHC.Exts +#endif + +#ifdef DEBUG_AUTO_LABEL +type DebugCallStack = GHC.Stack.HasCallStack +#else +type DebugCallStack = () :: GHC.Exts.Constraint +#endif -- ----------------------------------------------------------------------------- -- STM Async API @@ -95,40 +107,53 @@ compareAsyncs (Async t1 _) (Async t2 _) = compare t1 t2 -- (see module-level documentation for details). -- -- __Use 'withAsync' style functions wherever you can instead!__ -async :: IO a -> IO (Async a) +async :: + DebugCallStack => + IO a -> IO (Async a) async = inline asyncUsing rawForkIO -- | Like 'async' but using 'forkOS' internally. -asyncBound :: IO a -> IO (Async a) +asyncBound :: + DebugCallStack => + IO a -> IO (Async a) asyncBound = asyncUsing forkOS -- | Like 'async' but using 'forkOn' internally. -asyncOn :: Int -> IO a -> IO (Async a) +asyncOn :: + DebugCallStack => + Int -> IO a -> IO (Async a) asyncOn = asyncUsing . rawForkOn -- | Like 'async' but using 'forkIOWithUnmask' internally. The child -- thread is passed a function that can be used to unmask asynchronous -- exceptions. -asyncWithUnmask :: ((forall b . IO b -> IO b) -> IO a) -> IO (Async a) +asyncWithUnmask :: + DebugCallStack => + ((forall b . IO b -> IO b) -> IO a) -> IO (Async a) asyncWithUnmask actionWith = asyncUsing rawForkIO (actionWith unsafeUnmask) -- | Like 'asyncOn' but using 'forkOnWithUnmask' internally. The -- child thread is passed a function that can be used to unmask -- asynchronous exceptions. -asyncOnWithUnmask :: Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a) +asyncOnWithUnmask :: + DebugCallStack => + Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a) asyncOnWithUnmask cpu actionWith = asyncUsing (rawForkOn cpu) (actionWith unsafeUnmask) -asyncUsing :: (IO () -> IO ThreadId) - -> IO a -> IO (Async a) +asyncUsing :: + DebugCallStack => + (IO () -> IO ThreadId) -> IO a -> IO (Async a) asyncUsing doFork = \action -> do var <- newEmptyTMVarIO + let action_plus = debugLabelMe >> action -- t <- forkFinally action (\r -> atomically $ putTMVar var r) -- slightly faster: t <- mask $ \restore -> - doFork $ try (restore action) >>= atomically . putTMVar var + doFork $ try (restore action_plus) >>= atomically . putTMVar var return (Async t (readTMVar var)) + -- | Spawn an asynchronous action in a separate thread, and pass its -- @Async@ handle to the supplied function. When the function returns -- or throws an exception, 'uninterruptibleCancel' is called on the @Async@. @@ -144,41 +169,51 @@ asyncUsing doFork = \action -> do -- to `withAsync` returns, so nesting many `withAsync` calls requires -- linear memory. -- -withAsync :: IO a -> (Async a -> IO b) -> IO b +withAsync :: + DebugCallStack => + IO a -> (Async a -> IO b) -> IO b withAsync = inline withAsyncUsing rawForkIO -- | Like 'withAsync' but uses 'forkOS' internally. -withAsyncBound :: IO a -> (Async a -> IO b) -> IO b +withAsyncBound :: + DebugCallStack => + IO a -> (Async a -> IO b) -> IO b withAsyncBound = withAsyncUsing forkOS -- | Like 'withAsync' but uses 'forkOn' internally. -withAsyncOn :: Int -> IO a -> (Async a -> IO b) -> IO b +withAsyncOn :: + DebugCallStack => + Int -> IO a -> (Async a -> IO b) -> IO b withAsyncOn = withAsyncUsing . rawForkOn -- | Like 'withAsync' but uses 'forkIOWithUnmask' internally. The -- child thread is passed a function that can be used to unmask -- asynchronous exceptions. -withAsyncWithUnmask - :: ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b +withAsyncWithUnmask :: + DebugCallStack => + ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b withAsyncWithUnmask actionWith = withAsyncUsing rawForkIO (actionWith unsafeUnmask) -- | Like 'withAsyncOn' but uses 'forkOnWithUnmask' internally. The -- child thread is passed a function that can be used to unmask -- asynchronous exceptions -withAsyncOnWithUnmask - :: Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b +withAsyncOnWithUnmask :: + DebugCallStack => + Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b withAsyncOnWithUnmask cpu actionWith = withAsyncUsing (rawForkOn cpu) (actionWith unsafeUnmask) -withAsyncUsing :: (IO () -> IO ThreadId) - -> IO a -> (Async a -> IO b) -> IO b +withAsyncUsing :: + DebugCallStack => + (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b -- The bracket version works, but is slow. We can do better by -- hand-coding it: withAsyncUsing doFork = \action inner -> do var <- newEmptyTMVarIO mask $ \restore -> do - t <- doFork $ try (restore action) >>= atomically . putTMVar var + let action_plus = debugLabelMe >> action + t <- doFork $ try (restore action_plus) >>= atomically . putTMVar var let a = Async t (readTMVar var) r <- restore (inner a) `catchAll` \e -> do uninterruptibleCancel a @@ -504,6 +539,7 @@ linkOnly linkOnly shouldThrow a = do me <- myThreadId void $ forkRepeat $ do + myThreadId >>= flip labelThread ("linkOnly " ++ show (asyncThreadId a) ++ " -> " ++ show me) r <- waitCatch a case r of Left e | shouldThrow e -> throwTo me (ExceptionInLinkedThread a e) @@ -554,11 +590,15 @@ isCancel e -- > withAsync right $ \b -> -- > waitEither a b -- -race :: IO a -> IO b -> IO (Either a b) +race :: + DebugCallStack => + IO a -> IO b -> IO (Either a b) -- | Like 'race', but the result is ignored. -- -race_ :: IO a -> IO b -> IO () +race_ :: + DebugCallStack => + IO a -> IO b -> IO () -- | Run two @IO@ actions concurrently, and return both results. If @@ -570,19 +610,25 @@ race_ :: IO a -> IO b -> IO () -- > withAsync left $ \a -> -- > withAsync right $ \b -> -- > waitBoth a b -concurrently :: IO a -> IO b -> IO (a,b) +concurrently :: + DebugCallStack => + IO a -> IO b -> IO (a,b) -- | Run two @IO@ actions concurrently. If both of them end with @Right@, -- return both results. If one of then ends with @Left@, interrupt the other -- action and return the @Left@. -- -concurrentlyE :: IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b)) +concurrentlyE :: + DebugCallStack => + IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b)) -- | 'concurrently', but ignore the result values -- -- @since 2.1.1 -concurrently_ :: IO a -> IO b -> IO () +concurrently_ :: + DebugCallStack => + IO a -> IO b -> IO () #define USE_ASYNC_VERSIONS 0 @@ -643,9 +689,11 @@ concurrentlyE left right = concurrently' left right (collect []) Left ex -> throwIO ex Right r -> collect (r:xs) m -concurrently' :: IO a -> IO b - -> (IO (Either SomeException (Either a b)) -> IO r) - -> IO r +concurrently' :: + DebugCallStack => + IO a -> IO b + -> (IO (Either SomeException (Either a b)) -> IO r) + -> IO r concurrently' left right collect = do done <- newEmptyMVar mask $ \restore -> do @@ -684,6 +732,7 @@ concurrently' left right collect = do -- putMVar. when (count' > 0) $ void $ forkIO $ do + myThreadId >>= flip labelThread "concurrent stop" throwTo rid AsyncCancelled throwTo lid AsyncCancelled -- ensure the children are really dead @@ -721,7 +770,9 @@ concurrently_ left right = concurrently' left right (collect 0) -- for each element of the @Traversable@, so running this on large -- inputs without care may lead to resource exhaustion (of memory, -- file descriptors, or other limited resources). -mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b) +mapConcurrently :: + DebugCallStack => + Traversable t => (a -> IO b) -> t a -> IO (t b) mapConcurrently f = runConcurrently . traverse (Concurrently . f) -- | `forConcurrently` is `mapConcurrently` with its arguments flipped @@ -729,29 +780,39 @@ mapConcurrently f = runConcurrently . traverse (Concurrently . f) -- > pages <- forConcurrently ["url1", "url2", "url3"] $ \url -> getURL url -- -- @since 2.1.0 -forConcurrently :: Traversable t => t a -> (a -> IO b) -> IO (t b) +forConcurrently :: + DebugCallStack => + Traversable t => t a -> (a -> IO b) -> IO (t b) forConcurrently = flip mapConcurrently -- | `mapConcurrently_` is `mapConcurrently` with the return value discarded; -- a concurrent equivalent of 'mapM_'. -mapConcurrently_ :: F.Foldable f => (a -> IO b) -> f a -> IO () +mapConcurrently_ :: + DebugCallStack => + F.Foldable f => (a -> IO b) -> f a -> IO () mapConcurrently_ f = runConcurrently . F.foldMap (Concurrently . void . f) -- | `forConcurrently_` is `forConcurrently` with the return value discarded; -- a concurrent equivalent of 'forM_'. -forConcurrently_ :: F.Foldable f => f a -> (a -> IO b) -> IO () +forConcurrently_ :: + DebugCallStack => + F.Foldable f => f a -> (a -> IO b) -> IO () forConcurrently_ = flip mapConcurrently_ -- | Perform the action in the given number of threads. -- -- @since 2.1.1 -replicateConcurrently :: Int -> IO a -> IO [a] +replicateConcurrently :: + DebugCallStack => + Int -> IO a -> IO [a] replicateConcurrently cnt = runConcurrently . sequenceA . replicate cnt . Concurrently -- | Same as 'replicateConcurrently', but ignore the results. -- -- @since 2.1.1 -replicateConcurrently_ :: Int -> IO a -> IO () +replicateConcurrently_ :: + DebugCallStack => + Int -> IO a -> IO () replicateConcurrently_ cnt = runConcurrently . F.fold . replicate cnt . Concurrently . void -- ----------------------------------------------------------------------------- @@ -845,14 +906,16 @@ instance (Semigroup a, Monoid a) => Monoid (ConcurrentlyE e a) where -- | Fork a thread that runs the supplied action, and if it raises an -- exception, re-runs the action. The thread terminates only when the -- action runs to completion without raising an exception. -forkRepeat :: IO a -> IO ThreadId +forkRepeat :: + DebugCallStack => + IO a -> IO ThreadId forkRepeat action = mask $ \restore -> let go = do r <- tryAll (restore action) case r of Left _ -> go _ -> return () - in forkIO go + in forkIO (debugLabelMe >> go) catchAll :: IO a -> (SomeException -> IO a) -> IO a catchAll = catch @@ -864,11 +927,29 @@ tryAll = try -- handler: saves a bit of time when we will be installing our own -- exception handler. {-# INLINE rawForkIO #-} -rawForkIO :: IO () -> IO ThreadId -rawForkIO (IO action) = IO $ \ s -> - case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #) +rawForkIO :: + DebugCallStack => + IO () -> IO ThreadId +rawForkIO action = IO $ \ s -> + case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #) + where + (IO action_plus) = debugLabelMe >> action {-# INLINE rawForkOn #-} -rawForkOn :: Int -> IO () -> IO ThreadId -rawForkOn (I# cpu) (IO action) = IO $ \ s -> - case (forkOn# cpu action s) of (# s1, tid #) -> (# s1, ThreadId tid #) +rawForkOn :: + DebugCallStack => + Int -> IO () -> IO ThreadId +rawForkOn (I# cpu) action = IO $ \ s -> + case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #) + where + (IO action_plus) = debugLabelMe >> action + +debugLabelMe :: + DebugCallStack => + IO () +debugLabelMe = +#ifdef DEBUG_AUTO_LABEL + myThreadId >>= flip labelThread (GHC.Stack.prettyCallStack callStack) +#else + pure () +#endif diff --git a/async.cabal b/async.cabal index 68286ba..9d28abb 100644 --- a/async.cabal +++ b/async.cabal @@ -63,6 +63,16 @@ source-repository head type: git location: https://github.com/simonmar/async.git +flag debug-auto-label + description: + Strictly for debugging as it might have a non-negligible overhead. + + Enabling this flag will auto-label the threads spawned by @async@. Use it to + find where are unlabelled threads spawned in your program (be it your code or + dependency code). + default: False + manual: True + library default-language: Haskell2010 other-extensions: CPP, MagicHash, RankNTypes, UnboxedTuples @@ -73,6 +83,8 @@ library build-depends: base >= 4.3 && < 4.22, hashable >= 1.1.2.0 && < 1.6, stm >= 2.2 && < 2.6 + if flag(debug-auto-label) + cpp-options: -DDEBUG_AUTO_LABEL test-suite test-async default-language: Haskell2010