diff --git a/podman_compose.py b/podman_compose.py index df914c7d..0f741cc0 100755 --- a/podman_compose.py +++ b/podman_compose.py @@ -7,6 +7,8 @@ # https://docs.docker.com/compose/django/ # https://docs.docker.com/compose/wordpress/ # TODO: podman pod logs --color -n -f pod_testlogs +from __future__ import annotations + import argparse import asyncio.subprocess import getpass @@ -42,16 +44,12 @@ # helper functions -def is_str(string_object): - return isinstance(string_object, str) - - -def is_dict(dict_object): - return isinstance(dict_object, dict) - - def is_list(list_object): - return not is_str(list_object) and not is_dict(list_object) and hasattr(list_object, "__iter__") + return ( + not isinstance(list_object, str) + and not isinstance(list_object, dict) + and hasattr(list_object, "__iter__") + ) # identity filter @@ -263,8 +261,8 @@ def rec_subs(value, subs_dict): """ do bash-like substitution in value and if list of dictionary do that recursively """ - if is_dict(value): - if 'environment' in value and is_dict(value['environment']): + if isinstance(value, dict): + if 'environment' in value and isinstance(value['environment'], dict): # Load service's environment variables subs_dict = subs_dict.copy() svc_envs = {k: v for k, v in value['environment'].items() if k not in subs_dict} @@ -275,7 +273,7 @@ def rec_subs(value, subs_dict): subs_dict.update(svc_envs) value = {k: rec_subs(v, subs_dict) for k, v in value.items()} - elif is_str(value): + elif isinstance(value, str): def convert(m): if m.group("escaped") is not None: @@ -301,9 +299,10 @@ def norm_as_list(src): given a dictionary {key1:value1, key2: None} or list return a list of ["key1=value1", "key2"] """ + dst: list[str] if src is None: dst = [] - elif is_dict(src): + elif isinstance(src, dict): dst = [(f"{k}={v}" if v is not None else k) for k, v in src.items()] elif is_list(src): dst = list(src) @@ -319,13 +318,13 @@ def norm_as_dict(src): """ if src is None: dst = {} - elif is_dict(src): + elif isinstance(src, dict): dst = dict(src) elif is_list(src): dst = [i.split("=", 1) for i in src if i] dst = [(a if len(a) == 2 else (a[0], None)) for a in dst] dst = dict(dst) - elif is_str(src): + elif isinstance(src, str): key, value = src.split("=", 1) if "=" in src else (src, None) dst = {key: value} else: @@ -334,7 +333,7 @@ def norm_as_dict(src): def norm_ulimit(inner_value): - if is_dict(inner_value): + if isinstance(inner_value, dict): if not inner_value.keys() & {"soft", "hard"}: raise ValueError("expected at least one soft or hard limit") soft = inner_value.get("soft", inner_value.get("hard", None)) @@ -468,7 +467,7 @@ def mount_desc_to_mount_args(compose, mount_desc, srv_name, cnt_name): # pylint def ulimit_to_ulimit_args(ulimit, podman_args): if ulimit is not None: # ulimit can be a single value, i.e. ulimit: host - if is_str(ulimit): + if isinstance(ulimit, str): podman_args.extend(["--ulimit", ulimit]) # or a dictionary or list: else: @@ -535,7 +534,7 @@ def mount_desc_to_volume_args(compose, mount_desc, srv_name, cnt_name): # pylin def get_mnt_dict(compose, cnt, volume): srv_name = cnt["_service"] basedir = compose.dirname - if is_str(volume): + if isinstance(volume, str): volume = parse_short_mount(volume, basedir) return fix_mount_dict(compose, volume, srv_name) @@ -571,7 +570,7 @@ def get_secret_args(compose, cnt, secret, podman_is_building=False): podman_is_building: True if we are preparing arguments for an invocation of "podman build" False if we are preparing for something else like "podman run" """ - secret_name = secret if is_str(secret) else secret.get("source", None) + secret_name = secret if isinstance(secret, str) else secret.get("source", None) if not secret_name or secret_name not in compose.declared_secrets.keys(): raise ValueError(f'ERROR: undeclared secret: "{secret}", service: {cnt["_service"]}') declared_secret = compose.declared_secrets[secret_name] @@ -580,11 +579,17 @@ def get_secret_args(compose, cnt, secret, podman_is_building=False): dest_file = "" secret_opts = "" - secret_target = None if is_str(secret) else secret.get("target", None) - secret_uid = None if is_str(secret) else secret.get("uid", None) - secret_gid = None if is_str(secret) else secret.get("gid", None) - secret_mode = None if is_str(secret) else secret.get("mode", None) - secret_type = None if is_str(secret) else secret.get("type", None) + secret_target = None + secret_uid = None + secret_gid = None + secret_mode = None + secret_type = None + if isinstance(secret, dict): + secret_target = secret.get("target", None) + secret_uid = secret.get("uid", None) + secret_gid = secret.get("gid", None) + secret_mode = secret.get("mode", None) + secret_type = secret.get("type", None) if source_file: # assemble path for source file first, because we need it for all cases @@ -827,7 +832,7 @@ def get_network_create_args(net_desc, proj_name, net_name): if net_desc.get("enable_ipv6", None): args.append("--ipv6") - if is_dict(ipam_config_ls): + if isinstance(ipam_config_ls, dict): ipam_config_ls = [ipam_config_ls] for ipam_config in ipam_config_ls: subnet = ipam_config.get("subnet", None) @@ -852,13 +857,13 @@ async def assert_cnt_nets(compose, cnt): if net and not net.startswith("bridge"): return cnt_nets = cnt.get("networks", None) - if cnt_nets and is_dict(cnt_nets): + if cnt_nets and isinstance(cnt_nets, dict): cnt_nets = list(cnt_nets.keys()) cnt_nets = norm_as_list(cnt_nets or compose.default_net) for net in cnt_nets: net_desc = compose.networks[net] or {} is_ext = net_desc.get("external", None) - ext_desc = is_ext if is_dict(is_ext) else {} + ext_desc = is_ext if isinstance(is_ext, dict) else {} default_net_name = default_network_name_for_project(compose, net, is_ext) net_name = ext_desc.get("name", None) or net_desc.get("name", None) or default_net_name try: @@ -917,7 +922,7 @@ def get_net_args(compose, cnt): ip_assignments = 0 if cnt.get("_aliases", None): aliases.extend(cnt.get("_aliases", None)) - if cnt_nets and is_dict(cnt_nets): + if cnt_nets and isinstance(cnt_nets, dict): prioritized_cnt_nets = [] # cnt_nets is {net_key: net_value, ...} for net_key, net_value in cnt_nets.items(): @@ -945,7 +950,7 @@ def get_net_args(compose, cnt): for net in cnt_nets: net_desc = compose.networks[net] or {} is_ext = net_desc.get("external", None) - ext_desc = is_ext if is_dict(is_ext) else {} + ext_desc = is_ext if isinstance(is_ext, str) else {} default_net_name = default_network_name_for_project(compose, net, is_ext) net_name = ext_desc.get("name", None) or net_desc.get("name", None) or default_net_name net_names.append(net_name) @@ -981,7 +986,7 @@ def get_net_args(compose, cnt): for net_, net_config_ in multiple_nets.items(): net_desc = compose.networks[net_] or {} is_ext = net_desc.get("external", None) - ext_desc = is_ext if is_dict(is_ext) else {} + ext_desc = is_ext if isinstance(is_ext, str) else {} default_net_name = default_network_name_for_project(compose, net_, is_ext) net_name = ext_desc.get("name", None) or net_desc.get("name", None) or default_net_name @@ -1075,10 +1080,10 @@ async def container_to_args(compose, cnt, detached=True): for item in norm_as_list(cnt.get("dns_search", None)): podman_args.extend(["--dns-search", item]) env_file = cnt.get("env_file", []) - if is_str(env_file) or is_dict(env_file): + if isinstance(env_file, (dict, str)): env_file = [env_file] for i in env_file: - if is_str(i): + if isinstance(i, str): i = {"path": i} path = i["path"] required = i.get("required", True) @@ -1096,7 +1101,7 @@ async def container_to_args(compose, cnt, detached=True): for e in env: podman_args.extend(["-e", e]) tmpfs_ls = cnt.get("tmpfs", []) - if is_str(tmpfs_ls): + if isinstance(tmpfs_ls, str): tmpfs_ls = [tmpfs_ls] for i in tmpfs_ls: podman_args.extend(["--tmpfs", i]) @@ -1178,7 +1183,7 @@ async def container_to_args(compose, cnt, detached=True): podman_args.extend(["--init-path", cnt["init-path"]]) entrypoint = cnt.get("entrypoint", None) if entrypoint is not None: - if is_str(entrypoint): + if isinstance(entrypoint, str): entrypoint = shlex.split(entrypoint) podman_args.extend(["--entrypoint", json.dumps(entrypoint)]) platform = cnt.get("platform", None) @@ -1189,7 +1194,7 @@ async def container_to_args(compose, cnt, detached=True): # WIP: healthchecks are still work in progress healthcheck = cnt.get("healthcheck", None) or {} - if not is_dict(healthcheck): + if not isinstance(healthcheck, dict): raise ValueError("'healthcheck' must be a key-value mapping") healthcheck_disable = healthcheck.get("disable", False) healthcheck_test = healthcheck.get("test", None) @@ -1197,7 +1202,7 @@ async def container_to_args(compose, cnt, detached=True): healthcheck_test = ["NONE"] if healthcheck_test: # If it's a string, it's equivalent to specifying CMD-SHELL - if is_str(healthcheck_test): + if isinstance(healthcheck_test, str): # podman does not add shell to handle command with whitespace podman_args.extend([ "--healthcheck-command", @@ -1259,7 +1264,7 @@ async def container_to_args(compose, cnt, detached=True): podman_args.append(cnt["image"]) # command, ..etc. command = cnt.get("command", None) if command is not None: - if is_str(command): + if isinstance(command, str): podman_args.extend(shlex.split(command)) else: podman_args.extend([str(i) for i in command]) @@ -1302,9 +1307,9 @@ def flat_deps(services, with_extends=False): deps.add(ext) continue deps_ls = srv.get("depends_on", None) or [] - if is_str(deps_ls): + if isinstance(deps_ls, str): deps_ls = [deps_ls] - elif is_dict(deps_ls): + elif isinstance(deps_ls, dict): deps_ls = list(deps_ls.keys()) deps.update(deps_ls) # parse link to get service name and remove alias @@ -1467,7 +1472,7 @@ async def volume_ls(self): def normalize_service(service, sub_dir=""): if "build" in service: build = service["build"] - if is_str(build): + if isinstance(build, str): service["build"] = {"context": build} if sub_dir and "build" in service: build = service["build"] @@ -1482,19 +1487,19 @@ def normalize_service(service, sub_dir=""): context = "." service["build"]["context"] = context if "build" in service and "additional_contexts" in service["build"]: - if is_dict(build["additional_contexts"]): + if isinstance(build["additional_contexts"], dict): new_additional_contexts = [] for k, v in build["additional_contexts"].items(): new_additional_contexts.append(f"{k}={v}") build["additional_contexts"] = new_additional_contexts for key in ("command", "entrypoint"): if key in service: - if is_str(service[key]): + if isinstance(service[key], str): service[key] = shlex.split(service[key]) for key in ("env_file", "security_opt", "volumes"): if key not in service: continue - if is_str(service[key]): + if isinstance(service[key], str): service[key] = [service[key]] if "security_opt" in service: sec_ls = service["security_opt"] @@ -1507,12 +1512,12 @@ def normalize_service(service, sub_dir=""): service[key] = norm_as_dict(service[key]) if "extends" in service: extends = service["extends"] - if is_str(extends): + if isinstance(extends, str): extends = {"service": extends} service["extends"] = extends if "depends_on" in service: deps = service["depends_on"] - if is_str(deps): + if isinstance(deps, str): deps = [deps] if is_list(deps): deps_dict = {} @@ -1535,9 +1540,9 @@ def normalize(compose): def normalize_service_final(service: dict, project_dir: str) -> dict: if "build" in service: build = service["build"] - context = build if is_str(build) else build.get("context", ".") + context = build if isinstance(build, str) else build.get("context", ".") context = os.path.normpath(os.path.join(project_dir, context)) - if not is_dict(service["build"]): + if not isinstance(service["build"], dict): service["build"] = {} service["build"]["context"] = context return service @@ -1551,7 +1556,7 @@ def normalize_final(compose: dict, project_dir: str) -> dict: def clone(value): - return value.copy() if is_list(value) or is_dict(value) else value + return value.copy() if is_list(value) or isinstance(value, dict) else value def rec_merge_one(target, source): @@ -1589,7 +1594,7 @@ def rec_merge_one(target, source): value.extend(value2) else: value.extend(value2) - elif is_dict(value2): + elif isinstance(value2, dict): rec_merge_one(value, value2) else: target[key] = value2 @@ -1609,7 +1614,7 @@ def resolve_extends(services, service_names, environ): for name in service_names: service = services[name] ext = service.get("extends", {}) - if is_str(ext): + if isinstance(ext, str): ext = {"service": ext} from_service_name = ext.get("service", None) if not from_service_name: @@ -1663,16 +1668,16 @@ def dotenv_to_dict(dotenv_path): class PodmanCompose: def __init__(self): - self.podman = None + self.podman: Podman self.podman_version = None self.environ = {} self.exit_code = None self.commands = {} - self.global_args = None + self.global_args = argparse.Namespace() self.project_name = None self.dirname = None self.pods = None - self.containers = None + self.containers = [] self.vols = None self.networks = {} self.default_net = "default" @@ -1694,7 +1699,7 @@ def __init__(self): ] def assert_services(self, services): - if is_str(services): + if isinstance(services, str): services = [services] given = set(services or []) missing = given - self.all_services @@ -1918,7 +1923,9 @@ def _parse_compose_file(self): allnets = set() for name, srv in services.items(): srv_nets = srv.get("networks", None) or self.default_net - srv_nets = list(srv_nets.keys()) if is_dict(srv_nets) else norm_as_list(srv_nets) + srv_nets = ( + list(srv_nets.keys()) if isinstance(srv_nets, dict) else norm_as_list(srv_nets) + ) allnets.update(srv_nets) given_nets = set(nets.keys()) missing_nets = allnets - given_nets @@ -1948,7 +1955,8 @@ def _parse_compose_file(self): container_names_by_service = {} self.services = services for service_name, service_desc in services.items(): - replicas = try_int(service_desc.get("deploy", {}).get("replicas", "1")) + replicas = try_int(service_desc.get("deploy", {}).get("replicas"), fallback=1) + container_names_by_service[service_name] = [] for num in range(1, replicas + 1): name0 = f"{project_name}_{service_name}_{num}" @@ -2592,7 +2600,7 @@ def get_volume_names(compose, cnt): srv_name = cnt["_service"] ls = [] for volume in cnt.get("volumes", []): - if is_str(volume): + if isinstance(volume, str): volume = parse_short_mount(volume, basedir) volume = fix_mount_dict(compose, volume, srv_name) mount_type = volume["type"]