@@ -317,6 +317,8 @@ def read_to_queue(self, work_queue, exit_event, error_queue, warning_queue, timi
317
317
break
318
318
except Full :
319
319
pass
320
+ else :
321
+ break
320
322
timing_queue .put (('reader_wait' , time .time () - timePoint ))
321
323
timePoint = time .time ()
322
324
@@ -810,7 +812,7 @@ def abort_import(pools, exit_event, interrupt_event):
810
812
worker .terminate ()
811
813
worker .join (.1 )
812
814
else :
813
- print ("\n Terminate signal seen, aborting" )
815
+ print ("\n Terminate signal seen, aborting gracefully " )
814
816
interrupt_event .set ()
815
817
exit_event .set ()
816
818
@@ -873,6 +875,9 @@ def import_tables(options, sources):
873
875
interrupt_event = multiprocessing .Event ()
874
876
875
877
timing_queue = SimpleQueue ()
878
+
879
+ errors = []
880
+ warnings = []
876
881
timingSums = {}
877
882
878
883
pools = []
@@ -882,6 +887,24 @@ def import_tables(options, sources):
882
887
# - setup KeyboardInterupt handler
883
888
signal .signal (signal .SIGINT , lambda a , b : abort_import (pools , exit_event , interrupt_event ))
884
889
890
+ # - queue draining
891
+ def drainQueues ():
892
+ # error_queue
893
+ while not error_queue .empty ():
894
+ errors .append (error_queue .get ())
895
+
896
+ # warning_queue
897
+ while not warning_queue .empty ():
898
+ warnings .append (warning_queue .get ())
899
+
900
+ # timing_queue
901
+ while not timing_queue .empty ():
902
+ key , value = timing_queue .get ()
903
+ if not key in timingSums :
904
+ timingSums [key ] = value
905
+ else :
906
+ timingSums [key ] += value
907
+
885
908
# - setup dbs and tables
886
909
887
910
# create missing dbs
@@ -942,7 +965,6 @@ def import_tables(options, sources):
942
965
# - read the tables options.clients at a time
943
966
readers = []
944
967
pools .append (readers )
945
- filesLeft = len (sources )
946
968
fileIter = iter (sources )
947
969
try :
948
970
while not exit_event .is_set ():
@@ -959,40 +981,36 @@ def import_tables(options, sources):
959
981
)
960
982
readers .append (reader )
961
983
reader .start ()
962
- filesLeft -= 1
963
984
964
- # drain the timing queue
965
- while not timing_queue .empty ():
966
- key , value = timing_queue .get ()
967
- if not key in timingSums :
968
- timingSums [key ] = value
969
- else :
970
- timingSums [key ] += value
985
+ # drain the queues
986
+ drainQueues ()
971
987
972
988
# reap completed tasks
973
989
for reader in readers [:]:
974
990
if not reader .is_alive ():
975
991
readers .remove (reader )
976
- if filesLeft and len (readers ) == options .clients :
992
+ if len (readers ) == options .clients :
977
993
time .sleep (.05 )
978
994
except StopIteration :
979
995
pass # ran out of new tables
980
996
981
997
# - wait for the last batch of readers to complete
982
998
while readers :
983
- # drain the timing queue
984
- while not timing_queue .empty ():
985
- key , value = timing_queue .get ()
986
- if not key in timingSums :
987
- timingSums [key ] = value
988
- else :
989
- timingSums [key ] += value
999
+ # drain the queues
1000
+ drainQueues ()
1001
+
1002
+ # drain the work queue to prevent readers from stalling on exit
1003
+ if exit_event .is_set ():
1004
+ try :
1005
+ while True :
1006
+ work_queue .get (timeout = 0.1 )
1007
+ except Empty : pass
990
1008
991
1009
# watch the readers
992
1010
for reader in readers [:]:
993
- if exit_event . is_set () :
994
- reader .terminate () # kill it abruptly
995
- reader . join ( .1 )
1011
+ try :
1012
+ reader .join ( .1 )
1013
+ except Exception : pass
996
1014
if not reader .is_alive ():
997
1015
readers .remove (reader )
998
1016
@@ -1018,7 +1036,7 @@ def import_tables(options, sources):
1018
1036
try :
1019
1037
writer .terminate ()
1020
1038
except Exception : pass
1021
-
1039
+
1022
1040
# - stop the progress bar
1023
1041
if progressBar :
1024
1042
done_event .set ()
@@ -1028,10 +1046,8 @@ def import_tables(options, sources):
1028
1046
if progressBar .is_alive ():
1029
1047
progressBar .terminate ()
1030
1048
1031
- # - drain the error_queue
1032
- errors = []
1033
- while not error_queue .empty ():
1034
- errors .append (error_queue .get ())
1049
+ # - drain queues
1050
+ drainQueues ()
1035
1051
1036
1052
# - final reporting
1037
1053
if not options .quiet :
@@ -1054,33 +1070,35 @@ def import_tables(options, sources):
1054
1070
finally :
1055
1071
signal .signal (signal .SIGINT , signal .SIG_DFL )
1056
1072
1073
+ drainQueues ()
1074
+
1075
+ for error in errors :
1076
+ print ("%s" % error .message , file = sys .stderr )
1077
+ if options .debug and error .traceback :
1078
+ print (" Traceback:\n %s" % error .traceback , file = sys .stderr )
1079
+ if len (error .file ) == 4 :
1080
+ print (" In file: %s" % error .file , file = sys .stderr )
1081
+
1082
+ for warning in warnings :
1083
+ print ("%s" % warning [1 ], file = sys .stderr )
1084
+ if options .debug :
1085
+ print ("%s traceback: %s" % (warning [0 ].__name__ , warning [2 ]), file = sys .stderr )
1086
+ if len (warning ) == 4 :
1087
+ print ("In file: %s" % warning [3 ], file = sys .stderr )
1088
+
1057
1089
if interrupt_event .is_set ():
1058
1090
raise RuntimeError ("Interrupted" )
1059
-
1060
- if len (errors ) != 0 :
1061
- for error in errors :
1062
- print ("%s" % error .message , file = sys .stderr )
1063
- if options .debug and error .traceback :
1064
- print (" Traceback:\n %s" % error .traceback , file = sys .stderr )
1065
- if len (error .file ) == 4 :
1066
- print (" In file: %s" % error .file , file = sys .stderr )
1091
+ if errors :
1067
1092
raise RuntimeError ("Errors occurred during import" )
1068
-
1069
- if not warning_queue .empty ():
1070
- while not warning_queue .empty ():
1071
- warning = warning_queue .get ()
1072
- print ("%s" % warning [1 ], file = sys .stderr )
1073
- if options .debug :
1074
- print ("%s traceback: %s" % (warning [0 ].__name__ , warning [2 ]), file = sys .stderr )
1075
- if len (warning ) == 4 :
1076
- print ("In file: %s" % warning [3 ], file = sys .stderr )
1093
+ if warnings :
1077
1094
raise RuntimeError ("Warnings occurred during import" )
1078
1095
1079
- def import_directory (options ):
1096
+ def import_directory (options , files_ignored = None ):
1080
1097
# Scan for all files, make sure no duplicated tables with different formats
1081
1098
dbs = False
1082
1099
sources = {} # (db, table) => {file:, format:, db:, table:, info:}
1083
- files_ignored = []
1100
+ if files_ignored is None :
1101
+ files_ignored = []
1084
1102
for root , dirs , files in os .walk (options .directory ):
1085
1103
if not dbs :
1086
1104
files_ignored .extend ([os .path .join (root , f ) for f in files ])
@@ -1104,7 +1122,7 @@ def import_directory(options):
1104
1122
table , ext = os .path .splitext (filename )
1105
1123
table = os .path .basename (table )
1106
1124
1107
- if ext not in [ ".json" , ".csv" , ".info" ] :
1125
+ if ext not in ( ".json" , ".csv" , ".info" ) :
1108
1126
files_ignored .append (os .path .join (root , filename ))
1109
1127
elif ext == ".info" :
1110
1128
pass # Info files are included based on the data files
@@ -1137,7 +1155,13 @@ def import_directory(options):
1137
1155
except OSError :
1138
1156
files_ignored .append (os .path .join (root , f ))
1139
1157
1140
- tableType = JsonSourceFile if ext == ".json" else CsvSourceFile
1158
+ tableType = None
1159
+ if ext == ".json" :
1160
+ tableType = JsonSourceFile
1161
+ elif ext == ".csv" :
1162
+ tableType = CsvSourceFile
1163
+ else :
1164
+ raise Exception ("The table type is not recognised: %s" % ext )
1141
1165
sources [(db , table )] = tableType (
1142
1166
source = path ,
1143
1167
db = db , table = table ,
0 commit comments