Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration/alertmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestAlertmanager(t *testing.T) {
require.NoError(t, s.StartAndWaitReady(alertmanager))
require.NoError(t, alertmanager.WaitSumMetric("cortex_alertmanager_configs", 1))

c, err := e2ecortex.NewClient("", "", alertmanager.Endpoint(80), "user-1")
c, err := e2ecortex.NewClient("", "", alertmanager.HTTPEndpoint(), "user-1")
require.NoError(t, err)

cfg, err := c.GetAlertmanagerConfig(context.Background())
Expand Down
4 changes: 2 additions & 2 deletions integration/backward_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestBackwardCompatibilityWithChunksStorage(t *testing.T) {
now := time.Now()
series, expectedVector := generateSeries("series_1", now)

c, err := e2ecortex.NewClient(distributor.Endpoint(80), "", "", "user-1")
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

res, err := c.Push(series)
Expand All @@ -74,7 +74,7 @@ func TestBackwardCompatibilityWithChunksStorage(t *testing.T) {
require.NoError(t, querier.WaitSumMetric("cortex_ring_tokens_total", 512))

// Query the series
c, err := e2ecortex.NewClient(distributor.Endpoint(80), querier.Endpoint(80), "", "user-1")
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "user-1")
require.NoError(t, err)

result, err := c.Query("series_1", now)
Expand Down
51 changes: 40 additions & 11 deletions integration/e2ecortex/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
"github.com/cortexproject/cortex/integration/e2e"
)

const (
HTTPPort = 80
GRPCPort = 9095
)

// GetDefaultImage returns the Docker image to use to run Cortex.
func GetDefaultImage() string {
// Get the cortex image from the CORTEX_IMAGE env variable,
Expand Down Expand Up @@ -34,8 +39,8 @@ func NewDistributor(name string, consulAddress string, flags map[string]string,
"-ring.store": "consul",
"-consul.hostname": consulAddress,
}, flags))...),
e2e.NewReadinessProbe(80, "/ring", 200),
80,
e2e.NewReadinessProbe(HTTPPort, "/ring", 200),
HTTPPort,
)
}

Expand All @@ -49,14 +54,19 @@ func NewQuerier(name string, consulAddress string, flags map[string]string, imag
image,
e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{
"-target": "querier",
"-log.level": "warn",
"-log.level": "info", // TODO warn
"-distributor.replication-factor": "1",
// Configure the ingesters ring backend
"-ring.store": "consul",
"-consul.hostname": consulAddress,
// Query-frontend worker
"-querier.frontend-client.backoff-min-period": "100ms",
"-querier.frontend-client.backoff-max-period": "100ms",
"-querier.frontend-client.backoff-retries": "1",
"-querier.worker-parallelism": "1",
}, flags))...),
e2e.NewReadinessProbe(80, "/ready", 204),
80,
e2e.NewReadinessProbe(HTTPPort, "/ready", 204),
HTTPPort,
)
}

Expand All @@ -81,8 +91,8 @@ func NewIngester(name string, consulAddress string, flags map[string]string, ima
"-ring.store": "consul",
"-consul.hostname": consulAddress,
}, flags))...),
e2e.NewReadinessProbe(80, "/ready", 204),
80,
e2e.NewReadinessProbe(HTTPPort, "/ready", 204),
HTTPPort,
)
}

Expand All @@ -99,8 +109,27 @@ func NewTableManager(name string, flags map[string]string, image string) *e2e.HT
"-log.level": "warn",
}, flags))...),
// The table-manager doesn't expose a readiness probe, so we just check if the / returns 404
e2e.NewReadinessProbe(80, "/", 404),
80,
e2e.NewReadinessProbe(HTTPPort, "/", 404),
HTTPPort,
)
}

func NewQueryFrontend(name string, flags map[string]string, image string) *e2e.HTTPService {
if image == "" {
image = GetDefaultImage()
}

return e2e.NewHTTPService(
name,
image,
e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{
"-target": "query-frontend",
"-log.level": "info", // TODO warn
}, flags))...),
// The query-frontend doesn't expose a readiness probe, so we just check if the / returns 404
e2e.NewReadinessProbe(HTTPPort, "/", 404),
HTTPPort,
GRPCPort,
)
}

Expand Down Expand Up @@ -134,7 +163,7 @@ func NewAlertmanager(name string, flags map[string]string, image string) *e2e.HT
"-log.level": "warn",
}, flags))...),
// The alertmanager doesn't expose a readiness probe, so we just check if the / returns 404
e2e.NewReadinessProbe(80, "/", 404),
80,
e2e.NewReadinessProbe(HTTPPort, "/", 404),
HTTPPort,
)
}
2 changes: 1 addition & 1 deletion integration/ingester_flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestIngesterFlushWithChunksStorage(t *testing.T) {
require.NoError(t, distributor.WaitSumMetric("cortex_ring_tokens_total", 512))
require.NoError(t, querier.WaitSumMetric("cortex_ring_tokens_total", 512))

c, err := e2ecortex.NewClient(distributor.Endpoint(80), querier.Endpoint(80), "", "user-1")
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "user-1")
require.NoError(t, err)

// Push some series to Cortex.
Expand Down
2 changes: 1 addition & 1 deletion integration/ingester_hand_over_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func runIngesterHandOverTest(t *testing.T, flags map[string]string, setup func(t
require.NoError(t, distributor.WaitSumMetric("cortex_ring_tokens_total", 512))
require.NoError(t, querier.WaitSumMetric("cortex_ring_tokens_total", 512))

c, err := e2ecortex.NewClient(distributor.Endpoint(80), querier.Endpoint(80), "", "user-1")
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "user-1")
require.NoError(t, err)

// Push some series to Cortex.
Expand Down
78 changes: 78 additions & 0 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
)

func TestQueryFrontendWithBlocksStorage(t *testing.T) {
runQueryFrontendTest(t, BlocksStorage, func(t *testing.T, s *e2e.Scenario) {
minio := e2edb.NewMinio(9000, BlocksStorage["-experimental.tsdb.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))
})
}

func TestQueryFrontendWithChunksStorage(t *testing.T) {
runQueryFrontendTest(t, ChunksStorage, func(t *testing.T, s *e2e.Scenario) {
dynamo := e2edb.NewDynamoDB()
require.NoError(t, s.StartAndWaitReady(dynamo))

require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))

tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorage, "")
require.NoError(t, s.StartAndWaitReady(tableManager))

// Wait until the first table-manager sync has completed, so that we're
// sure the tables have been created.
require.NoError(t, tableManager.WaitSumMetric("cortex_dynamo_sync_tables_seconds", 1))
})
}

func runQueryFrontendTest(t *testing.T, flags map[string]string, setup func(t *testing.T, s *e2e.Scenario)) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(consul))

setup(t, s)

// Start Cortex components.
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(networkName), flags, "")
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(networkName), mergeFlags(flags, map[string]string{
"-querier.frontend-address": queryFrontend.NetworkEndpoint(networkName, e2ecortex.GRPCPort),
}), "")
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(networkName), flags, "")
require.NoError(t, s.StartAndWaitReady(queryFrontend, distributor, querier, ingester))

// Wait until both the distributor and querier have updated the ring.
require.NoError(t, distributor.WaitSumMetric("cortex_ring_tokens_total", 512))
require.NoError(t, querier.WaitSumMetric("cortex_ring_tokens_total", 512))

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "user-1")
require.NoError(t, err)

// Push some series to Cortex.
now := time.Now()
series, expectedVector := generateSeries("series_1", now)

res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Query the series.
result, err := c.Query("series_1", now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))
}