@@ -925,6 +925,227 @@ JL_DLLEXPORT int jl_alignment(size_t sz)
925
925
return jl_gc_alignment (sz );
926
926
}
927
927
928
+ // Heartbeat mechanism for Julia's task scheduler
929
+ // ---
930
+ // Start a thread that does not participate in running Julia's tasks. This
931
+ // thread simply sleeps until the heartbeat mechanism is enabled. When
932
+ // enabled, the heartbeat thread enters a loop in which it blocks waiting
933
+ // for the specified heartbeat interval. If, within that interval,
934
+ // `jl_heartbeat()` is *not* called at least once, then the thread calls
935
+ // `jl_print_task_backtraces(0)`.
936
+
937
+ #ifdef JL_HEARTBEAT_THREAD
938
+
939
+ #include <time.h>
940
+
941
+ volatile int heartbeat_enabled ;
942
+ uv_sem_t heartbeat_on_sem , // jl_heartbeat_enable -> thread
943
+ heartbeat_off_sem ; // thread -> jl_heartbeat_enable
944
+ int heartbeat_interval_s ,
945
+ n_loss_reports ,
946
+ reset_reporting_s ;
947
+ int last_report_s , report_interval_s , n_reported ;
948
+ _Atomic(int ) heartbeats ;
949
+
950
+ JL_DLLEXPORT void jl_print_task_backtraces (int show_done ) JL_NOTSAFEPOINT ;
951
+ void jl_heartbeat_threadfun (void * arg );
952
+
953
+ // start the heartbeat thread with heartbeats disabled
954
+ void jl_init_heartbeat (void )
955
+ {
956
+ uv_thread_t uvtid ;
957
+ heartbeat_enabled = 0 ;
958
+ uv_sem_init (& heartbeat_on_sem , 0 );
959
+ uv_sem_init (& heartbeat_off_sem , 0 );
960
+ uv_thread_create (& uvtid , jl_heartbeat_threadfun , NULL );
961
+ uv_thread_detach (& uvtid );
962
+ }
963
+
964
+ // enable/disable heartbeats
965
+ // heartbeat_s: interval within which jl_heartbeat() must be called
966
+ // n_reports: for one heartbeat loss interval, how many times to report
967
+ // reset_reporting_after_s: how long to wait after a heartbeat loss
968
+ // interval and a return to steady heartbeats, before resetting
969
+ // reporting behavior
970
+ //
971
+ // When disabling heartbeats, the heartbeat thread must wake up,
972
+ // find out that heartbeats are now diabled, and reset. For now, we
973
+ // handle this by preventing re-enabling of heartbeats until this
974
+ // completes.
975
+ JL_DLLEXPORT int jl_heartbeat_enable (int heartbeat_s , int n_reports ,
976
+ int reset_reporting_after_s )
977
+ {
978
+ if (heartbeat_s <= 0 ) {
979
+ heartbeat_enabled = 0 ;
980
+ heartbeat_interval_s = n_loss_reports = reset_reporting_s = 0 ;
981
+ }
982
+ else {
983
+ // must disable before enabling
984
+ if (heartbeat_enabled ) {
985
+ return -1 ;
986
+ }
987
+ // heartbeat thread must be ready
988
+ if (uv_sem_trywait (& heartbeat_off_sem ) != 0 ) {
989
+ return -1 ;
990
+ }
991
+
992
+ jl_atomic_store_relaxed (& heartbeats , 0 );
993
+ heartbeat_interval_s = heartbeat_s ;
994
+ n_loss_reports = n_reports ;
995
+ reset_reporting_s = reset_reporting_after_s ;
996
+ last_report_s = 0 ;
997
+ report_interval_s = heartbeat_interval_s ;
998
+ heartbeat_enabled = 1 ;
999
+ uv_sem_post (& heartbeat_on_sem ); // wake the heartbeat thread
1000
+ }
1001
+ return 0 ;
1002
+ }
1003
+
1004
+ // heartbeat
1005
+ JL_DLLEXPORT void jl_heartbeat (void )
1006
+ {
1007
+ jl_atomic_fetch_add (& heartbeats , 1 );
1008
+ }
1009
+
1010
+ // sleep the thread for the specified interval
1011
+ void sleep_for (int secs , int nsecs )
1012
+ {
1013
+ struct timespec rqtp , rmtp ;
1014
+ rqtp .tv_sec = secs ;
1015
+ rqtp .tv_nsec = nsecs ;
1016
+ rmtp .tv_sec = 0 ;
1017
+ rmtp .tv_nsec = 0 ;
1018
+ for (; ;) {
1019
+ // this suspends the thread so we aren't using CPU
1020
+ if (nanosleep (& rqtp , & rmtp ) == 0 ) {
1021
+ return ;
1022
+ }
1023
+ // TODO: else if (errno == EINTR)
1024
+ // this could be SIGTERM and we should shutdown but how to find out?
1025
+ rqtp = rmtp ;
1026
+ }
1027
+ }
1028
+
1029
+ // check for heartbeats and maybe report loss
1030
+ uint8_t check_heartbeats (uint8_t gc_state )
1031
+ {
1032
+ int hb = jl_atomic_exchange (& heartbeats , 0 );
1033
+ uint64_t curr_s = jl_hrtime () / 1e9 ;
1034
+
1035
+ if (hb <= 0 ) {
1036
+ // we didn't get a heartbeat in the last interval; should we report?
1037
+ if (n_reported < n_loss_reports &&
1038
+ curr_s - last_report_s >= report_interval_s ) {
1039
+ jl_task_t * ct = jl_current_task ;
1040
+ jl_ptls_t ptls = ct -> ptls ;
1041
+
1042
+ // exit GC-safe region to report then re-enter
1043
+ jl_gc_safe_leave (ptls , gc_state );
1044
+ jl_safe_printf ("==== heartbeat loss ====\n" );
1045
+ jl_print_task_backtraces (0 );
1046
+ gc_state = jl_gc_safe_enter (ptls );
1047
+
1048
+ // we've reported
1049
+ n_reported ++ ;
1050
+
1051
+ // record the reporting time _after_ the report
1052
+ last_report_s = jl_hrtime () / 1e9 ;
1053
+
1054
+ // double the reporting interval up to a maximum
1055
+ if (report_interval_s < 60 * heartbeat_interval_s ) {
1056
+ report_interval_s *= 2 ;
1057
+ }
1058
+ }
1059
+ // no heartbeats, don't change reporting state
1060
+ return gc_state ;
1061
+ }
1062
+ else {
1063
+ // we got a heartbeat; reset the report count
1064
+ n_reported = 0 ;
1065
+ }
1066
+
1067
+ // reset the reporting interval only once we're steadily getting
1068
+ // heartbeats for the requested reset interval
1069
+ if (curr_s - reset_reporting_s > last_report_s ) {
1070
+ report_interval_s = heartbeat_interval_s ;
1071
+ }
1072
+
1073
+ return gc_state ;
1074
+ }
1075
+
1076
+ // heartbeat thread function
1077
+ void jl_heartbeat_threadfun (void * arg )
1078
+ {
1079
+ int s , ns = 1e9 - 1 , rs ;
1080
+ uint64_t t0 , tchb ;
1081
+
1082
+ // We need a TLS because backtraces are accumulated into ptls->bt_size
1083
+ // and ptls->bt_data, so we need to call jl_adopt_thread().
1084
+ jl_adopt_thread ();
1085
+ jl_task_t * ct = jl_current_task ;
1086
+ jl_ptls_t ptls = ct -> ptls ;
1087
+
1088
+ // Don't hold up GC, this thread doesn't participate.
1089
+ uint8_t gc_state = jl_gc_safe_enter (ptls );
1090
+
1091
+ for (;;) {
1092
+ if (!heartbeat_enabled ) {
1093
+ // post the off semaphore to indicate we're ready to enable
1094
+ uv_sem_post (& heartbeat_off_sem );
1095
+
1096
+ // sleep the thread here; this semaphore is posted in
1097
+ // jl_heartbeat_enable()
1098
+ uv_sem_wait (& heartbeat_on_sem );
1099
+
1100
+ // Set the sleep duration.
1101
+ s = heartbeat_interval_s - 1 ;
1102
+ ns = 1e9 - 1 ;
1103
+ continue ;
1104
+ }
1105
+
1106
+ // heartbeat is enabled; sleep, waiting for the desired interval
1107
+ sleep_for (s , ns );
1108
+
1109
+ // if heartbeats were turned off while we were sleeping, reset
1110
+ if (!heartbeat_enabled ) {
1111
+ continue ;
1112
+ }
1113
+
1114
+ // check if any heartbeats have happened, report as appropriate
1115
+ t0 = jl_hrtime ();
1116
+ gc_state = check_heartbeats (gc_state );
1117
+ tchb = jl_hrtime () - t0 ;
1118
+
1119
+ // adjust the next sleep duration based on how long the heartbeat
1120
+ // check took
1121
+ rs = 1 ;
1122
+ while (tchb > 1e9 ) {
1123
+ rs ++ ;
1124
+ tchb -= 1e9 ;
1125
+ }
1126
+ s = heartbeat_interval_s - rs ;
1127
+ ns = 1e9 - tchb ;
1128
+ }
1129
+ }
1130
+
1131
+ #else // !JL_HEARTBEAT_THREAD
1132
+
1133
+ void jl_init_heartbeat (void )
1134
+ {
1135
+ }
1136
+
1137
+ JL_DLLEXPORT int jl_heartbeat_enable (int heartbeat_s , int n_reports ,
1138
+ int reset_reporting_after_s )
1139
+ {
1140
+ return -1 ;
1141
+ }
1142
+
1143
+ JL_DLLEXPORT void jl_heartbeat (void )
1144
+ {
1145
+ }
1146
+
1147
+ #endif // JL_HEARTBEAT_THREAD
1148
+
928
1149
#ifdef __cplusplus
929
1150
}
930
1151
#endif
0 commit comments