Skip to content

Adding Microsoft Azure Blob storage support for chunks #1913

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
* [CHANGE] Ingesters now write only normalised tokens to the ring, although they can still read denormalised tokens used by other ingesters. `-ingester.normalise-tokens` is now deprecated, and ignored. If you want to switch back to using denormalised tokens, you need to downgrade to Cortex 0.4.0. Previous versions don't handle claiming tokens from normalised ingesters correctly. #1809
* [FEATURE] The distributor can now drop labels from samples (similar to the removal of the replica label for HA ingestion) per user via the `distributor.drop-label` flag. #1726
* [FEATURE] Added `global` ingestion rate limiter strategy. Deprecated `-distributor.limiter-reload-period` flag. #1766
* [FEATURE] Added support for Microsoft Azure blob storage to be used for storing chunk data. #1913
* [BUGFIX] Fixed unnecessary CAS operations done by the HA tracker when the jitter is enabled. #1861

## 0.4.0 / 2019-12-02

* [CHANGE] The frontend component has been refactored to be easier to re-use. When upgrading the frontend, cache entries will be discarded and re-created with the new protobuf schema. #1734
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Cortex provides horizontally scalable, highly available, multi-tenant, long term
- **Highly available:** When run in a cluster, Cortex can replicate data between machines. This allows you to survive machine failure without gaps in your graphs.
- **Multi-tenant:** Cortex can isolate data and queries from multiple different independent
Prometheus sources in a single cluster, allowing untrusted parties to share the same cluster.
- **Long term storage:** Cortex supports Amazon DynamoDB, Google Bigtable, Cassandra, S3 and GCS for long term storage of metric data. This allows you to durably store data for longer than the lifetime of any single machine, and use this data for long term capacity planning.
- **Long term storage:** Cortex supports Amazon DynamoDB, Google Bigtable, Cassandra, S3, GCS and Microsoft Azure for long term storage of metric data. This allows you to durably store data for longer than the lifetime of any single machine, and use this data for long term capacity planning.

Cortex is a [CNCF](https://cncf.io) sandbox project used in several production systems including [Weave Cloud](https://cloud.weave.works) and [Grafana Cloud](https://grafana.com/cloud).
Cortex is primarily used as a [remote write](https://prometheus.io/docs/operating/configuration/#remote_write) destination for Prometheus, with a Prometheus-compatible query API.
Expand Down
11 changes: 9 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ go 1.12

require (
cloud.google.com/go v0.44.1
github.com/Azure/azure-sdk-for-go v26.3.0+incompatible // indirect
github.com/Azure/go-autorest v11.5.1+incompatible // indirect
github.com/Azure/azure-storage-blob-go v0.8.0
github.com/Azure/go-autorest/autorest v0.9.2 // indirect
github.com/Azure/go-autorest/autorest/adal v0.8.0 // indirect
github.com/Azure/go-autorest/autorest/to v0.3.0 // indirect
github.com/Azure/go-autorest/autorest/validation v0.2.0 // indirect
github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5
github.com/NYTimes/gziphandler v1.1.1
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4
Expand Down Expand Up @@ -73,6 +76,10 @@ require (
gopkg.in/yaml.v2 v2.2.2
)

replace github.com/Azure/azure-sdk-for-go => github.com/Azure/azure-sdk-for-go v36.2.0+incompatible

replace github.com/Azure/go-autorest => github.com/Azure/go-autorest v13.3.0+incompatible

// Override since git.apache.org is down. The docs say to fetch from github.
replace git.apache.org/thrift.git => github.com/apache/thrift v0.0.0-20180902110319-2566ecd5d999

Expand Down
34 changes: 27 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,37 @@ contrib.go.opencensus.io/exporter/ocagent v0.6.0 h1:Z1n6UAyr0QwM284yUuh5Zd8JlvxU
contrib.go.opencensus.io/exporter/ocagent v0.6.0/go.mod h1:zmKjrJcdo0aYcVS7bmEeSEBLPA9YJp5bjrofdU3pIXs=
github.com/Azure/azure-pipeline-go v0.2.1 h1:OLBdZJ3yvOn2MezlWvbrBMTEUQC72zAftRZOMdj5HYo=
github.com/Azure/azure-pipeline-go v0.2.1/go.mod h1:UGSo8XybXnIGZ3epmeBw7Jdz+HiUVpqIlpz/HKHylF4=
github.com/Azure/azure-sdk-for-go v23.2.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v26.3.0+incompatible h1:w/tfbWIy9a8SSNJFwcapWeOfknQXDYBVjh5UkuIr+NA=
github.com/Azure/azure-sdk-for-go v26.3.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v36.2.0+incompatible h1:09cv2WoH0g6jl6m2iT+R9qcIPZKhXEL0sbmLhxP895s=
github.com/Azure/azure-sdk-for-go v36.2.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-storage-blob-go v0.7.0 h1:MuueVOYkufCxJw5YZzF842DY2MBsp+hLuh2apKY0mck=
github.com/Azure/azure-storage-blob-go v0.7.0/go.mod h1:f9YQKtsG1nMisotuTPpO0tjNuEjKRYAcJU8/ydDI++4=
github.com/Azure/azure-storage-blob-go v0.8.0 h1:53qhf0Oxa0nOjgbDeeYPUeyiNmafAFEY95rZLK0Tj6o=
github.com/Azure/azure-storage-blob-go v0.8.0/go.mod h1:lPI3aLPpuLTeUwh1sViKXFxwl2B6teiRqI0deQUvsw0=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
github.com/Azure/go-autorest v11.1.2+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest v11.2.8+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest v11.5.1+incompatible h1:tdB6TZ8w2B7+F8wD6eTQSXXQo31zKKL55b6uqNDAGKw=
github.com/Azure/go-autorest v11.5.1+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest v13.3.0+incompatible h1:8Ix0VdeOllBx9jEcZ2Wb1uqWUpE1awmJiaHztwaJCPk=
github.com/Azure/go-autorest v13.3.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI=
github.com/Azure/go-autorest/autorest v0.9.2 h1:6AWuh3uWrsZJcNoCHrCF/+g4aKPCU39kaMO6/qrnK/4=
github.com/Azure/go-autorest/autorest v0.9.2/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI=
github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0=
github.com/Azure/go-autorest/autorest/adal v0.8.0 h1:CxTzQrySOxDnKpLjFJeZAS5Qrv/qFPkgLjx5bOAi//I=
github.com/Azure/go-autorest/autorest/adal v0.8.0/go.mod h1:Z6vX6WXXuyieHAXwMj0S6HY6e6wcHn37qQMBQlvY3lc=
github.com/Azure/go-autorest/autorest/date v0.1.0/go.mod h1:plvfp3oPSKwf2DNjlBjWF/7vwR+cUD/ELuzDCXwHUVA=
github.com/Azure/go-autorest/autorest/date v0.2.0 h1:yW+Zlqf26583pE43KhfnhFcdmSWlm5Ew6bxipnr/tbM=
github.com/Azure/go-autorest/autorest/date v0.2.0/go.mod h1:vcORJHLJEh643/Ioh9+vPmf1Ij9AEBM5FuBIXLmIy0g=
github.com/Azure/go-autorest/autorest/mocks v0.1.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0=
github.com/Azure/go-autorest/autorest/mocks v0.2.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0=
github.com/Azure/go-autorest/autorest/mocks v0.3.0 h1:qJumjCaCudz+OcqE9/XtEPfvtOjOmKaui4EOpFI6zZc=
github.com/Azure/go-autorest/autorest/mocks v0.3.0/go.mod h1:a8FDP3DYzQ4RYfVAxAN3SVSiiO77gL2j2ronKKP0syM=
github.com/Azure/go-autorest/autorest/to v0.3.0 h1:zebkZaadz7+wIQYgC7GXaz3Wb28yKYfVkkBKwc38VF8=
github.com/Azure/go-autorest/autorest/to v0.3.0/go.mod h1:MgwOyqaIuKdG4TL/2ywSsIWKAfJfgHDo8ObuUk3t5sA=
github.com/Azure/go-autorest/autorest/validation v0.2.0 h1:15vMO4y76dehZSq7pAaOLQxC6dZYsSrj2GQpflyM/L4=
github.com/Azure/go-autorest/autorest/validation v0.2.0/go.mod h1:3EEqHnBxQGHXRYq3HT1WyXAvT7LLY3tl70hw6tQIbjI=
github.com/Azure/go-autorest/logger v0.1.0 h1:ruG4BSDXONFRrZZJ2GUXDiUyVpayPmb1GnWeHDdaNKY=
github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6LSNgds39diKLz7Vrc=
github.com/Azure/go-autorest/tracing v0.5.0 h1:TRn4WjSnkcSy5AEG3pnbtFSwNtwzjr4VYyQflFE619k=
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
Expand Down
143 changes: 143 additions & 0 deletions pkg/chunk/azure/blob_storage_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package azure

import (
"bytes"
"context"
"flag"
"fmt"
"net/url"
"strings"
"time"

"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/util"
)

const blobURLFmt = "https://%s.blob.core.windows.net/%s/%s"

// BlobStorageConfig defines the configurable flags that can be defined when using azure blob storage.
type BlobStorageConfig struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By convention we use lowercase dash-separated yaml config options (ie. containerName to container_name). May you fix it, please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. Thanks for calling that out.

ContainerName string `yaml:"container_name"`
AccountName string `yaml:"account_name"`
AccountKey string `yaml:"account_key"`
DownloadBufferSize int `yaml:"download_buffer_size"`
UploadBufferSize int `yaml:"upload_buffer_size"`
UploadBufferCount int `yaml:"upload_buffer_count"`
RequestTimeout time.Duration `yaml:"request_timeout"`
MaxRetries int `yaml:"max_retries"`
MinRetryDelay time.Duration `yaml:"min_retry_delay"`
MaxRetryDelay time.Duration `yaml:"max_retry_delay"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (c *BlobStorageConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&c.ContainerName, "azure.container-name", "cortex", "Name of the blob container used to store chunks. Defaults to `cortex`. This container must be created before running cortex.")
f.StringVar(&c.AccountName, "azure.account-name", "", "The Microsoft Azure account name to be used")
f.StringVar(&c.AccountKey, "azure.account-key", "", "The Microsoft Azure account key to use.")
f.DurationVar(&c.RequestTimeout, "azure.request-timeout", 30*time.Second, "Timeout for requests made against azure blob storage. Defaults to 30 seconds.")
f.IntVar(&c.DownloadBufferSize, "azure.download-buffer-size", 512000, "Preallocated buffer size for downloads (default is 512KB)")
f.IntVar(&c.UploadBufferSize, "azure.upload-buffer-size", 256000, "Preallocated buffer size for up;oads (default is 256KB)")
f.IntVar(&c.UploadBufferCount, "azure.download-buffer-count", 1, "Number of buffers used to used to upload a chunk. (defaults to 1)")
f.IntVar(&c.MaxRetries, "azure.max-retries", 5, "Number of retries for a request which times out.")
f.DurationVar(&c.MinRetryDelay, "azure.min-retry-delay", 10*time.Millisecond, "Minimum time to wait before retrying a request.")
f.DurationVar(&c.MaxRetryDelay, "azure.max-retry-delay", 500*time.Millisecond, "Maximum time to wait before retrying a request.")
}

// BlobStorage is used to interact with azure blob storage for setting or getting time series chunks.
// Implements ObjectStorage
type BlobStorage struct {
//blobService storage.Serv
cfg *BlobStorageConfig
}

// NewBlobStorage creates a new instance of the BlobStorage struct.
func NewBlobStorage(cfg *BlobStorageConfig) *BlobStorage {
return &BlobStorage{cfg: cfg}
}

// Stop is a no op, as there are no background workers with this driver currently
func (b *BlobStorage) Stop() {}

// GetChunks retrieves the requested data chunks from blob storage.
func (b *BlobStorage) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
return util.GetParallelChunks(ctx, chunks, b.getChunk)
}

func (b *BlobStorage) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, input chunk.Chunk) (chunk.Chunk, error) {
if b.cfg.RequestTimeout > 0 {
// The context will be cancelled with the timeout or when the parent context is cancelled, whichever occurs first.
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, b.cfg.RequestTimeout)
defer cancel()
}

blockBlobURL, err := b.getBlobURL(input.ExternalKey())
if err != nil {
return chunk.Chunk{}, err
}

buf := make([]byte, 0, b.cfg.DownloadBufferSize)

err = azblob.DownloadBlobToBuffer(ctx, blockBlobURL.BlobURL, 0, 0, buf, azblob.DownloadFromBlobOptions{})
if err != nil {
return chunk.Chunk{}, err
}

if err := input.Decode(decodeContext, buf); err != nil {
return chunk.Chunk{}, err
}

return input, nil
}

// PutChunks writes a set of chunks to azure blob storage using block blobs.
func (b *BlobStorage) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {

for _, chunk := range chunks {
buf, err := chunk.Encoded()
if err != nil {
return err
}

blockBlobURL, err := b.getBlobURL(chunk.ExternalKey())
if err != nil {
return err
}

bufferSize := b.cfg.UploadBufferSize
maxBuffers := b.cfg.UploadBufferCount
_, err = azblob.UploadStreamToBlockBlob(ctx, bytes.NewReader(buf), blockBlobURL,
azblob.UploadStreamToBlockBlobOptions{BufferSize: bufferSize, MaxBuffers: maxBuffers})

if err != nil {
return err
}
}
return nil
}

func (b *BlobStorage) getBlobURL(blobID string) (azblob.BlockBlobURL, error) {

blobID = strings.Replace(blobID, ":", "-", -1)

//generate url for new chunk blob
u, err := url.Parse(fmt.Sprintf(blobURLFmt, b.cfg.AccountName, b.cfg.ContainerName, blobID))
if err != nil {
return azblob.BlockBlobURL{}, err
}
credential, err := azblob.NewSharedKeyCredential(b.cfg.AccountName, b.cfg.AccountKey)
if err != nil {
return azblob.BlockBlobURL{}, err
}

return azblob.NewBlockBlobURL(*u, azblob.NewPipeline(credential, azblob.PipelineOptions{
Retry: azblob.RetryOptions{
Policy: azblob.RetryPolicyExponential,
MaxTries: (int32)(b.cfg.MaxRetries),
TryTimeout: b.cfg.RequestTimeout,
RetryDelay: b.cfg.MinRetryDelay,
MaxRetryDelay: b.cfg.MaxRetryDelay,
},
})), nil
}
21 changes: 13 additions & 8 deletions pkg/chunk/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/aws"
"github.com/cortexproject/cortex/pkg/chunk/azure"
"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/chunk/cassandra"
"github.com/cortexproject/cortex/pkg/chunk/gcp"
Expand All @@ -33,13 +34,14 @@ type StoreLimits interface {

// Config chooses which storage client to use.
type Config struct {
Engine string `yaml:"engine"`
AWSStorageConfig aws.StorageConfig `yaml:"aws"`
GCPStorageConfig gcp.Config `yaml:"bigtable"`
GCSConfig gcp.GCSConfig `yaml:"gcs"`
CassandraStorageConfig cassandra.Config `yaml:"cassandra"`
BoltDBConfig local.BoltDBConfig `yaml:"boltdb"`
FSConfig local.FSConfig `yaml:"filesystem"`
Engine string `yaml:"engine"`
AWSStorageConfig aws.StorageConfig `yaml:"aws"`
AzureStorageConfig azure.BlobStorageConfig `yaml:"azure"`
GCPStorageConfig gcp.Config `yaml:"bigtable"`
GCSConfig gcp.GCSConfig `yaml:"gcs"`
CassandraStorageConfig cassandra.Config `yaml:"cassandra"`
BoltDBConfig local.BoltDBConfig `yaml:"boltdb"`
FSConfig local.FSConfig `yaml:"filesystem"`

IndexCacheValidity time.Duration

Expand All @@ -49,6 +51,7 @@ type Config struct {
// RegisterFlags adds the flags required to configure this flag set.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.AWSStorageConfig.RegisterFlags(f)
cfg.AzureStorageConfig.RegisterFlags(f)
cfg.GCPStorageConfig.RegisterFlags(f)
cfg.GCSConfig.RegisterFlags(f)
cfg.CassandraStorageConfig.RegisterFlags(f)
Expand Down Expand Up @@ -159,6 +162,8 @@ func NewObjectClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chu
level.Warn(util.Logger).Log("msg", "ignoring DynamoDB URL path", "path", path)
}
return aws.NewDynamoDBObjectClient(cfg.AWSStorageConfig.DynamoDBConfig, schemaCfg)
case "azure":
return azure.NewBlobStorage(&cfg.AzureStorageConfig), nil
case "gcp":
return gcp.NewBigtableObjectClient(context.Background(), cfg.GCPStorageConfig, schemaCfg)
case "gcp-columnkey", "bigtable", "bigtable-hashed":
Expand All @@ -170,7 +175,7 @@ func NewObjectClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chu
case "filesystem":
return local.NewFSObjectClient(cfg.FSConfig)
default:
return nil, fmt.Errorf("Unrecognized storage client %v, choose one of: aws, cassandra, inmemory, gcp, bigtable, bigtable-hashed", name)
return nil, fmt.Errorf("Unrecognized storage client %v, choose one of: aws, azure, cassandra, inmemory, gcp, bigtable, bigtable-hashed", name)
}
}

Expand Down
17 changes: 0 additions & 17 deletions vendor/contrib.go.opencensus.io/exporter/ocagent/.gitignore

This file was deleted.

18 changes: 0 additions & 18 deletions vendor/contrib.go.opencensus.io/exporter/ocagent/.travis.yml

This file was deleted.

24 changes: 0 additions & 24 deletions vendor/contrib.go.opencensus.io/exporter/ocagent/CONTRIBUTING.md

This file was deleted.

Loading