Skip to content

Add creation handler (WIP) #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 38 additions & 73 deletions handlers.py
Original file line number Diff line number Diff line change
@@ -1,77 +1,61 @@
import os
import kopf
import kubernetes
import requests
import constants
from math import ceil
from collections import namedtuple
import requests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can try httpx and see if any side effect

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, actually, it's the old code dependency that I wrote for the prototype. I'll remove it in the next PR. It is being used to poll flower API to fetch queue length. Autoscaling implementation will change entirely.

import kubernetes
import kopf
import constants
from models.celery_custom_resource import celery_custom_resource_from_dict
from kubernetes_utils.worker_deployment_generator import WorkerDeploymentGenerator
from kubernetes_utils.flower_deployment_generator import FlowerDeploymentGenerator

from deployment_utils import (
deploy_celery_workers,
deploy_flower,
expose_flower_service
)
from update_utils import (
from utilities.patching import (
update_all_deployments,
update_worker_deployment,
update_flower_deployment
)


@kopf.on.create('celeryproject.org', 'v1alpha1', 'celery')
def create_fn(spec, name, namespace, logger, **kwargs):
def create_fn(spec, namespace, logger, **kwargs):
"""
Celery custom resource creation handler
"""

# 1. Validation of spec
val, err_msg = validate_spec(spec)
if err_msg:
status = 'Failed validation'
raise kopf.PermanentError(f"{err_msg}. Got {val}")

api = kubernetes.client.CoreV1Api()
apps_api_instance = kubernetes.client.AppsV1Api()

# 2. Deployment for celery workers
worker_deployment = deploy_celery_workers(
apps_api_instance, namespace, spec, logger
try:
celery_cr = celery_custom_resource_from_dict(dict(spec))
except Exception as e:
raise kopf.PermanentError(e)

# deploy worker
worker_deployment = WorkerDeploymentGenerator(
namespace=namespace, celery_cr=celery_cr
).get_worker_deployment()
kopf.adopt(worker_deployment)
apps_api_instance.create_namespaced_deployment(
namespace=namespace,
body=worker_deployment
)

# 3. Deployment for flower
flower_deployment = deploy_flower(
apps_api_instance, namespace, spec, logger
# deploy flower
flower_dep_gen_instance = FlowerDeploymentGenerator(
namespace=namespace, celery_cr=celery_cr
)

# 4. Expose flower service
flower_svc = expose_flower_service(
api, namespace, spec, logger
flower_deployment = flower_dep_gen_instance.get_flower_deployment()
kopf.adopt(flower_deployment)
apps_api_instance.create_namespaced_deployment(
namespace=namespace,
body=flower_deployment
)

children = [
{
'name': worker_deployment.metadata.name,
'replicas': worker_deployment.spec.replicas,
'kind': constants.DEPLOYMENT_KIND,
'type': constants.WORKER_TYPE
},
{
'name': flower_deployment.metadata.name,
'replicas': flower_deployment.spec.replicas,
'kind': constants.DEPLOYMENT_KIND,
'type': constants.FLOWER_TYPE
},
{
'name': flower_svc.metadata.name,
'spec': flower_svc.spec.to_dict(),
'kind': constants.SERVICE_KIND,
'type': constants.FLOWER_TYPE
}
]
# expose service
flower_svc = flower_dep_gen_instance.get_flower_svc()
kopf.adopt(flower_svc)
api.create_namespaced_service(namespace=namespace, body=flower_svc)

# TODO: Decide the return structure
return {
'children': children,
'children_count': len(children),
'children': 3,
'status': constants.STATUS_CREATED
}

Expand Down Expand Up @@ -172,7 +156,7 @@ def get_flower_svc_host(status):


@kopf.timer('celeryproject.org', 'v1alpha1', 'celery',
initial_delay=5, interval=10, idle=10)
initial_delay=50000, interval=10000, idle=10)
def message_queue_length(spec, status, **kwargs):
flower_svc_host = get_flower_svc_host(status)
if not flower_svc_host:
Expand Down Expand Up @@ -242,23 +226,4 @@ def horizontal_autoscale(spec, status, namespace, **kwargs):
return {
'deploymentName': updated_deployment.metadata.name,
'updated_num_of_replicas': updated_num_of_replicas
}


def validate_stuff(spec):
"""
1. If the deployment/svc already exists, k8s throws error
2. Response and spec classes and enums
"""
pass


def validate_spec(spec):
"""
Validates the incoming spec
@returns - True/False, Error Message
"""
# size = spec.get('size')
# if not size:
# return size, "Size must be set"
return None, None
}
Empty file added kubernetes_utils/__init__.py
Empty file.
71 changes: 71 additions & 0 deletions kubernetes_utils/flower_deployment_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from typing import List
from kubernetes import client as k8s
from kubernetes_utils.pod_generator import PodGenerator
from models.celery_custom_resource import CeleryCustomResource


class FlowerDeploymentGenerator(object):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explicit inheritance from the object is not needed


def __init__(self, namespace: str, celery_cr: CeleryCustomResource):
self.namespace = namespace
self.celery_cr = celery_cr

def get_flower_deployment(self) -> k8s.V1Deployment:
template = k8s.V1PodTemplateSpec(
metadata=k8s.V1ObjectMeta(
labels={"app": f'{self.celery_cr.app_name}-celery-flower'}
),
spec=self.get_flower_pod().spec
)
deployment_spec = k8s.V1DeploymentSpec(
replicas=self.celery_cr.worker_replicas,
template=template,
selector={'matchLabels': {'app': f'{self.celery_cr.app_name}-celery-flower'}}
)

return k8s.V1Deployment(
api_version="apps/v1",
kind="Deployment",
metadata=k8s.V1ObjectMeta(name=f'{self.celery_cr.app_name}-celery-flower'),
spec=deployment_spec
)

def get_flower_pod(self) -> k8s.V1Pod:
return PodGenerator(
namespace=self.namespace,
image=self.celery_cr.image,
image_pull_policy=self.celery_cr.image_pull_policy,
image_pull_secrets=self.celery_cr.image_pull_secrets,
name=f"{self.celery_cr.app_name}-celery-flower",
container_name='flower',
volume_mounts=self.celery_cr.volume_mounts,
volumes=self.celery_cr.volumes,
init_containers=self.celery_cr.init_containers,
cmds=["flower"],
envs=self.celery_cr.flower_spec.env,
args=self.__get_flower_container_args(
self.celery_cr.celery_app, self.celery_cr.flower_spec.args
),
node_selectors=self.celery_cr.flower_spec.node_selector,
resources=self.celery_cr.flower_spec.resources.to_dict(),
readiness_probe=self.celery_cr.readiness_probe,
liveness_probe=self.celery_cr.liveness_probe
).gen_pod()

def get_flower_svc(self) -> k8s.V1Service:
svc_template = self.celery_cr.flower_spec.service
metadata = {
'name': f'{self.celery_cr.app_name}-celery-flower'
}
return k8s.V1Service(
metadata=metadata,
spec=svc_template.get('spec')
)

def __get_flower_container_args(
self, celery_app: str, args: List[str]
) -> List[str]:
base_args = [f"--app={celery_app}", "flower"]
if args:
base_args.extend(args)
return base_args
Loading