diff --git a/CHANGELOG.md b/CHANGELOG.md index 26603845680..a0531b47280 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ ## master / unreleased +* [CHANGE] Experimental blocks storage: removed the support to transfer blocks between ingesters on shutdown. When running the Cortex blocks storage, ingesters are expected to run with a persistent disk. The following metrics have been removed: #2996 + * `cortex_ingester_sent_files` + * `cortex_ingester_received_files` + * `cortex_ingester_received_bytes_total` + * `cortex_ingester_sent_bytes_total` * [ENHANCEMENT] Query-tee: added a small tolerance to floating point sample values comparison. #2994 ## 1.3.0 in progress diff --git a/development/tsdb-blocks-storage-s3-single-binary/config/cortex.yaml b/development/tsdb-blocks-storage-s3-single-binary/config/cortex.yaml index cff78c4e765..eb1fdac8bb7 100644 --- a/development/tsdb-blocks-storage-s3-single-binary/config/cortex.yaml +++ b/development/tsdb-blocks-storage-s3-single-binary/config/cortex.yaml @@ -13,8 +13,6 @@ ingester_client: use_gzip_compression: true ingester: - max_transfer_retries: 1 - lifecycler: # We want to start immediately. join_after: 0 diff --git a/development/tsdb-blocks-storage-s3/config/cortex.yaml b/development/tsdb-blocks-storage-s3/config/cortex.yaml index 43449801e3f..7baaea81d87 100644 --- a/development/tsdb-blocks-storage-s3/config/cortex.yaml +++ b/development/tsdb-blocks-storage-s3/config/cortex.yaml @@ -13,8 +13,6 @@ ingester_client: use_gzip_compression: true ingester: - max_transfer_retries: 1 - lifecycler: # We want to start immediately. join_after: 0 diff --git a/docs/architecture.md b/docs/architecture.md index 2ca49b2e69d..24d59a7f656 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -153,17 +153,18 @@ Incoming series are not immediately written to the storage but kept in memory an Ingesters contain a **lifecycler** which manages the lifecycle of an ingester and stores the **ingester state** in the [hash ring](#the-hash-ring). Each ingester could be in one of the following states: -1. `PENDING` is an ingester's state when it just started and is waiting for a hand-over from another ingester that is `LEAVING`. If no hand-over occurs within the configured timeout period ("auto-join timeout", configurable via `-ingester.join-after` option), the ingester will join the ring with a new set of random tokens (ie. during a scale up). When hand-over process starts, state changes to `JOINING`. - -2. `JOINING` is an ingester's state in two situations. First, ingester will switch to a `JOINING` state from `PENDING` state after auto-join timeout. In this case, ingester will generate tokens, store them into the ring, optionally observe the ring for token conflicts and then move to `ACTIVE` state. Second, ingester will also switch into a `JOINING` state as a result of another `LEAVING` ingester initiating a hand-over process with `PENDING` (which then switches to `JOINING` state). `JOINING` ingester then receives series and tokens from `LEAVING` ingester, and if everything goes well, `JOINING` ingester switches to `ACTIVE` state. If hand-over process fails, `JOINING` ingester will move back to `PENDING` state and either wait for another hand-over or auto-join timeout. - -3. `ACTIVE` is an ingester's state when it is fully initialized. It may receive both write and read requests for tokens it owns. - -4. `LEAVING` is an ingester's state when it is shutting down. It cannot receive write requests anymore, while it could still receive read requests for series it has in memory. While in this state, the ingester may look for a `PENDING` ingester to start a hand-over process with, used to transfer the state from `LEAVING` ingester to the `PENDING` one, during a rolling update (`PENDING` ingester moves to `JOINING` state during hand-over process). If there is no new ingester to accept hand-over, ingester in `LEAVING` state will flush data to storage instead. - -5. `UNHEALTHY` is an ingester's state when it has failed to heartbeat to the ring's KV Store. While in this state, distributors skip the ingester while building the replication set for incoming series and the ingester does not receive write or read requests. - -For more information about the hand-over process, please check out the [Ingester hand-over](guides/ingester-handover.md) documentation. +- **`PENDING`**
+ The ingester has just started. While in this state, the ingester doesn't receive neither write and read requests, and could be waiting for time series data transfer from another ingester if running the chunks storage and the [hand-over](guides/ingesters-rolling-updates.md#chunks-storage-with-wal-disabled-hand-over) is enabled. +- **`JOINING`**
+ The ingester is starting up and joining the ring. While in this state the ingester doesn't receive neither write and read requests. The ingester will join the ring using tokens received by a leaving ingester as part of the [hand-over](guides/ingesters-rolling-updates.md#chunks-storage-with-wal-disabled-hand-over) process (if enabled), otherwise it could load tokens from disk (if `-ingester.tokens-file-path` is configured) or generate a set of new random ones. Finally, the ingester optionally observes the ring for tokens conflicts and then, once any conflict is resolved, will move to `ACTIVE` state. +- **`ACTIVE`**
+ The ingester is up and running. While in this state the ingester can receive both write and read requests. +- **`LEAVING`**
+ The ingester is shutting down and leaving the ring. While in this state the ingester doesn't receive write requests, while it could receive read requests. +- **`UNHEALTHY`**
+ The ingester has failed to heartbeat to the ring's KV Store. While in this state, distributors skip the ingester while building the replication set for incoming series and the ingester does not receive write or read requests. + +_The ingester states are interally used for different purposes, including the series hand-over process supported by the chunks storage. For more information about it, please check out the [Ingester hand-over](guides/ingesters-rolling-updates.md#chunks-storage-with-wal-disabled-hand-over) documentation._ Ingesters are **semi-stateful**. diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 53e627059f2..5bd445efdcf 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -475,9 +475,8 @@ blocks_storage: # CLI flag: -experimental.blocks-storage.tsdb.wal-compression-enabled [wal_compression_enabled: | default = false] - # If true, and transfer of blocks on shutdown fails or is disabled, - # incomplete blocks are flushed to storage instead. If false, incomplete - # blocks will be reused after restart, and uploaded when finished. + # True to flush blocks to storage on shutdown. If false, incomplete blocks + # will be reused after restart. # CLI flag: -experimental.blocks-storage.tsdb.flush-blocks-on-shutdown [flush_blocks_on_shutdown: | default = false] diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index f7bb21b7913..c1fc61a32a6 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -502,9 +502,8 @@ blocks_storage: # CLI flag: -experimental.blocks-storage.tsdb.wal-compression-enabled [wal_compression_enabled: | default = false] - # If true, and transfer of blocks on shutdown fails or is disabled, - # incomplete blocks are flushed to storage instead. If false, incomplete - # blocks will be reused after restart, and uploaded when finished. + # True to flush blocks to storage on shutdown. If false, incomplete blocks + # will be reused after restart. # CLI flag: -experimental.blocks-storage.tsdb.flush-blocks-on-shutdown [flush_blocks_on_shutdown: | default = false] diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index 5396e311c50..88ef0ce2841 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -306,11 +306,11 @@ It also talks to a KVStore and has it's own copies of the same flags used by the - `-ingester.join-after` - How long to wait in PENDING state during the [hand-over process](../guides/ingester-handover.md). (default 0s) + How long to wait in PENDING state during the [hand-over process](../guides/ingesters-rolling-updates.md#chunks-storage-with-wal-disabled-hand-over) (supported only by the chunks storage). (default 0s) - `-ingester.max-transfer-retries` - How many times a LEAVING ingester tries to find a PENDING ingester during the [hand-over process](../guides/ingester-handover.md). Each attempt takes a second or so. Negative value or zero disables hand-over process completely. (default 10) + How many times a LEAVING ingester tries to find a PENDING ingester during the [hand-over process](../guides/ingesters-rolling-updates.md#chunks-storage-with-wal-disabled-hand-over) (supported only by the chunks storage). Negative value or zero disables hand-over process completely. (default 10) - `-ingester.normalise-tokens` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 025d33f48aa..3c693b68840 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -543,7 +543,8 @@ lifecycler: [availability_zone: | default = ""] # Number of times to try and transfer chunks before falling back to flushing. -# Negative value or zero disables hand-over. +# Negative value or zero disables hand-over. This feature is supported only by +# the chunks storage. # CLI flag: -ingester.max-transfer-retries [max_transfer_retries: | default = 10] @@ -3263,9 +3264,8 @@ tsdb: # CLI flag: -experimental.blocks-storage.tsdb.wal-compression-enabled [wal_compression_enabled: | default = false] - # If true, and transfer of blocks on shutdown fails or is disabled, incomplete - # blocks are flushed to storage instead. If false, incomplete blocks will be - # reused after restart, and uploaded when finished. + # True to flush blocks to storage on shutdown. If false, incomplete blocks + # will be reused after restart. # CLI flag: -experimental.blocks-storage.tsdb.flush-blocks-on-shutdown [flush_blocks_on_shutdown: | default = false] diff --git a/docs/configuration/single-process-config-blocks-gossip-1.yaml b/docs/configuration/single-process-config-blocks-gossip-1.yaml index 3a163b076fd..ea874767c50 100644 --- a/docs/configuration/single-process-config-blocks-gossip-1.yaml +++ b/docs/configuration/single-process-config-blocks-gossip-1.yaml @@ -29,9 +29,6 @@ ingester_client: use_gzip_compression: true ingester: - # Disable blocks transfers on ingesters shutdown or rollout. - max_transfer_retries: 0 - lifecycler: # The address to advertise for this ingester. Will be autodiscovered by # looking up address on eth0 or en0; can be specified if this fails. diff --git a/docs/configuration/single-process-config-blocks-gossip-2.yaml b/docs/configuration/single-process-config-blocks-gossip-2.yaml index 9746fcf0b3a..ff4441aef48 100644 --- a/docs/configuration/single-process-config-blocks-gossip-2.yaml +++ b/docs/configuration/single-process-config-blocks-gossip-2.yaml @@ -29,9 +29,6 @@ ingester_client: use_gzip_compression: true ingester: - # Disable blocks transfers on ingesters shutdown or rollout. - max_transfer_retries: 0 - lifecycler: # The address to advertise for this ingester. Will be autodiscovered by # looking up address on eth0 or en0; can be specified if this fails. diff --git a/docs/configuration/single-process-config-blocks-tls.yaml b/docs/configuration/single-process-config-blocks-tls.yaml index 06eacd21706..a8fd02d8b62 100644 --- a/docs/configuration/single-process-config-blocks-tls.yaml +++ b/docs/configuration/single-process-config-blocks-tls.yaml @@ -37,9 +37,6 @@ ingester_client: tls_ca_path: "root.crt" ingester: - # Disable blocks transfers on ingesters shutdown or rollout. - max_transfer_retries: 0 - lifecycler: # The address to advertise for this ingester. Will be autodiscovered by # looking up address on eth0 or en0; can be specified if this fails. diff --git a/docs/configuration/single-process-config-blocks.yaml b/docs/configuration/single-process-config-blocks.yaml index f8804a892ae..843510a7281 100644 --- a/docs/configuration/single-process-config-blocks.yaml +++ b/docs/configuration/single-process-config-blocks.yaml @@ -28,9 +28,6 @@ ingester_client: use_gzip_compression: true ingester: - # Disable blocks transfers on ingesters shutdown or rollout. - max_transfer_retries: 0 - lifecycler: # The address to advertise for this ingester. Will be autodiscovered by # looking up address on eth0 or en0; can be specified if this fails. diff --git a/docs/guides/ingester-handover.md b/docs/guides/ingester-handover.md deleted file mode 100644 index c2263db4e1b..00000000000 --- a/docs/guides/ingester-handover.md +++ /dev/null @@ -1,49 +0,0 @@ ---- -title: "Ingester Hand-over" -linkTitle: "Ingester Hand-over" -weight: 102 -slug: ingester-handover ---- - -The [ingester](architecture.md#ingester) holds several hours of sample -data in memory. When we want to shut down an ingester, either for -software version update or to drain a node for maintenance, this data -must not be discarded. - -Each ingester goes through different states in its lifecycle. When -working normally, the state is `ACTIVE`. - -On start-up, an ingester first goes into state `PENDING`. After a -[short time](../configuration/arguments.md#ingester), if nothing happens, it adds -itself to the ring and goes into state ACTIVE. - -A running ingester is notified to shut down by Unix signal -`SIGINT`. On receipt of this signal it goes into state `LEAVING` and -looks for an ingester in state `PENDING`. If it finds one, that -ingester goes into state `JOINING` and the leaver transfers all its -in-memory data over to the joiner. On successful transfer the leaver -removes itself from the ring and exits and the joiner changes to -`ACTIVE`, taking over ownership of the leaver's -[ring tokens](../architecture.md#hashing). - -If a leaving ingester does not find a pending ingester after [several attempts](../configuration/arguments.md#ingester), it will flush all of its chunks to -the backing database, then remove itself from the ring and exit. This -may take tens of minutes to complete. - -During hand-over, neither the leaving nor joining ingesters will -accept new samples. Distributors are aware of this, and "spill" the -samples to the next ingester in the ring. This creates a set of extra -"spilled" chunks which will idle out and flush after hand-over is -complete. The sudden increase in flush queue can be alarming! - -The following metrics can be used to observe this process: - - - `cortex_member_ring_tokens_owned` - how many tokens each ingester thinks it owns - - `cortex_ring_tokens_owned` - how many tokens each ingester is seen to own by other components - - `cortex_ring_member_ownership_percent` same as `cortex_ring_tokens_owned` but expressed as a percentage - - `cortex_ring_members` - how many ingesters can be seen in each state, by other components - - `cortex_ingester_sent_chunks` - number of chunks sent by leaving ingester - - `cortex_ingester_received_chunks` - number of chunks received by joining ingester - -You can see the current state of the ring via http browser request to -`/ring` on a distributor. diff --git a/docs/guides/ingesters-rolling-updates.md b/docs/guides/ingesters-rolling-updates.md new file mode 100644 index 00000000000..82a6fe254a1 --- /dev/null +++ b/docs/guides/ingesters-rolling-updates.md @@ -0,0 +1,90 @@ +--- +title: "Ingesters rolling updates" +linkTitle: "Ingesters rolling updates" +weight: 102 +slug: ingesters-rolling-updates +--- + +Cortex [ingesters](architecture.md#ingester) are semi-stateful. +A running ingester holds several hours of time series data in memory, before they're flushed to the long-term storage. +When an ingester shutdowns, because of a rolling update or maintenance, the in-memory data must not be discarded in order to avoid any data loss. + +In this document we describe the techniques employed to safely handle rolling updates, based on different setups: + +- [Blocks storage](#blocks-storage) +- [Chunks storage with WAL enabled](#chunks-storage-with-wal-enabled) +- [Chunks storage with WAL disabled](#chunks-storage-with-wal-disabled-hand-over) + +## Blocks storage + +The Cortex [blocks storage](../blocks-storage/_index.md) requires ingesters to run with a persistent disk where the TSDB WAL and blocks are stored (eg. a StatefulSet when deployed on Kubernetes). + +During a rolling update, the leaving ingester closes the open TSDBs, synchronize the data to disk (`fsync`) and releases the disk resources. +The new ingester, which is expected to reuse the same disk of the leaving one, will replay the TSDB WAL on startup in order to load back in memory the time series that have not been compacted into a block yet. + +_The blocks storage doesn't support the series [hand-over](#chunks-storage-with-wal-disabled-hand-over)._ + +## Chunks storage + +The Cortex chunks storage optionally supports a write-ahead log (WAL). +The rolling update procedure for a Cortex cluster running the chunks storage depends whether the WAL is enabled or not. + +### Chunks storage with WAL enabled + +Similarly to the blocks storage, when Cortex is running the chunks storage with WAL enabled, it requires ingesters to run with a persistent disk where the WAL is stored (eg. a StatefulSet when deployed on Kubernetes). + +During a rolling update, the leaving ingester closes the WAL, synchronize the data to disk (`fsync`) and releases the disk resources. +The new ingester, which is expected to reuse the same disk of the leaving one, will replay the WAL on startup in order to load back in memory the time series data. + +_For more information about the WAL, please refer to [Ingesters with WAL](../production/ingesters-with-wal.md)._ + +### Chunks storage with WAL disabled (hand-over) + +When Cortex is running the chunks storage with WAL disabled, Cortex supports on-the-fly series hand-over between a leaving ingester and a joining one. + +The hand-over is based on the ingesters state stored in the ring. Each ingester could be in one of the following **states**: + +- `PENDING` +- `JOINING` +- `ACTIVE` +- `LEAVING` + +On startup, an ingester goes into the **`PENDING`** state. +In this state, the ingester is waiting for a hand-over from another ingester that is `LEAVING`. +If no hand-over occurs within the configured timeout period ("auto-join timeout", configurable via `-ingester.join-after` option), the ingester will join the ring with a new set of random tokens (eg. during a scale up) and will switch its state to `ACTIVE`. + +When a running ingester in the **`ACTIVE`** state is notified to shutdown via `SIGINT` or `SIGTERM` Unix signal, the ingester switches to `LEAVING` state. In this state it cannot receive write requests anymore, but it can still receive read requests for series it has in memory. + +A **`LEAVING`** ingester looks for a `PENDING` ingester to start a hand-over process with. +If it finds one, that ingester goes into the `JOINING` state and the leaver transfers all its in-memory data over to the joiner. +On successful transfer the leaver removes itself from the ring and exits, while the joiner changes its state to `ACTIVE`, taking over ownership of the leaver's [ring tokens](../architecture.md#hashing). As soon as the joiner switches it state to `ACTIVE`, it will start receive both write requests from distributors and queries from queriers. + +If the `LEAVING` ingester does not find a `PENDING` ingester after `-ingester.max-transfer-retries` retries, it will flush all of its chunks to the long-term storage, then removes itself from the ring and exits. The chunks flushing to the storage may take several minutes to complete. + +#### Higher number of series / chunks during rolling updates + +During hand-over, neither the leaving nor joining ingesters will +accept new samples. Distributors are aware of this, and "spill" the +samples to the next ingester in the ring. This creates a set of extra +"spilled" series and chunks which will idle out and flush after hand-over is +complete. + +#### Observability + +The following metrics can be used to observe this process: + +- **`cortex_member_ring_tokens_owned`**
+ How many tokens each ingester thinks it owns. +- **`cortex_ring_tokens_owned`**
+ How many tokens each ingester is seen to own by other components. +- **`cortex_ring_member_ownership_percent`**
+ Same as `cortex_ring_tokens_owned` but expressed as a percentage. +- **`cortex_ring_members`**
+ How many ingesters can be seen in each state, by other components. +- **`cortex_ingester_sent_chunks`**
+ Number of chunks sent by leaving ingester. +- **`cortex_ingester_received_chunks`**
+ Number of chunks received by joining ingester. + +You can see the current state of the ring via http browser request to +`/ring` on a distributor. diff --git a/integration/ingester_hand_over_test.go b/integration/ingester_hand_over_test.go index 91dd61c9cfc..3132450c21c 100644 --- a/integration/ingester_hand_over_test.go +++ b/integration/ingester_hand_over_test.go @@ -16,13 +16,6 @@ import ( "github.com/cortexproject/cortex/integration/e2ecortex" ) -func TestIngesterHandOverWithBlocksStorage(t *testing.T) { - runIngesterHandOverTest(t, BlocksStorageFlags, func(t *testing.T, s *e2e.Scenario) { - minio := e2edb.NewMinio(9000, BlocksStorageFlags["-experimental.blocks-storage.s3.bucket-name"]) - require.NoError(t, s.StartAndWaitReady(minio)) - }) -} - func TestIngesterHandOverWithChunksStorage(t *testing.T) { runIngesterHandOverTest(t, ChunksStorageFlags, func(t *testing.T, s *e2e.Scenario) { dynamo := e2edb.NewDynamoDB() @@ -77,9 +70,7 @@ func runIngesterHandOverTest(t *testing.T, flags map[string]string, setup func(t assert.Equal(t, expectedVector, result.(model.Vector)) // Ensure 1st ingester metrics are tracked correctly. - if flags["-store.engine"] != blocksStorageEngine { - require.NoError(t, ingester1.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_chunks_created_total")) - } + require.NoError(t, ingester1.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_chunks_created_total")) // Start ingester-2. ingester2 := e2ecortex.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ diff --git a/pkg/ingester/client/cortex.pb.go b/pkg/ingester/client/cortex.pb.go index 027b6b2c97d..17212cef68b 100644 --- a/pkg/ingester/client/cortex.pb.go +++ b/pkg/ingester/client/cortex.pb.go @@ -1567,41 +1567,6 @@ func (m *TimeSeriesFile) GetData() []byte { return nil } -type TransferTSDBResponse struct { -} - -func (m *TransferTSDBResponse) Reset() { *m = TransferTSDBResponse{} } -func (*TransferTSDBResponse) ProtoMessage() {} -func (*TransferTSDBResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_893a47d0a749d749, []int{30} -} -func (m *TransferTSDBResponse) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *TransferTSDBResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_TransferTSDBResponse.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *TransferTSDBResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_TransferTSDBResponse.Merge(m, src) -} -func (m *TransferTSDBResponse) XXX_Size() int { - return m.Size() -} -func (m *TransferTSDBResponse) XXX_DiscardUnknown() { - xxx_messageInfo_TransferTSDBResponse.DiscardUnknown(m) -} - -var xxx_messageInfo_TransferTSDBResponse proto.InternalMessageInfo - func init() { proto.RegisterEnum("cortex.MatchType", MatchType_name, MatchType_value) proto.RegisterEnum("cortex.WriteRequest_SourceEnum", WriteRequest_SourceEnum_name, WriteRequest_SourceEnum_value) @@ -1636,106 +1601,104 @@ func init() { proto.RegisterType((*Metric)(nil), "cortex.Metric") proto.RegisterType((*LabelMatcher)(nil), "cortex.LabelMatcher") proto.RegisterType((*TimeSeriesFile)(nil), "cortex.TimeSeriesFile") - proto.RegisterType((*TransferTSDBResponse)(nil), "cortex.TransferTSDBResponse") } func init() { proto.RegisterFile("cortex.proto", fileDescriptor_893a47d0a749d749) } var fileDescriptor_893a47d0a749d749 = []byte{ - // 1488 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x57, 0x4b, 0x6f, 0xdb, 0xc6, - 0x16, 0x26, 0xad, 0x87, 0xad, 0x23, 0x59, 0xa6, 0xc7, 0x8e, 0xa3, 0x28, 0xf7, 0x52, 0xc9, 0x00, - 0xc9, 0x35, 0xee, 0xbd, 0x71, 0x52, 0x07, 0x69, 0xbd, 0x68, 0x11, 0xc8, 0x89, 0xec, 0xa8, 0xb5, - 0x64, 0x67, 0x24, 0x37, 0x6d, 0x81, 0x42, 0xa0, 0xa5, 0xb1, 0x4d, 0x94, 0xa4, 0x14, 0x3e, 0x8a, - 0x7a, 0x51, 0xa0, 0x40, 0x97, 0x5d, 0x34, 0xcb, 0xfe, 0x84, 0xae, 0xbb, 0x29, 0xba, 0xed, 0x2a, - 0xcb, 0x2c, 0x83, 0x2e, 0x82, 0xc6, 0xd9, 0x74, 0x19, 0xf4, 0x17, 0x14, 0xf3, 0x20, 0x45, 0xd2, - 0x52, 0x9b, 0x3e, 0xb2, 0xe3, 0x9c, 0xc7, 0x37, 0x67, 0xbe, 0x39, 0xe7, 0xcc, 0x21, 0x94, 0xfa, - 0x43, 0xd7, 0xa7, 0x9f, 0xad, 0x8d, 0xdc, 0xa1, 0x3f, 0x44, 0x79, 0xb1, 0xaa, 0x5e, 0x3b, 0x32, - 0xfd, 0xe3, 0xe0, 0x60, 0xad, 0x3f, 0xb4, 0xaf, 0x1f, 0x0d, 0x8f, 0x86, 0xd7, 0xb9, 0xfa, 0x20, - 0x38, 0xe4, 0x2b, 0xbe, 0xe0, 0x5f, 0xc2, 0x0d, 0xff, 0xaa, 0x42, 0xe9, 0x81, 0x6b, 0xfa, 0x94, - 0xd0, 0x87, 0x01, 0xf5, 0x7c, 0xd4, 0x06, 0xf0, 0x4d, 0x9b, 0x7a, 0xd4, 0x35, 0xa9, 0x57, 0x51, - 0x2f, 0x65, 0x56, 0x8b, 0xeb, 0x68, 0x4d, 0x6e, 0xd5, 0x35, 0x6d, 0xda, 0xe1, 0x9a, 0xcd, 0xea, - 0xe3, 0x67, 0x35, 0xe5, 0xa7, 0x67, 0x35, 0xb4, 0xe7, 0x52, 0xc3, 0xb2, 0x86, 0xfd, 0x6e, 0xe4, - 0x45, 0x62, 0x08, 0xe8, 0x2d, 0xc8, 0x77, 0x86, 0x81, 0xdb, 0xa7, 0x95, 0x99, 0x4b, 0xea, 0x6a, - 0x79, 0xbd, 0x16, 0x62, 0xc5, 0x77, 0x5d, 0x13, 0x26, 0x0d, 0x27, 0xb0, 0x49, 0xde, 0xe3, 0xdf, - 0x68, 0x03, 0xe6, 0x6c, 0xea, 0x1b, 0x03, 0xc3, 0x37, 0x2a, 0x19, 0x1e, 0xc6, 0x4a, 0xe8, 0xda, - 0xa2, 0xbe, 0x6b, 0xf6, 0x5b, 0x52, 0xbb, 0x99, 0x7d, 0xfc, 0xac, 0xa6, 0x92, 0xc8, 0x1a, 0xd7, - 0x00, 0xc6, 0x78, 0x68, 0x16, 0x32, 0xf5, 0xbd, 0xa6, 0xa6, 0xa0, 0x39, 0xc8, 0x92, 0xfd, 0x9d, - 0x86, 0xa6, 0xe2, 0x05, 0x98, 0x97, 0xbb, 0x7b, 0xa3, 0xa1, 0xe3, 0x51, 0xfc, 0x0e, 0x14, 0x09, - 0x35, 0x06, 0x21, 0x07, 0x6b, 0x30, 0xfb, 0x30, 0x88, 0x13, 0xb0, 0x1c, 0xee, 0x7c, 0x3f, 0xa0, - 0xee, 0x89, 0x34, 0x23, 0xa1, 0x11, 0xbe, 0x0d, 0x25, 0xe1, 0x2e, 0xe0, 0xd0, 0x75, 0x98, 0x75, - 0xa9, 0x17, 0x58, 0x7e, 0xe8, 0x7f, 0x2e, 0xe5, 0x2f, 0xec, 0x48, 0x68, 0x85, 0xbf, 0x51, 0xa1, - 0x14, 0x87, 0x46, 0xff, 0x07, 0xe4, 0xf9, 0x86, 0xeb, 0xf7, 0x38, 0x93, 0xbe, 0x61, 0x8f, 0x7a, - 0x36, 0x03, 0x53, 0x57, 0x33, 0x44, 0xe3, 0x9a, 0x6e, 0xa8, 0x68, 0x79, 0x68, 0x15, 0x34, 0xea, - 0x0c, 0x92, 0xb6, 0x33, 0xdc, 0xb6, 0x4c, 0x9d, 0x41, 0xdc, 0xf2, 0x06, 0xcc, 0xd9, 0x86, 0xdf, - 0x3f, 0xa6, 0xae, 0x27, 0x49, 0x8d, 0x8e, 0xb6, 0x63, 0x1c, 0x50, 0xab, 0x25, 0x94, 0x24, 0xb2, - 0xc2, 0x4d, 0x98, 0x4f, 0x04, 0x8d, 0x36, 0x5e, 0x31, 0x41, 0xd8, 0xad, 0x28, 0xf1, 0x54, 0xc0, - 0x8f, 0x54, 0x58, 0xe2, 0x58, 0x1d, 0xdf, 0xa5, 0x86, 0x1d, 0x21, 0xde, 0x86, 0x62, 0xff, 0x38, - 0x70, 0x3e, 0x49, 0x40, 0x9e, 0x3f, 0x0b, 0x79, 0x87, 0x19, 0x49, 0xdc, 0xb8, 0x47, 0x2a, 0xa4, - 0x99, 0x3f, 0x11, 0xd2, 0x4d, 0x40, 0xfc, 0xdc, 0xef, 0x1b, 0x56, 0x40, 0xbd, 0x90, 0xfd, 0x7f, - 0x03, 0x58, 0x4c, 0xda, 0x73, 0x0c, 0x9b, 0x72, 0xd6, 0x0b, 0xa4, 0xc0, 0x25, 0x6d, 0xc3, 0xa6, - 0x78, 0x03, 0x96, 0x12, 0x4e, 0xf2, 0x18, 0x97, 0xa1, 0x24, 0xbc, 0x3e, 0xe5, 0x72, 0x7e, 0x8e, - 0x02, 0x29, 0x5a, 0x63, 0x53, 0xbc, 0x04, 0x8b, 0x3b, 0x21, 0x4c, 0xb8, 0x1b, 0xbe, 0x25, 0x63, - 0x90, 0x42, 0x89, 0x56, 0x83, 0xe2, 0x38, 0x86, 0x10, 0x0c, 0xa2, 0x20, 0x3c, 0x8c, 0x40, 0xdb, - 0xf7, 0xa8, 0xdb, 0xf1, 0x0d, 0x3f, 0x82, 0xfa, 0x5e, 0x85, 0xc5, 0x98, 0x50, 0x42, 0x5d, 0x81, - 0xb2, 0xe9, 0x1c, 0x51, 0xcf, 0x37, 0x87, 0x4e, 0xcf, 0x35, 0x7c, 0x71, 0x24, 0x95, 0xcc, 0x47, - 0x52, 0x62, 0xf8, 0x94, 0x9d, 0xda, 0x09, 0xec, 0x5e, 0xc4, 0xa2, 0xba, 0x9a, 0x25, 0x05, 0x27, - 0xb0, 0x05, 0x79, 0x2c, 0x25, 0x8d, 0x91, 0xd9, 0x4b, 0x21, 0x65, 0x38, 0x92, 0x66, 0x8c, 0xcc, - 0x66, 0x02, 0x6c, 0x0d, 0x96, 0xdc, 0xc0, 0xa2, 0x69, 0xf3, 0x2c, 0x37, 0x5f, 0x64, 0xaa, 0x84, - 0x3d, 0xfe, 0x18, 0x96, 0x58, 0xe0, 0xcd, 0xbb, 0xc9, 0xd0, 0xcf, 0xc3, 0x6c, 0xe0, 0x51, 0xb7, - 0x67, 0x0e, 0xe4, 0x35, 0xe4, 0xd9, 0xb2, 0x39, 0x40, 0xd7, 0x20, 0xcb, 0x3b, 0x03, 0x0b, 0xb3, - 0xb8, 0x7e, 0x21, 0xbc, 0xec, 0x33, 0x87, 0x27, 0xdc, 0x0c, 0x6f, 0x03, 0x62, 0x2a, 0x2f, 0x89, - 0xfe, 0x06, 0xe4, 0x3c, 0x26, 0x90, 0x29, 0x77, 0x31, 0x8e, 0x92, 0x8a, 0x84, 0x08, 0x4b, 0xfc, - 0x9d, 0x0a, 0xba, 0x68, 0x3f, 0xde, 0xd6, 0xd0, 0x8d, 0xd7, 0x8c, 0xf7, 0xba, 0x6b, 0x77, 0x03, - 0x4a, 0x61, 0x55, 0xf6, 0x3c, 0xea, 0xcb, 0xfa, 0x3d, 0x37, 0xa9, 0x7e, 0x3d, 0x52, 0x0c, 0x4d, - 0x3b, 0xd4, 0xc7, 0x4d, 0xa8, 0x4d, 0x8d, 0x59, 0x52, 0x71, 0x15, 0xf2, 0x36, 0x37, 0x91, 0x5c, - 0x94, 0x93, 0xbd, 0x96, 0x48, 0x2d, 0xae, 0xc0, 0x8a, 0x84, 0x0a, 0xdb, 0x6f, 0x98, 0x7b, 0x2d, - 0x38, 0x7f, 0x46, 0x23, 0xc1, 0xd7, 0x63, 0xad, 0x5c, 0xfd, 0xbd, 0x56, 0x1e, 0x6b, 0xe2, 0x3f, - 0xaa, 0xb0, 0x90, 0x2a, 0x7d, 0xc6, 0xd5, 0xa1, 0x3b, 0xb4, 0x65, 0x52, 0xc5, 0xd3, 0xa2, 0xcc, - 0xe4, 0x4d, 0x29, 0x6e, 0x0e, 0xe2, 0x79, 0x33, 0x93, 0xc8, 0x9b, 0xdb, 0x90, 0xe7, 0x35, 0x14, - 0xb6, 0xbf, 0xc5, 0x04, 0x7d, 0x7b, 0x86, 0xe9, 0x6e, 0x2e, 0xcb, 0x97, 0xad, 0xc4, 0x45, 0xf5, - 0x81, 0x31, 0xf2, 0xa9, 0x4b, 0xa4, 0x1b, 0xfa, 0x1f, 0xe4, 0x45, 0xeb, 0xa9, 0x64, 0x39, 0xc0, - 0x7c, 0x08, 0x10, 0xef, 0x4e, 0xd2, 0x04, 0x7f, 0xad, 0x42, 0x4e, 0x84, 0xfe, 0xba, 0x92, 0xa2, - 0x0a, 0x73, 0xd4, 0xe9, 0x0f, 0x07, 0xa6, 0x73, 0xc4, 0x6b, 0x31, 0x47, 0xa2, 0x35, 0x42, 0xb2, - 0x46, 0x58, 0xd1, 0x95, 0x64, 0x21, 0x54, 0x60, 0xa5, 0xeb, 0x1a, 0x8e, 0x77, 0x48, 0x5d, 0x1e, - 0x58, 0x94, 0x01, 0xf8, 0x73, 0x80, 0x31, 0xdf, 0x31, 0x9e, 0xd4, 0xbf, 0xc6, 0xd3, 0x1a, 0xcc, - 0x7a, 0x86, 0x3d, 0xb2, 0xa2, 0x86, 0x1c, 0x65, 0x54, 0x87, 0x8b, 0x25, 0x53, 0xa1, 0x11, 0xbe, - 0x05, 0x85, 0x08, 0x9a, 0x45, 0x1e, 0xb5, 0xde, 0x12, 0xe1, 0xdf, 0x68, 0x19, 0x72, 0xbc, 0xb1, - 0x72, 0x22, 0x4a, 0x44, 0x2c, 0x70, 0x1d, 0xf2, 0x02, 0x6f, 0xac, 0x17, 0xcd, 0x4d, 0x2c, 0x58, - 0x53, 0x9e, 0xc0, 0x62, 0xd1, 0x1f, 0x53, 0x88, 0xeb, 0x30, 0x9f, 0xa8, 0x89, 0xc4, 0x23, 0xa9, - 0xbe, 0xd2, 0x23, 0xf9, 0xd5, 0x0c, 0x94, 0x93, 0x99, 0x8c, 0x6e, 0x41, 0xd6, 0x3f, 0x19, 0x89, - 0x68, 0xca, 0xeb, 0x97, 0x27, 0xe7, 0xbb, 0x5c, 0x76, 0x4f, 0x46, 0x94, 0x70, 0x73, 0xd6, 0xf6, - 0x45, 0xa5, 0x89, 0xb7, 0x47, 0x24, 0x2f, 0x08, 0x11, 0xeb, 0xfb, 0x8c, 0x9a, 0x63, 0x6a, 0x8d, - 0xf8, 0xa5, 0x16, 0x08, 0xff, 0x66, 0xb2, 0xc0, 0x31, 0xfd, 0x4a, 0x4e, 0xc8, 0xd8, 0x37, 0x3e, - 0x01, 0x18, 0x83, 0xa3, 0x22, 0xcc, 0xee, 0xb7, 0xdf, 0x6b, 0xef, 0x3e, 0x68, 0x6b, 0x0a, 0x5b, - 0xdc, 0xd9, 0xdd, 0x6f, 0x77, 0x1b, 0x44, 0x53, 0x51, 0x01, 0x72, 0xdb, 0xf5, 0xfd, 0xed, 0x86, - 0x36, 0x83, 0xe6, 0xa1, 0x70, 0xaf, 0xd9, 0xe9, 0xee, 0x6e, 0x93, 0x7a, 0x4b, 0xcb, 0x20, 0x04, - 0x65, 0xae, 0x19, 0xcb, 0xb2, 0xcc, 0xb5, 0xb3, 0xdf, 0x6a, 0xd5, 0xc9, 0x87, 0x5a, 0x8e, 0x0d, - 0x54, 0xcd, 0xf6, 0xd6, 0xae, 0x96, 0x47, 0x25, 0x98, 0xeb, 0x74, 0xeb, 0xdd, 0x46, 0xa7, 0xd1, - 0xd5, 0x66, 0x71, 0x13, 0xf2, 0x62, 0xeb, 0xbf, 0x9d, 0x45, 0xb8, 0x07, 0xa5, 0x38, 0xe5, 0xe8, - 0x4a, 0x82, 0xd5, 0x08, 0x8e, 0xab, 0x63, 0x2c, 0x86, 0xf9, 0x23, 0xe8, 0x4b, 0xe5, 0x4f, 0x86, - 0x0b, 0x65, 0xfe, 0x7c, 0xa9, 0x42, 0x79, 0x9c, 0xf6, 0x5b, 0xa6, 0x45, 0xff, 0x89, 0x2e, 0x53, - 0x85, 0xb9, 0x43, 0xd3, 0xa2, 0x3c, 0x06, 0xb1, 0x5d, 0xb4, 0x9e, 0x58, 0x95, 0x2b, 0xb0, 0x1c, - 0x56, 0x65, 0xb7, 0x73, 0x77, 0x33, 0xac, 0xc9, 0xff, 0xbe, 0x0b, 0x85, 0xe8, 0x68, 0xec, 0xa6, - 0x1a, 0xf7, 0xf7, 0xeb, 0x3b, 0x9a, 0xc2, 0x6e, 0xaa, 0xbd, 0xdb, 0xed, 0x89, 0xa5, 0x8a, 0x16, - 0xa0, 0x48, 0x1a, 0xdb, 0x8d, 0x0f, 0x7a, 0xad, 0x7a, 0xf7, 0xce, 0x3d, 0x6d, 0x86, 0x5d, 0x9d, - 0x10, 0xb4, 0x77, 0xa5, 0x2c, 0xb3, 0xfe, 0x43, 0x1e, 0xe6, 0xc2, 0xd8, 0x59, 0x76, 0xee, 0x05, - 0xde, 0x31, 0x5a, 0x9e, 0x34, 0x8d, 0x57, 0xcf, 0xa5, 0xa4, 0xb2, 0x43, 0x28, 0xe8, 0x4d, 0xc8, - 0xf1, 0x01, 0x0e, 0x4d, 0x1c, 0x88, 0xab, 0x93, 0xc7, 0x5c, 0xac, 0xa0, 0xbb, 0x50, 0x8c, 0x0d, - 0x7e, 0x53, 0xbc, 0x2f, 0x26, 0xa4, 0xc9, 0x19, 0x11, 0x2b, 0x37, 0x54, 0x74, 0x0f, 0x8a, 0xb1, - 0xb9, 0x0b, 0x55, 0x13, 0xc9, 0x94, 0x98, 0xe0, 0xc6, 0x58, 0x13, 0x06, 0x35, 0xac, 0xa0, 0x06, - 0xc0, 0x78, 0xe4, 0x42, 0x17, 0x12, 0xc6, 0xf1, 0xd9, 0xac, 0x5a, 0x9d, 0xa4, 0x8a, 0x60, 0x36, - 0xa1, 0x10, 0x0d, 0x1c, 0xa8, 0x32, 0x61, 0x06, 0x11, 0x20, 0xd3, 0xa7, 0x13, 0xac, 0xa0, 0x2d, - 0x28, 0xd5, 0x2d, 0xeb, 0x55, 0x60, 0xaa, 0x71, 0x8d, 0x97, 0xc6, 0xb1, 0xa2, 0xe7, 0x37, 0xfd, - 0xc6, 0xa3, 0xab, 0xc9, 0xe6, 0x33, 0x6d, 0x70, 0xa9, 0xfe, 0xe7, 0x0f, 0xed, 0xa2, 0xdd, 0xba, - 0xb0, 0x90, 0x7a, 0xec, 0x91, 0x9e, 0xf2, 0x4e, 0xcd, 0x07, 0xd5, 0xda, 0x54, 0x7d, 0x84, 0xda, - 0x82, 0x72, 0xf2, 0x71, 0x42, 0xd3, 0xfe, 0x02, 0xaa, 0xd1, 0x6e, 0x53, 0x5e, 0x33, 0x65, 0x95, - 0xe5, 0x4b, 0x29, 0x5e, 0x55, 0x68, 0xe5, 0x2c, 0x18, 0x2b, 0xf8, 0xea, 0xbf, 0xd2, 0x58, 0xf1, - 0x1a, 0x64, 0x48, 0x9b, 0x6f, 0x3f, 0x79, 0xae, 0x2b, 0x4f, 0x9f, 0xeb, 0xca, 0xcb, 0xe7, 0xba, - 0xfa, 0xc5, 0xa9, 0xae, 0x7e, 0x7b, 0xaa, 0xab, 0x8f, 0x4f, 0x75, 0xf5, 0xc9, 0xa9, 0xae, 0xfe, - 0x7c, 0xaa, 0xab, 0xbf, 0x9c, 0xea, 0xca, 0xcb, 0x53, 0x5d, 0x7d, 0xf4, 0x42, 0x57, 0x9e, 0xbc, - 0xd0, 0x95, 0xa7, 0x2f, 0x74, 0xe5, 0xa3, 0x7c, 0xdf, 0x32, 0xa9, 0xe3, 0x1f, 0xe4, 0xf9, 0xaf, - 0xf6, 0xcd, 0xdf, 0x02, 0x00, 0x00, 0xff, 0xff, 0xee, 0xad, 0x00, 0xad, 0xb1, 0x0f, 0x00, 0x00, + // 1457 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x57, 0xcb, 0x6f, 0x1b, 0x55, + 0x17, 0x9f, 0x1b, 0x3f, 0x12, 0x1f, 0x3b, 0xce, 0xe4, 0x26, 0x6d, 0x5d, 0x57, 0xdf, 0xb8, 0xbd, + 0x52, 0xfb, 0x45, 0xdf, 0x47, 0xd3, 0x92, 0xaa, 0x90, 0x05, 0xa8, 0x72, 0x5a, 0x27, 0x35, 0xc4, + 0x4e, 0x7a, 0xed, 0x50, 0x40, 0x42, 0xd6, 0xc4, 0xbe, 0x49, 0x46, 0xcc, 0x8c, 0xdd, 0x79, 0x20, + 0xb2, 0x40, 0x42, 0x62, 0xc9, 0x82, 0x2e, 0xf9, 0x13, 0x58, 0xb3, 0x61, 0xcf, 0xaa, 0xcb, 0x2e, + 0x2b, 0x16, 0x15, 0x4d, 0x37, 0xec, 0xa8, 0xf8, 0x0b, 0xd0, 0x7d, 0xcc, 0x78, 0xc6, 0x75, 0xa0, + 0x3c, 0xba, 0x9b, 0x7b, 0x1e, 0xbf, 0x7b, 0xee, 0xef, 0x9e, 0x73, 0xee, 0x19, 0x28, 0xf5, 0x87, + 0x5e, 0xc0, 0x3e, 0x5f, 0x1d, 0x79, 0xc3, 0x60, 0x88, 0xf3, 0x72, 0x55, 0xbd, 0x7a, 0x68, 0x05, + 0x47, 0xe1, 0xfe, 0x6a, 0x7f, 0xe8, 0x5c, 0x3b, 0x1c, 0x1e, 0x0e, 0xaf, 0x09, 0xf5, 0x7e, 0x78, + 0x20, 0x56, 0x62, 0x21, 0xbe, 0xa4, 0x1b, 0xf9, 0x0d, 0x41, 0xe9, 0xbe, 0x67, 0x05, 0x8c, 0xb2, + 0x07, 0x21, 0xf3, 0x03, 0xdc, 0x06, 0x08, 0x2c, 0x87, 0xf9, 0xcc, 0xb3, 0x98, 0x5f, 0x41, 0x17, + 0x33, 0x2b, 0xc5, 0x35, 0xbc, 0xaa, 0xb6, 0xea, 0x5a, 0x0e, 0xeb, 0x08, 0xcd, 0x46, 0xf5, 0xd1, + 0xd3, 0x9a, 0xf6, 0xd3, 0xd3, 0x1a, 0xde, 0xf5, 0x98, 0x69, 0xdb, 0xc3, 0x7e, 0x37, 0xf6, 0xa2, + 0x09, 0x04, 0xfc, 0x36, 0xe4, 0x3b, 0xc3, 0xd0, 0xeb, 0xb3, 0xca, 0xcc, 0x45, 0xb4, 0x52, 0x5e, + 0xab, 0x45, 0x58, 0xc9, 0x5d, 0x57, 0xa5, 0x49, 0xc3, 0x0d, 0x1d, 0x9a, 0xf7, 0xc5, 0x37, 0x5e, + 0x87, 0x39, 0x87, 0x05, 0xe6, 0xc0, 0x0c, 0xcc, 0x4a, 0x46, 0x84, 0x71, 0x36, 0x72, 0x6d, 0xb1, + 0xc0, 0xb3, 0xfa, 0x2d, 0xa5, 0xdd, 0xc8, 0x3e, 0x7a, 0x5a, 0x43, 0x34, 0xb6, 0x26, 0x35, 0x80, + 0x31, 0x1e, 0x9e, 0x85, 0x4c, 0x7d, 0xb7, 0xa9, 0x6b, 0x78, 0x0e, 0xb2, 0x74, 0x6f, 0xbb, 0xa1, + 0x23, 0xb2, 0x00, 0xf3, 0x6a, 0x77, 0x7f, 0x34, 0x74, 0x7d, 0x46, 0xde, 0x85, 0x22, 0x65, 0xe6, + 0x20, 0xe2, 0x60, 0x15, 0x66, 0x1f, 0x84, 0x49, 0x02, 0x96, 0xa3, 0x9d, 0xef, 0x85, 0xcc, 0x3b, + 0x56, 0x66, 0x34, 0x32, 0x22, 0xb7, 0xa0, 0x24, 0xdd, 0x25, 0x1c, 0xbe, 0x06, 0xb3, 0x1e, 0xf3, + 0x43, 0x3b, 0x88, 0xfc, 0xcf, 0x4c, 0xf8, 0x4b, 0x3b, 0x1a, 0x59, 0x91, 0x6f, 0x11, 0x94, 0x92, + 0xd0, 0xf8, 0x0d, 0xc0, 0x7e, 0x60, 0x7a, 0x41, 0x4f, 0x30, 0x19, 0x98, 0xce, 0xa8, 0xe7, 0x70, + 0x30, 0xb4, 0x92, 0xa1, 0xba, 0xd0, 0x74, 0x23, 0x45, 0xcb, 0xc7, 0x2b, 0xa0, 0x33, 0x77, 0x90, + 0xb6, 0x9d, 0x11, 0xb6, 0x65, 0xe6, 0x0e, 0x92, 0x96, 0xd7, 0x61, 0xce, 0x31, 0x83, 0xfe, 0x11, + 0xf3, 0x7c, 0x45, 0x6a, 0x7c, 0xb4, 0x6d, 0x73, 0x9f, 0xd9, 0x2d, 0xa9, 0xa4, 0xb1, 0x15, 0x69, + 0xc2, 0x7c, 0x2a, 0x68, 0xbc, 0xfe, 0x8a, 0x09, 0xc2, 0x6f, 0x45, 0x4b, 0xa6, 0x02, 0x79, 0x88, + 0x60, 0x49, 0x60, 0x75, 0x02, 0x8f, 0x99, 0x4e, 0x8c, 0x78, 0x0b, 0x8a, 0xfd, 0xa3, 0xd0, 0xfd, + 0x34, 0x05, 0x79, 0xee, 0x65, 0xc8, 0xdb, 0xdc, 0x48, 0xe1, 0x26, 0x3d, 0x26, 0x42, 0x9a, 0xf9, + 0x0b, 0x21, 0xdd, 0x00, 0x2c, 0xce, 0xfd, 0x81, 0x69, 0x87, 0xcc, 0x8f, 0xd8, 0xff, 0x0f, 0x80, + 0xcd, 0xa5, 0x3d, 0xd7, 0x74, 0x98, 0x60, 0xbd, 0x40, 0x0b, 0x42, 0xd2, 0x36, 0x1d, 0x46, 0xd6, + 0x61, 0x29, 0xe5, 0xa4, 0x8e, 0x71, 0x09, 0x4a, 0xd2, 0xeb, 0x33, 0x21, 0x17, 0xe7, 0x28, 0xd0, + 0xa2, 0x3d, 0x36, 0x25, 0x4b, 0xb0, 0xb8, 0x1d, 0xc1, 0x44, 0xbb, 0x91, 0x9b, 0x2a, 0x06, 0x25, + 0x54, 0x68, 0x35, 0x28, 0x8e, 0x63, 0x88, 0xc0, 0x20, 0x0e, 0xc2, 0x27, 0x18, 0xf4, 0x3d, 0x9f, + 0x79, 0x9d, 0xc0, 0x0c, 0x62, 0xa8, 0x1f, 0x10, 0x2c, 0x26, 0x84, 0x0a, 0xea, 0x32, 0x94, 0x2d, + 0xf7, 0x90, 0xf9, 0x81, 0x35, 0x74, 0x7b, 0x9e, 0x19, 0xc8, 0x23, 0x21, 0x3a, 0x1f, 0x4b, 0xa9, + 0x19, 0x30, 0x7e, 0x6a, 0x37, 0x74, 0x7a, 0x31, 0x8b, 0x68, 0x25, 0x4b, 0x0b, 0x6e, 0xe8, 0x48, + 0xf2, 0x78, 0x4a, 0x9a, 0x23, 0xab, 0x37, 0x81, 0x94, 0x11, 0x48, 0xba, 0x39, 0xb2, 0x9a, 0x29, + 0xb0, 0x55, 0x58, 0xf2, 0x42, 0x9b, 0x4d, 0x9a, 0x67, 0x85, 0xf9, 0x22, 0x57, 0xa5, 0xec, 0xc9, + 0x27, 0xb0, 0xc4, 0x03, 0x6f, 0xde, 0x49, 0x87, 0x7e, 0x0e, 0x66, 0x43, 0x9f, 0x79, 0x3d, 0x6b, + 0xa0, 0xae, 0x21, 0xcf, 0x97, 0xcd, 0x01, 0xbe, 0x0a, 0x59, 0xd1, 0x19, 0x78, 0x98, 0xc5, 0xb5, + 0xf3, 0xd1, 0x65, 0xbf, 0x74, 0x78, 0x2a, 0xcc, 0xc8, 0x16, 0x60, 0xae, 0xf2, 0xd3, 0xe8, 0x6f, + 0x42, 0xce, 0xe7, 0x02, 0x95, 0x72, 0x17, 0x92, 0x28, 0x13, 0x91, 0x50, 0x69, 0x49, 0xbe, 0x47, + 0x60, 0xc8, 0xf6, 0xe3, 0x6f, 0x0e, 0xbd, 0x64, 0xcd, 0xf8, 0xaf, 0xbb, 0x76, 0xd7, 0xa1, 0x14, + 0x55, 0x65, 0xcf, 0x67, 0x81, 0xaa, 0xdf, 0x33, 0xd3, 0xea, 0xd7, 0xa7, 0xc5, 0xc8, 0xb4, 0xc3, + 0x02, 0xd2, 0x84, 0xda, 0xa9, 0x31, 0x2b, 0x2a, 0xae, 0x40, 0xde, 0x11, 0x26, 0x8a, 0x8b, 0x72, + 0xba, 0xd7, 0x52, 0xa5, 0x25, 0x15, 0x38, 0xab, 0xa0, 0xa2, 0xf6, 0x1b, 0xe5, 0x5e, 0x0b, 0xce, + 0xbd, 0xa4, 0x51, 0xe0, 0x6b, 0x89, 0x56, 0x8e, 0xfe, 0xa8, 0x95, 0x27, 0x9a, 0xf8, 0x8f, 0x08, + 0x16, 0x26, 0x4a, 0x9f, 0x73, 0x75, 0xe0, 0x0d, 0x1d, 0x95, 0x54, 0xc9, 0xb4, 0x28, 0x73, 0x79, + 0x53, 0x89, 0x9b, 0x83, 0x64, 0xde, 0xcc, 0xa4, 0xf2, 0xe6, 0x16, 0xe4, 0x45, 0x0d, 0x45, 0xed, + 0x6f, 0x31, 0x45, 0xdf, 0xae, 0x69, 0x79, 0x1b, 0xcb, 0xea, 0x65, 0x2b, 0x09, 0x51, 0x7d, 0x60, + 0x8e, 0x02, 0xe6, 0x51, 0xe5, 0x86, 0xff, 0x0f, 0x79, 0xd9, 0x7a, 0x2a, 0x59, 0x01, 0x30, 0x1f, + 0x01, 0x24, 0xbb, 0x93, 0x32, 0x21, 0xdf, 0x20, 0xc8, 0xc9, 0xd0, 0x5f, 0x57, 0x52, 0x54, 0x61, + 0x8e, 0xb9, 0xfd, 0xe1, 0xc0, 0x72, 0x0f, 0x45, 0x2d, 0xe6, 0x68, 0xbc, 0xc6, 0x58, 0xd5, 0x08, + 0x2f, 0xba, 0x92, 0x2a, 0x84, 0x0a, 0x9c, 0xed, 0x7a, 0xa6, 0xeb, 0x1f, 0x30, 0x4f, 0x04, 0x16, + 0x67, 0x00, 0xf9, 0x02, 0x60, 0xcc, 0x77, 0x82, 0x27, 0xf4, 0xf7, 0x78, 0x5a, 0x85, 0x59, 0xdf, + 0x74, 0x46, 0x76, 0xdc, 0x90, 0xe3, 0x8c, 0xea, 0x08, 0xb1, 0x62, 0x2a, 0x32, 0x22, 0x37, 0xa1, + 0x10, 0x43, 0xf3, 0xc8, 0xe3, 0xd6, 0x5b, 0xa2, 0xe2, 0x1b, 0x2f, 0x43, 0x4e, 0x34, 0x56, 0x41, + 0x44, 0x89, 0xca, 0x05, 0xa9, 0x43, 0x5e, 0xe2, 0x8d, 0xf5, 0xb2, 0xb9, 0xc9, 0x05, 0x6f, 0xca, + 0x53, 0x58, 0x2c, 0x06, 0x63, 0x0a, 0x49, 0x1d, 0xe6, 0x53, 0x35, 0x91, 0x7a, 0x24, 0xd1, 0x2b, + 0x3d, 0x92, 0x5f, 0xcf, 0x40, 0x39, 0x9d, 0xc9, 0xf8, 0x26, 0x64, 0x83, 0xe3, 0x91, 0x8c, 0xa6, + 0xbc, 0x76, 0x69, 0x7a, 0xbe, 0xab, 0x65, 0xf7, 0x78, 0xc4, 0xa8, 0x30, 0xe7, 0x6d, 0x5f, 0x56, + 0x9a, 0x7c, 0x7b, 0x64, 0xf2, 0x82, 0x14, 0xf1, 0xbe, 0xcf, 0xa9, 0x39, 0x62, 0xf6, 0x48, 0x5c, + 0x6a, 0x81, 0x8a, 0x6f, 0x2e, 0x0b, 0x5d, 0x2b, 0xa8, 0xe4, 0xa4, 0x8c, 0x7f, 0x93, 0x63, 0x80, + 0x31, 0x38, 0x2e, 0xc2, 0xec, 0x5e, 0xfb, 0xfd, 0xf6, 0xce, 0xfd, 0xb6, 0xae, 0xf1, 0xc5, 0xed, + 0x9d, 0xbd, 0x76, 0xb7, 0x41, 0x75, 0x84, 0x0b, 0x90, 0xdb, 0xaa, 0xef, 0x6d, 0x35, 0xf4, 0x19, + 0x3c, 0x0f, 0x85, 0xbb, 0xcd, 0x4e, 0x77, 0x67, 0x8b, 0xd6, 0x5b, 0x7a, 0x06, 0x63, 0x28, 0x0b, + 0xcd, 0x58, 0x96, 0xe5, 0xae, 0x9d, 0xbd, 0x56, 0xab, 0x4e, 0x3f, 0xd2, 0x73, 0x7c, 0xa0, 0x6a, + 0xb6, 0x37, 0x77, 0xf4, 0x3c, 0x2e, 0xc1, 0x5c, 0xa7, 0x5b, 0xef, 0x36, 0x3a, 0x8d, 0xae, 0x3e, + 0x4b, 0x9a, 0x90, 0x97, 0x5b, 0xff, 0xe3, 0x2c, 0x22, 0x3d, 0x28, 0x25, 0x29, 0xc7, 0x97, 0x53, + 0xac, 0xc6, 0x70, 0x42, 0x9d, 0x60, 0x31, 0xca, 0x1f, 0x49, 0xdf, 0x44, 0xfe, 0x64, 0x84, 0x50, + 0xe5, 0xcf, 0x57, 0x08, 0xca, 0xe3, 0xb4, 0xdf, 0xb4, 0x6c, 0xf6, 0x6f, 0x74, 0x99, 0x2a, 0xcc, + 0x1d, 0x58, 0x36, 0x13, 0x31, 0xc8, 0xed, 0xe2, 0xf5, 0xb4, 0xaa, 0xfc, 0xdf, 0x7b, 0x50, 0x88, + 0x8f, 0xc0, 0x6f, 0xa4, 0x71, 0x6f, 0xaf, 0xbe, 0xad, 0x6b, 0xfc, 0x46, 0xda, 0x3b, 0xdd, 0x9e, + 0x5c, 0x22, 0xbc, 0x00, 0x45, 0xda, 0xd8, 0x6a, 0x7c, 0xd8, 0x6b, 0xd5, 0xbb, 0xb7, 0xef, 0xea, + 0x33, 0xfc, 0x8a, 0xa4, 0xa0, 0xbd, 0xa3, 0x64, 0x99, 0xb5, 0x5f, 0x73, 0x30, 0x17, 0xc5, 0xc8, + 0xb3, 0x70, 0x37, 0xf4, 0x8f, 0xf0, 0xf2, 0xb4, 0xa9, 0xbb, 0x7a, 0x66, 0x42, 0xaa, 0x3a, 0x81, + 0x86, 0xdf, 0x82, 0x9c, 0x18, 0xd4, 0xf0, 0xd4, 0xc1, 0xb7, 0x3a, 0x7d, 0x9c, 0x25, 0x1a, 0xbe, + 0x03, 0xc5, 0xc4, 0x80, 0x77, 0x8a, 0xf7, 0x85, 0x94, 0x34, 0x3d, 0x0b, 0x12, 0xed, 0x3a, 0xc2, + 0x77, 0xa1, 0x98, 0x98, 0xaf, 0x70, 0x35, 0x95, 0x34, 0xa9, 0x49, 0x6d, 0x8c, 0x35, 0x65, 0x20, + 0x23, 0x1a, 0x6e, 0x00, 0x8c, 0x47, 0x2b, 0x7c, 0x3e, 0x65, 0x9c, 0x9c, 0xc1, 0xaa, 0xd5, 0x69, + 0xaa, 0x18, 0x66, 0x03, 0x0a, 0xf1, 0x60, 0x81, 0x2b, 0x53, 0x66, 0x0d, 0x09, 0x72, 0xfa, 0x14, + 0x42, 0x34, 0xbc, 0x09, 0xa5, 0xba, 0x6d, 0xbf, 0x0a, 0x4c, 0x35, 0xa9, 0xf1, 0x27, 0x71, 0xec, + 0xf8, 0x99, 0x9d, 0x7c, 0xcb, 0xf1, 0x95, 0x74, 0x93, 0x39, 0x6d, 0x40, 0xa9, 0xfe, 0xf7, 0x4f, + 0xed, 0xe2, 0xdd, 0xba, 0xb0, 0x30, 0xf1, 0xa8, 0x63, 0x63, 0xc2, 0x7b, 0x62, 0x0e, 0xa8, 0xd6, + 0x4e, 0xd5, 0xc7, 0xa8, 0x2d, 0x28, 0xa7, 0x1f, 0x21, 0x7c, 0xda, 0xb4, 0x5f, 0x8d, 0x77, 0x3b, + 0xe5, 0xd5, 0xd2, 0x56, 0xd0, 0xc6, 0x3b, 0x8f, 0x9f, 0x19, 0xda, 0x93, 0x67, 0x86, 0xf6, 0xe2, + 0x99, 0x81, 0xbe, 0x3c, 0x31, 0xd0, 0x77, 0x27, 0x06, 0x7a, 0x74, 0x62, 0xa0, 0xc7, 0x27, 0x06, + 0xfa, 0xf9, 0xc4, 0x40, 0xbf, 0x9c, 0x18, 0xda, 0x8b, 0x13, 0x03, 0x3d, 0x7c, 0x6e, 0x68, 0x8f, + 0x9f, 0x1b, 0xda, 0x93, 0xe7, 0x86, 0xf6, 0x71, 0xbe, 0x6f, 0x5b, 0xcc, 0x0d, 0xf6, 0xf3, 0xe2, + 0x47, 0xf8, 0xc6, 0xef, 0x01, 0x00, 0x00, 0xff, 0xff, 0xd5, 0x01, 0x6d, 0x44, 0x4f, 0x0f, 0x00, + 0x00, } func (x MatchType) String() string { @@ -2648,27 +2611,6 @@ func (this *TimeSeriesFile) Equal(that interface{}) bool { } return true } -func (this *TransferTSDBResponse) Equal(that interface{}) bool { - if that == nil { - return this == nil - } - - that1, ok := that.(*TransferTSDBResponse) - if !ok { - that2, ok := that.(TransferTSDBResponse) - if ok { - that1 = &that2 - } else { - return false - } - } - if that1 == nil { - return this == nil - } else if this == nil { - return false - } - return true -} func (this *WriteRequest) GoString() string { if this == nil { return "nil" @@ -3042,15 +2984,6 @@ func (this *TimeSeriesFile) GoString() string { s = append(s, "}") return strings.Join(s, "") } -func (this *TransferTSDBResponse) GoString() string { - if this == nil { - return "nil" - } - s := make([]string, 0, 4) - s = append(s, "&client.TransferTSDBResponse{") - s = append(s, "}") - return strings.Join(s, "") -} func valueToGoStringCortex(v interface{}, typ string) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -3083,8 +3016,6 @@ type IngesterClient interface { MetricsMetadata(ctx context.Context, in *MetricsMetadataRequest, opts ...grpc.CallOption) (*MetricsMetadataResponse, error) // TransferChunks allows leaving ingester (client) to stream chunks directly to joining ingesters (server). TransferChunks(ctx context.Context, opts ...grpc.CallOption) (Ingester_TransferChunksClient, error) - // TransferTSDB transfers all files of a tsdb to a joining ingester - TransferTSDB(ctx context.Context, opts ...grpc.CallOption) (Ingester_TransferTSDBClient, error) } type ingesterClient struct { @@ -3233,40 +3164,6 @@ func (x *ingesterTransferChunksClient) CloseAndRecv() (*TransferChunksResponse, return m, nil } -func (c *ingesterClient) TransferTSDB(ctx context.Context, opts ...grpc.CallOption) (Ingester_TransferTSDBClient, error) { - stream, err := c.cc.NewStream(ctx, &_Ingester_serviceDesc.Streams[2], "/cortex.Ingester/TransferTSDB", opts...) - if err != nil { - return nil, err - } - x := &ingesterTransferTSDBClient{stream} - return x, nil -} - -type Ingester_TransferTSDBClient interface { - Send(*TimeSeriesFile) error - CloseAndRecv() (*TransferTSDBResponse, error) - grpc.ClientStream -} - -type ingesterTransferTSDBClient struct { - grpc.ClientStream -} - -func (x *ingesterTransferTSDBClient) Send(m *TimeSeriesFile) error { - return x.ClientStream.SendMsg(m) -} - -func (x *ingesterTransferTSDBClient) CloseAndRecv() (*TransferTSDBResponse, error) { - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - m := new(TransferTSDBResponse) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - // IngesterServer is the server API for Ingester service. type IngesterServer interface { Push(context.Context, *WriteRequest) (*WriteResponse, error) @@ -3280,8 +3177,6 @@ type IngesterServer interface { MetricsMetadata(context.Context, *MetricsMetadataRequest) (*MetricsMetadataResponse, error) // TransferChunks allows leaving ingester (client) to stream chunks directly to joining ingesters (server). TransferChunks(Ingester_TransferChunksServer) error - // TransferTSDB transfers all files of a tsdb to a joining ingester - TransferTSDB(Ingester_TransferTSDBServer) error } // UnimplementedIngesterServer can be embedded to have forward compatible implementations. @@ -3318,9 +3213,6 @@ func (*UnimplementedIngesterServer) MetricsMetadata(ctx context.Context, req *Me func (*UnimplementedIngesterServer) TransferChunks(srv Ingester_TransferChunksServer) error { return status.Errorf(codes.Unimplemented, "method TransferChunks not implemented") } -func (*UnimplementedIngesterServer) TransferTSDB(srv Ingester_TransferTSDBServer) error { - return status.Errorf(codes.Unimplemented, "method TransferTSDB not implemented") -} func RegisterIngesterServer(s *grpc.Server, srv IngesterServer) { s.RegisterService(&_Ingester_serviceDesc, srv) @@ -3517,32 +3409,6 @@ func (x *ingesterTransferChunksServer) Recv() (*TimeSeriesChunk, error) { return m, nil } -func _Ingester_TransferTSDB_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(IngesterServer).TransferTSDB(&ingesterTransferTSDBServer{stream}) -} - -type Ingester_TransferTSDBServer interface { - SendAndClose(*TransferTSDBResponse) error - Recv() (*TimeSeriesFile, error) - grpc.ServerStream -} - -type ingesterTransferTSDBServer struct { - grpc.ServerStream -} - -func (x *ingesterTransferTSDBServer) SendAndClose(m *TransferTSDBResponse) error { - return x.ServerStream.SendMsg(m) -} - -func (x *ingesterTransferTSDBServer) Recv() (*TimeSeriesFile, error) { - m := new(TimeSeriesFile) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - var _Ingester_serviceDesc = grpc.ServiceDesc{ ServiceName: "cortex.Ingester", HandlerType: (*IngesterServer)(nil), @@ -3591,11 +3457,6 @@ var _Ingester_serviceDesc = grpc.ServiceDesc{ Handler: _Ingester_TransferChunks_Handler, ClientStreams: true, }, - { - StreamName: "TransferTSDB", - Handler: _Ingester_TransferTSDB_Handler, - ClientStreams: true, - }, }, Metadata: "cortex.proto", } @@ -4768,29 +4629,6 @@ func (m *TimeSeriesFile) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } -func (m *TransferTSDBResponse) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *TransferTSDBResponse) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *TransferTSDBResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - return len(dAtA) - i, nil -} - func encodeVarintCortex(dAtA []byte, offset int, v uint64) int { offset -= sovCortex(v) base := offset @@ -5308,15 +5146,6 @@ func (m *TimeSeriesFile) Size() (n int) { return n } -func (m *TransferTSDBResponse) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - return n -} - func sovCortex(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -5716,15 +5545,6 @@ func (this *TimeSeriesFile) String() string { }, "") return s } -func (this *TransferTSDBResponse) String() string { - if this == nil { - return "nil" - } - s := strings.Join([]string{`&TransferTSDBResponse{`, - `}`, - }, "") - return s -} func valueToStringCortex(v interface{}) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -8827,59 +8647,6 @@ func (m *TimeSeriesFile) Unmarshal(dAtA []byte) error { } return nil } -func (m *TransferTSDBResponse) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCortex - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: TransferTSDBResponse: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: TransferTSDBResponse: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - default: - iNdEx = preIndex - skippy, err := skipCortex(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthCortex - } - if (iNdEx + skippy) < 0 { - return ErrInvalidLengthCortex - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} func skipCortex(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/pkg/ingester/client/cortex.proto b/pkg/ingester/client/cortex.proto index ef52580b731..2c3d703ce72 100644 --- a/pkg/ingester/client/cortex.proto +++ b/pkg/ingester/client/cortex.proto @@ -23,9 +23,6 @@ service Ingester { // TransferChunks allows leaving ingester (client) to stream chunks directly to joining ingesters (server). rpc TransferChunks(stream TimeSeriesChunk) returns (TransferChunksResponse) {}; - - // TransferTSDB transfers all files of a tsdb to a joining ingester - rpc TransferTSDB(stream TimeSeriesFile) returns (TransferTSDBResponse) {}; } message WriteRequest { @@ -192,5 +189,3 @@ message TimeSeriesFile { string filename = 3; bytes data = 4; } - -message TransferTSDBResponse {} diff --git a/pkg/ingester/client/cortex_mock_test.go b/pkg/ingester/client/cortex_mock_test.go index a6a0dc51f8f..65774e73e2e 100644 --- a/pkg/ingester/client/cortex_mock_test.go +++ b/pkg/ingester/client/cortex_mock_test.go @@ -59,8 +59,3 @@ func (m *IngesterServerMock) TransferChunks(s Ingester_TransferChunksServer) err args := m.Called(s) return args.Error(0) } - -func (m *IngesterServerMock) TransferTSDB(s Ingester_TransferTSDBServer) error { - args := m.Called(s) - return args.Error(0) -} diff --git a/pkg/ingester/client/cortex_util.go b/pkg/ingester/client/cortex_util.go index 433ff141fdc..2f917c03d70 100644 --- a/pkg/ingester/client/cortex_util.go +++ b/pkg/ingester/client/cortex_util.go @@ -20,14 +20,6 @@ func SendTimeSeriesChunk(s Ingester_TransferChunksClient, m *TimeSeriesChunk) er }) } -// SendTimeSeriesFile wraps the stream's Send() checking if the context is done -// before calling Send(). -func SendTimeSeriesFile(s Ingester_TransferTSDBClient, m *TimeSeriesFile) error { - return sendWithContextErrChecking(s.Context(), func() error { - return s.Send(m) - }) -} - func sendWithContextErrChecking(ctx context.Context, send func() error) error { // If the context has been canceled or its deadline exceeded, we should return it // instead of the cryptic error the Send() will return. diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 2b6cafaa78e..bc5db6787be 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -88,7 +88,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.LifecyclerConfig.RegisterFlags(f) cfg.WALConfig.RegisterFlags(f) - f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing. Negative value or zero disables hand-over.") + f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing. Negative value or zero disables hand-over. This feature is supported only by the chunks storage.") f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.") f.DurationVar(&cfg.RetainPeriod, "ingester.retain-period", 5*time.Minute, "Period chunks will remain in memory after flushing.") diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index efa22a9db14..b193094b5f4 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -5,7 +5,6 @@ import ( "fmt" "io/ioutil" "math" - "math/rand" "net/http" "os" "path/filepath" @@ -898,21 +897,3 @@ func benchmarkIngesterPush(b *testing.B, limits validation.Limits, errorsExpecte } } - -func TestRemoveEmptyDir(t *testing.T) { - - // remove dir that dne - require.NoError(t, removeEmptyDir(fmt.Sprintf("%v", rand.Int63()))) - - // remove empty dir - dir, err := ioutil.TempDir("", "TestRemoveEmptyDir") - require.NoError(t, err) - require.NoError(t, removeEmptyDir(dir)) - - // remove non-empty dir - dir, err = ioutil.TempDir("", "TestRemoveEmptyDir") - require.NoError(t, err) - - ioutil.WriteFile(filepath.Join(dir, "tempfile"), []byte("hello world"), 0777) - require.NotNil(t, removeEmptyDir(dir)) -} diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index a638b2d8417..c4a1ea0a964 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -126,13 +126,6 @@ type TSDBState struct { // Value used by shipper as external label. shipperIngesterID string - // Keeps count of in-flight requests - inflightWriteReqs sync.WaitGroup - - // Used to run only once operations at shutdown, during the blocks/wal - // transferring to a joining ingester - transferOnce sync.Once - subservices *services.Manager tsdbMetrics *tsdbMetrics @@ -391,21 +384,11 @@ func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*clien // Ensure the ingester shutdown procedure hasn't started i.userStatesMtx.RLock() - if i.stopped { i.userStatesMtx.RUnlock() return nil, fmt.Errorf("ingester stopping") } - - // Keep track of in-flight requests, in order to safely start blocks transfer - // (at shutdown) only once all in-flight write requests have completed. - // It's important to increase the number of in-flight requests within the lock - // (even if sync.WaitGroup is thread-safe), otherwise there's a race condition - // with the TSDB transfer, which - after the stopped flag is set to true - waits - // until all in-flight requests to reach zero. - i.TSDBState.inflightWriteReqs.Add(1) i.userStatesMtx.RUnlock() - defer i.TSDBState.inflightWriteReqs.Done() // Given metadata is a best-effort approach, and we don't halt on errors // process it before samples. Otherwise, we risk returning an error before ingestion. diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index fab64527f86..906bda83e93 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -2,12 +2,8 @@ package ingester import ( "context" - "errors" "io" - "io/ioutil" "math" - "os" - "path/filepath" "testing" "time" @@ -24,7 +20,6 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/ring/testutils" - "github.com/cortexproject/cortex/pkg/storage/backend/s3" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/test" @@ -95,7 +90,7 @@ func TestIngesterRestart(t *testing.T) { }) } -func TestIngesterTransfer(t *testing.T) { +func TestIngesterChunksTransfer(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) @@ -267,76 +262,6 @@ func (i ingesterClientAdapater) TransferChunks(ctx context.Context, _ ...grpc.Ca return stream, nil } -type ingesterTransferTSDBStreamMock struct { - ctx context.Context - reqs chan *client.TimeSeriesFile - resp chan *client.TransferTSDBResponse - err chan error - - grpc.ServerStream - grpc.ClientStream -} - -func (s *ingesterTransferTSDBStreamMock) Send(tsc *client.TimeSeriesFile) error { - s.reqs <- tsc - return nil -} - -func (s *ingesterTransferTSDBStreamMock) CloseAndRecv() (*client.TransferTSDBResponse, error) { - close(s.reqs) - select { - case resp := <-s.resp: - return resp, nil - case err := <-s.err: - return nil, err - } -} - -func (s *ingesterTransferTSDBStreamMock) SendAndClose(resp *client.TransferTSDBResponse) error { - s.resp <- resp - return nil -} - -func (s *ingesterTransferTSDBStreamMock) ErrorAndClose(err error) { - s.err <- err -} - -func (s *ingesterTransferTSDBStreamMock) Recv() (*client.TimeSeriesFile, error) { - req, ok := <-s.reqs - if !ok { - return nil, io.EOF - } - return req, nil -} - -func (s *ingesterTransferTSDBStreamMock) Context() context.Context { - return s.ctx -} - -func (*ingesterTransferTSDBStreamMock) SendMsg(m interface{}) error { - return nil -} - -func (*ingesterTransferTSDBStreamMock) RecvMsg(m interface{}) error { - return nil -} - -func (i ingesterClientAdapater) TransferTSDB(ctx context.Context, _ ...grpc.CallOption) (client.Ingester_TransferTSDBClient, error) { - stream := &ingesterTransferTSDBStreamMock{ - ctx: ctx, - reqs: make(chan *client.TimeSeriesFile), - resp: make(chan *client.TransferTSDBResponse), - err: make(chan error), - } - go func() { - err := i.ingester.TransferTSDB(stream) - if err != nil { - stream.ErrorAndClose(err) - } - }() - return stream, nil -} - func (i ingesterClientAdapater) Close() error { return nil } @@ -401,124 +326,3 @@ func TestIngesterFlush(t *testing.T) { }, }, res) } - -func TestV2IngesterTransfer(t *testing.T) { - scenarios := map[string]struct { - failedTransfers int - }{ - "transfer succeeded at first attempt": { - failedTransfers: 0, - }, - "transfer failed at first attempt, then succeeded": { - failedTransfers: 1, - }, - } - - // We run the same under different scenarios - for name, scenario := range scenarios { - t.Run(name, func(t *testing.T) { - limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) - require.NoError(t, err) - - dir1, err := ioutil.TempDir("", "tsdb") - require.NoError(t, err) - dir2, err := ioutil.TempDir("", "tsdb") - require.NoError(t, err) - require.NoError(t, os.Remove(dir2)) // remove the destination dir so there isn't a move conflict - - // Add a spurious file to the ing1 TSDB directory, in order to ensure - // the transfer successfully work anyway. - ioutil.WriteFile(filepath.Join(dir1, ".DS_Store"), []byte("{}"), 0644) - - // Start the first ingester, and get it into ACTIVE state. - cfg1 := defaultIngesterTestConfig() - cfg1.BlocksStorageEnabled = true - cfg1.BlocksStorageConfig.TSDB.Dir = dir1 - cfg1.BlocksStorageConfig.S3 = s3.Config{ - Endpoint: "dummy", - BucketName: "dummy", - SecretAccessKey: flagext.Secret{Value: "dummy"}, - AccessKeyID: "dummy", - } - cfg1.LifecyclerConfig.ID = "ingester1" - cfg1.LifecyclerConfig.Addr = "ingester1" - cfg1.LifecyclerConfig.JoinAfter = 0 * time.Second - cfg1.MaxTransferRetries = 10 - ing1, err := New(cfg1, defaultClientTestConfig(), limits, nil, nil) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing1)) - - test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} { - return ing1.lifecycler.GetState() - }) - - // Now write a sample to this ingester - req, expectedResponse, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000) - ctx := user.InjectOrgID(context.Background(), userID) - _, err = ing1.Push(ctx, req) - require.NoError(t, err) - - // Start a second ingester, but let it go into PENDING - cfg2 := defaultIngesterTestConfig() - cfg2.BlocksStorageEnabled = true - cfg2.BlocksStorageConfig.TSDB.Dir = dir2 - cfg2.BlocksStorageConfig.S3 = s3.Config{ - Endpoint: "dummy", - BucketName: "dummy", - SecretAccessKey: flagext.Secret{Value: "dummy"}, - AccessKeyID: "dummy", - } - cfg2.LifecyclerConfig.RingConfig.KVStore.Mock = cfg1.LifecyclerConfig.RingConfig.KVStore.Mock - cfg2.LifecyclerConfig.ID = "ingester2" - cfg2.LifecyclerConfig.Addr = "ingester2" - cfg2.LifecyclerConfig.JoinAfter = 100 * time.Second - ing2, err := New(cfg2, defaultClientTestConfig(), limits, nil, nil) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing2)) - - // Let ing1 send blocks/wal to ing2 - ingesterClientFactoryCount := 0 - - ing1.cfg.ingesterClientFactory = func(addr string, _ client.Config) (client.HealthAndIngesterClient, error) { - if ingesterClientFactoryCount++; ingesterClientFactoryCount <= scenario.failedTransfers { - return nil, errors.New("mocked error simulating the case the leaving ingester is unable to connect to the joining ingester") - } - - return ingesterClientAdapater{ - ingester: ing2, - }, nil - } - - // Now stop the first ingester, and wait for the second ingester to become ACTIVE. - require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing1)) - test.Poll(t, 10*time.Second, ring.ACTIVE, func() interface{} { - return ing2.lifecycler.GetState() - }) - - // And check the second ingester has the sample - matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "foo") - require.NoError(t, err) - - request, err := client.ToQueryRequest(model.TimeFromUnix(0), model.TimeFromUnix(200), []*labels.Matcher{matcher}) - require.NoError(t, err) - - response, err := ing2.Query(ctx, request) - require.NoError(t, err) - assert.Equal(t, expectedResponse, response) - - // Check we can send the same sample again to the new ingester and get the same result - req, _, _ = mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000) - _, err = ing2.Push(ctx, req) - require.NoError(t, err) - response, err = ing2.Query(ctx, request) - require.NoError(t, err) - assert.Equal(t, expectedResponse, response) - - // Assert the data is in the expected location of dir2 - files, err := ioutil.ReadDir(dir2) - require.NoError(t, err) - require.Equal(t, 1, len(files)) - require.Equal(t, "1", files[0].Name()) - }) - } -} diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index b052c3b996c..88618ccf28f 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -38,13 +38,9 @@ type ingesterMetrics struct { walReplayDuration prometheus.Gauge walCorruptionsTotal prometheus.Counter - // Chunks / blocks transfer. + // Chunks transfer. sentChunks prometheus.Counter receivedChunks prometheus.Counter - sentFiles prometheus.Counter - receivedFiles prometheus.Counter - receivedBytes prometheus.Counter - sentBytes prometheus.Counter // Chunks flushing. flushSeriesInProgress prometheus.Gauge @@ -145,22 +141,6 @@ func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSD Name: "cortex_ingester_received_chunks", Help: "The total number of chunks received by this ingester whilst joining", }), - sentFiles: promauto.With(r).NewCounter(prometheus.CounterOpts{ - Name: "cortex_ingester_sent_files", - Help: "The total number of files sent by this ingester whilst leaving.", - }), - receivedFiles: promauto.With(r).NewCounter(prometheus.CounterOpts{ - Name: "cortex_ingester_received_files", - Help: "The total number of files received by this ingester whilst joining", - }), - receivedBytes: promauto.With(r).NewCounter(prometheus.CounterOpts{ - Name: "cortex_ingester_received_bytes_total", - Help: "The total number of bytes received by this ingester whilst joining", - }), - sentBytes: promauto.With(r).NewCounter(prometheus.CounterOpts{ - Name: "cortex_ingester_sent_bytes_total", - Help: "The total number of bytes sent by this ingester whilst leaving", - }), // Chunks flushing. flushSeriesInProgress: promauto.With(r).NewGauge(prometheus.GaugeOpts{ diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 383f00d78f6..b9b87bda7e0 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -5,18 +5,12 @@ import ( "context" "fmt" "io" - "io/ioutil" "os" - "path/filepath" - "sync" "time" "github.com/go-kit/kit/log/level" - "github.com/oklog/ulid" "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" - "github.com/thanos-io/thanos/pkg/shipper" "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/chunk/encoding" @@ -214,143 +208,6 @@ func (i *Ingester) transfer(ctx context.Context, xfer func() error) error { return nil } -// TransferTSDB receives all the file chunks from another ingester, and writes them to tsdb directories -func (i *Ingester) TransferTSDB(stream client.Ingester_TransferTSDBServer) error { - fromIngesterID := "" - - xfer := func() error { - - // Validate the final directory is empty, if it exists and is empty delete it so a move can succeed - err := removeEmptyDir(i.cfg.BlocksStorageConfig.TSDB.Dir) - if err != nil { - return errors.Wrap(err, "remove existing TSDB directory") - } - - tmpDir, err := ioutil.TempDir("", "tsdb_xfer") - if err != nil { - return errors.Wrap(err, "unable to create temporary directory to store transferred TSDB blocks") - } - defer os.RemoveAll(tmpDir) - - bytesXfer := 0 - filesXfer := 0 - - files := make(map[string]*os.File) - defer func() { - for _, f := range files { - if err := f.Close(); err != nil { - level.Warn(util.Logger).Log("msg", "failed to close xfer file", "err", err) - } - } - }() - for { - f, err := stream.Recv() - if err == io.EOF { - break - } - if err != nil { - return errors.Wrap(err, "TransferTSDB: Recv") - } - if fromIngesterID == "" { - fromIngesterID = f.FromIngesterId - level.Info(util.Logger).Log("msg", "processing TransferTSDB request", "from_ingester", fromIngesterID) - - // Before transfer, make sure 'from' ingester is in correct state to call ClaimTokensFor later - err := i.checkFromIngesterIsInLeavingState(stream.Context(), fromIngesterID) - if err != nil { - return errors.Wrap(err, "TransferTSDB: checkFromIngesterIsInLeavingState") - } - } - bytesXfer += len(f.Data) - - createfile := func(f *client.TimeSeriesFile) (*os.File, error) { - dir := filepath.Join(tmpDir, filepath.Dir(f.Filename)) - if err := os.MkdirAll(dir, 0777); err != nil { - return nil, errors.Wrap(err, "TransferTSDB: MkdirAll") - } - file, err := os.Create(filepath.Join(tmpDir, f.Filename)) - if err != nil { - return nil, errors.Wrap(err, "TransferTSDB: Create") - } - - _, err = file.Write(f.Data) - return file, errors.Wrap(err, "TransferTSDB: Write") - } - - // Create or get existing open file - file, ok := files[f.Filename] - if !ok { - file, err = createfile(f) - if err != nil { - return errors.Wrapf(err, "unable to create file %s to store incoming TSDB block", f) - } - filesXfer++ - files[f.Filename] = file - } else { - - // Write to existing file - if _, err := file.Write(f.Data); err != nil { - return errors.Wrap(err, "TransferTSDB: Write") - } - } - } - - if err := i.lifecycler.ClaimTokensFor(stream.Context(), fromIngesterID); err != nil { - return errors.Wrap(err, "TransferTSDB: ClaimTokensFor") - } - - i.metrics.receivedBytes.Add(float64(bytesXfer)) - i.metrics.receivedFiles.Add(float64(filesXfer)) - level.Info(util.Logger).Log("msg", "Total xfer", "from_ingester", fromIngesterID, "files", filesXfer, "bytes", bytesXfer) - - // Move the tmpdir to the final location - err = os.Rename(tmpDir, i.cfg.BlocksStorageConfig.TSDB.Dir) - if err != nil { - return errors.Wrap(err, fmt.Sprintf("unable to move received TSDB blocks from %s to %s", tmpDir, i.cfg.BlocksStorageConfig.TSDB.Dir)) - } - - // At this point all TSDBs have been received, so we can proceed loading TSDBs in memory. - // This is required because of two reasons: - // 1. No WAL replay performance penalty once the ingester switches to ACTIVE state - // 2. If a query is received on user X, for which the TSDB has been transferred, before - // the first series is ingested, if we don't open the TSDB the query will return an - // empty result (because the TSDB is opened only on first push or transfer) - userIDs, err := ioutil.ReadDir(i.cfg.BlocksStorageConfig.TSDB.Dir) - if err != nil { - return errors.Wrap(err, fmt.Sprintf("unable to list TSDB users in %s", i.cfg.BlocksStorageConfig.TSDB.Dir)) - } - - for _, user := range userIDs { - userID := user.Name() - - level.Info(util.Logger).Log("msg", fmt.Sprintf("Loading TSDB for user %s", userID)) - _, err = i.getOrCreateTSDB(userID, true) - - if err != nil { - level.Error(util.Logger).Log("msg", fmt.Sprintf("Unable to load TSDB for user %s", userID), "err", err) - } else { - level.Info(util.Logger).Log("msg", fmt.Sprintf("Loaded TSDB for user %s", userID)) - } - } - - return nil - } - - if err := i.transfer(stream.Context(), xfer); err != nil { - return err - } - - // Close the stream last, as this is what tells the "from" ingester that - // it's OK to shut down. - if err := stream.SendAndClose(&client.TransferTSDBResponse{}); err != nil { - level.Error(util.Logger).Log("msg", "Error closing TransferTSDB stream", "from_ingester", fromIngesterID, "err", err) - return err - } - level.Info(util.Logger).Log("msg", "Successfully transferred tsdbs", "from_ingester", fromIngesterID) - - return nil -} - // The passed wireChunks slice is for re-use. func toWireChunks(descs []*desc, wireChunks []client.Chunk) ([]client.Chunk, error) { if cap(wireChunks) < len(descs) { @@ -408,6 +265,12 @@ func fromWireChunks(wireChunks []client.Chunk) ([]*desc, error) { // TransferOut finds an ingester in PENDING state and transfers our chunks to it. // Called as part of the ingester shutdown process. func (i *Ingester) TransferOut(ctx context.Context) error { + // The blocks storage doesn't support blocks transferring. + if i.cfg.BlocksStorageEnabled { + level.Info(util.Logger).Log("msg", "transfer between a LEAVING ingester and a PENDING one is not supported for the blocks storage") + return ring.ErrTransferDisabled + } + if i.cfg.MaxTransferRetries <= 0 { return ring.ErrTransferDisabled } @@ -438,10 +301,6 @@ func (i *Ingester) TransferOut(ctx context.Context) error { } func (i *Ingester) transferOut(ctx context.Context) error { - if i.cfg.BlocksStorageEnabled { - return i.v2TransferOut(ctx) - } - userStatesCopy := i.userStates.cp() if len(userStatesCopy) == 0 { level.Info(util.Logger).Log("msg", "nothing to transfer") @@ -512,102 +371,6 @@ func (i *Ingester) transferOut(ctx context.Context) error { return nil } -func (i *Ingester) v2TransferOut(ctx context.Context) error { - // Skip TSDB transfer if there are no DBs - i.userStatesMtx.RLock() - skip := len(i.TSDBState.dbs) == 0 - i.userStatesMtx.RUnlock() - - if skip { - level.Info(util.Logger).Log("msg", "the ingester has nothing to transfer") - return nil - } - - // This transfer function may be called multiple times in case of error, - // until the max number of retries is reached. For this reason, we run - // some initialization only once. - i.TSDBState.transferOnce.Do(func() { - // In order to transfer TSDB WAL without closing the TSDB itself - which is a - // pre-requisite to continue serving read requests while transferring - we need - // to make sure no more series will be written to the TSDB. For this reason, we - // wait until all in-flight write requests have been completed. No new write - // requests will be accepted because the "stopped" flag has already been set. - level.Info(util.Logger).Log("msg", "waiting for in-flight write requests to complete") - - // Do not use the parent context cause we don't want to interrupt while waiting - // for in-flight requests to complete if the parent context is cancelled, given - // this logic run only once. - waitCtx, waitCancel := context.WithTimeout(context.Background(), 10*time.Second) - defer waitCancel() - - if err := util.WaitGroup(waitCtx, &i.TSDBState.inflightWriteReqs); err != nil { - level.Warn(util.Logger).Log("msg", "timeout expired while waiting in-flight write requests to complete, transfer will continue anyway", "err", err) - } - - // Before beginning transfer, we need to make sure no WAL compaction will occur. - // If there's an on-going compaction, the DisableCompactions() will wait until - // completed. - level.Info(util.Logger).Log("msg", "disabling compaction on all TSDBs") - - i.userStatesMtx.RLock() - wg := &sync.WaitGroup{} - wg.Add(len(i.TSDBState.dbs)) - - for _, userDB := range i.TSDBState.dbs { - go func(db *userTSDB) { - defer wg.Done() - db.DisableCompactions() - }(userDB) - } - - i.userStatesMtx.RUnlock() - wg.Wait() - }) - - // Look for a joining ingester to transfer blocks and WAL to - targetIngester, err := i.findTargetIngester(ctx) - if err != nil { - return errors.Wrap(err, "cannot find ingester to transfer blocks to") - } - - level.Info(util.Logger).Log("msg", "begin transferring TSDB blocks and WAL to joining ingester", "to_ingester", targetIngester.Addr) - c, err := i.cfg.ingesterClientFactory(targetIngester.Addr, i.clientConfig) - if err != nil { - return err - } - defer c.Close() - - ctx = user.InjectOrgID(ctx, "-1") - stream, err := c.TransferTSDB(ctx) - if err != nil { - return errors.Wrap(err, "TransferTSDB() has failed") - } - - // Grab a list of all blocks that need to be shipped - blocks, err := unshippedBlocks(i.cfg.BlocksStorageConfig.TSDB.Dir) - if err != nil { - return err - } - - for user, blockIDs := range blocks { - // Transfer the users TSDB - // TODO(thor) transferring users can be done concurrently - i.transferUser(ctx, stream, i.cfg.BlocksStorageConfig.TSDB.Dir, i.lifecycler.ID, user, blockIDs) - } - - _, err = stream.CloseAndRecv() - if err != nil { - return errors.Wrap(err, "CloseAndRecv") - } - - // The transfer out has been successfully completed. Now we should close - // all open TSDBs: the Close() will wait until all on-going read operations - // will be completed. - i.closeAllTSDB() - - return nil -} - // findTargetIngester finds an ingester in PENDING state. func (i *Ingester) findTargetIngester(ctx context.Context) (*ring.IngesterDesc, error) { ringDesc, err := i.lifecycler.KVStore.Get(ctx, i.lifecycler.RingKey) @@ -624,176 +387,3 @@ func (i *Ingester) findTargetIngester(ctx context.Context) (*ring.IngesterDesc, return &ingesters[0], nil } - -// unshippedBlocks returns a ulid list of blocks that haven't been shipped -func unshippedBlocks(dir string) (map[string][]string, error) { - userIDs, err := ioutil.ReadDir(dir) - if err != nil { - return nil, errors.Wrap(err, "unable to list the directory containing TSDB blocks") - } - - blocks := make(map[string][]string, len(userIDs)) - for _, user := range userIDs { - userID := user.Name() - userDir := filepath.Join(dir, userID) - - // Ensure the user dir is actually a directory. There may be spurious files - // in the storage, especially when using Minio in the local development environment. - if stat, err := os.Stat(userDir); err == nil && !stat.IsDir() { - level.Warn(util.Logger).Log("msg", "skipping entry while transferring TSDB blocks because not a directory", "path", userDir) - continue - } - - // Seed the map with the userID to ensure we transfer the WAL, even if all blocks are shipped. - blocks[userID] = []string{} - - blockIDs, err := ioutil.ReadDir(userDir) - if err != nil { - return nil, err - } - - m, err := shipper.ReadMetaFile(userDir) - if err != nil { - if !os.IsNotExist(err) { - return nil, err - } - - // If the meta file doesn't exit, it means the first sync for this - // user didn't occur yet, so we're going to consider all blocks unshipped. - m = &shipper.Meta{} - } - - shipped := make(map[string]bool) - for _, u := range m.Uploaded { - shipped[u.String()] = true - } - - for _, blockID := range blockIDs { - _, err := ulid.Parse(blockID.Name()) - if err != nil { - continue - } - - if _, ok := shipped[blockID.Name()]; !ok { - blocks[userID] = append(blocks[userID], blockID.Name()) - } - } - } - - return blocks, nil -} - -func (i *Ingester) transferUser(ctx context.Context, stream client.Ingester_TransferTSDBClient, dir, ingesterID, userID string, blocks []string) { - level.Info(util.Logger).Log("msg", "transferring user blocks", "user", userID) - // Transfer all blocks - for _, blk := range blocks { - err := filepath.Walk(filepath.Join(dir, userID, blk), func(path string, info os.FileInfo, err error) error { - if err != nil { - return nil - } - - if info.IsDir() { - return nil - } - - b, err := ioutil.ReadFile(path) - if err != nil { - return err - } - - p, err := filepath.Rel(dir, path) - if err != nil { - return err - } - - if err := batchSend(1024*1024, b, stream, &client.TimeSeriesFile{ - FromIngesterId: ingesterID, - UserId: userID, - Filename: p, - }, i.metrics.sentBytes); err != nil { - return err - } - - i.metrics.sentFiles.Add(1) - return nil - }) - if err != nil { - level.Warn(util.Logger).Log("msg", "failed to transfer all user blocks", "err", err) - } - } - - // Transfer WAL - level.Info(util.Logger).Log("msg", "transferring user WAL", "user", userID) - err := filepath.Walk(filepath.Join(dir, userID, "wal"), func(path string, info os.FileInfo, err error) error { - if err != nil { - return nil - } - - if info.IsDir() { - return nil - } - - b, err := ioutil.ReadFile(path) - if err != nil { - return err - } - - p, err := filepath.Rel(dir, path) - if err != nil { - return err - } - - if err := batchSend(1024*1024, b, stream, &client.TimeSeriesFile{ - FromIngesterId: ingesterID, - UserId: userID, - Filename: p, - }, i.metrics.sentBytes); err != nil { - return err - } - - i.metrics.sentFiles.Add(1) - return nil - }) - - if err != nil { - level.Warn(util.Logger).Log("msg", "failed to transfer user WAL", "err", err) - } - - level.Info(util.Logger).Log("msg", "user blocks and WAL transfer completed", "user", userID) -} - -func batchSend(batch int, b []byte, stream client.Ingester_TransferTSDBClient, tsfile *client.TimeSeriesFile, sentBytes prometheus.Counter) error { - // Split file into smaller blocks for xfer - i := 0 - for ; i+batch < len(b); i += batch { - tsfile.Data = b[i : i+batch] - err := client.SendTimeSeriesFile(stream, tsfile) - if err != nil { - return err - } - sentBytes.Add(float64(len(tsfile.Data))) - } - - // Send final data - if i < len(b) { - tsfile.Data = b[i:] - err := client.SendTimeSeriesFile(stream, tsfile) - if err != nil { - return err - } - sentBytes.Add(float64(len(tsfile.Data))) - } - - return nil -} - -func removeEmptyDir(dir string) error { - if _, err := os.Stat(dir); err != nil { - if os.IsNotExist(err) { - return nil - } - return err - } - - return os.Remove(dir) -} diff --git a/pkg/ingester/transfer_test.go b/pkg/ingester/transfer_test.go deleted file mode 100644 index 54df0d81289..00000000000 --- a/pkg/ingester/transfer_test.go +++ /dev/null @@ -1,244 +0,0 @@ -package ingester - -import ( - "context" - "crypto/rand" - "fmt" - "io/ioutil" - rnd "math/rand" - "os" - "path/filepath" - "strings" - "testing" - "time" - - "github.com/oklog/ulid" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/thanos-io/thanos/pkg/shipper" - "google.golang.org/grpc" - - "github.com/cortexproject/cortex/pkg/ingester/client" -) - -type testUserTSDB struct { - userID string - shipPercent int - numBlocks int - meta *shipper.Meta - unshipped []string -} - -func createTSDB(t *testing.T, dir string, users []*testUserTSDB) { - createAndWrite := func(t *testing.T, path string) { - f, err := os.Create(path) - require.NoError(t, err) - defer f.Close() - _, err = f.Write([]byte("a man a plan a canal panama")) - require.NoError(t, err) - } - - for _, user := range users { - userDir := filepath.Join(dir, user.userID) - - err := os.MkdirAll(userDir, 0777) - require.NoError(t, err) - - // Generate blocks. - for i := 0; i < user.numBlocks; i++ { - u, err := ulid.New(uint64(time.Now().Unix()*1000), rand.Reader) - require.NoError(t, err) - - blockDir := filepath.Join(userDir, u.String()) - require.NoError(t, os.MkdirAll(filepath.Join(blockDir, "chunks"), 0777)) - - for i := 0; i < 2; i++ { - createAndWrite(t, filepath.Join(blockDir, "chunks", fmt.Sprintf("00000%v", i))) - } - - meta := []string{"index", "meta.json", "tombstones"} - for _, name := range meta { - createAndWrite(t, filepath.Join(blockDir, name)) - } - - // Record if this block is to be "shipped" - if rnd.Intn(100) < user.shipPercent { - user.meta.Uploaded = append(user.meta.Uploaded, u) - } else { - user.unshipped = append(user.unshipped, u.String()) - } - } - - // Generate WAL. - require.NoError(t, os.MkdirAll(filepath.Join(userDir, "wal", "checkpoint.000419"), 0777)) - createAndWrite(t, filepath.Join(userDir, "wal", "000001")) - createAndWrite(t, filepath.Join(userDir, "wal", "checkpoint.000419", "000000")) - - require.NoError(t, shipper.WriteMetaFile(nil, filepath.Join(dir, user.userID), user.meta)) - } -} - -func TestUnshippedBlocks(t *testing.T) { - dir, err := ioutil.TempDir("", "tsdb") - require.NoError(t, err) - - // Validate empty dir - blks, err := unshippedBlocks(dir) - require.NoError(t, err) - require.Empty(t, blks) - - /* - Create three user dirs - One of them has some blocks shipped, - One of them has all blocks shipped, - One of them has no blocks shipped, - */ - users := []*testUserTSDB{ - { - userID: "0", - shipPercent: 70, - numBlocks: 10, - meta: &shipper.Meta{ - Version: shipper.MetaVersion1, - }, - unshipped: []string{}, - }, - { - userID: "1", - shipPercent: 100, - numBlocks: 10, - meta: &shipper.Meta{ - Version: shipper.MetaVersion1, - }, - unshipped: []string{}, - }, - { - userID: "2", - shipPercent: 0, - numBlocks: 10, - meta: &shipper.Meta{ - Version: shipper.MetaVersion1, - }, - unshipped: []string{}, - }, - } - - createTSDB(t, dir, users) - - blks, err = unshippedBlocks(dir) - require.NoError(t, err) - for _, u := range users { - _, ok := blks[u.userID] - require.True(t, ok) - } - - // Validate the unshipped blocks against the returned list - for _, user := range users { - require.ElementsMatch(t, user.unshipped, blks[user.userID]) - } -} - -type MockTransferTSDBClient struct { - Dir string - - grpc.ClientStream -} - -func (m *MockTransferTSDBClient) Send(f *client.TimeSeriesFile) error { - dir, _ := filepath.Split(f.Filename) - if err := os.MkdirAll(filepath.Join(m.Dir, dir), 0777); err != nil { - return err - } - if _, err := os.Create(filepath.Join(m.Dir, f.Filename)); err != nil { - return err - } - return nil -} - -func (m *MockTransferTSDBClient) CloseAndRecv() (*client.TransferTSDBResponse, error) { - return &client.TransferTSDBResponse{}, nil -} - -func (m *MockTransferTSDBClient) Context() context.Context { - return context.Background() -} - -func TestTransferUser(t *testing.T) { - reg := prometheus.NewPedanticRegistry() - - // Create an ingester without starting it (not needed). - i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), reg) - require.NoError(t, err) - defer cleanup() - - // Create a fake TSDB on disk. - dir, err := ioutil.TempDir("", "tsdb") - require.NoError(t, err) - - createTSDB(t, dir, []*testUserTSDB{ - { - userID: "0", - shipPercent: 0, - numBlocks: 3, - meta: &shipper.Meta{ - Version: shipper.MetaVersion1, - }, - }, - }) - - blks, err := unshippedBlocks(dir) - require.NoError(t, err) - - xfer, err := ioutil.TempDir("", "xfer") - require.NoError(t, err) - m := &MockTransferTSDBClient{ - Dir: xfer, - } - i.transferUser(context.Background(), m, dir, "test", "0", blks["0"]) - - var original []string - var xferfiles []string - err = filepath.Walk(xfer, func(path string, info os.FileInfo, err error) error { - p, _ := filepath.Rel(xfer, path) - xferfiles = append(xferfiles, p) - return nil - }) - require.NoError(t, err) - - err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if info.Name() == "thanos.shipper.json" { - return nil - } - p, _ := filepath.Rel(dir, path) - original = append(original, p) - return nil - }) - require.NoError(t, err) - - require.Equal(t, original, xferfiles) - - // Assert exported metrics (3 blocks, 5 files per block, 2 files WAL). - metricNames := []string{ - "cortex_ingester_sent_files", - "cortex_ingester_received_files", - "cortex_ingester_received_bytes_total", - "cortex_ingester_sent_bytes_total", - } - - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` - # HELP cortex_ingester_sent_files The total number of files sent by this ingester whilst leaving. - # TYPE cortex_ingester_sent_files counter - cortex_ingester_sent_files 17 - # HELP cortex_ingester_received_files The total number of files received by this ingester whilst joining - # TYPE cortex_ingester_received_files counter - cortex_ingester_received_files 0 - # HELP cortex_ingester_received_bytes_total The total number of bytes received by this ingester whilst joining - # TYPE cortex_ingester_received_bytes_total counter - cortex_ingester_received_bytes_total 0 - # HELP cortex_ingester_sent_bytes_total The total number of bytes sent by this ingester whilst leaving - # TYPE cortex_ingester_sent_bytes_total counter - cortex_ingester_sent_bytes_total 459 - `), metricNames...)) -} diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 61ff7e3e33e..9928942688b 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -172,7 +172,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.HeadCompactionIdleTimeout, "experimental.blocks-storage.tsdb.head-compaction-idle-timeout", 1*time.Hour, "If TSDB head is idle for this duration, it is compacted. 0 means disabled.") f.IntVar(&cfg.StripeSize, "experimental.blocks-storage.tsdb.stripe-size", 16384, "The number of shards of series to use in TSDB (must be a power of 2). Reducing this will decrease memory footprint, but can negatively impact performance.") f.BoolVar(&cfg.WALCompressionEnabled, "experimental.blocks-storage.tsdb.wal-compression-enabled", false, "True to enable TSDB WAL compression.") - f.BoolVar(&cfg.FlushBlocksOnShutdown, "experimental.blocks-storage.tsdb.flush-blocks-on-shutdown", false, "If true, and transfer of blocks on shutdown fails or is disabled, incomplete blocks are flushed to storage instead. If false, incomplete blocks will be reused after restart, and uploaded when finished.") + f.BoolVar(&cfg.FlushBlocksOnShutdown, "experimental.blocks-storage.tsdb.flush-blocks-on-shutdown", false, "True to flush blocks to storage on shutdown. If false, incomplete blocks will be reused after restart.") } // Validate the config.