@@ -120,6 +120,9 @@ struct io_wq {
120120 refcount_t refs ;
121121 struct completion done ;
122122
123+ atomic_t worker_refs ;
124+ struct completion worker_done ;
125+
123126 struct hlist_node cpuhp_node ;
124127
125128 pid_t task_pid ;
@@ -189,7 +192,8 @@ static void io_worker_exit(struct io_worker *worker)
189192 raw_spin_unlock_irq (& wqe -> lock );
190193
191194 kfree_rcu (worker , rcu );
192- io_wq_put (wqe -> wq );
195+ if (atomic_dec_and_test (& wqe -> wq -> worker_refs ))
196+ complete (& wqe -> wq -> worker_done );
193197}
194198
195199static inline bool io_wqe_run_queue (struct io_wqe * wqe )
@@ -648,14 +652,15 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
648652 init_completion (& worker -> ref_done );
649653 init_completion (& worker -> started );
650654
651- refcount_inc (& wq -> refs );
655+ atomic_inc (& wq -> worker_refs );
652656
653657 if (index == IO_WQ_ACCT_BOUND )
654658 pid = io_wq_fork_thread (task_thread_bound , worker );
655659 else
656660 pid = io_wq_fork_thread (task_thread_unbound , worker );
657661 if (pid < 0 ) {
658- io_wq_put (wq );
662+ if (atomic_dec_and_test (& wq -> worker_refs ))
663+ complete (& wq -> worker_done );
659664 kfree (worker );
660665 return false;
661666 }
@@ -736,6 +741,7 @@ static int io_wq_manager(void *data)
736741{
737742 struct io_wq * wq = data ;
738743 char buf [TASK_COMM_LEN ];
744+ int node ;
739745
740746 sprintf (buf , "iou-mgr-%d" , wq -> task_pid );
741747 set_task_comm (current , buf );
@@ -753,6 +759,15 @@ static int io_wq_manager(void *data)
753759 } while (!test_bit (IO_WQ_BIT_EXIT , & wq -> state ));
754760
755761 io_wq_check_workers (wq );
762+
763+ rcu_read_lock ();
764+ for_each_node (node )
765+ io_wq_for_each_worker (wq -> wqes [node ], io_wq_worker_wake , NULL );
766+ rcu_read_unlock ();
767+
768+ /* we might not ever have created any workers */
769+ if (atomic_read (& wq -> worker_refs ))
770+ wait_for_completion (& wq -> worker_done );
756771 wq -> manager = NULL ;
757772 io_wq_put (wq );
758773 do_exit (0 );
@@ -796,6 +811,7 @@ static int io_wq_fork_manager(struct io_wq *wq)
796811 if (wq -> manager )
797812 return 0 ;
798813
814+ reinit_completion (& wq -> worker_done );
799815 clear_bit (IO_WQ_BIT_EXIT , & wq -> state );
800816 refcount_inc (& wq -> refs );
801817 current -> flags |= PF_IO_WORKER ;
@@ -1050,6 +1066,9 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
10501066 init_completion (& wq -> done );
10511067 refcount_set (& wq -> refs , 1 );
10521068
1069+ init_completion (& wq -> worker_done );
1070+ atomic_set (& wq -> worker_refs , 0 );
1071+
10531072 ret = io_wq_fork_manager (wq );
10541073 if (!ret )
10551074 return wq ;
@@ -1077,11 +1096,6 @@ static void io_wq_destroy(struct io_wq *wq)
10771096 if (wq -> manager )
10781097 wake_up_process (wq -> manager );
10791098
1080- rcu_read_lock ();
1081- for_each_node (node )
1082- io_wq_for_each_worker (wq -> wqes [node ], io_wq_worker_wake , NULL );
1083- rcu_read_unlock ();
1084-
10851099 spin_lock_irq (& wq -> hash -> wait .lock );
10861100 for_each_node (node ) {
10871101 struct io_wqe * wqe = wq -> wqes [node ];
0 commit comments