Skip to content

Free port auto resolving for TarantoolServer and AppServer #348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 4 additions & 41 deletions dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,35 +46,6 @@
from listeners import StatisticsWatcher


class TcpPortDispatcher:
""" Helper class holds available and occupied TCP port ranges. This ranges
intended to distributes between workers.
"""
def __init__(self, range_count):
lowest_port = 3000
highest_port = 59999
port_count = highest_port - lowest_port + 1
range_size = port_count // range_count

self.available_ranges = set()
for i in range(range_count):
start_port = lowest_port + i * range_size
end_port = start_port + range_size - 1
tcp_port_range = (start_port, end_port)
self.available_ranges.add(tcp_port_range)

self.acquired_ranges = dict()

def acquire_range(self, _id):
tcp_port_range = self.available_ranges.pop()
self.acquired_ranges[_id] = tcp_port_range
return tcp_port_range

def release_range(self, _id):
tcp_port_range = self.acquired_ranges.pop(_id)
self.available_ranges.add(tcp_port_range)


class Dispatcher:
"""Run specified count of worker processes ('max_workers_cnt' arg), pass
task IDs (via 'task_queue'), receive results and output (via
Expand Down Expand Up @@ -136,8 +107,6 @@ def __init__(self, task_groups, max_workers_cnt, randomize):
self.worker_id_to_pid = dict()

self.randomize = randomize
self.tcp_port_dispatcher = TcpPortDispatcher(
range_count=max_workers_cnt)

def terminate_all_workers(self):
for process in self.processes:
Expand Down Expand Up @@ -235,10 +204,7 @@ def add_worker(self):
# find_nonempty_task_queue_disp()
if self.workers_cnt >= self.max_workers_cnt:
return False
tcp_port_range = self.tcp_port_dispatcher.acquire_range(
self.worker_next_id)
process = task_queue_disp.add_worker(self.worker_next_id,
tcp_port_range)
process = task_queue_disp.add_worker(self.worker_next_id)
self.processes.append(process)
self.pids.append(process.pid)
self.pid_to_worker_id[process.pid] = self.worker_next_id
Expand All @@ -255,7 +221,6 @@ def del_worker(self, worker_id):
task_queue_disp = self.get_task_queue_disp(worker_id)
task_queue_disp.del_worker(worker_id)
self.workers_cnt -= 1
self.tcp_port_dispatcher.release_range(worker_id)

self.pids.remove(pid)
del self.worker_id_to_pid[worker_id]
Expand Down Expand Up @@ -412,24 +377,22 @@ def __init__(self, key, task_group, randomize):
self.done = False
self.done_task_ids = set()

def _run_worker(self, worker_id, tcp_port_range):
def _run_worker(self, worker_id):
"""Entry function for worker processes."""
os.environ['TEST_RUN_WORKER_ID'] = str(worker_id)
os.environ['TEST_RUN_TCP_PORT_START'] = str(tcp_port_range[0])
os.environ['TEST_RUN_TCP_PORT_END'] = str(tcp_port_range[1])
color_stdout.queue = self.result_queue
worker = self.gen_worker(worker_id)
sampler.set_queue(self.result_queue, worker_id, worker.name)
worker.run_all(self.task_queue, self.result_queue)

def add_worker(self, worker_id, tcp_port_range):
def add_worker(self, worker_id):
# Note: each of our workers should consume only one None, but for the
# case of abnormal circumstances we listen for processes termination
# (method 'check_for_dead_processes') and for time w/o output from
# workers (class 'HangWatcher').
self.task_queue.put(None) # 'stop worker' marker

entry = functools.partial(self._run_worker, worker_id, tcp_port_range)
entry = functools.partial(self._run_worker, worker_id)

self.worker_ids.add(worker_id)
process = multiprocessing.Process(target=entry)
Expand Down
8 changes: 4 additions & 4 deletions lib/app_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from lib.tarantool_server import Test
from lib.tarantool_server import TarantoolServer
from lib.tarantool_server import TarantoolStartError
from lib.utils import find_port
from lib.utils import format_process
from lib.utils import signame
from lib.utils import warn_unix_socket
Expand All @@ -33,7 +32,7 @@ def timeout_handler(server_process, test_timeout):


def run_server(execs, cwd, server, logfile, retval, test_id):
os.putenv("LISTEN", server.iproto)
os.putenv("LISTEN", server.listen_uri)
server.process = Popen(execs, stdout=PIPE, stderr=PIPE, cwd=cwd)
sampler.register_process(server.process.pid, test_id, server.name)
test_timeout = Options().args.test_timeout
Expand Down Expand Up @@ -113,6 +112,7 @@ def __init__(self, _ini=None, test_suite=None):
self.lua_libs = ini['lua_libs']
self.name = 'app_server'
self.process = None
self.localhost = '127.0.0.1'
self.use_unix_sockets_iproto = ini['use_unix_sockets_iproto']

@property
Expand Down Expand Up @@ -156,9 +156,9 @@ def deploy(self, vardir=None, silent=True, need_init=True):
if self.use_unix_sockets_iproto:
path = os.path.join(self.vardir, self.name + ".i")
warn_unix_socket(path)
self.iproto = path
self.listen_uri = path
else:
self.iproto = str(find_port())
self.listen_uri = self.localhost + ':0'
shutil.copy(os.path.join(self.TEST_RUN_DIR, 'test_run.lua'),
self.vardir)

Expand Down
6 changes: 6 additions & 0 deletions lib/box_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@

import errno
import ctypes
import re
import socket

from lib.tarantool_connection import TarantoolConnection

# monkey patch tarantool and msgpack
from lib.utils import check_libs
from lib.utils import warn_unix_socket
check_libs()

from tarantool import Connection as tnt_connection # noqa: E402
Expand All @@ -41,6 +43,10 @@
class BoxConnection(TarantoolConnection):
def __init__(self, host, port):
super(BoxConnection, self).__init__(host, port)
if self.host == 'unix/' or re.search(r'^/', str(self.port)):
warn_unix_socket(self.port)
host = None

self.py_con = tnt_connection(host, port, connect_now=False,
socket_timeout=100)
self.py_con.error = False
Expand Down
5 changes: 0 additions & 5 deletions lib/inspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from gevent.server import StreamServer

from lib.utils import bytes_to_str
from lib.utils import find_port
from lib.utils import prefix_each_line
from lib.utils import str_to_bytes
from lib.colorer import color_stdout
Expand Down Expand Up @@ -52,10 +51,6 @@ class TarantoolInspector(StreamServer):
"""

def __init__(self, host, port):
# When specific port range was acquired for current worker, don't allow
# OS set port for us that isn't from specified range.
if port == 0:
port = find_port()
super(TarantoolInspector, self).__init__((host, port))
self.parser = None

Expand Down
14 changes: 8 additions & 6 deletions lib/preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(self, suite_ini, default_server, create_server, params={},
self.curcon = [self.connections['default']]
nmsp = Namespace()
setattr(nmsp, 'admin', default_server.admin.uri)
setattr(nmsp, 'listen', default_server.iproto.uri)
setattr(nmsp, 'listen', default_server.listen_uri)
setattr(self.environ, 'default', nmsp)
# for propagating 'current_test' to non-default servers
self.default_server_no_connect = kwargs.get(
Expand Down Expand Up @@ -259,7 +259,6 @@ def server_create(self, ctype, sname, opts):
if 'rpl_master' in opts:
temp.rpl_master = self.servers[opts['rpl_master']]
temp.vardir = self.suite_ini['vardir']
temp.use_unix_sockets = self.suite_ini['use_unix_sockets']
temp.use_unix_sockets_iproto = \
self.suite_ini['use_unix_sockets_iproto']
temp.inspector_port = int(self.suite_ini.get(
Expand All @@ -283,13 +282,13 @@ def server_create(self, ctype, sname, opts):
copy_to
))
nmsp = Namespace()
setattr(nmsp, 'admin', temp.admin.port)
setattr(nmsp, 'listen', temp.iproto.port)
setattr(nmsp, 'admin', temp.admin.uri)
setattr(nmsp, 'listen', temp.listen_uri)
if temp.rpl_master:
setattr(nmsp, 'master', temp.rpl_master.iproto.port)
setattr(nmsp, 'master', temp.rpl_master.iproto.uri)
setattr(self.environ, sname, nmsp)
if 'return_listen_uri' in opts and opts['return_listen_uri'] == 'True':
return self.servers[sname].iproto.uri
return self.servers[sname].listen_uri

def server_deploy(self, ctype, sname, opts):
self.servers[sname].install()
Expand Down Expand Up @@ -343,6 +342,9 @@ def server_restart(self, ctype, sname, opts):
# remove proxy
self.server_stop('stop', 'proxy', {})

def server_get_iproto_uri(self, ctype, sname, opts):
return self.servers[sname].iproto.uri

def server(self, ctype, sname, opts):
attr = 'server_%s' % ctype
if hasattr(self, attr):
Expand Down
Loading