Skip to content

Commit 5f7340c

Browse files
authored
Added query-frontend integration test (#2154)
Signed-off-by: Marco Pracucci <[email protected]>
1 parent d6e9f71 commit 5f7340c

File tree

6 files changed

+123
-16
lines changed

6 files changed

+123
-16
lines changed

integration/alertmanager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestAlertmanager(t *testing.T) {
3131
require.NoError(t, s.StartAndWaitReady(alertmanager))
3232
require.NoError(t, alertmanager.WaitSumMetric("cortex_alertmanager_configs", 1))
3333

34-
c, err := e2ecortex.NewClient("", "", alertmanager.Endpoint(80), "user-1")
34+
c, err := e2ecortex.NewClient("", "", alertmanager.HTTPEndpoint(), "user-1")
3535
require.NoError(t, err)
3636

3737
cfg, err := c.GetAlertmanagerConfig(context.Background())

integration/backward_compatibility_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func TestBackwardCompatibilityWithChunksStorage(t *testing.T) {
4848
now := time.Now()
4949
series, expectedVector := generateSeries("series_1", now)
5050

51-
c, err := e2ecortex.NewClient(distributor.Endpoint(80), "", "", "user-1")
51+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "user-1")
5252
require.NoError(t, err)
5353

5454
res, err := c.Push(series)
@@ -74,7 +74,7 @@ func TestBackwardCompatibilityWithChunksStorage(t *testing.T) {
7474
require.NoError(t, querier.WaitSumMetric("cortex_ring_tokens_total", 512))
7575

7676
// Query the series
77-
c, err := e2ecortex.NewClient(distributor.Endpoint(80), querier.Endpoint(80), "", "user-1")
77+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "user-1")
7878
require.NoError(t, err)
7979

8080
result, err := c.Query("series_1", now)

integration/e2ecortex/services.go

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ import (
66
"github.com/cortexproject/cortex/integration/e2e"
77
)
88

9+
const (
10+
HTTPPort = 80
11+
GRPCPort = 9095
12+
)
13+
914
// GetDefaultImage returns the Docker image to use to run Cortex.
1015
func GetDefaultImage() string {
1116
// Get the cortex image from the CORTEX_IMAGE env variable,
@@ -34,8 +39,8 @@ func NewDistributor(name string, consulAddress string, flags map[string]string,
3439
"-ring.store": "consul",
3540
"-consul.hostname": consulAddress,
3641
}, flags))...),
37-
e2e.NewReadinessProbe(80, "/ring", 200),
38-
80,
42+
e2e.NewReadinessProbe(HTTPPort, "/ring", 200),
43+
HTTPPort,
3944
)
4045
}
4146

@@ -49,14 +54,19 @@ func NewQuerier(name string, consulAddress string, flags map[string]string, imag
4954
image,
5055
e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{
5156
"-target": "querier",
52-
"-log.level": "warn",
57+
"-log.level": "info", // TODO warn
5358
"-distributor.replication-factor": "1",
5459
// Configure the ingesters ring backend
5560
"-ring.store": "consul",
5661
"-consul.hostname": consulAddress,
62+
// Query-frontend worker
63+
"-querier.frontend-client.backoff-min-period": "100ms",
64+
"-querier.frontend-client.backoff-max-period": "100ms",
65+
"-querier.frontend-client.backoff-retries": "1",
66+
"-querier.worker-parallelism": "1",
5767
}, flags))...),
58-
e2e.NewReadinessProbe(80, "/ready", 204),
59-
80,
68+
e2e.NewReadinessProbe(HTTPPort, "/ready", 204),
69+
HTTPPort,
6070
)
6171
}
6272

@@ -81,8 +91,8 @@ func NewIngester(name string, consulAddress string, flags map[string]string, ima
8191
"-ring.store": "consul",
8292
"-consul.hostname": consulAddress,
8393
}, flags))...),
84-
e2e.NewReadinessProbe(80, "/ready", 204),
85-
80,
94+
e2e.NewReadinessProbe(HTTPPort, "/ready", 204),
95+
HTTPPort,
8696
)
8797
}
8898

@@ -99,8 +109,27 @@ func NewTableManager(name string, flags map[string]string, image string) *e2e.HT
99109
"-log.level": "warn",
100110
}, flags))...),
101111
// The table-manager doesn't expose a readiness probe, so we just check if the / returns 404
102-
e2e.NewReadinessProbe(80, "/", 404),
103-
80,
112+
e2e.NewReadinessProbe(HTTPPort, "/", 404),
113+
HTTPPort,
114+
)
115+
}
116+
117+
func NewQueryFrontend(name string, flags map[string]string, image string) *e2e.HTTPService {
118+
if image == "" {
119+
image = GetDefaultImage()
120+
}
121+
122+
return e2e.NewHTTPService(
123+
name,
124+
image,
125+
e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{
126+
"-target": "query-frontend",
127+
"-log.level": "info", // TODO warn
128+
}, flags))...),
129+
// The query-frontend doesn't expose a readiness probe, so we just check if the / returns 404
130+
e2e.NewReadinessProbe(HTTPPort, "/", 404),
131+
HTTPPort,
132+
GRPCPort,
104133
)
105134
}
106135

@@ -134,7 +163,7 @@ func NewAlertmanager(name string, flags map[string]string, image string) *e2e.HT
134163
"-log.level": "warn",
135164
}, flags))...),
136165
// The alertmanager doesn't expose a readiness probe, so we just check if the / returns 404
137-
e2e.NewReadinessProbe(80, "/", 404),
138-
80,
166+
e2e.NewReadinessProbe(HTTPPort, "/", 404),
167+
HTTPPort,
139168
)
140169
}

integration/ingester_flush_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func TestIngesterFlushWithChunksStorage(t *testing.T) {
4646
require.NoError(t, distributor.WaitSumMetric("cortex_ring_tokens_total", 512))
4747
require.NoError(t, querier.WaitSumMetric("cortex_ring_tokens_total", 512))
4848

49-
c, err := e2ecortex.NewClient(distributor.Endpoint(80), querier.Endpoint(80), "", "user-1")
49+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "user-1")
5050
require.NoError(t, err)
5151

5252
// Push some series to Cortex.

integration/ingester_hand_over_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func runIngesterHandOverTest(t *testing.T, flags map[string]string, setup func(t
5656
require.NoError(t, distributor.WaitSumMetric("cortex_ring_tokens_total", 512))
5757
require.NoError(t, querier.WaitSumMetric("cortex_ring_tokens_total", 512))
5858

59-
c, err := e2ecortex.NewClient(distributor.Endpoint(80), querier.Endpoint(80), "", "user-1")
59+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "user-1")
6060
require.NoError(t, err)
6161

6262
// Push some series to Cortex.

integration/query_frontend_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package main
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/prometheus/common/model"
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
11+
"github.com/cortexproject/cortex/integration/e2e"
12+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
13+
"github.com/cortexproject/cortex/integration/e2ecortex"
14+
)
15+
16+
func TestQueryFrontendWithBlocksStorage(t *testing.T) {
17+
runQueryFrontendTest(t, BlocksStorage, func(t *testing.T, s *e2e.Scenario) {
18+
minio := e2edb.NewMinio(9000, BlocksStorage["-experimental.tsdb.s3.bucket-name"])
19+
require.NoError(t, s.StartAndWaitReady(minio))
20+
})
21+
}
22+
23+
func TestQueryFrontendWithChunksStorage(t *testing.T) {
24+
runQueryFrontendTest(t, ChunksStorage, func(t *testing.T, s *e2e.Scenario) {
25+
dynamo := e2edb.NewDynamoDB()
26+
require.NoError(t, s.StartAndWaitReady(dynamo))
27+
28+
require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))
29+
30+
tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorage, "")
31+
require.NoError(t, s.StartAndWaitReady(tableManager))
32+
33+
// Wait until the first table-manager sync has completed, so that we're
34+
// sure the tables have been created.
35+
require.NoError(t, tableManager.WaitSumMetric("cortex_dynamo_sync_tables_seconds", 1))
36+
})
37+
}
38+
39+
func runQueryFrontendTest(t *testing.T, flags map[string]string, setup func(t *testing.T, s *e2e.Scenario)) {
40+
s, err := e2e.NewScenario(networkName)
41+
require.NoError(t, err)
42+
defer s.Close()
43+
44+
consul := e2edb.NewConsul()
45+
require.NoError(t, s.StartAndWaitReady(consul))
46+
47+
setup(t, s)
48+
49+
// Start Cortex components.
50+
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
51+
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(networkName), flags, "")
52+
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(networkName), mergeFlags(flags, map[string]string{
53+
"-querier.frontend-address": queryFrontend.NetworkEndpoint(networkName, e2ecortex.GRPCPort),
54+
}), "")
55+
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(networkName), flags, "")
56+
require.NoError(t, s.StartAndWaitReady(queryFrontend, distributor, querier, ingester))
57+
58+
// Wait until both the distributor and querier have updated the ring.
59+
require.NoError(t, distributor.WaitSumMetric("cortex_ring_tokens_total", 512))
60+
require.NoError(t, querier.WaitSumMetric("cortex_ring_tokens_total", 512))
61+
62+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "user-1")
63+
require.NoError(t, err)
64+
65+
// Push some series to Cortex.
66+
now := time.Now()
67+
series, expectedVector := generateSeries("series_1", now)
68+
69+
res, err := c.Push(series)
70+
require.NoError(t, err)
71+
require.Equal(t, 200, res.StatusCode)
72+
73+
// Query the series.
74+
result, err := c.Query("series_1", now)
75+
require.NoError(t, err)
76+
require.Equal(t, model.ValVector, result.Type())
77+
assert.Equal(t, expectedVector, result.(model.Vector))
78+
}

0 commit comments

Comments
 (0)