@@ -120,6 +120,9 @@ struct io_wq {
120
120
refcount_t refs ;
121
121
struct completion done ;
122
122
123
+ atomic_t worker_refs ;
124
+ struct completion worker_done ;
125
+
123
126
struct hlist_node cpuhp_node ;
124
127
125
128
pid_t task_pid ;
@@ -189,7 +192,8 @@ static void io_worker_exit(struct io_worker *worker)
189
192
raw_spin_unlock_irq (& wqe -> lock );
190
193
191
194
kfree_rcu (worker , rcu );
192
- io_wq_put (wqe -> wq );
195
+ if (atomic_dec_and_test (& wqe -> wq -> worker_refs ))
196
+ complete (& wqe -> wq -> worker_done );
193
197
}
194
198
195
199
static inline bool io_wqe_run_queue (struct io_wqe * wqe )
@@ -648,14 +652,15 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
648
652
init_completion (& worker -> ref_done );
649
653
init_completion (& worker -> started );
650
654
651
- refcount_inc (& wq -> refs );
655
+ atomic_inc (& wq -> worker_refs );
652
656
653
657
if (index == IO_WQ_ACCT_BOUND )
654
658
pid = io_wq_fork_thread (task_thread_bound , worker );
655
659
else
656
660
pid = io_wq_fork_thread (task_thread_unbound , worker );
657
661
if (pid < 0 ) {
658
- io_wq_put (wq );
662
+ if (atomic_dec_and_test (& wq -> worker_refs ))
663
+ complete (& wq -> worker_done );
659
664
kfree (worker );
660
665
return false;
661
666
}
@@ -736,6 +741,7 @@ static int io_wq_manager(void *data)
736
741
{
737
742
struct io_wq * wq = data ;
738
743
char buf [TASK_COMM_LEN ];
744
+ int node ;
739
745
740
746
sprintf (buf , "iou-mgr-%d" , wq -> task_pid );
741
747
set_task_comm (current , buf );
@@ -753,6 +759,15 @@ static int io_wq_manager(void *data)
753
759
} while (!test_bit (IO_WQ_BIT_EXIT , & wq -> state ));
754
760
755
761
io_wq_check_workers (wq );
762
+
763
+ rcu_read_lock ();
764
+ for_each_node (node )
765
+ io_wq_for_each_worker (wq -> wqes [node ], io_wq_worker_wake , NULL );
766
+ rcu_read_unlock ();
767
+
768
+ /* we might not ever have created any workers */
769
+ if (atomic_read (& wq -> worker_refs ))
770
+ wait_for_completion (& wq -> worker_done );
756
771
wq -> manager = NULL ;
757
772
io_wq_put (wq );
758
773
do_exit (0 );
@@ -796,6 +811,7 @@ static int io_wq_fork_manager(struct io_wq *wq)
796
811
if (wq -> manager )
797
812
return 0 ;
798
813
814
+ reinit_completion (& wq -> worker_done );
799
815
clear_bit (IO_WQ_BIT_EXIT , & wq -> state );
800
816
refcount_inc (& wq -> refs );
801
817
current -> flags |= PF_IO_WORKER ;
@@ -1050,6 +1066,9 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1050
1066
init_completion (& wq -> done );
1051
1067
refcount_set (& wq -> refs , 1 );
1052
1068
1069
+ init_completion (& wq -> worker_done );
1070
+ atomic_set (& wq -> worker_refs , 0 );
1071
+
1053
1072
ret = io_wq_fork_manager (wq );
1054
1073
if (!ret )
1055
1074
return wq ;
@@ -1077,11 +1096,6 @@ static void io_wq_destroy(struct io_wq *wq)
1077
1096
if (wq -> manager )
1078
1097
wake_up_process (wq -> manager );
1079
1098
1080
- rcu_read_lock ();
1081
- for_each_node (node )
1082
- io_wq_for_each_worker (wq -> wqes [node ], io_wq_worker_wake , NULL );
1083
- rcu_read_unlock ();
1084
-
1085
1099
spin_lock_irq (& wq -> hash -> wait .lock );
1086
1100
for_each_node (node ) {
1087
1101
struct io_wqe * wqe = wq -> wqes [node ];
0 commit comments