diff --git a/kafka/vendor/socketpair.py b/kafka/vendor/socketpair.py index b55e629ee..54d908767 100644 --- a/kafka/vendor/socketpair.py +++ b/kafka/vendor/socketpair.py @@ -53,6 +53,23 @@ def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0): raise finally: lsock.close() + + # Authenticating avoids using a connection from something else + # able to connect to {host}:{port} instead of us. + # We expect only AF_INET and AF_INET6 families. + try: + if ( + ssock.getsockname() != csock.getpeername() + or csock.getsockname() != ssock.getpeername() + ): + raise ConnectionError("Unexpected peer connection") + except: + # getsockname() and getpeername() can fail + # if either socket isn't connected. + ssock.close() + csock.close() + raise + return (ssock, csock) socket.socketpair = socketpair