Skip to content

Commit 7612ce7

Browse files
absurdfarcedkropachev
authored andcommitted
PYTHON-1366 Handle removal of asyncore in Python 3.12 (datastax#1187)
1 parent 2d4a7c9 commit 7612ce7

17 files changed

+206
-134
lines changed

benchmarks/base.py

Lines changed: 15 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@
2828
sys.path.append(os.path.join(dirname, '..'))
2929

3030
import cassandra
31-
from cassandra.cluster import Cluster
32-
from cassandra.io.asyncorereactor import AsyncoreConnection
31+
from cassandra.cluster import Cluster, get_all_supported_connections_classes
3332

3433
log = logging.getLogger()
3534
handler = logging.StreamHandler()
@@ -48,31 +47,7 @@
4847
'NOTSET': logging.NOTSET,
4948
}
5049

51-
have_libev = False
52-
supported_reactors = [AsyncoreConnection]
53-
try:
54-
from cassandra.io.libevreactor import LibevConnection
55-
have_libev = True
56-
supported_reactors.append(LibevConnection)
57-
except ImportError as exc:
58-
pass
59-
60-
have_asyncio = False
61-
try:
62-
from cassandra.io.asyncioreactor import AsyncioConnection
63-
have_asyncio = True
64-
supported_reactors.append(AsyncioConnection)
65-
except (ImportError, SyntaxError):
66-
pass
67-
68-
have_twisted = False
69-
try:
70-
from cassandra.io.twistedreactor import TwistedConnection
71-
have_twisted = True
72-
supported_reactors.append(TwistedConnection)
73-
except ImportError as exc:
74-
log.exception("Error importing twisted")
75-
pass
50+
supported_reactors = get_all_supported_connections_classes()
7651

7752
KEYSPACE = "testkeyspace" + str(int(time.time()))
7853
TABLE = "testtable"
@@ -214,6 +189,15 @@ def benchmark(thread_class):
214189
log.info(" 99.9th: %0.4fs", request_timer['999percentile'])
215190

216191

192+
def get_connection_class(class_name):
193+
for cls in supported_reactors:
194+
if cls.__name__ == class_name:
195+
return cls
196+
else:
197+
log.error("unavailable reactor class: %s", class_name)
198+
sys.exit(f"{class_name} is not available")
199+
200+
217201
def parse_options():
218202
parser = OptionParser()
219203
parser.add_option('-H', '--hosts', default='127.0.0.1',
@@ -261,23 +245,15 @@ def parse_options():
261245
log.warning("Unknown log level specified: %s; specify one of %s", options.log_level, _log_levels.keys())
262246

263247
if options.asyncore_only:
264-
options.supported_reactors = [AsyncoreConnection]
248+
options.supported_reactors = [get_connection_class("AsyncoreConnection")]
265249
elif options.asyncio_only:
266-
options.supported_reactors = [AsyncioConnection]
250+
options.supported_reactors = [get_connection_class("AsyncioConnection")]
267251
elif options.libev_only:
268-
if not have_libev:
269-
log.error("libev is not available")
270-
sys.exit(1)
271-
options.supported_reactors = [LibevConnection]
252+
options.supported_reactors = [get_connection_class("LibevConnection")]
272253
elif options.twisted_only:
273-
if not have_twisted:
274-
log.error("Twisted is not available")
275-
sys.exit(1)
276-
options.supported_reactors = [TwistedConnection]
254+
options.supported_reactors = [get_connection_class("TwistedConnection")]
277255
else:
278256
options.supported_reactors = supported_reactors
279-
if not have_libev:
280-
log.warning("Not benchmarking libev reactor because libev is not available")
281257

282258
return options, args
283259

cassandra/__init__.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -747,3 +747,20 @@ def __init__(self, op_type=None, rejected_by_coordinator=False):
747747
self.rejected_by_coordinator = rejected_by_coordinator
748748
message = f"[request_error_rate_limit_reached OpType={op_type.name} RejectedByCoordinator={rejected_by_coordinator}]"
749749
Exception.__init__(self, message)
750+
751+
752+
class DependencyException(Exception):
753+
"""
754+
Specific exception class for handling issues with driver dependencies
755+
"""
756+
757+
excs = []
758+
"""
759+
A sequence of child exceptions
760+
"""
761+
762+
def __init__(self, msg, excs=[]):
763+
complete_msg = msg
764+
if excs:
765+
complete_msg += ("The following exceptions were observed: \n" + '\n'.join(str(e) for e in excs))
766+
Exception.__init__(self, complete_msg)

cassandra/cluster.py

Lines changed: 86 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,11 @@
2525
from collections.abc import Mapping
2626
from concurrent.futures import ThreadPoolExecutor, FIRST_COMPLETED, wait as wait_futures
2727
from copy import copy
28-
from functools import partial, wraps
28+
from functools import partial, reduce, wraps
2929
from itertools import groupby, count, chain
3030
import json
3131
import logging
32+
from typing import NamedTuple, Type, Optional
3233
from warnings import warn
3334
from random import random
3435
import re
@@ -45,12 +46,12 @@
4546
from cassandra import (ConsistencyLevel, AuthenticationFailed, InvalidRequest,
4647
OperationTimedOut, UnsupportedOperation,
4748
SchemaTargetType, DriverException, ProtocolVersion,
48-
UnresolvableContactPoints)
49+
UnresolvableContactPoints, DependencyException)
4950
from cassandra.auth import _proxy_execute_key, PlainTextAuthProvider
5051
from cassandra.connection import (ConnectionException, ConnectionShutdown,
5152
ConnectionHeartbeat, ProtocolVersionUnsupported,
5253
EndPoint, DefaultEndPoint, DefaultEndPointFactory,
53-
ContinuousPagingState, SniEndPointFactory, ConnectionBusy)
54+
ContinuousPagingState, SniEndPointFactory, ConnectionBusy, Connection)
5455
from cassandra.cqltypes import UserType
5556
import cassandra.cqltypes as types
5657
from cassandra.encoder import Encoder
@@ -98,12 +99,12 @@
9899

99100
try:
100101
from cassandra.io.twistedreactor import TwistedConnection
101-
except ImportError:
102+
except DependencyException:
102103
TwistedConnection = None
103104

104105
try:
105106
from cassandra.io.eventletreactor import EventletConnection
106-
except (ImportError, AttributeError):
107+
except DependencyException:
107108
# AttributeError was add for handling python 3.12 https://github.com/eventlet/eventlet/issues/812
108109
# TODO: remove it when eventlet issue would be fixed
109110
EventletConnection = None
@@ -113,6 +114,33 @@
113114
except ImportError:
114115
from cassandra.util import WeakSet # NOQA
115116

117+
118+
class ClassImportResult(NamedTuple):
119+
name: str
120+
exception: Optional[Exception]
121+
connection_class: Optional[Type[Connection]]
122+
123+
124+
def _is_gevent_monkey_patched():
125+
if 'gevent.monkey' not in sys.modules:
126+
return False
127+
try:
128+
import gevent.socket
129+
return socket.socket is gevent.socket.socket
130+
except (ModuleNotFoundError, ImportError, AttributeError):
131+
return False
132+
133+
def _try_gevent_import():
134+
if _is_gevent_monkey_patched():
135+
try:
136+
from cassandra.io.geventreactor import GeventConnection
137+
return ClassImportResult(name="GeventConnection", connection_class=GeventConnection, exception=None)
138+
except DependencyException as e:
139+
return ClassImportResult(name="GeventConnection", connection_class=None, exception=e)
140+
else:
141+
return ClassImportResult(name="GeventConnection", connection_class=None, exception=DependencyException("gevent is not patched"))
142+
143+
116144
def _is_eventlet_monkey_patched():
117145
if 'eventlet.patcher' not in sys.modules:
118146
return False
@@ -124,32 +152,63 @@ def _is_eventlet_monkey_patched():
124152
# TODO: remove it when eventlet issue would be fixed
125153
return False
126154

127-
def _is_gevent_monkey_patched():
128-
if 'gevent.monkey' not in sys.modules:
129-
return False
155+
def _try_eventlet_import():
130156
try:
131-
import eventlet.patcher
132-
return eventlet.patcher.is_monkey_patched('socket')
133-
# Another case related to PYTHON-1364
134-
except AttributeError:
135-
return False
157+
from cassandra.io.eventletreactor import EventletConnection
158+
except DependencyException as e:
159+
return ClassImportResult(name="EventletConnection", connection_class=None, exception=e)
160+
if _is_eventlet_monkey_patched():
161+
return ClassImportResult(name="EventletConnection", connection_class=EventletConnection, exception=None)
162+
return ClassImportResult(name="EventletConnection", connection_class=None, exception=DependencyException("eventlet is not patched"))
163+
164+
def _try_libev_import():
165+
try:
166+
from cassandra.io.libevreactor import LibevConnection
167+
return ClassImportResult(name="LibevConnection", connection_class=LibevConnection, exception=None)
168+
except DependencyException as e:
169+
return ClassImportResult(name="LibevConnection", connection_class=None, exception=e)
136170

171+
def _try_asyncore_import():
172+
try:
173+
from cassandra.io.asyncorereactor import AsyncoreConnection
174+
return ClassImportResult(name="AsyncoreConnection", connection_class=AsyncoreConnection, exception=None)
175+
except DependencyException as e:
176+
return ClassImportResult(name="AsyncoreConnection", connection_class=None, exception=e)
137177

138-
# default to gevent when we are monkey patched with gevent, eventlet when
139-
# monkey patched with eventlet, otherwise if libev is available, use that as
140-
# the default because it's fastest. Otherwise, use asyncore.
141-
if _is_gevent_monkey_patched():
142-
from cassandra.io.geventreactor import GeventConnection as DefaultConnection
143-
elif _is_eventlet_monkey_patched():
144-
from cassandra.io.eventletreactor import EventletConnection as DefaultConnection
145-
else:
178+
def _try_twisted_import():
146179
try:
147-
from cassandra.io.libevreactor import LibevConnection as DefaultConnection # NOQA
148-
except ImportError:
149-
try:
150-
from cassandra.io.asyncorereactor import AsyncoreConnection as DefaultConnection # NOQA
151-
except ImportError:
152-
from cassandra.io.asyncioreactor import AsyncioConnection as DefaultConnection # NOQA
180+
from cassandra.io.twistedreactor import TwistedConnection
181+
return ClassImportResult(name="TwistedConnection", connection_class=TwistedConnection, exception=None)
182+
except DependencyException as e:
183+
return ClassImportResult(name="TwistedConnection", connection_class=None, exception=e)
184+
185+
186+
log = logging.getLogger(__name__)
187+
188+
189+
def load_all_connections_classes():
190+
results = []
191+
for try_fn in (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncore_import, _try_twisted_import):
192+
results.append(try_fn())
193+
return tuple(results)
194+
195+
def get_all_supported_connections_classes():
196+
return [res.connection_class for res in load_all_connections_classes() if res.connection_class]
197+
198+
def get_default_connection_class():
199+
excs = []
200+
for try_fn in (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncore_import, _try_twisted_import):
201+
res = try_fn()
202+
if res.connection_class:
203+
return res.connection_class, excs
204+
excs.append(res.exception)
205+
return None, tuple(excs)
206+
207+
208+
(conn_class, excs) = get_default_connection_class()
209+
if not conn_class:
210+
raise DependencyException("Unable to load a default connection class", excs)
211+
DefaultConnection = conn_class
153212

154213
# Forces load of utf8 encoding module to avoid deadlock that occurs
155214
# if code that is being imported tries to import the module in a seperate

cassandra/io/asyncorereactor.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,15 @@
3030
except ImportError:
3131
from cassandra.util import WeakSet # noqa
3232

33-
import asyncore
33+
from cassandra import DependencyException
34+
try:
35+
import asyncore
36+
except ModuleNotFoundError:
37+
raise DependencyException(
38+
"Unable to import asyncore module. Note that this module has been removed in Python 3.12 "
39+
"so when using the driver with this version (or anything newer) you will need to use one of the "
40+
"other event loop implementations."
41+
)
3442

3543
from cassandra.connection import Connection, ConnectionShutdown, NONBLOCKING, Timer, TimerManager
3644

cassandra/io/eventletreactor.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,20 @@
1515

1616
# Originally derived from MagnetoDB source:
1717
# https://github.com/stackforge/magnetodb/blob/2015.1.0b1/magnetodb/common/cassandra/io/eventletreactor.py
18-
import eventlet
19-
from eventlet.green import socket
20-
from eventlet.queue import Queue
21-
from greenlet import GreenletExit
18+
from cassandra import DependencyException
19+
20+
try:
21+
import eventlet
22+
from eventlet.green import socket
23+
from eventlet.queue import Queue
24+
except (ModuleNotFoundError, ImportError, AttributeError):
25+
raise DependencyException("Unable to import eventlet module. Try to install it via `pip install eventlet`")
26+
27+
try:
28+
from greenlet import GreenletExit
29+
except (ModuleNotFoundError, ImportError, AttributeError):
30+
raise DependencyException("Unable to import greenlet module. Try to install it via `pip install greenlet`")
31+
2232
import logging
2333
from threading import Event
2434
import time

cassandra/io/geventreactor.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,19 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
import gevent
15-
import gevent.event
16-
from gevent.queue import Queue
17-
from gevent import socket
18-
import gevent.ssl
14+
from cassandra import DependencyException
15+
16+
try:
17+
import gevent
18+
import gevent.event
19+
from gevent.queue import Queue
20+
from gevent import socket
21+
import gevent.ssl
22+
except (ImportError, ModuleNotFoundError, AttributeError):
23+
raise DependencyException(
24+
"Unable to import gevent module. This module is optional, but if you want to use GeventConnection you need to "
25+
"install it. Try to install it via `pip install gevent`."
26+
)
1927

2028
import logging
2129
import time

cassandra/io/libevreactor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
from threading import Lock, Thread
2222
import time
2323

24-
24+
from cassandra import DependencyException
2525
from cassandra.connection import (Connection, ConnectionShutdown,
2626
NONBLOCKING, Timer, TimerManager)
2727
try:
2828
import cassandra.io.libevwrapper as libev
2929
except ImportError:
30-
raise ImportError(
30+
raise DependencyException(
3131
"The C extension needed to use libev was not found. This "
3232
"probably means that you didn't have the required build dependencies "
3333
"when installing the driver. See "

cassandra/io/twistedreactor.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,19 @@
2222
from threading import Thread, Lock
2323
import weakref
2424

25-
from twisted.internet import reactor, protocol
26-
from twisted.internet.endpoints import connectProtocol, TCP4ClientEndpoint, SSL4ClientEndpoint
27-
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
28-
from twisted.python.failure import Failure
29-
from zope.interface import implementer
25+
from cassandra import DependencyException
26+
try:
27+
from twisted.internet import reactor, protocol
28+
from twisted.internet.endpoints import connectProtocol, TCP4ClientEndpoint, SSL4ClientEndpoint
29+
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
30+
from twisted.python.failure import Failure
31+
except (ModuleNotFoundError, ImportError):
32+
raise DependencyException("Unable to import twisted module. Try to install it via `pip install twisted[tls]`")
33+
34+
try:
35+
from zope.interface import implementer
36+
except (ModuleNotFoundError, ImportError):
37+
raise DependencyException("Unable to import zope module. Try to install it via `pip install zope`")
3038

3139
from cassandra.connection import Connection, ConnectionShutdown, Timer, TimerManager, ConnectionException
3240

0 commit comments

Comments
 (0)