Skip to content

Improve daemon #4169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Nov 1, 2017
36 changes: 19 additions & 17 deletions misc/incremental_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def get_commits_starting_at(repo_folder_path: str, start_commit: str) -> List[Tu
return get_commits(repo_folder_path, '{0}^..HEAD'.format(start_commit))


def get_nth_commit(repo_folder_path, n: int) -> Tuple[str, str]:
def get_nth_commit(repo_folder_path: str, n: int) -> Tuple[str, str]:
print("Fetching last {} commits (or all, if there are fewer commits than n)".format(n))
return get_commits(repo_folder_path, '-{}'.format(n))[0]

Expand Down Expand Up @@ -156,19 +156,13 @@ def run_mypy(target_file_path: Optional[str],
return runtime, output


def start_daemon(mypy_cache_path: str, verbose: bool) -> None:
stdout, stderr, status = execute(DAEMON_CMD + ["status"], fail_on_error=False)
if status:
cmd = DAEMON_CMD + ["start", "--", "--cache-dir", mypy_cache_path]
if verbose:
cmd.extend(["-v", "-v"])
execute(cmd)
def start_daemon(mypy_cache_path: str) -> None:
cmd = DAEMON_CMD + ["restart", "--", "--cache-dir", mypy_cache_path]
execute(cmd)


def stop_daemon() -> None:
stdout, stderr, status = execute(DAEMON_CMD + ["status"], fail_on_error=False)
if status == 0:
execute(DAEMON_CMD + ["stop"])
execute(DAEMON_CMD + ["stop"])


def load_cache(incremental_cache_path: str = CACHE_PATH) -> JsonDict:
Expand Down Expand Up @@ -221,7 +215,8 @@ def test_incremental(commits: List[Tuple[str, str]],
mypy_cache_path: str,
*,
mypy_script: Optional[str] = None,
daemon: bool = False) -> None:
daemon: bool = False,
exit_on_error: bool = False) -> None:
"""Runs incremental mode on all `commits` to verify the output matches the expected output.

This function runs mypy on the `target_file_path` inside the `temp_repo_path`. The
Expand All @@ -242,6 +237,8 @@ def test_incremental(commits: List[Tuple[str, str]],
print_offset(expected_output, 8)
print(" Actual output: ({:.3f} sec):".format(runtime))
print_offset(output, 8)
if exit_on_error:
break
else:
print(" Output matches expected result!")
print(" Incremental: {:.3f} sec".format(runtime))
Expand All @@ -257,7 +254,7 @@ def test_repo(target_repo_url: str, temp_repo_path: str,
target_file_path: Optional[str],
mypy_path: str, incremental_cache_path: str, mypy_cache_path: str,
range_type: str, range_start: str, branch: str,
params: Optional[Namespace] = None) -> None:
params: Namespace) -> None:
"""Tests incremental mode against the repo specified in `target_repo_url`.

This algorithm runs in five main stages:
Expand Down Expand Up @@ -290,7 +287,7 @@ def test_repo(target_repo_url: str, temp_repo_path: str,
else:
raise RuntimeError("Invalid option: {}".format(range_type))
commits = get_commits_starting_at(temp_repo_path, start_commit)
if params is not None and params.sample:
if params.sample:
seed = params.seed or base64.urlsafe_b64encode(os.urandom(15)).decode('ascii')
random.seed(seed)
commits = random.sample(commits, params.sample)
Expand All @@ -304,18 +301,21 @@ def test_repo(target_repo_url: str, temp_repo_path: str,

# Stage 4: Rewind and re-run mypy (with incremental mode enabled)
if params.daemon:
start_daemon(mypy_cache_path, False)
print('Starting daemon')
start_daemon(mypy_cache_path)
test_incremental(commits, cache, temp_repo_path, target_file_path, mypy_cache_path,
mypy_script=params.mypy_script, daemon=params.daemon)
mypy_script=params.mypy_script, daemon=params.daemon,
exit_on_error=params.exit_on_error)

# Stage 5: Remove temp files, stop daemon
cleanup(temp_repo_path, mypy_cache_path)
if params.daemon:
print('Stopping daemon')
stop_daemon()


def main() -> None:
help_factory = (lambda prog: RawDescriptionHelpFormatter(prog=prog, max_help_position=32))
help_factory = (lambda prog: RawDescriptionHelpFormatter(prog=prog, max_help_position=32)) # type: Any
parser = ArgumentParser(
prog='incremental_checker',
description=__doc__,
Expand All @@ -330,6 +330,8 @@ def main() -> None:
help="the repo to clone and run tests on")
parser.add_argument("-f", "--file-path", default=MYPY_TARGET_FILE, metavar="FILE",
help="the name of the file or directory to typecheck")
parser.add_argument("-x", "--exit-on-error", action='store_true',
help="Exits as soon as an error occurs")
parser.add_argument("--cache-path", default=CACHE_PATH, metavar="DIR",
help="sets a custom location to store cache data")
parser.add_argument("--branch", default=None, metavar="NAME",
Expand Down
30 changes: 19 additions & 11 deletions mypy/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ def default_lib_path(data_dir: str,


def cache_meta_from_dict(meta: Dict[str, Any], data_json: str) -> CacheMeta:
sentinel = None # type: Any # the values will be post-validated below
sentinel = None # type: Any # Values to be validated by the caller
return CacheMeta(
meta.get('id', sentinel),
meta.get('path', sentinel),
Expand Down Expand Up @@ -1408,6 +1408,7 @@ class State:
meta = None # type: Optional[CacheMeta]
data = None # type: Optional[str]
tree = None # type: Optional[MypyFile]
is_from_saved_cache = False # True if the tree came from the in-memory cache
dependencies = None # type: List[str]
suppressed = None # type: List[str] # Suppressed/missing dependencies
priorities = None # type: Dict[str, int]
Expand Down Expand Up @@ -1442,9 +1443,6 @@ class State:
# Whether to ignore all errors
ignore_all = False

# Whether this module was found to have errors
has_errors = False

def __init__(self,
id: Optional[str],
path: Optional[str],
Expand Down Expand Up @@ -1621,7 +1619,6 @@ def mark_interface_stale(self, *, on_errors: bool = False) -> None:
"""Marks this module as having a stale public interface, and discards the cache data."""
self.meta = None
self.externally_same = False
self.has_errors = on_errors
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this no longer needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduced it in the first daemon PR but it turns out I don't need a separate flag, I can just check self.meta is None.

if not on_errors:
self.manager.stale_modules.add(self.id)

Expand Down Expand Up @@ -1947,7 +1944,7 @@ def preserve_cache(graph: Graph) -> SavedCache:
saved_cache = {}
for id, state in graph.items():
assert state.id == id
if state.meta is not None and state.tree is not None and not state.has_errors:
if state.meta is not None and state.tree is not None:
saved_cache[id] = (state.meta, state.tree)
return saved_cache

Expand Down Expand Up @@ -2256,16 +2253,27 @@ def order_ascc(graph: Graph, ascc: AbstractSet[str], pri_max: int = PRI_ALL) ->


def process_fresh_scc(graph: Graph, scc: List[str], manager: BuildManager) -> None:
"""Process the modules in one SCC from their cached data."""
# TODO: Clean this up, it's ugly.
"""Process the modules in one SCC from their cached data.

This involves loading the tree from JSON and then doing various cleanups.

If the tree is loaded from memory ('saved_cache') it's even quicker.
"""
saved_cache = manager.saved_cache
# Check that all nodes are available for loading from memory.
if all(id in saved_cache for id in scc):
trees = {id: saved_cache[id][1] for id in scc}
if all(trees.values()):
deps = set(dep for id in scc for dep in graph[id].dependencies if dep in graph)
# Check that all dependencies were loaded from memory.
# If not, some dependency was reparsed but the interface hash
# wasn't changed -- in that case we can't reuse the tree.
if all(graph[dep].is_from_saved_cache for dep in deps):
trees = {id: saved_cache[id][1] for id in scc}
for id, tree in trees.items():
manager.add_stats(reused_trees=1)
manager.trace("Reusing saved tree %s" % id)
graph[id].tree = tree
st = graph[id]
st.tree = tree # This is never overwritten.
st.is_from_saved_cache = True
manager.modules[id] = tree
return
for id in scc:
Expand Down
114 changes: 68 additions & 46 deletions mypy/dmypy.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
subparsers = parser.add_subparsers()

start_parser = subparsers.add_parser('start', help="Start daemon")
start_parser.add_argument('--log-file', metavar='FILE', type=str,
help="Direct daemon stdout/stderr to FILE")
start_parser.add_argument('flags', metavar='FLAG', nargs='*', type=str,
help="Regular mypy flags (precede with --)")

Expand All @@ -45,6 +47,8 @@

restart_parser = subparsers.add_parser('restart',
help="Restart daemon (stop or kill followed by start)")
restart_parser.add_argument('--log-file', metavar='FILE', type=str,
help="Direct daemon stdout/stderr to FILE")
restart_parser.add_argument('flags', metavar='FLAG', nargs='*', type=str,
help="Regular mypy flags (precede with --)")

Expand Down Expand Up @@ -103,7 +107,7 @@ def do_start(args: argparse.Namespace) -> None:
try:
pid, sockname = get_status()
except SystemExit as err:
if daemonize(Server(args.flags).serve):
if daemonize(Server(args.flags).serve, args.log_file):
sys.exit(1)
wait_for_server()
else:
Expand Down Expand Up @@ -169,7 +173,7 @@ def do_restart(args: argparse.Namespace) -> None:
sys.exit("Status: %s" % str(response))
else:
print("Daemon stopped")
if daemonize(Server(args.flags).serve):
if daemonize(Server(args.flags).serve, args.log_file):
sys.exit(1)
wait_for_server()

Expand Down Expand Up @@ -333,7 +337,7 @@ def read_status() -> Dict[str, object]:
return data


def daemonize(func: Callable[[], None]) -> int:
def daemonize(func: Callable[[], None], log_file: Optional[str] = None) -> int:
"""Arrange to call func() in a grandchild of the current process.

Return 0 for success, exit status for failure, negative if
Expand Down Expand Up @@ -368,6 +372,11 @@ def daemonize(func: Callable[[], None]) -> int:
# Child is done, exit to parent.
os._exit(0)
# Grandchild: run the server.
if log_file:
sys.stdout = sys.stderr = open(log_file, 'a', buffering=1)
fd = sys.stdout.fileno()
os.dup2(fd, 2)
os.dup2(fd, 1)
func()
finally:
# Make sure we never get back into the caller.
Expand Down Expand Up @@ -490,43 +499,28 @@ def cmd_recheck(self) -> Dict[str, object]:
return {'error': "Command 'recheck' is only valid after a 'check' command"}
return self.check(self.last_sources)

last_mananager = None # type: Optional[mypy.build.BuildManager]
# Needed by tests.
last_manager = None # type: Optional[mypy.build.BuildManager]

def check(self, sources: List[mypy.build.BuildSource],
alt_lib_path: Optional[str] = None) -> Dict[str, Any]:
# TODO: Move stats handling code to make the logic here less cluttered.
bound_gc_callback = self.gc_callback
self.gc_start_time = None # type: Optional[float]
self.gc_time = 0.0
self.gc_calls = 0
self.gc_collected = 0
self.gc_uncollectable = 0
t0 = time.time()
try:
gc.callbacks.append(bound_gc_callback)
# saved_cache is mutated in place.
res = mypy.build.build(sources, self.options,
saved_cache=self.saved_cache,
alt_lib_path=alt_lib_path)
msgs = res.errors
self.last_manager = res.manager # type: Optional[mypy.build.BuildManager]
except mypy.errors.CompileError as err:
msgs = err.messages
self.last_manager = None
finally:
while bound_gc_callback in gc.callbacks:
gc.callbacks.remove(bound_gc_callback)
t1 = time.time()
self.last_manager = None
with GcLogger() as gc_result:
try:
# saved_cache is mutated in place.
res = mypy.build.build(sources, self.options,
saved_cache=self.saved_cache,
alt_lib_path=alt_lib_path)
msgs = res.errors
self.last_manager = res.manager # type: Optional[mypy.build.BuildManager]
except mypy.errors.CompileError as err:
msgs = err.messages
if msgs:
msgs.append("")
response = {'out': "\n".join(msgs), 'err': "", 'status': 1}
else:
response = {'out': "", 'err': "", 'status': 0}
response['build_time'] = t1 - t0
response['gc_time'] = self.gc_time
response['gc_calls'] = self.gc_calls
response['gc_collected'] = self.gc_collected
response['gc_uncollectable'] = self.gc_uncollectable
response.update(gc_result.get_stats())
response.update(get_meminfo())
if self.last_manager is not None:
response.update(self.last_manager.stats_summary())
Expand All @@ -537,20 +531,6 @@ def cmd_hang(self) -> Dict[str, object]:
time.sleep(100)
return {}

def gc_callback(self, phase: str, info: Mapping[str, int]) -> None:
if phase == 'start':
assert self.gc_start_time is None, "Start phase out of sequence"
self.gc_start_time = time.time()
elif phase == 'stop':
assert self.gc_start_time is not None, "Stop phase out of sequence"
self.gc_calls += 1
self.gc_time += time.time() - self.gc_start_time
self.gc_start_time = None
self.gc_collected += info['collected']
self.gc_uncollectable += info['uncollectable']
else:
assert False, "Unrecognized gc phase (%r)" % (phase,)


# Misc utilities.

Expand All @@ -570,6 +550,48 @@ def receive(sock: socket.socket) -> Any:
return data


class GcLogger:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to depend on anything else so this could be easily moved to a new module.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll move it as soon as there's a need for it outside this module.

"""Context manager to log GC stats and overall time."""

def __enter__(self) -> 'GcLogger':
self.gc_start_time = None # type: Optional[float]
self.gc_time = 0.0
self.gc_calls = 0
self.gc_collected = 0
self.gc_uncollectable = 0
gc.callbacks.append(self.gc_callback)
self.start_time = time.time()
return self

def gc_callback(self, phase: str, info: Mapping[str, int]) -> None:
if phase == 'start':
assert self.gc_start_time is None, "Start phase out of sequence"
self.gc_start_time = time.time()
elif phase == 'stop':
assert self.gc_start_time is not None, "Stop phase out of sequence"
self.gc_calls += 1
self.gc_time += time.time() - self.gc_start_time
self.gc_start_time = None
self.gc_collected += info['collected']
self.gc_uncollectable += info['uncollectable']
else:
assert False, "Unrecognized gc phase (%r)" % (phase,)

def __exit__(self, *args: object) -> None:
while self.gc_callback in gc.callbacks:
gc.callbacks.remove(self.gc_callback)

def get_stats(self) -> Dict[str, float]:
end_time = time.time()
result = {}
result['gc_time'] = self.gc_time
result['gc_calls'] = self.gc_calls
result['gc_collected'] = self.gc_collected
result['gc_uncollectable'] = self.gc_uncollectable
result['build_time'] = end_time - self.start_time
return result


MiB = 2**20


Expand Down
Loading