diff --git a/handlers.py b/handlers.py index 3602eb2..4ab1992 100644 --- a/handlers.py +++ b/handlers.py @@ -1,17 +1,14 @@ -import os -import kopf -import kubernetes -import requests -import constants from math import ceil from collections import namedtuple +import requests +import kubernetes +import kopf +import constants +from models.celery_custom_resource import celery_custom_resource_from_dict +from kubernetes_utils.worker_deployment_generator import WorkerDeploymentGenerator +from kubernetes_utils.flower_deployment_generator import FlowerDeploymentGenerator -from deployment_utils import ( - deploy_celery_workers, - deploy_flower, - expose_flower_service -) -from update_utils import ( +from utilities.patching import ( update_all_deployments, update_worker_deployment, update_flower_deployment @@ -19,59 +16,46 @@ @kopf.on.create('celeryproject.org', 'v1alpha1', 'celery') -def create_fn(spec, name, namespace, logger, **kwargs): +def create_fn(spec, namespace, logger, **kwargs): """ Celery custom resource creation handler """ - - # 1. Validation of spec - val, err_msg = validate_spec(spec) - if err_msg: - status = 'Failed validation' - raise kopf.PermanentError(f"{err_msg}. Got {val}") - api = kubernetes.client.CoreV1Api() apps_api_instance = kubernetes.client.AppsV1Api() - - # 2. Deployment for celery workers - worker_deployment = deploy_celery_workers( - apps_api_instance, namespace, spec, logger + try: + celery_cr = celery_custom_resource_from_dict(dict(spec)) + except Exception as e: + raise kopf.PermanentError(e) + + # deploy worker + worker_deployment = WorkerDeploymentGenerator( + namespace=namespace, celery_cr=celery_cr + ).get_worker_deployment() + kopf.adopt(worker_deployment) + apps_api_instance.create_namespaced_deployment( + namespace=namespace, + body=worker_deployment ) - # 3. Deployment for flower - flower_deployment = deploy_flower( - apps_api_instance, namespace, spec, logger + # deploy flower + flower_dep_gen_instance = FlowerDeploymentGenerator( + namespace=namespace, celery_cr=celery_cr ) - - # 4. Expose flower service - flower_svc = expose_flower_service( - api, namespace, spec, logger + flower_deployment = flower_dep_gen_instance.get_flower_deployment() + kopf.adopt(flower_deployment) + apps_api_instance.create_namespaced_deployment( + namespace=namespace, + body=flower_deployment ) - children = [ - { - 'name': worker_deployment.metadata.name, - 'replicas': worker_deployment.spec.replicas, - 'kind': constants.DEPLOYMENT_KIND, - 'type': constants.WORKER_TYPE - }, - { - 'name': flower_deployment.metadata.name, - 'replicas': flower_deployment.spec.replicas, - 'kind': constants.DEPLOYMENT_KIND, - 'type': constants.FLOWER_TYPE - }, - { - 'name': flower_svc.metadata.name, - 'spec': flower_svc.spec.to_dict(), - 'kind': constants.SERVICE_KIND, - 'type': constants.FLOWER_TYPE - } - ] + # expose service + flower_svc = flower_dep_gen_instance.get_flower_svc() + kopf.adopt(flower_svc) + api.create_namespaced_service(namespace=namespace, body=flower_svc) + # TODO: Decide the return structure return { - 'children': children, - 'children_count': len(children), + 'children': 3, 'status': constants.STATUS_CREATED } @@ -172,7 +156,7 @@ def get_flower_svc_host(status): @kopf.timer('celeryproject.org', 'v1alpha1', 'celery', - initial_delay=5, interval=10, idle=10) + initial_delay=50000, interval=10000, idle=10) def message_queue_length(spec, status, **kwargs): flower_svc_host = get_flower_svc_host(status) if not flower_svc_host: @@ -242,23 +226,4 @@ def horizontal_autoscale(spec, status, namespace, **kwargs): return { 'deploymentName': updated_deployment.metadata.name, 'updated_num_of_replicas': updated_num_of_replicas - } - - -def validate_stuff(spec): - """ - 1. If the deployment/svc already exists, k8s throws error - 2. Response and spec classes and enums - """ - pass - - -def validate_spec(spec): - """ - Validates the incoming spec - @returns - True/False, Error Message - """ - # size = spec.get('size') - # if not size: - # return size, "Size must be set" - return None, None + } \ No newline at end of file diff --git a/kubernetes_utils/__init__.py b/kubernetes_utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/kubernetes_utils/flower_deployment_generator.py b/kubernetes_utils/flower_deployment_generator.py new file mode 100644 index 0000000..9ab3b2c --- /dev/null +++ b/kubernetes_utils/flower_deployment_generator.py @@ -0,0 +1,71 @@ +from typing import List +from kubernetes import client as k8s +from kubernetes_utils.pod_generator import PodGenerator +from models.celery_custom_resource import CeleryCustomResource + + +class FlowerDeploymentGenerator(object): + + def __init__(self, namespace: str, celery_cr: CeleryCustomResource): + self.namespace = namespace + self.celery_cr = celery_cr + + def get_flower_deployment(self) -> k8s.V1Deployment: + template = k8s.V1PodTemplateSpec( + metadata=k8s.V1ObjectMeta( + labels={"app": f'{self.celery_cr.app_name}-celery-flower'} + ), + spec=self.get_flower_pod().spec + ) + deployment_spec = k8s.V1DeploymentSpec( + replicas=self.celery_cr.worker_replicas, + template=template, + selector={'matchLabels': {'app': f'{self.celery_cr.app_name}-celery-flower'}} + ) + + return k8s.V1Deployment( + api_version="apps/v1", + kind="Deployment", + metadata=k8s.V1ObjectMeta(name=f'{self.celery_cr.app_name}-celery-flower'), + spec=deployment_spec + ) + + def get_flower_pod(self) -> k8s.V1Pod: + return PodGenerator( + namespace=self.namespace, + image=self.celery_cr.image, + image_pull_policy=self.celery_cr.image_pull_policy, + image_pull_secrets=self.celery_cr.image_pull_secrets, + name=f"{self.celery_cr.app_name}-celery-flower", + container_name='flower', + volume_mounts=self.celery_cr.volume_mounts, + volumes=self.celery_cr.volumes, + init_containers=self.celery_cr.init_containers, + cmds=["flower"], + envs=self.celery_cr.flower_spec.env, + args=self.__get_flower_container_args( + self.celery_cr.celery_app, self.celery_cr.flower_spec.args + ), + node_selectors=self.celery_cr.flower_spec.node_selector, + resources=self.celery_cr.flower_spec.resources.to_dict(), + readiness_probe=self.celery_cr.readiness_probe, + liveness_probe=self.celery_cr.liveness_probe + ).gen_pod() + + def get_flower_svc(self) -> k8s.V1Service: + svc_template = self.celery_cr.flower_spec.service + metadata = { + 'name': f'{self.celery_cr.app_name}-celery-flower' + } + return k8s.V1Service( + metadata=metadata, + spec=svc_template.get('spec') + ) + + def __get_flower_container_args( + self, celery_app: str, args: List[str] + ) -> List[str]: + base_args = [f"--app={celery_app}", "flower"] + if args: + base_args.extend(args) + return base_args diff --git a/kubernetes_utils/pod_generator.py b/kubernetes_utils/pod_generator.py new file mode 100644 index 0000000..0b1b7e3 --- /dev/null +++ b/kubernetes_utils/pod_generator.py @@ -0,0 +1,183 @@ +from typing import Dict, List, Optional, Union + +from kubernetes import client as k8s + + +class PodGenerator(object): + """ + Wrapper class to generate a Kubernetes Pod + Class source taken from: https://github.com/apache/airflow repository + + Any configuration that is container specific gets applied to + the first container in the list of containers. + :param image: The docker image + :type image: Optional[str] + :param name: name in the metadata section (not the container name) + :type name: Optional[str] + :param namespace: pod namespace + :type namespace: Optional[str] + :param volume_mounts: list of kubernetes volumes mounts + :type volume_mounts: Optional[List[Union[k8s.V1VolumeMount, dict]]] + :param envs: A dict containing the environment variables + :type envs: Optional[Dict[str, str]] + :param cmds: The command to be run on the first container + :type cmds: Optional[List[str]] + :param args: The arguments to be run on the pod + :type args: Optional[List[str]] + :param labels: labels for the pod metadata + :type labels: Optional[Dict[str, str]] + :param node_selectors: node selectors for the pod + :type node_selectors: Optional[Dict[str, str]] + :param ports: list of ports. Applies to the first container. + :type ports: Optional[List[Union[k8s.V1ContainerPort, dict]]] + :param volumes: Volumes to be attached to the first container + :type volumes: Optional[List[Union[k8s.V1Volume, dict]]] + :param image_pull_policy: Specify a policy to cache or always pull an image + :type image_pull_policy: str + :param restart_policy: The restart policy of the pod + :type restart_policy: str + :param image_pull_secrets: Any image pull secrets to be given to the pod. + If more than one secret is required, provide a comma separated list: + secret_a,secret_b + :type image_pull_secrets: str + :param init_containers: A list of init containers + :type init_containers: Optional[List[k8s.V1Container]] + :param service_account_name: Identity for processes that run in a Pod + :type service_account_name: Optional[str] + :param resources: Resource requirements for the first containers + :type resources: Optional[Union[k8s.V1ResourceRequirements, dict]] + :param readiness_probe: Periodic probe of container service readiness. + :type readiness_probe: Optional[Union[k8s.V1Probe, dict]] + :param liveness_probe: Periodic probe of container service liveness. + :type liveness_probe: Optional[Union[k8s.V1Probe, dict]] + :param annotations: annotations for the pod + :type annotations: Optional[Dict[str, str]] + :param affinity: A dict containing a group of affinity scheduling rules + :type affinity: Optional[dict] + :param hostnetwork: If True enable host networking on the pod + :type hostnetwork: bool + :param tolerations: A list of kubernetes tolerations + :type tolerations: Optional[list] + :param security_context: A dict containing the security context for the pod + :type security_context: Optional[Union[k8s.V1PodSecurityContext, dict]] + :param configmaps: Any configmap refs to envfrom. + If more than one configmap is required, provide a comma separated list + configmap_a,configmap_b + :type configmaps: List[str] + :param dnspolicy: Specify a dnspolicy for the pod + :type dnspolicy: Optional[str] + :param schedulername: Specify a schedulername for the pod + :type schedulername: Optional[str] + :param pod: The fully specified pod. Mutually exclusive with `path_or_string` + :type pod: Optional[kubernetes.client.models.V1Pod] + :param extract_xcom: Whether to bring up a container for xcom + :type extract_xcom: bool + :param priority_class_name: priority class name for the launched Pod + :type priority_class_name: str + """ + + def __init__( + self, + image: Optional[str] = None, + name: Optional[str] = None, + namespace: Optional[str] = None, + volume_mounts: Optional[List[Union[k8s.V1VolumeMount, dict]]] = None, + envs: Optional[Dict[str, str]] = None, + cmds: Optional[List[str]] = None, + args: Optional[List[str]] = None, + container_name: Optional[str] = 'base', + labels: Optional[Dict[str, str]] = None, + node_selectors: Optional[Dict[str, str]] = None, + ports: Optional[List[Union[k8s.V1ContainerPort, dict]]] = None, + volumes: Optional[List[Union[k8s.V1Volume, dict]]] = None, + image_pull_policy: Optional[str] = None, + restart_policy: Optional[str] = None, + image_pull_secrets: Optional[str] = None, + init_containers: Optional[List[k8s.V1Container]] = None, + service_account_name: Optional[str] = None, + resources: Optional[Union[k8s.V1ResourceRequirements, dict]] = None, + readiness_probe: Optional[Union[k8s.V1Probe, dict]] = None, + liveness_probe: Optional[Union[k8s.V1Probe, dict]] = None, + annotations: Optional[Dict[str, str]] = None, + affinity: Optional[dict] = None, + hostnetwork: bool = False, + tolerations: Optional[list] = None, + security_context: Optional[Union[k8s.V1PodSecurityContext, dict]] = None, + configmaps: Optional[List[str]] = None, + dnspolicy: Optional[str] = None, + schedulername: Optional[str] = None, + priority_class_name: Optional[str] = None + ): + + self.pod = k8s.V1Pod() + self.pod.api_version = 'v1' + self.pod.kind = 'Pod' + + # Pod Metadata + self.metadata = k8s.V1ObjectMeta() + self.metadata.labels = labels + self.metadata.name = name + self.metadata.namespace = namespace + self.metadata.annotations = annotations + + # Pod Container + self.container = k8s.V1Container(name=container_name) + self.container.image = image + self.container.env = [] + + if envs: + if isinstance(envs, dict): + for key, val in envs.items(): + self.container.env.append(k8s.V1EnvVar(name=key, value=val)) + elif isinstance(envs, list): + self.container.env.extend(envs) + + configmaps = configmaps or [] + self.container.env_from = [] + for configmap in configmaps: + self.container.env_from.append( + k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmap)) + ) + + self.container.command = cmds or [] + self.container.args = args or [] + self.container.image_pull_policy = image_pull_policy + self.container.ports = ports or [] + self.container.resources = resources + self.container.liveness_probe = liveness_probe + self.container.readiness_probe = readiness_probe + self.container.volume_mounts = volume_mounts or [] + + # Pod Spec + self.spec = k8s.V1PodSpec(containers=[]) + self.spec.security_context = security_context + self.spec.tolerations = tolerations + self.spec.dns_policy = dnspolicy + self.spec.scheduler_name = schedulername + self.spec.host_network = hostnetwork + self.spec.affinity = affinity + self.spec.service_account_name = service_account_name + self.spec.init_containers = init_containers + self.spec.volumes = volumes or [] + self.spec.node_selector = node_selectors + self.spec.restart_policy = restart_policy + self.spec.priority_class_name = priority_class_name + + self.spec.image_pull_secrets = [] + + if image_pull_secrets: + for image_pull_secret in image_pull_secrets.split(','): + self.spec.image_pull_secrets.append(k8s.V1LocalObjectReference(name=image_pull_secret)) + + def gen_pod(self) -> k8s.V1Pod: + """Generates pod""" + result = None + + if result is None: + result = self.pod + result.spec = self.spec + result.metadata = self.metadata + result.spec.containers = [self.container] + + # result.metadata.name = self.make_unique_pod_id(result.metadata.name) + return result \ No newline at end of file diff --git a/templates/config_maps/config_map.yaml b/kubernetes_utils/templates/config_maps/config_map.yaml similarity index 100% rename from templates/config_maps/config_map.yaml rename to kubernetes_utils/templates/config_maps/config_map.yaml diff --git a/templates/deployments/celery_worker_deployment.yaml b/kubernetes_utils/templates/deployments/celery_worker_deployment.yaml similarity index 91% rename from templates/deployments/celery_worker_deployment.yaml rename to kubernetes_utils/templates/deployments/celery_worker_deployment.yaml index 2e9fb04..a2d88b7 100644 --- a/templates/deployments/celery_worker_deployment.yaml +++ b/kubernetes_utils/templates/deployments/celery_worker_deployment.yaml @@ -8,7 +8,7 @@ metadata: namespace: {namespace} spec: minReadySeconds: 10 - replicas: {num_of_workers} + replicas: {worker_replicas} selector: matchLabels: app: {app_name} @@ -25,7 +25,7 @@ spec: containers: - name: {app_name}-celery-worker image: {image} - imagePullPolicy: Never + imagePullPolicy: { image_pull_policy } command: ["celery"] args: - "--app={celery_app}" diff --git a/templates/deployments/flower_deployment.yaml b/kubernetes_utils/templates/deployments/flower_deployment.yaml similarity index 100% rename from templates/deployments/flower_deployment.yaml rename to kubernetes_utils/templates/deployments/flower_deployment.yaml diff --git a/templates/services/flower_service.yaml b/kubernetes_utils/templates/services/flower_service.yaml similarity index 100% rename from templates/services/flower_service.yaml rename to kubernetes_utils/templates/services/flower_service.yaml diff --git a/templates/static/celery-worker-static-deployment.yaml b/kubernetes_utils/templates/static/celery-worker-static-deployment.yaml similarity index 100% rename from templates/static/celery-worker-static-deployment.yaml rename to kubernetes_utils/templates/static/celery-worker-static-deployment.yaml diff --git a/templates/static/flask-example.yaml b/kubernetes_utils/templates/static/flask-example.yaml similarity index 100% rename from templates/static/flask-example.yaml rename to kubernetes_utils/templates/static/flask-example.yaml diff --git a/templates/static/redis-master.yaml b/kubernetes_utils/templates/static/redis-master.yaml similarity index 100% rename from templates/static/redis-master.yaml rename to kubernetes_utils/templates/static/redis-master.yaml diff --git a/kubernetes_utils/worker_deployment_generator.py b/kubernetes_utils/worker_deployment_generator.py new file mode 100644 index 0000000..bc5ec95 --- /dev/null +++ b/kubernetes_utils/worker_deployment_generator.py @@ -0,0 +1,61 @@ +from typing import List +from kubernetes import client as k8s +from kubernetes_utils.pod_generator import PodGenerator +from models.celery_custom_resource import CeleryCustomResource + + +class WorkerDeploymentGenerator(object): + + def __init__(self, namespace: str, celery_cr: CeleryCustomResource): + self.namespace = namespace + self.celery_cr = celery_cr + + def get_worker_deployment(self) -> k8s.V1Deployment: + template = k8s.V1PodTemplateSpec( + metadata=k8s.V1ObjectMeta( + labels={"app": f'{self.celery_cr.app_name}-celery-worker'} + ), + spec=self.get_worker_pod().spec + ) + deployment_spec = k8s.V1DeploymentSpec( + replicas=self.celery_cr.worker_replicas, + template=template, + selector={'matchLabels': {'app': f'{self.celery_cr.app_name}-celery-worker'}} + ) + + return k8s.V1Deployment( + api_version="apps/v1", + kind="Deployment", + metadata=k8s.V1ObjectMeta(name=f'{self.celery_cr.app_name}-celery'), + spec=deployment_spec + ) + + def get_worker_pod(self) -> k8s.V1Pod: + return PodGenerator( + namespace=self.namespace, + image=self.celery_cr.image, + image_pull_policy=self.celery_cr.image_pull_policy, + image_pull_secrets=self.celery_cr.image_pull_secrets, + name=f"{self.celery_cr.app_name}-celery-worker", + container_name='celery', + volume_mounts=self.celery_cr.volume_mounts, + volumes=self.celery_cr.volumes, + init_containers=self.celery_cr.init_containers, + envs=self.celery_cr.worker_spec.env, + cmds=["celery"], + args=self.__get_worker_container_args( + self.celery_cr.celery_app, self.celery_cr.worker_spec.args + ), + node_selectors=self.celery_cr.worker_spec.node_selector, + resources=self.celery_cr.worker_spec.resources.to_dict(), + readiness_probe=self.celery_cr.readiness_probe, + liveness_probe=self.celery_cr.liveness_probe + ).gen_pod() + + def __get_worker_container_args( + self, celery_app: str, args: List[str] + ) -> List[str]: + base_args = [f"--app={celery_app}", "worker"] + if args: + base_args.extend(args) + return base_args diff --git a/models/__init__.py b/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/models/celery_custom_resource.py b/models/celery_custom_resource.py new file mode 100644 index 0000000..1fe81f8 --- /dev/null +++ b/models/celery_custom_resource.py @@ -0,0 +1,198 @@ +# To use this code, make sure you +# +# import json +# +# and then, to convert JSON from a string, do +# +# result = celery_custom_resource_from_dict(json.loads(json_string)) + +from dataclasses import dataclass +from typing import Any, Optional, List, TypeVar, Type, cast, Callable + + +T = TypeVar("T") + + +def from_str(x: Any) -> str: + assert isinstance(x, str) + return x + + +def from_none(x: Any) -> Any: + assert x is None + return x + + +def from_union(fs, x): + for f in fs: + try: + return f(x) + except: + pass + assert False + + +def to_class(c: Type[T], x: Any) -> dict: + assert isinstance(x, c) + return cast(Any, x).to_dict() + + +def from_list(f: Callable[[Any], T], x: Any) -> List[T]: + assert isinstance(x, list) + return [f(y) for y in x] + + +def from_int(x: Any) -> int: + assert isinstance(x, int) and not isinstance(x, bool) + return x + + +def from_dict(x: Any) -> dict: + assert isinstance(x, dict) + return x + + +@dataclass +class ResourceRequirements: + requests: Optional[dict] = None + limits: Optional[dict] = None + + @staticmethod + def from_dict(obj: Any) -> 'ResourceRequirements': + assert isinstance(obj, dict) + requests = from_union([from_dict, from_none], obj.get("requests")) + limits = from_union([from_dict, from_none], obj.get("limits")) + return ResourceRequirements(requests, limits) + + def to_dict(self) -> dict: + result: dict = {} + result["requests"] = from_union([from_dict, from_none], self.requests) + result["limits"] = from_union([from_dict, from_none], self.limits) + return result + + +@dataclass +class FlowerSpecificSpec: + service: Optional[dict] = None + args: Optional[List[str]] = None + node_selector: Optional[dict] = None + env: Optional[List[dict]] = None + resources: Optional[ResourceRequirements] = None + + @staticmethod + def from_dict(obj: Any) -> 'FlowerSpecificSpec': + assert isinstance(obj, dict) + service = from_union([from_dict, from_none], obj.get("service")) + args = from_union([lambda x: from_list(from_str, x), from_none], obj.get("args")) + node_selector = from_union([from_dict, from_none], obj.get("nodeSelector")) + env = from_union([lambda x: from_list(from_dict, x), from_none], obj.get("env")) + resources = from_union([ResourceRequirements.from_dict, from_none], obj.get("resources")) + return FlowerSpecificSpec(service, args, node_selector, env, resources) + + def to_dict(self) -> dict: + result: dict = {} + result["service"] = from_union([from_dict, from_none], self.service) + result["args"] = from_union([lambda x: from_list(from_str, x), from_none], self.args) + result["nodeSelector"] = from_union([from_dict, from_none], self.node_selector) + result["env"] = from_union([lambda x: from_list(from_dict, x), from_none], self.env) + result["resources"] = from_union([lambda x: to_class(ResourceRequirements, x), from_none], self.resources) + return result + + +@dataclass +class WorkerSpecificSpec: + args: Optional[List[str]] = None + node_selector: Optional[dict] = None + env: Optional[List[dict]] = None + resources: Optional[ResourceRequirements] = None + + @staticmethod + def from_dict(obj: Any) -> 'WorkerSpecificSpec': + assert isinstance(obj, dict) + args = from_union([lambda x: from_list(from_str, x), from_none], obj.get("args")) + node_selector = from_union([from_dict, from_none], obj.get("nodeSelector")) + env = from_union([lambda x: from_list(from_dict, x), from_none], obj.get("env")) + resources = from_union([ResourceRequirements.from_dict, from_none], obj.get("resources")) + return WorkerSpecificSpec(args, node_selector, env, resources) + + def to_dict(self) -> dict: + result: dict = {} + result["args"] = from_union([lambda x: from_list(from_str, x), from_none], self.args) + result["nodeSelector"] = from_union([from_dict, from_none], self.node_selector) + result["env"] = from_union([lambda x: from_list(from_dict, x), from_none], self.env) + result["resources"] = from_union([lambda x: to_class(ResourceRequirements, x), from_none], self.resources) + return result + + +@dataclass +class CeleryCustomResource: + app_name: str + celery_app: str + celery_version: str + image: str + worker_spec: WorkerSpecificSpec + flower_spec: FlowerSpecificSpec + image_pull_policy: Optional[str] = None + image_pull_secrets: Optional[List[dict]] = None + worker_replicas: Optional[int] = None + flower_replicas: Optional[int] = None + init_containers: Optional[List[Any]] = None + volume_mounts: Optional[List[Any]] = None + volumes: Optional[List[Any]] = None + liveness_probe: Optional[dict] = None + readiness_probe: Optional[dict] = None + + @staticmethod + def from_dict(obj: Any) -> 'CeleryCustomResource': + assert isinstance(obj, dict) + app_name = from_str(obj.get("appName")) + celery_app = from_str(obj.get("celeryApp")) + celery_version = from_str(obj.get("celeryVersion")) + image = from_str(obj.get("image")) + image_pull_policy = from_union([from_str, from_none], obj.get("imagePullPolicy")) + image_pull_secrets = from_union( + [lambda x: from_list(from_dict, x), from_none], obj.get("imagePullSecrets")) + worker_replicas = from_union([from_int, from_none], obj.get("workerReplicas")) + flower_replicas = from_union([from_int, from_none], obj.get("flowerReplicas")) + worker_spec = from_union([WorkerSpecificSpec.from_dict, from_none], obj.get("workerSpec")) + flower_spec = from_union([FlowerSpecificSpec.from_dict, from_none], obj.get("flowerSpec")) + init_containers = from_union( + [lambda x: from_list(lambda x: x, x), from_none], obj.get("initContainers")) + volume_mounts = from_union( + [lambda x: from_list(lambda x: x, x), from_none], obj.get("volumeMounts")) + volumes = from_union([lambda x: from_list(lambda x: x, x), from_none], obj.get("volumes")) + liveness_probe = from_union([from_dict, from_none], obj.get("livenessProbe")) + readiness_probe = from_union([from_dict, from_none], obj.get("readinessProbe")) + return CeleryCustomResource( + app_name, celery_app, celery_version, + image, worker_spec, flower_spec, image_pull_policy, + image_pull_secrets, worker_replicas, flower_replicas, + init_containers, volume_mounts, volumes, liveness_probe, readiness_probe + ) + + def to_dict(self) -> dict: + result: dict = {} + result["appName"] = from_str(self.app_name) + result["celeryApp"] = from_str(self.celery_app) + result["celeryVersion"] = from_str(self.celery_version) + result["image"] = from_str(self.image) + result["imagePullPolicy"] = from_union([from_str, from_none], self.image_pull_policy) + result["imagePullSecrets"] = from_union([lambda x: from_list(from_dict), from_none], self.image_pull_secrets) + result["workerReplicas"] = from_union([from_int, from_none], self.worker_replicas) + result["flowerReplicas"] = from_union([from_int, from_none], self.flower_replicas) + result["workerSpec"] = from_union([lambda x: to_class(WorkerSpecificSpec, x), from_none], self.worker_spec) + result["flowerSpec"] = from_union([lambda x: to_class(FlowerSpecificSpec, x), from_none], self.flower_spec) + result["initContainers"] = from_union([lambda x: from_list(lambda x: x, x), from_none], self.init_containers) + result["volumeMounts"] = from_union([lambda x: from_list(lambda x: x, x), from_none], self.volume_mounts) + result["volumes"] = from_union([lambda x: from_list(lambda x: x, x), from_none], self.volumes) + result["livenessProbe"] = from_union([from_dict, from_none], self.liveness_probe) + result["readinessProbe"] = from_union([from_dict, from_none], self.readiness_probe) + return result + + +def celery_custom_resource_from_dict(s: Any) -> CeleryCustomResource: + return CeleryCustomResource.from_dict(s) + + +def celery_custom_resource_to_dict(x: CeleryCustomResource) -> Any: + return to_class(CeleryCustomResource, x) diff --git a/requirements.txt b/requirements.txt index 49a4a93..77242e6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -aiohttp==3.7.4 +aiohttp==3.6.2 aiojobs==0.2.2 amqp==2.6.0 appnope==0.1.0 @@ -24,9 +24,9 @@ ipython-genutils==0.2.0 iso8601==0.1.12 itsdangerous==1.1.0 jedi==0.17.0 -Jinja2==2.11.3 +Jinja2==2.11.2 kombu==4.6.10 -kopf==0.27 +kopf==1.30 kubernetes==11.0.0 MarkupSafe==1.1.1 multidict==4.7.6 @@ -46,7 +46,7 @@ PyYAML==5.3.1 redis==3.5.3 requests==2.23.0 requests-oauthlib==1.3.0 -rsa==4.1 +rsa==4.0 six==1.15.0 tornado==6.0.4 traitlets==4.3.3 diff --git a/utilities/__init__.py b/utilities/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/deployment_utils.py b/utilities/deployments.py similarity index 93% rename from deployment_utils.py rename to utilities/deployments.py index 8658244..0b6ce34 100644 --- a/deployment_utils.py +++ b/utilities/deployments.py @@ -6,7 +6,7 @@ def deploy_celery_workers(apps_api, namespace, spec, logger): path = os.path.join( os.path.dirname(__file__), - 'templates/deployments/celery_worker_deployment.yaml' + '../kubernetes/templates/deployments/celery_worker_deployment.yaml' ) tmpl = open(path, 'rt').read() @@ -47,7 +47,7 @@ def deploy_celery_workers(apps_api, namespace, spec, logger): def deploy_flower(apps_api, namespace, spec, logger): path = os.path.join( os.path.dirname(__file__), - 'templates/deployments/flower_deployment.yaml' + '../kubernetes/templates/deployments/flower_deployment.yaml' ) tmpl = open(path, 'rt').read() @@ -83,7 +83,7 @@ def deploy_flower(apps_api, namespace, spec, logger): def expose_flower_service(api, namespace, spec, logger): path = os.path.join( os.path.dirname(__file__), - 'templates/services/flower_service.yaml' + '../kubernetes/templates/services/flower_service.yaml' ) tmpl = open(path, 'rt').read() diff --git a/update_utils.py b/utilities/patching.py similarity index 100% rename from update_utils.py rename to utilities/patching.py