145
145
146
146
from collections .abc import MutableMapping
147
147
from collections import ChainMap as _ChainMap
148
+ import contextlib
149
+ from dataclasses import dataclass , field
148
150
import functools
149
151
import io
150
152
import itertools
151
153
import os
152
154
import re
153
155
import sys
156
+ from typing import Iterable
154
157
155
158
__all__ = ("NoSectionError" , "DuplicateOptionError" , "DuplicateSectionError" ,
156
159
"NoOptionError" , "InterpolationError" , "InterpolationDepthError" ,
@@ -302,15 +305,33 @@ def __init__(self, option, section, rawval):
302
305
class ParsingError (Error ):
303
306
"""Raised when a configuration file does not follow legal syntax."""
304
307
305
- def __init__ (self , source ):
308
+ def __init__ (self , source , * args ):
306
309
super ().__init__ (f'Source contains parsing errors: { source !r} ' )
307
310
self .source = source
308
311
self .errors = []
309
312
self .args = (source , )
313
+ if args :
314
+ self .append (* args )
310
315
311
316
def append (self , lineno , line ):
312
317
self .errors .append ((lineno , line ))
313
- self .message += '\n \t [line %2d]: %s' % (lineno , line )
318
+ self .message += '\n \t [line %2d]: %s' % (lineno , repr (line ))
319
+
320
+ def combine (self , others ):
321
+ for other in others :
322
+ for error in other .errors :
323
+ self .append (* error )
324
+ return self
325
+
326
+ @staticmethod
327
+ def _raise_all (exceptions : Iterable ['ParsingError' ]):
328
+ """
329
+ Combine any number of ParsingErrors into one and raise it.
330
+ """
331
+ exceptions = iter (exceptions )
332
+ with contextlib .suppress (StopIteration ):
333
+ raise next (exceptions ).combine (exceptions )
334
+
314
335
315
336
316
337
class MissingSectionHeaderError (ParsingError ):
@@ -517,6 +538,55 @@ def _interpolate_some(self, parser, option, accum, rest, section, map,
517
538
"found: %r" % (rest ,))
518
539
519
540
541
+ @dataclass
542
+ class _ReadState :
543
+ elements_added : set [str ] = field (default_factory = set )
544
+ cursect : dict [str , str ] | None = None
545
+ sectname : str | None = None
546
+ optname : str | None = None
547
+ lineno : int = 0
548
+ indent_level : int = 0
549
+ errors : list [ParsingError ] = field (default_factory = list )
550
+
551
+
552
+ @dataclass
553
+ class _Prefixes :
554
+ full : Iterable [str ]
555
+ inline : Iterable [str ]
556
+
557
+
558
+ class _Line (str ):
559
+
560
+ def __new__ (cls , val , * args , ** kwargs ):
561
+ return super ().__new__ (cls , val )
562
+
563
+ def __init__ (self , val , prefixes : _Prefixes ):
564
+ self .prefixes = prefixes
565
+
566
+ @functools .cached_property
567
+ def clean (self ):
568
+ return self ._strip_full () and self ._strip_inline ()
569
+
570
+ @property
571
+ def has_comments (self ):
572
+ return self .strip () != self .clean
573
+
574
+ def _strip_inline (self ):
575
+ """
576
+ Search for the earliest prefix at the beginning of the line or following a space.
577
+ """
578
+ matcher = re .compile (
579
+ '|' .join (fr'(^|\s)({ re .escape (prefix )} )' for prefix in self .prefixes .inline )
580
+ # match nothing if no prefixes
581
+ or '(?!)'
582
+ )
583
+ match = matcher .search (self )
584
+ return self [:match .start () if match else None ].strip ()
585
+
586
+ def _strip_full (self ):
587
+ return '' if any (map (self .strip ().startswith , self .prefixes .full )) else True
588
+
589
+
520
590
class RawConfigParser (MutableMapping ):
521
591
"""ConfigParser that does not do interpolation."""
522
592
@@ -583,8 +653,10 @@ def __init__(self, defaults=None, dict_type=_default_dict,
583
653
else :
584
654
self ._optcre = re .compile (self ._OPT_TMPL .format (delim = d ),
585
655
re .VERBOSE )
586
- self ._comment_prefixes = tuple (comment_prefixes or ())
587
- self ._inline_comment_prefixes = tuple (inline_comment_prefixes or ())
656
+ self ._prefixes = _Prefixes (
657
+ full = tuple (comment_prefixes or ()),
658
+ inline = tuple (inline_comment_prefixes or ()),
659
+ )
588
660
self ._strict = strict
589
661
self ._allow_no_value = allow_no_value
590
662
self ._empty_lines_in_values = empty_lines_in_values
@@ -975,147 +1047,117 @@ def _read(self, fp, fpname):
975
1047
in an otherwise empty line or may be entered in lines holding values or
976
1048
section names. Please note that comments get stripped off when reading configuration files.
977
1049
"""
978
- elements_added = set ()
979
- cursect = None # None, or a dictionary
980
- sectname = None
981
- optname = None
982
- lineno = 0
983
- indent_level = 0
984
- e = None # None, or an exception
985
1050
986
1051
try :
987
- for lineno , line in enumerate (fp , start = 1 ):
988
- comment_start = sys .maxsize
989
- # strip inline comments
990
- inline_prefixes = {p : - 1 for p in self ._inline_comment_prefixes }
991
- while comment_start == sys .maxsize and inline_prefixes :
992
- next_prefixes = {}
993
- for prefix , index in inline_prefixes .items ():
994
- index = line .find (prefix , index + 1 )
995
- if index == - 1 :
996
- continue
997
- next_prefixes [prefix ] = index
998
- if index == 0 or (index > 0 and line [index - 1 ].isspace ()):
999
- comment_start = min (comment_start , index )
1000
- inline_prefixes = next_prefixes
1001
- # strip full line comments
1002
- for prefix in self ._comment_prefixes :
1003
- if line .strip ().startswith (prefix ):
1004
- comment_start = 0
1005
- break
1006
- if comment_start == sys .maxsize :
1007
- comment_start = None
1008
- value = line [:comment_start ].strip ()
1009
- if not value :
1010
- if self ._empty_lines_in_values :
1011
- # add empty line to the value, but only if there was no
1012
- # comment on the line
1013
- if (comment_start is None and
1014
- cursect is not None and
1015
- optname and
1016
- cursect [optname ] is not None ):
1017
- cursect [optname ].append ('' ) # newlines added at join
1018
- else :
1019
- # empty line marks end of value
1020
- indent_level = sys .maxsize
1021
- continue
1022
- # continuation line?
1023
- first_nonspace = self .NONSPACECRE .search (line )
1024
- cur_indent_level = first_nonspace .start () if first_nonspace else 0
1025
- if (cursect is not None and optname and
1026
- cur_indent_level > indent_level ):
1027
- if cursect [optname ] is None :
1028
- raise MultilineContinuationError (fpname , lineno , line )
1029
- cursect [optname ].append (value )
1030
- # a section header or option header?
1031
- else :
1032
- if self ._allow_unnamed_section and cursect is None :
1033
- sectname = UNNAMED_SECTION
1034
- cursect = self ._dict ()
1035
- self ._sections [sectname ] = cursect
1036
- self ._proxies [sectname ] = SectionProxy (self , sectname )
1037
- elements_added .add (sectname )
1038
-
1039
- indent_level = cur_indent_level
1040
- # is it a section header?
1041
- mo = self .SECTCRE .match (value )
1042
- if mo :
1043
- sectname = mo .group ('header' )
1044
- if sectname in self ._sections :
1045
- if self ._strict and sectname in elements_added :
1046
- raise DuplicateSectionError (sectname , fpname ,
1047
- lineno )
1048
- cursect = self ._sections [sectname ]
1049
- elements_added .add (sectname )
1050
- elif sectname == self .default_section :
1051
- cursect = self ._defaults
1052
- else :
1053
- cursect = self ._dict ()
1054
- self ._sections [sectname ] = cursect
1055
- self ._proxies [sectname ] = SectionProxy (self , sectname )
1056
- elements_added .add (sectname )
1057
- # So sections can't start with a continuation line
1058
- optname = None
1059
- # no section header?
1060
- elif cursect is None :
1061
- raise MissingSectionHeaderError (fpname , lineno , line )
1062
- # an option line?
1063
- else :
1064
- indent_level = cur_indent_level
1065
- # is it a section header?
1066
- mo = self .SECTCRE .match (value )
1067
- if mo :
1068
- sectname = mo .group ('header' )
1069
- if sectname in self ._sections :
1070
- if self ._strict and sectname in elements_added :
1071
- raise DuplicateSectionError (sectname , fpname ,
1072
- lineno )
1073
- cursect = self ._sections [sectname ]
1074
- elements_added .add (sectname )
1075
- elif sectname == self .default_section :
1076
- cursect = self ._defaults
1077
- else :
1078
- cursect = self ._dict ()
1079
- self ._sections [sectname ] = cursect
1080
- self ._proxies [sectname ] = SectionProxy (self , sectname )
1081
- elements_added .add (sectname )
1082
- # So sections can't start with a continuation line
1083
- optname = None
1084
- # no section header in the file?
1085
- elif cursect is None :
1086
- raise MissingSectionHeaderError (fpname , lineno , line )
1087
- # an option line?
1088
- else :
1089
- mo = self ._optcre .match (value )
1090
- if mo :
1091
- optname , vi , optval = mo .group ('option' , 'vi' , 'value' )
1092
- if not optname :
1093
- e = self ._handle_error (e , fpname , lineno , line )
1094
- optname = self .optionxform (optname .rstrip ())
1095
- if (self ._strict and
1096
- (sectname , optname ) in elements_added ):
1097
- raise DuplicateOptionError (sectname , optname ,
1098
- fpname , lineno )
1099
- elements_added .add ((sectname , optname ))
1100
- # This check is fine because the OPTCRE cannot
1101
- # match if it would set optval to None
1102
- if optval is not None :
1103
- optval = optval .strip ()
1104
- cursect [optname ] = [optval ]
1105
- else :
1106
- # valueless option handling
1107
- cursect [optname ] = None
1108
- else :
1109
- # a non-fatal parsing error occurred. set up the
1110
- # exception but keep going. the exception will be
1111
- # raised at the end of the file and will contain a
1112
- # list of all bogus lines
1113
- e = self ._handle_error (e , fpname , lineno , line )
1052
+ ParsingError ._raise_all (self ._read_inner (fp , fpname ))
1114
1053
finally :
1115
1054
self ._join_multiline_values ()
1116
- # if any parsing errors occurred, raise an exception
1117
- if e :
1118
- raise e
1055
+
1056
+ def _read_inner (self , fp , fpname ):
1057
+ st = _ReadState ()
1058
+
1059
+ Line = functools .partial (_Line , prefixes = self ._prefixes )
1060
+ for st .lineno , line in enumerate (map (Line , fp ), start = 1 ):
1061
+ if not line .clean :
1062
+ if self ._empty_lines_in_values :
1063
+ # add empty line to the value, but only if there was no
1064
+ # comment on the line
1065
+ if (not line .has_comments and
1066
+ st .cursect is not None and
1067
+ st .optname and
1068
+ st .cursect [st .optname ] is not None ):
1069
+ st .cursect [st .optname ].append ('' ) # newlines added at join
1070
+ else :
1071
+ # empty line marks end of value
1072
+ st .indent_level = sys .maxsize
1073
+ continue
1074
+
1075
+ first_nonspace = self .NONSPACECRE .search (line )
1076
+ st .cur_indent_level = first_nonspace .start () if first_nonspace else 0
1077
+
1078
+ if self ._handle_continuation_line (st , line , fpname ):
1079
+ continue
1080
+
1081
+ self ._handle_rest (st , line , fpname )
1082
+
1083
+ return st .errors
1084
+
1085
+ def _handle_continuation_line (self , st , line , fpname ):
1086
+ # continuation line?
1087
+ is_continue = (st .cursect is not None and st .optname and
1088
+ st .cur_indent_level > st .indent_level )
1089
+ if is_continue :
1090
+ if st .cursect [st .optname ] is None :
1091
+ raise MultilineContinuationError (fpname , st .lineno , line )
1092
+ st .cursect [st .optname ].append (line .clean )
1093
+ return is_continue
1094
+
1095
+ def _handle_rest (self , st , line , fpname ):
1096
+ # a section header or option header?
1097
+ if self ._allow_unnamed_section and st .cursect is None :
1098
+ st .sectname = UNNAMED_SECTION
1099
+ st .cursect = self ._dict ()
1100
+ self ._sections [st .sectname ] = st .cursect
1101
+ self ._proxies [st .sectname ] = SectionProxy (self , st .sectname )
1102
+ st .elements_added .add (st .sectname )
1103
+
1104
+ st .indent_level = st .cur_indent_level
1105
+ # is it a section header?
1106
+ mo = self .SECTCRE .match (line .clean )
1107
+
1108
+ if not mo and st .cursect is None :
1109
+ raise MissingSectionHeaderError (fpname , st .lineno , line )
1110
+
1111
+ self ._handle_header (st , mo , fpname ) if mo else self ._handle_option (st , line , fpname )
1112
+
1113
+ def _handle_header (self , st , mo , fpname ):
1114
+ st .sectname = mo .group ('header' )
1115
+ if st .sectname in self ._sections :
1116
+ if self ._strict and st .sectname in st .elements_added :
1117
+ raise DuplicateSectionError (st .sectname , fpname ,
1118
+ st .lineno )
1119
+ st .cursect = self ._sections [st .sectname ]
1120
+ st .elements_added .add (st .sectname )
1121
+ elif st .sectname == self .default_section :
1122
+ st .cursect = self ._defaults
1123
+ else :
1124
+ st .cursect = self ._dict ()
1125
+ self ._sections [st .sectname ] = st .cursect
1126
+ self ._proxies [st .sectname ] = SectionProxy (self , st .sectname )
1127
+ st .elements_added .add (st .sectname )
1128
+ # So sections can't start with a continuation line
1129
+ st .optname = None
1130
+
1131
+ def _handle_option (self , st , line , fpname ):
1132
+ # an option line?
1133
+ st .indent_level = st .cur_indent_level
1134
+
1135
+ mo = self ._optcre .match (line .clean )
1136
+ if not mo :
1137
+ # a non-fatal parsing error occurred. set up the
1138
+ # exception but keep going. the exception will be
1139
+ # raised at the end of the file and will contain a
1140
+ # list of all bogus lines
1141
+ st .errors .append (ParsingError (fpname , st .lineno , line ))
1142
+ return
1143
+
1144
+ st .optname , vi , optval = mo .group ('option' , 'vi' , 'value' )
1145
+ if not st .optname :
1146
+ st .errors .append (ParsingError (fpname , st .lineno , line ))
1147
+ st .optname = self .optionxform (st .optname .rstrip ())
1148
+ if (self ._strict and
1149
+ (st .sectname , st .optname ) in st .elements_added ):
1150
+ raise DuplicateOptionError (st .sectname , st .optname ,
1151
+ fpname , st .lineno )
1152
+ st .elements_added .add ((st .sectname , st .optname ))
1153
+ # This check is fine because the OPTCRE cannot
1154
+ # match if it would set optval to None
1155
+ if optval is not None :
1156
+ optval = optval .strip ()
1157
+ st .cursect [st .optname ] = [optval ]
1158
+ else :
1159
+ # valueless option handling
1160
+ st .cursect [st .optname ] = None
1119
1161
1120
1162
def _join_multiline_values (self ):
1121
1163
defaults = self .default_section , self ._defaults
@@ -1135,12 +1177,6 @@ def _read_defaults(self, defaults):
1135
1177
for key , value in defaults .items ():
1136
1178
self ._defaults [self .optionxform (key )] = value
1137
1179
1138
- def _handle_error (self , exc , fpname , lineno , line ):
1139
- if not exc :
1140
- exc = ParsingError (fpname )
1141
- exc .append (lineno , repr (line ))
1142
- return exc
1143
-
1144
1180
def _unify_values (self , section , vars ):
1145
1181
"""Create a sequence of lookups with 'vars' taking priority over
1146
1182
the 'section' which takes priority over the DEFAULTSECT.
0 commit comments