diff --git a/Gopkg.lock b/Gopkg.lock index fb6af533fd5..283b2ff2737 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -910,12 +910,12 @@ revision = "a72b379d632eab4b49e4f4b2c765cfebf0a74796" [[projects]] - digest = "1:8e79da55d2a6a92374cc7ca5ce1a7739fe0a13ca04b6f8c806a50f1873138eff" + digest = "1:30bef1faf227592dcae9ced04b457efe84298d059b9a8398630d848c0f507e89" name = "github.com/sercand/kuberesolver" packages = ["."] pruneopts = "UT" - revision = "aa801ca262949d887bbe0bae3f6f731ac82c26f6" - version = "v1.0.0" + revision = "f0a61d5e8ca1bcc7a607d6de3dfd51467791db88" + version = "v2.1.0" [[projects]] digest = "1:d867dfa6751c8d7a435821ad3b736310c2ed68945d05b50fb9d23aee0540c8cc" @@ -990,7 +990,7 @@ [[projects]] branch = "master" - digest = "1:0b1e3d99a4914c79245fb7d8992db23596ed79848c078307db97a099a1899e62" + digest = "1:cc0ef0c583b7dcaf4168af51727f59e72b9638a76cb7e40e05c3b7b4b24cd37f" name = "github.com/weaveworks/common" packages = [ "aws", @@ -1010,7 +1010,7 @@ "user", ] pruneopts = "UT" - revision = "c1808abf9c462ba088ef5c764053a316a58cde24" + revision = "87611edc252e21e7de58c08270384450b126b48a" [[projects]] digest = "1:efac30de93ca1ff38050f46dc34f1338ebc8778de488f919f79ad9e6188719d3" diff --git a/vendor/github.com/sercand/kuberesolver/LICENSE b/vendor/github.com/sercand/kuberesolver/LICENSE new file mode 100644 index 00000000000..5ae44e510f2 --- /dev/null +++ b/vendor/github.com/sercand/kuberesolver/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2017 Sercan Degirmenci + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/vendor/github.com/sercand/kuberesolver/README.md b/vendor/github.com/sercand/kuberesolver/README.md index ba3a8044e42..4035f6202c2 100644 --- a/vendor/github.com/sercand/kuberesolver/README.md +++ b/vendor/github.com/sercand/kuberesolver/README.md @@ -2,17 +2,27 @@ Grpc Client-Side Load Balancer with Kubernetes name resolver ```go -//New balancer for default namespace -balancer := kuberesolver.New() -//Dials with RoundRobin lb and kubernetes name resolver. if url schema is not 'kubernetes' than uses dns -cc, err := balancer.Dial("kubernetes://service-name:portname", opts...) -// or, add balancer as dial option bu this does not fallback to dns if schema is not 'kubernetes' -cc, err := grpc.Dial("kubernetes://service-name:portname", balancer.DialOption(), opts...) +// Register kuberesolver to grpc +kuberesolver.RegisterInCluster() +// is same as +resolver.Register(kuberesolver.NewBuilder(nil)) +// you can bring your own k8s client, below is default behaviour +client, err := kuberesolver.NewInClusterK8sClient() +resolver.Register(kuberesolver.NewBuilder(client)) + +// USAGE: +// if schema is 'kubernetes' then grpc will use kuberesolver to resolve addresses +cc, err := grpc.Dial("kubernetes:///service-name.namespace:portname", opts...) ``` -An url can be one of the following + +An url can be one of the following, [grpc naming docs](https://github.com/grpc/grpc/blob/master/doc/naming.md) ``` -kubernetes://service-name:portname uses kubernetes api to fetch endpoints and port names -kubernetes://service-name:8080 uses kubernetes api to fetch endpoints but uses given port -dns://service-name:8080 does not use lb -service-name:8080 +kubernetes:///service-name:8080 +kubernetes:///service-name:portname +kubernetes:///service-name.namespace:8080 + +kubernetes://namespace/service-name:8080 +kubernetes://service-name:8080/ +kubernetes://service-name.namespace:8080/ + ``` diff --git a/vendor/github.com/sercand/kuberesolver/balancer.go b/vendor/github.com/sercand/kuberesolver/balancer.go deleted file mode 100644 index d03162de067..00000000000 --- a/vendor/github.com/sercand/kuberesolver/balancer.go +++ /dev/null @@ -1,135 +0,0 @@ -package kuberesolver - -import ( - "errors" - "fmt" - "net/url" - "strconv" - "strings" - - "google.golang.org/grpc" - "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/naming" -) - -type Balancer struct { - Namespace string - client *k8sClient - resolvers []*kubeResolver -} - -type TargetUrlType int32 - -const ( - TargetTypeDNS TargetUrlType = 0 - TargetTypeKubernetes TargetUrlType = 1 - kubernetesSchema = "kubernetes" - dnsSchema = "dns" -) - -type targetInfo struct { - urlType TargetUrlType - target string - port string - resolveByPortName bool - useFirstPort bool -} - -func parseTarget(target string) (targetInfo, error) { - u, err := url.Parse(target) - if err != nil { - return targetInfo{}, err - } - ti := targetInfo{} - if u.Scheme == kubernetesSchema { - ti.urlType = TargetTypeKubernetes - spl := strings.Split(u.Host, ":") - if len(spl) == 2 { - ti.target = spl[0] - ti.port = spl[1] - ti.useFirstPort = false - if _, err := strconv.Atoi(ti.port); err != nil { - ti.resolveByPortName = true - } else { - ti.resolveByPortName = false - } - } else { - ti.target = spl[0] - ti.useFirstPort = true - } - } else if u.Scheme == dnsSchema { - ti.urlType = TargetTypeDNS - ti.target = u.Host - } else { - ti.urlType = TargetTypeDNS - ti.target = target - } - return ti, nil -} - -//Resolver returns Resolver for grpc -func (b *Balancer) Resolver() naming.Resolver { - return newResolver(b.client, b.Namespace) -} - -//DialOption returns grpc.DialOption with RoundRobin balancer and resolver -func (b *Balancer) DialOption() grpc.DialOption { - rs := newResolver(b.client, b.Namespace) - return grpc.WithBalancer(grpc.RoundRobin(rs)) -} - -// Dial calls grpc.Dial, also parses target and uses load balancer if necessary -func (b *Balancer) Dial(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { - pt, err := parseTarget(target) - if err != nil { - return nil, err - } - switch pt.urlType { - case TargetTypeKubernetes: - if b.client == nil { - return nil, errors.New("application is not running inside kubernetes") - } - grpclog.Printf("kuberesolver: using kubernetes resolver target=%s", pt.target) - rs := newResolver(b.client, b.Namespace) - b.resolvers = append(b.resolvers, rs) - opts := append(opts, grpc.WithBalancer(grpc.RoundRobin(rs))) - return grpc.Dial(target, opts...) - case TargetTypeDNS: - return grpc.Dial(pt.target, opts...) - default: - return nil, errors.New("Unknown target type") - } -} - -func (b *Balancer) Healthy() error { - for _, r := range b.resolvers { - if r.watcher != nil { - if len(r.watcher.endpoints) == 0 { - return fmt.Errorf("target does not have endpoints") - } - } - } - return nil -} - -// IsInCluster returns true if application is running inside kubernetes cluster -func (b *Balancer) IsInCluster() bool { - return b.client != nil -} - -// New creates a Balancer with "default" namespace -func New() *Balancer { - return NewWithNamespace("default") -} - -// NewWithNamespace creates a Balancer with given namespace. -func NewWithNamespace(namespace string) *Balancer { - client, err := newInClusterClient() - if err != nil { - grpclog.Printf("kuberesolver: application is not running inside kubernetes") - } - return &Balancer{ - Namespace: namespace, - client: client, - } -} diff --git a/vendor/github.com/sercand/kuberesolver/builder.go b/vendor/github.com/sercand/kuberesolver/builder.go new file mode 100644 index 00000000000..14c2d6bc61e --- /dev/null +++ b/vendor/github.com/sercand/kuberesolver/builder.go @@ -0,0 +1,255 @@ +package kuberesolver + +import ( + "fmt" + "io" + "net" + "strconv" + "strings" + "sync" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/resolver" +) + +const ( + kubernetesSchema = "kubernetes" + defaultFreq = time.Minute * 30 +) + +type targetInfo struct { + serviceName string + serviceNamespace string + port string + resolveByPortName bool + useFirstPort bool +} + +// RegisterInCluster registers the kuberesolver builder to grpc +func RegisterInCluster() { + RegisterInClusterWithSchema(kubernetesSchema) +} + +func RegisterInClusterWithSchema(schema string) { + resolver.Register(NewBuilder(nil, schema)) +} + +// NewBuilder creates a kubeBuilder which is used to factory Kuberesolvers. +func NewBuilder(client K8sClient, schema string) resolver.Builder { + return &kubeBuilder{ + k8sClient: client, + schema: schema, + } +} + +type kubeBuilder struct { + k8sClient K8sClient + schema string +} + +func parseResolverTarget(target resolver.Target) (targetInfo, error) { + // kubernetes://default/service:port + end := target.Endpoint + snamespace := target.Authority + // kubernetes://service.default:port/ + if end == "" { + end = target.Authority + snamespace = "default" + } + // kubernetes:///service:port + // kubernetes://service:port/ + if snamespace == "" { + snamespace = "default" + } + + ti := targetInfo{} + if end == "" { + return targetInfo{}, fmt.Errorf("target(%q) is empty", target) + } + var name string + var port string + if strings.LastIndex(end, ":") < 0 { + name = end + port = "" + ti.useFirstPort = true + } else { + var err error + name, port, err = net.SplitHostPort(end) + if err != nil { + return targetInfo{}, fmt.Errorf("target endpoint='%s' is invalid. grpc target is %#v, err=%v", end, target, err) + } + } + + namesplit := strings.SplitN(name, ".", 2) + sname := name + if len(namesplit) == 2 { + sname = namesplit[0] + snamespace = namesplit[1] + } + ti.serviceName = sname + ti.serviceNamespace = snamespace + ti.port = port + if !ti.useFirstPort { + if _, err := strconv.Atoi(ti.port); err != nil { + ti.resolveByPortName = true + } else { + ti.resolveByPortName = false + } + } + return ti, nil +} + +// Build creates a new resolver for the given target. +// +// gRPC dial calls Build synchronously, and fails if the returned error is +// not nil. +func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) { + if b.k8sClient == nil { + if cl, err := NewInClusterK8sClient(); err == nil { + b.k8sClient = cl + } else { + return nil, err + } + } + ti, err := parseResolverTarget(target) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(context.Background()) + r := &kResolver{ + target: ti, + ctx: ctx, + cancel: cancel, + cc: cc, + rn: make(chan struct{}, 1), + k8sClient: b.k8sClient, + t: time.NewTimer(defaultFreq), + freq: defaultFreq, + } + go until(func() { + r.wg.Add(1) + err := r.watch() + if err != nil && err != io.EOF { + grpclog.Errorf("kuberesolver: watching ended with error='%v', will reconnect again", err) + } + }, time.Second, ctx.Done()) + return r, nil +} + +// Scheme returns the scheme supported by this resolver. +// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md. +func (b *kubeBuilder) Scheme() string { + return b.schema +} + +type kResolver struct { + target targetInfo + ctx context.Context + cancel context.CancelFunc + cc resolver.ClientConn + // rn channel is used by ResolveNow() to force an immediate resolution of the target. + rn chan struct{} + k8sClient K8sClient + // wg is used to enforce Close() to return after the watcher() goroutine has finished. + wg sync.WaitGroup + t *time.Timer + freq time.Duration +} + +// ResolveNow will be called by gRPC to try to resolve the target name again. +// It's just a hint, resolver can ignore this if it's not necessary. +func (k *kResolver) ResolveNow(resolver.ResolveNowOption) { + select { + case k.rn <- struct{}{}: + default: + } +} + +// Close closes the resolver. +func (k *kResolver) Close() { + k.cancel() + k.wg.Wait() +} + +func (k *kResolver) makeAddresses(e Endpoints) ([]resolver.Address, string) { + var newAddrs []resolver.Address + for _, subset := range e.Subsets { + port := "" + if k.target.useFirstPort { + port = strconv.Itoa(subset.Ports[0].Port) + } else if k.target.resolveByPortName { + for _, p := range subset.Ports { + if p.Name == k.target.port { + port = strconv.Itoa(p.Port) + break + } + } + } else { + port = k.target.port + } + + if len(port) == 0 { + port = strconv.Itoa(subset.Ports[0].Port) + } + + for _, address := range subset.Addresses { + sname := k.target.serviceName + if address.TargetRef != nil { + sname = address.TargetRef.Name + } + newAddrs = append(newAddrs, resolver.Address{ + Type: resolver.Backend, + Addr: net.JoinHostPort(address.IP, port), + ServerName: sname, + Metadata: nil, + }) + } + } + return newAddrs, "" +} + +func (k *kResolver) handle(e Endpoints) { + result, _ := k.makeAddresses(e) + // k.cc.NewServiceConfig(sc) + if len(result) > 0 { + k.cc.NewAddress(result) + } +} + +func (k *kResolver) resolve() { + e, err := getEndpoints(k.k8sClient, k.target.serviceNamespace, k.target.serviceName) + if err == nil { + k.handle(e) + } else { + grpclog.Errorf("kuberesolver: lookup endpoints failed: %v", err) + } + // Next lookup should happen after an interval defined by k.freq. + k.t.Reset(k.freq) +} + +func (k *kResolver) watch() error { + defer k.wg.Done() + // watch endpoints lists existing endpoints at start + sw, err := watchEndpoints(k.k8sClient, k.target.serviceNamespace, k.target.serviceName) + if err != nil { + return err + } + for { + select { + case <-k.ctx.Done(): + return nil + case <-k.t.C: + k.resolve() + case <-k.rn: + k.resolve() + case up, hasMore := <-sw.ResultChan(): + if hasMore { + k.handle(up.Object) + } else { + return nil + } + } + } +} diff --git a/vendor/github.com/sercand/kuberesolver/kubernetes.go b/vendor/github.com/sercand/kuberesolver/kubernetes.go index 737880ee4aa..18925eae794 100644 --- a/vendor/github.com/sercand/kuberesolver/kubernetes.go +++ b/vendor/github.com/sercand/kuberesolver/kubernetes.go @@ -3,11 +3,14 @@ package kuberesolver import ( "crypto/tls" "crypto/x509" + "encoding/json" "fmt" "io/ioutil" "net" "net/http" + "net/url" "os" + "strings" "time" ) @@ -16,13 +19,23 @@ const ( serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" ) +// K8sClient is minimal kubernetes client interface +type K8sClient interface { + Do(req *http.Request) (*http.Response, error) + GetRequest(url string) (*http.Request, error) + Host() string +} + type k8sClient struct { host string token string httpClient *http.Client } -func (kc *k8sClient) getRequest(url string) (*http.Request, error) { +func (kc *k8sClient) GetRequest(url string) (*http.Request, error) { + if !strings.HasPrefix(url, kc.host) { + url = fmt.Sprintf("%s/%s", kc.host, url) + } req, err := http.NewRequest("GET", url, nil) if err != nil { return nil, err @@ -37,12 +50,12 @@ func (kc *k8sClient) Do(req *http.Request) (*http.Response, error) { return kc.httpClient.Do(req) } -/* -KUBERNETES_SERVICE_PORT=443 -KUBERNETES_SERVICE_PORT_HTTPS=443 -KUBERNETES_SERVICE_HOST=10.0.0.1 -*/ -func newInClusterClient() (*k8sClient, error) { +func (kc *k8sClient) Host() string { + return kc.host +} + +// NewInClusterK8sClient creates K8sClient if it is inside Kubernetes +func NewInClusterK8sClient() (K8sClient, error) { host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT") if len(host) == 0 || len(port) == 0 { return nil, fmt.Errorf("unable to load in-cluster configuration, KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT must be defined") @@ -69,3 +82,56 @@ func newInClusterClient() (*k8sClient, error) { httpClient: httpClient, }, nil } + +// NewInsecureK8sClient creates an insecure k8s client which is suitable +// to connect kubernetes api behind proxy +func NewInsecureK8sClient(apiURL string) K8sClient { + return &k8sClient{ + host: apiURL, + httpClient: http.DefaultClient, + } +} + +func getEndpoints(client K8sClient, namespace, targetName string) (Endpoints, error) { + u, err := url.Parse(fmt.Sprintf("%s/api/v1/namespaces/%s/endpoints/%s", + client.Host(), namespace, targetName)) + if err != nil { + return Endpoints{}, err + } + req, err := client.GetRequest(u.String()) + if err != nil { + return Endpoints{}, err + } + resp, err := client.Do(req) + if err != nil { + return Endpoints{}, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return Endpoints{}, fmt.Errorf("invalid response code %d", resp.StatusCode) + } + result := Endpoints{} + err = json.NewDecoder(resp.Body).Decode(&result) + return result, err +} + +func watchEndpoints(client K8sClient, namespace, targetName string) (watchInterface, error) { + u, err := url.Parse(fmt.Sprintf("%s/api/v1/watch/namespaces/%s/endpoints/%s", + client.Host(), namespace, targetName)) + if err != nil { + return nil, err + } + req, err := client.GetRequest(u.String()) + if err != nil { + return nil, err + } + resp, err := client.Do(req) + if err != nil { + return nil, err + } + if resp.StatusCode != http.StatusOK { + defer resp.Body.Close() + return nil, fmt.Errorf("invalid response code %d", resp.StatusCode) + } + return newStreamWatcher(resp.Body), nil +} diff --git a/vendor/github.com/sercand/kuberesolver/models.go b/vendor/github.com/sercand/kuberesolver/models.go index 3cf13fa6eea..167e3613e73 100644 --- a/vendor/github.com/sercand/kuberesolver/models.go +++ b/vendor/github.com/sercand/kuberesolver/models.go @@ -23,8 +23,10 @@ type Endpoints struct { } type Metadata struct { - Name string `json:"name"` - ResourceVersion string `json:"resourceVersion"` + Name string `json:"name"` + Namespace string `json:"namespace"` + ResourceVersion string `json:"resourceVersion"` + Labels map[string]string `json:"labels"` } type Subset struct { @@ -33,9 +35,15 @@ type Subset struct { } type Address struct { - IP string `json:"ip"` + IP string `json:"ip"` + TargetRef *ObjectReference `json:"targetRef,omitempty"` } +type ObjectReference struct { + Kind string `json:"kind"` + Name string `json:"name"` + Namespace string `json:"namespace"` +} type Port struct { Name string `json:"name"` Port int `json:"port"` diff --git a/vendor/github.com/sercand/kuberesolver/resolver.go b/vendor/github.com/sercand/kuberesolver/resolver.go deleted file mode 100644 index 1c6a92d4cb6..00000000000 --- a/vendor/github.com/sercand/kuberesolver/resolver.go +++ /dev/null @@ -1,86 +0,0 @@ -package kuberesolver - -import ( - "fmt" - "io/ioutil" - "net/http" - "net/url" - "time" - - "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/naming" -) - -// kubeResolver resolves service names using Kubernetes endpoints. -type kubeResolver struct { - k8sClient *k8sClient - namespace string - watcher *watcher -} - -// NewResolver returns a new Kubernetes resolver. -func newResolver(client *k8sClient, namespace string) *kubeResolver { - if namespace == "" { - namespace = "default" - } - return &kubeResolver{client, namespace, nil} -} - -// Resolve creates a Kubernetes watcher for the named target. -func (r *kubeResolver) Resolve(target string) (naming.Watcher, error) { - pt, err := parseTarget(target) - if err != nil { - return nil, err - } - resultChan := make(chan watchResult) - stopCh := make(chan struct{}) - wtarget := pt.target - go until(func() { - err := r.watch(wtarget, stopCh, resultChan) - if err != nil { - grpclog.Printf("kuberesolver: watching ended with error='%v', will reconnect again", err) - } - }, time.Second, stopCh) - - r.watcher = &watcher{ - target: pt, - endpoints: make(map[string]interface{}), - stopCh: stopCh, - result: resultChan, - } - return r.watcher, nil -} - -func (r *kubeResolver) watch(target string, stopCh <-chan struct{}, resultCh chan<- watchResult) error { - u, err := url.Parse(fmt.Sprintf("%s/api/v1/watch/namespaces/%s/endpoints/%s", - r.k8sClient.host, r.namespace, target)) - if err != nil { - return err - } - req, err := r.k8sClient.getRequest(u.String()) - if err != nil { - return err - } - resp, err := r.k8sClient.Do(req) - if err != nil { - return err - } - if resp.StatusCode != http.StatusOK { - defer resp.Body.Close() - rbody, _ := ioutil.ReadAll(resp.Body) - return fmt.Errorf("invalid response code %d: %s", resp.StatusCode, rbody) - } - sw := newStreamWatcher(resp.Body) - for { - select { - case <-stopCh: - return nil - case up, more := <-sw.ResultChan(): - if more { - resultCh <- watchResult{err: nil, ep: &up} - } else { - return nil - } - } - } -} diff --git a/vendor/github.com/sercand/kuberesolver/stream.go b/vendor/github.com/sercand/kuberesolver/stream.go index 51aab49b5db..30a600ccde9 100644 --- a/vendor/github.com/sercand/kuberesolver/stream.go +++ b/vendor/github.com/sercand/kuberesolver/stream.go @@ -79,9 +79,9 @@ func (sw *streamWatcher) receive() { case io.EOF: // watch closed normally case io.ErrUnexpectedEOF: - grpclog.Printf("kuberesolver: Unexpected EOF during watch stream event decoding: %v", err) + grpclog.Infof("kuberesolver: Unexpected EOF during watch stream event decoding: %v", err) default: - grpclog.Printf("kuberesolver: Unable to decode an event from the watch stream: %v", err) + grpclog.Infof("kuberesolver: Unable to decode an event from the watch stream: %v", err) } return } diff --git a/vendor/github.com/sercand/kuberesolver/util.go b/vendor/github.com/sercand/kuberesolver/util.go index e0726092002..8262a268851 100644 --- a/vendor/github.com/sercand/kuberesolver/util.go +++ b/vendor/github.com/sercand/kuberesolver/util.go @@ -30,6 +30,6 @@ func until(f func(), period time.Duration, stopCh <-chan struct{}) { func handleCrash() { if r := recover(); r != nil { callers := string(debug.Stack()) - grpclog.Printf("kuberesolver: recovered from panic: %#v (%v)\n%v", r, r, callers) + grpclog.Errorf("kuberesolver: recovered from panic: %#v (%v)\n%v", r, r, callers) } } diff --git a/vendor/github.com/sercand/kuberesolver/watcher.go b/vendor/github.com/sercand/kuberesolver/watcher.go deleted file mode 100644 index 42437bd482b..00000000000 --- a/vendor/github.com/sercand/kuberesolver/watcher.go +++ /dev/null @@ -1,95 +0,0 @@ -package kuberesolver - -import ( - "net" - "strconv" - "sync" - - "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/naming" -) - -type watchResult struct { - ep *Event - err error -} - -// A Watcher provides name resolution updates from Kubernetes endpoints -// identified by name. -type watcher struct { - target targetInfo - endpoints map[string]interface{} - stopCh chan struct{} - result chan watchResult - sync.Mutex - stopped bool -} - -// Close closes the watcher, cleaning up any open connections. -func (w *watcher) Close() { - close(w.stopCh) -} - -// Next updates the endpoints for the name being watched. -func (w *watcher) Next() ([]*naming.Update, error) { - updates := make([]*naming.Update, 0) - updatedEndpoints := make(map[string]interface{}) - var ep Event - - select { - case <-w.stopCh: - w.Lock() - if !w.stopped { - w.stopped = true - } - w.Unlock() - return updates, nil - case r := <-w.result: - if r.err == nil { - ep = *r.ep - } else { - return updates, r.err - } - } - for _, subset := range ep.Object.Subsets { - port := "" - if w.target.useFirstPort { - port = strconv.Itoa(subset.Ports[0].Port) - } else if w.target.resolveByPortName { - for _, p := range subset.Ports { - if p.Name == w.target.port { - port = strconv.Itoa(p.Port) - break - } - } - } else { - port = w.target.port - } - - if len(port) == 0 { - port = strconv.Itoa(subset.Ports[0].Port) - } - for _, address := range subset.Addresses { - endpoint := net.JoinHostPort(address.IP, port) - updatedEndpoints[endpoint] = nil - } - } - - // Create updates to add new endpoints. - for addr, md := range updatedEndpoints { - if _, ok := w.endpoints[addr]; !ok { - updates = append(updates, &naming.Update{naming.Add, addr, md}) - grpclog.Printf("kuberesolver: %s ADDED to %s", addr, w.target.target) - } - } - - // Create updates to delete old endpoints. - for addr := range w.endpoints { - if _, ok := updatedEndpoints[addr]; !ok { - updates = append(updates, &naming.Update{naming.Delete, addr, nil}) - grpclog.Printf("kuberesolver: %s DELETED from %s", addr, w.target.target) - } - } - w.endpoints = updatedEndpoints - return updates, nil -} diff --git a/vendor/github.com/weaveworks/common/httpgrpc/server/server.go b/vendor/github.com/weaveworks/common/httpgrpc/server/server.go index b2676016a4e..28a0aabec04 100644 --- a/vendor/github.com/weaveworks/common/httpgrpc/server/server.go +++ b/vendor/github.com/weaveworks/common/httpgrpc/server/server.go @@ -17,6 +17,7 @@ import ( "github.com/sercand/kuberesolver" "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/balancer/roundrobin" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/logging" @@ -71,10 +72,15 @@ type Client struct { // ParseURL deals with direct:// style URLs, as well as kubernetes:// urls. // For backwards compatibility it treats URLs without schems as kubernetes://. -func ParseURL(unparsed string) (string, []grpc.DialOption, error) { +func ParseURL(unparsed string) (string, error) { + // if it has :///, this is the kuberesolver v2 URL. Return it as it is. + if strings.Contains(unparsed, ":///") { + return unparsed, nil + } + parsed, err := url.Parse(unparsed) if err != nil { - return "", nil, err + return "", err } scheme, host := parsed.Scheme, parsed.Host @@ -84,12 +90,12 @@ func ParseURL(unparsed string) (string, []grpc.DialOption, error) { switch scheme { case "direct": - return host, nil, err + return host, err case "kubernetes": host, port, err := net.SplitHostPort(host) if err != nil { - return "", nil, err + return "", err } parts := strings.SplitN(host, ".", 3) service, namespace, domain := parts[0], "default", "" @@ -100,31 +106,31 @@ func ParseURL(unparsed string) (string, []grpc.DialOption, error) { if len(parts) > 2 { domain = domain + "." + parts[2] } - balancer := kuberesolver.NewWithNamespace(namespace) address := fmt.Sprintf("kubernetes://%s%s:%s", service, domain, port) - dialOptions := []grpc.DialOption{balancer.DialOption()} - return address, dialOptions, nil + return address, nil default: - return "", nil, fmt.Errorf("unrecognised scheme: %s", parsed.Scheme) + return "", fmt.Errorf("unrecognised scheme: %s", parsed.Scheme) } } // NewClient makes a new Client, given a kubernetes service address. func NewClient(address string) (*Client, error) { - address, dialOptions, err := ParseURL(address) + kuberesolver.RegisterInCluster() + + address, err := ParseURL(address) if err != nil { return nil, err } - dialOptions = append( - dialOptions, + dialOptions := []grpc.DialOption{ + grpc.WithBalancerName(roundrobin.Name), grpc.WithInsecure(), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), middleware.ClientUserHeaderInterceptor, )), - ) + } conn, err := grpc.Dial(address, dialOptions...) if err != nil { diff --git a/vendor/github.com/weaveworks/common/instrument/instrument.go b/vendor/github.com/weaveworks/common/instrument/instrument.go index 2994544c365..2b6168d48b1 100644 --- a/vendor/github.com/weaveworks/common/instrument/instrument.go +++ b/vendor/github.com/weaveworks/common/instrument/instrument.go @@ -152,7 +152,9 @@ func CollectedRequest(ctx context.Context, method string, col Collector, toStatu col.After(method, toStatusCode(err), start) if err != nil { - ext.Error.Set(sp, true) + if err != context.Canceled { + ext.Error.Set(sp, true) + } sp.LogFields(otlog.Error(err)) } sp.Finish() diff --git a/vendor/github.com/weaveworks/common/logging/level.go b/vendor/github.com/weaveworks/common/logging/level.go index e9d60e63537..fba3e51dfb6 100644 --- a/vendor/github.com/weaveworks/common/logging/level.go +++ b/vendor/github.com/weaveworks/common/logging/level.go @@ -40,7 +40,21 @@ func (l *Level) String() string { return l.s } -// Set updates the value of the allowed level. +// UnmarshalYAML implements yaml.Unmarshaler. +func (l *Level) UnmarshalYAML(unmarshal func(interface{}) error) error { + var level string + if err := unmarshal(&level); err != nil { + return err + } + return l.Set(level) +} + +// MarshalYAML implements yaml.Marshaler. +func (l Level) MarshalYAML() (interface{}, error) { + return l.String(), nil +} + +// Set updates the value of the allowed level. Implments flag.Value. func (l *Level) Set(s string) error { switch s { case "debug": diff --git a/vendor/github.com/weaveworks/common/middleware/grpc_logging.go b/vendor/github.com/weaveworks/common/middleware/grpc_logging.go index bc9c9f37d51..75328eafc88 100644 --- a/vendor/github.com/weaveworks/common/middleware/grpc_logging.go +++ b/vendor/github.com/weaveworks/common/middleware/grpc_logging.go @@ -31,7 +31,11 @@ func (s GRPCServerLog) UnaryServerInterceptor(ctx context.Context, req interface if s.WithRequest { entry = entry.WithField("request", req) } - entry.WithField(errorKey, err).Warnln(gRPC) + if err == context.Canceled { + entry.WithField(errorKey, err).Debugln(gRPC) + } else { + entry.WithField(errorKey, err).Warnln(gRPC) + } } else { entry.Debugf("%s (success)", gRPC) } @@ -44,7 +48,11 @@ func (s GRPCServerLog) StreamServerInterceptor(srv interface{}, ss grpc.ServerSt err := handler(srv, ss) entry := user.LogWith(ss.Context(), s.Log).WithFields(logging.Fields{"method": info.FullMethod, "duration": time.Since(begin)}) if err != nil { - entry.WithField(errorKey, err).Warnln(gRPC) + if err == context.Canceled { + entry.WithField(errorKey, err).Debugln(gRPC) + } else { + entry.WithField(errorKey, err).Warnln(gRPC) + } } else { entry.Debugf("%s (success)", gRPC) } diff --git a/vendor/github.com/weaveworks/common/middleware/logging.go b/vendor/github.com/weaveworks/common/middleware/logging.go index 148a87e73d8..06f816e8fd1 100644 --- a/vendor/github.com/weaveworks/common/middleware/logging.go +++ b/vendor/github.com/weaveworks/common/middleware/logging.go @@ -63,8 +63,9 @@ func dumpRequest(req *http.Request) ([]byte, error) { // Exclude some headers for security, or just that we don't need them when debugging err := req.Header.WriteSubset(&b, map[string]bool{ - "Cookie": true, - "X-Csrf-Token": true, + "Cookie": true, + "X-Csrf-Token": true, + "Authorization": true, }) if err != nil { return nil, err diff --git a/vendor/github.com/weaveworks/common/server/server.go b/vendor/github.com/weaveworks/common/server/server.go index 5d882afb2fa..17a55b47d3b 100644 --- a/vendor/github.com/weaveworks/common/server/server.go +++ b/vendor/github.com/weaveworks/common/server/server.go @@ -9,7 +9,7 @@ import ( "time" "github.com/gorilla/mux" - "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" @@ -26,25 +26,31 @@ import ( // Config for a Server type Config struct { - MetricsNamespace string - HTTPListenPort int - GRPCListenPort int + MetricsNamespace string `yaml:"-"` + HTTPListenPort int `yaml:"http_listen_port"` + GRPCListenPort int `yaml:"grpc_listen_port"` - RegisterInstrumentation bool - ExcludeRequestInLog bool + RegisterInstrumentation bool `yaml:"-"` + ExcludeRequestInLog bool `yaml:"-"` - ServerGracefulShutdownTimeout time.Duration - HTTPServerReadTimeout time.Duration - HTTPServerWriteTimeout time.Duration - HTTPServerIdleTimeout time.Duration + ServerGracefulShutdownTimeout time.Duration `yaml:"graceful_shutdown_timeout"` + HTTPServerReadTimeout time.Duration `yaml:"http_server_read_timeout"` + HTTPServerWriteTimeout time.Duration `yaml:"http_server_write_timeout"` + HTTPServerIdleTimeout time.Duration `yaml:"http_server_idle_timeout"` - GRPCOptions []grpc.ServerOption - GRPCMiddleware []grpc.UnaryServerInterceptor - GRPCStreamMiddleware []grpc.StreamServerInterceptor - HTTPMiddleware []middleware.Interface + GRPCOptions []grpc.ServerOption `yaml:"-"` + GRPCMiddleware []grpc.UnaryServerInterceptor `yaml:"-"` + GRPCStreamMiddleware []grpc.StreamServerInterceptor `yaml:"-"` + HTTPMiddleware []middleware.Interface `yaml:"-"` - LogLevel logging.Level - Log logging.Interface + GPRCServerMaxRecvMsgSize int `yaml:"grpc_server_max_recv_msg_size"` + GRPCServerMaxSendMsgSize int `yaml:"grpc_server_max_send_msg_size"` + GPRCServerMaxConcurrentStreams uint `yaml:"grpc_server_max_concurrent_streams"` + + LogLevel logging.Level `yaml:"log_level"` + Log logging.Interface `yaml:"-"` + + PathPrefix string `yaml:"http_path_prefix"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -56,6 +62,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.HTTPServerReadTimeout, "server.http-read-timeout", 30*time.Second, "Read timeout for HTTP server") f.DurationVar(&cfg.HTTPServerWriteTimeout, "server.http-write-timeout", 30*time.Second, "Write timeout for HTTP server") f.DurationVar(&cfg.HTTPServerIdleTimeout, "server.http-idle-timeout", 120*time.Second, "Idle timeout for HTTP server") + f.IntVar(&cfg.GPRCServerMaxRecvMsgSize, "server.grpc-max-recv-msg-size-bytes", 4*1024*1024, "Limit on the size of a gRPC message this server can receive (bytes).") + f.IntVar(&cfg.GRPCServerMaxSendMsgSize, "server.grpc-max-send-msg-size-bytes", 4*1024*1024, "Limit on the size of a gRPC message this server can send (bytes).") + f.UintVar(&cfg.GPRCServerMaxConcurrentStreams, "server.grpc-max-concurrent-streams", 100, "Limit on the number of concurrent streams for gRPC calls (0 = unlimited)") + f.StringVar(&cfg.PathPrefix, "server.path-prefix", "", "Base path to serve all API routes from (e.g. /v1/)") cfg.LogLevel.RegisterFlags(f) } @@ -103,6 +113,8 @@ func New(cfg Config) (*Server, error) { log = logging.NewLogrus(cfg.LogLevel) } + log.WithField("http", httpListener.Addr()).WithField("grpc", grpcListener.Addr()).Infof("server listening on addresses") + // Setup gRPC server serverLog := middleware.GRPCServerLog{ WithRequest: !cfg.ExcludeRequestInLog, @@ -129,12 +141,20 @@ func New(cfg Config) (*Server, error) { grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpcStreamMiddleware..., )), + grpc.MaxRecvMsgSize(cfg.GPRCServerMaxRecvMsgSize), + grpc.MaxSendMsgSize(cfg.GRPCServerMaxSendMsgSize), + grpc.MaxConcurrentStreams(uint32(cfg.GPRCServerMaxConcurrentStreams)), } grpcOptions = append(grpcOptions, cfg.GRPCOptions...) grpcServer := grpc.NewServer(grpcOptions...) // Setup HTTP server router := mux.NewRouter() + if cfg.PathPrefix != "" { + // Expect metrics and pprof handlers to be prefixed with server's path prefix. + // e.g. /loki/metrics or /loki/debug/pprof + router = router.PathPrefix(cfg.PathPrefix).Subrouter() + } if cfg.RegisterInstrumentation { RegisterInstrumentation(router) }