Skip to content

Allow for async refresh_api_key_hook methods. #359

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

iciclespider
Copy link

Rather than creating an issue, I created this draft pull request to demonstrate what the issue is.

My specific use case is I have implemented the ability to obtain AWS EKS credentials which expire every 15 minutes using an AWS async client. I suspect that more often than not, refreshing credentials will involve i/o and would benefit from async support.

@iciclespider iciclespider force-pushed the async-refresh_api_key_hook branch from ea60e73 to e6f704b Compare May 20, 2025 21:46
@tomplus
Copy link
Owner

tomplus commented May 22, 2025

Thanks for your PR. Could you provide more details about your EKS cluster? I think EKS uses "exec" in kubeconfig and it should work out of the box. Does kubectl work for you?

@iciclespider
Copy link
Author

iciclespider commented May 23, 2025

Yes, kubectl works for me, however the clusters I want to access are not configured for it.

What I have implemented is essentially replicating what the kubectl exec is doing, but in pure python. The EKS clusters I am accessing are dynamically discovered and do not exist in my kubectl configuration. The follow program shows how this works using hard coded values instead of being dynamically discovered.

This is just one example of having to perform I/O to refresh an api token. In my opinion, in general such token refreshes are more likely than not going to involve performing I/O.

import aiobotocore.session
import asyncio
import atexit
import base64
import kubernetes_asyncio
import os
import tempfile
import time


async def main():
    aws_session = aiobotocore.session.AioSession()
    async with aws_session.create_client('eks') as eks:
        response = await eks.describe_cluster(name='enablement')
        configuration = EksConfiguration(aws_session, response['cluster'])
        async with kubernetes_asyncio.client.ApiClient(configuration) as api:
            v1 = kubernetes_asyncio.client.CoreV1Api(api)
            response = await v1.list_node()
            for node in response.items:
                print(node.metadata.name)


class EksConfiguration(kubernetes_asyncio.client.Configuration):
    def __init__(self, aws_session, cluster):
        super(EksConfiguration, self).__init__(
            cluster['endpoint'],
            {'BearerToken': True},
            {'BearerToken': 'Bearer'},
            ssl_ca_cert=temp_file(
                base64.b64decode(cluster['certificateAuthority']['data']).decode()
            ),
        )
        self.aws_session = aws_session
        self.cluster_name = cluster['name']
        self.token_expires = 0
        self.refresh_api_key_hook = self.refresh_eks_token

    @staticmethod
    async def refresh_eks_token(self):
        if time.time() < self.token_expires:
            return
        async with self.aws_session.create_client('sts') as sts:
            sts.meta.events.register(
                'before-sign.sts.GetCallerIdentity', self.inject_cluster_name_header
            )
            url = await sts.generate_presigned_url(
                ClientMethod='get_caller_identity',
                ExpiresIn=15 * 60,  # max 15 minutes
                HttpMethod='GET',
            )
        self.api_key[
            'BearerToken'
        ] = f"k8s-aws-v1.{base64.urlsafe_b64encode(url.encode()).decode().rstrip('=')}"
        self.token_expires = time.time() + (10 * 60)  # refresh every 10 minutes

    def inject_cluster_name_header(self, request, **_kwargs):
        request.headers['x-k8s-aws-id'] = self.cluster_name


temp_files = []

def temp_file(content):
    if not temp_files:
        atexit.register(cleanup_temp_files)
    name = tempfile.mkstemp()[1]
    temp_files.append(name)
    with open(name, "w") as fd:
        fd.write(content)
    return name

def cleanup_temp_files():
    for temp_file in temp_files:
        try:
            os.remove(temp_file)
        except OSError:
            pass
    temp_files.clear()


if __name__ == '__main__':
    asyncio.run(main())

@iciclespider iciclespider force-pushed the async-refresh_api_key_hook branch from e6f704b to e70b107 Compare May 23, 2025 03:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants