-
Notifications
You must be signed in to change notification settings - Fork 169
[Kubernetes secret provider] Add cache for the secrets #3822
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
dadc79b
ea3a100
a71101b
125330f
09ba2b6
4908655
a291bae
9f5b92c
9d94859
6d30148
d7c8756
b3ba219
8913d04
0fcd65a
57ad12c
8533ec4
c65ae20
87f0453
f02823e
97d6c84
0f8d86a
70c16ca
7f40d9c
62862a3
7642f09
ab76805
4f3ab48
044e7d1
8b7da6c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
# Kind can be one of: | ||
# - breaking-change: a change to previously-documented behavior | ||
# - deprecation: functionality that is being removed in a later release | ||
# - bug-fix: fixes a problem in a previous version | ||
# - enhancement: extends functionality but does not break or fix existing behavior | ||
# - feature: new functionality | ||
# - known-issue: problems that we are aware of in a given version | ||
# - security: impacts on the security of a product or a user’s deployment. | ||
# - upgrade: important information for someone upgrading from a prior version | ||
# - other: does not fit into any of the other categories | ||
kind: feature | ||
|
||
# Change summary; a 80ish characters long description of the change. | ||
summary: add cache for secrets when using kubernetes secret provider | ||
|
||
# Long description; in case the summary is not enough to describe the change | ||
# this field accommodate a description without length limits. | ||
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. | ||
#description: | ||
|
||
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. | ||
component: elastic-agent | ||
|
||
# PR URL; optional; the PR number that added the changeset. | ||
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. | ||
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. | ||
# Please provide it if you are adding a fragment for a different PR. | ||
pr: https://github.com/elastic/elastic-agent/pull/3822 | ||
|
||
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). | ||
# If not present is automatically filled by the tooling with the issue linked to the PR number. | ||
issue: https://github.com/elastic/elastic-agent/issues/3594 |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ import ( | |
"context" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
k8sclient "k8s.io/client-go/kubernetes" | ||
|
@@ -33,6 +34,14 @@ type contextProviderK8sSecrets struct { | |
|
||
clientMx sync.Mutex | ||
client k8sclient.Interface | ||
|
||
secretsCacheMx sync.RWMutex | ||
secretsCache map[string]*secretsData | ||
} | ||
|
||
type secretsData struct { | ||
value string | ||
lastAccess time.Time | ||
} | ||
|
||
// ContextProviderBuilder builds the context provider. | ||
|
@@ -46,67 +55,235 @@ func ContextProviderBuilder(logger *logger.Logger, c *config.Config, managed boo | |
return nil, errors.New(err, "failed to unpack configuration") | ||
} | ||
return &contextProviderK8sSecrets{ | ||
logger: logger, | ||
config: &cfg, | ||
logger: logger, | ||
config: &cfg, | ||
secretsCache: make(map[string]*secretsData), | ||
}, nil | ||
} | ||
|
||
func (p *contextProviderK8sSecrets) Fetch(key string) (string, bool) { | ||
// key = "kubernetes_secrets.somenamespace.somesecret.value" | ||
if p.config.DisableCache { | ||
valid := p.validateKey(key) | ||
if valid { | ||
return p.fetchSecretWithTimeout(key) | ||
} else { | ||
return "", false | ||
} | ||
} else { | ||
return p.getFromCache(key) | ||
} | ||
} | ||
|
||
// Run initializes the k8s secrets context provider. | ||
func (p *contextProviderK8sSecrets) Run(ctx context.Context, comm corecomp.ContextProviderComm) error { | ||
client, err := getK8sClientFunc(p.config.KubeConfig, p.config.KubeClientOptions) | ||
if err != nil { | ||
p.logger.Debugf("kubernetes_secrets provider skipped, unable to connect: %s", err) | ||
return nil | ||
} | ||
p.clientMx.Lock() | ||
client := p.client | ||
p.client = client | ||
p.clientMx.Unlock() | ||
if client == nil { | ||
return "", false | ||
|
||
if !p.config.DisableCache { | ||
go p.updateSecrets(ctx) | ||
} | ||
|
||
<-comm.Done() | ||
|
||
p.clientMx.Lock() | ||
p.client = nil | ||
p.clientMx.Unlock() | ||
return comm.Err() | ||
} | ||
|
||
func getK8sClient(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclient.Interface, error) { | ||
return kubernetes.GetKubernetesClient(kubeconfig, opt) | ||
} | ||
|
||
// Update the secrets in the cache every RefreshInterval | ||
func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context) { | ||
timer := time.NewTimer(p.config.RefreshInterval) | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-timer.C: | ||
p.updateCache() | ||
timer.Reset(p.config.RefreshInterval) | ||
} | ||
} | ||
} | ||
|
||
// mergeWithCurrent merges the updated map with the cache map. | ||
// This function needs to be called between the mutex lock for the map. | ||
func (p *contextProviderK8sSecrets) mergeWithCurrent(updatedMap map[string]*secretsData) map[string]*secretsData { | ||
merged := make(map[string]*secretsData) | ||
|
||
for name, data := range p.secretsCache { | ||
diff := time.Since(data.lastAccess) | ||
if diff < p.config.TTLDelete { | ||
merged[name] = data | ||
} | ||
} | ||
|
||
for name, data := range updatedMap { | ||
// We need to check if the key is already in the new map. If it is, lastAccess cannot be overwritten since | ||
// it could have been updated when trying to fetch the secret at the same time we are running update cache. | ||
// In that case, we only update the value. | ||
if _, ok := merged[name]; ok { | ||
merged[name].value = data.value | ||
} | ||
} | ||
|
||
return merged | ||
} | ||
|
||
func (p *contextProviderK8sSecrets) updateCache() { | ||
// deleting entries does not free the memory, so we need to create a new map | ||
// to place the secrets we want to keep | ||
cacheTmp := make(map[string]*secretsData) | ||
|
||
// to not hold the lock for long, we copy the current state of the cache map | ||
copyMap := make(map[string]secretsData) | ||
p.secretsCacheMx.RLock() | ||
for name, data := range p.secretsCache { | ||
copyMap[name] = *data | ||
} | ||
p.secretsCacheMx.RUnlock() | ||
|
||
for name, data := range copyMap { | ||
diff := time.Since(data.lastAccess) | ||
if diff < p.config.TTLDelete { | ||
value, ok := p.fetchSecretWithTimeout(name) | ||
if ok { | ||
newData := &secretsData{ | ||
value: value, | ||
lastAccess: data.lastAccess, | ||
} | ||
cacheTmp[name] = newData | ||
} | ||
|
||
} | ||
} | ||
|
||
// While the cache was updated, it is possible that some secret was added through another go routine. | ||
// We need to merge the updated map with the current cache map to catch the new entries and avoid | ||
// loss of data. | ||
p.secretsCacheMx.Lock() | ||
p.secretsCache = p.mergeWithCurrent(cacheTmp) | ||
p.secretsCacheMx.Unlock() | ||
} | ||
|
||
func (p *contextProviderK8sSecrets) getFromCache(key string) (string, bool) { | ||
p.secretsCacheMx.RLock() | ||
_, ok := p.secretsCache[key] | ||
p.secretsCacheMx.RUnlock() | ||
|
||
// if value is still not present in cache, it is possible we haven't tried to fetch it yet | ||
if !ok { | ||
value, ok := p.addToCache(key) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why grab the lock inside of the function to set the new value. Then only 4 lines below grab it again to set the lastAccess time? Why not just set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Because then we would be changing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To me its worse to grab a lock, release the lock, grab the lock again, then release it again all in the same path. It should grab the lock do what it needs to do and release it. |
||
// if it was not possible to fetch the secret, return | ||
if !ok { | ||
return value, ok | ||
} | ||
} | ||
|
||
p.secretsCacheMx.Lock() | ||
data, ok := p.secretsCache[key] | ||
data.lastAccess = time.Now() | ||
cmacknz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pass := data.value | ||
p.secretsCacheMx.Unlock() | ||
|
||
return pass, ok | ||
} | ||
|
||
func (p *contextProviderK8sSecrets) validateKey(key string) bool { | ||
// Make sure the key has the expected format "kubernetes_secrets.somenamespace.somesecret.value" | ||
tokens := strings.Split(key, ".") | ||
if len(tokens) > 0 && tokens[0] != "kubernetes_secrets" { | ||
return "", false | ||
return false | ||
} | ||
if len(tokens) != 4 { | ||
p.logger.Debugf( | ||
"not valid secret key: %v. Secrets should be of the following format %v", | ||
key, | ||
"kubernetes_secrets.somenamespace.somesecret.value", | ||
) | ||
return false | ||
} | ||
return true | ||
} | ||
|
||
func (p *contextProviderK8sSecrets) addToCache(key string) (string, bool) { | ||
valid := p.validateKey(key) | ||
if !valid { | ||
return "", false | ||
} | ||
|
||
value, ok := p.fetchSecretWithTimeout(key) | ||
if ok { | ||
p.secretsCacheMx.Lock() | ||
p.secretsCache[key] = &secretsData{value: value} | ||
p.secretsCacheMx.Unlock() | ||
} | ||
return value, ok | ||
} | ||
|
||
type Result struct { | ||
value string | ||
ok bool | ||
} | ||
|
||
func (p *contextProviderK8sSecrets) fetchSecretWithTimeout(key string) (string, bool) { | ||
ctxTimeout, cancel := context.WithTimeout(context.Background(), p.config.RequestTimeout) | ||
defer cancel() | ||
|
||
resultCh := make(chan Result, 1) | ||
p.fetchSecret(ctxTimeout, key, resultCh) | ||
|
||
select { | ||
case <-ctxTimeout.Done(): | ||
p.logger.Errorf("Could not retrieve value for key %v: %v", key, ctxTimeout.Err()) | ||
return "", false | ||
case result := <-resultCh: | ||
return result.value, result.ok | ||
} | ||
} | ||
|
||
func (p *contextProviderK8sSecrets) fetchSecret(context context.Context, key string, resultCh chan Result) { | ||
p.clientMx.Lock() | ||
client := p.client | ||
p.clientMx.Unlock() | ||
if client == nil { | ||
resultCh <- Result{value: "", ok: false} | ||
return | ||
} | ||
|
||
tokens := strings.Split(key, ".") | ||
// key has the format "kubernetes_secrets.somenamespace.somesecret.value" | ||
// This function is only called from: | ||
// - addToCache, where we already validated that the key has the right format. | ||
// - updateCache, where the results are only added to the cache through addToCache | ||
// Because of this we no longer need to validate the key | ||
ns := tokens[1] | ||
secretName := tokens[2] | ||
secretVar := tokens[3] | ||
|
||
secretIntefrace := client.CoreV1().Secrets(ns) | ||
ctx := context.TODO() | ||
secret, err := secretIntefrace.Get(ctx, secretName, metav1.GetOptions{}) | ||
secretInterface := client.CoreV1().Secrets(ns) | ||
secret, err := secretInterface.Get(context, secretName, metav1.GetOptions{}) | ||
|
||
if err != nil { | ||
p.logger.Errorf("Could not retrieve secret from k8s API: %v", err) | ||
return "", false | ||
resultCh <- Result{value: "", ok: false} | ||
return | ||
} | ||
if _, ok := secret.Data[secretVar]; !ok { | ||
p.logger.Errorf("Could not retrieve value %v for secret %v", secretVar, secretName) | ||
return "", false | ||
resultCh <- Result{value: "", ok: false} | ||
return | ||
} | ||
secretString := secret.Data[secretVar] | ||
return string(secretString), true | ||
} | ||
|
||
// Run initializes the k8s secrets context provider. | ||
func (p *contextProviderK8sSecrets) Run(ctx context.Context, comm corecomp.ContextProviderComm) error { | ||
client, err := getK8sClientFunc(p.config.KubeConfig, p.config.KubeClientOptions) | ||
if err != nil { | ||
p.logger.Debugf("Kubernetes_secrets provider skipped, unable to connect: %s", err) | ||
return nil | ||
} | ||
p.clientMx.Lock() | ||
p.client = client | ||
p.clientMx.Unlock() | ||
<-comm.Done() | ||
p.clientMx.Lock() | ||
p.client = nil | ||
p.clientMx.Unlock() | ||
return comm.Err() | ||
} | ||
|
||
func getK8sClient(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclient.Interface, error) { | ||
return kubernetes.GetKubernetesClient(kubeconfig, opt) | ||
secretString := secret.Data[secretVar] | ||
resultCh <- Result{value: string(secretString), ok: true} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.