Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,14 @@ public String getCollectionUniqueId() {
}

@Override
public CollectionRoutingMap tryCombine(List<ImmutablePair<PartitionKeyRange, IServerIdentity>> ranges) {
public CollectionRoutingMap tryCombine(
List<ImmutablePair<PartitionKeyRange, IServerIdentity>> ranges,
String changeFeedIfNoneMatch) {
return null;
}

@Override
public String getChangeFeedNextIfNoneMatch() {
return null;
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.caches;

import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.ImmutablePair;
import com.azure.cosmos.implementation.routing.CollectionRoutingMap;
import com.azure.cosmos.implementation.routing.InMemoryCollectionRoutingMap;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import org.mockito.Mockito;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.util.Arrays;
import java.util.HashMap;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;

public class RxPartitionKeyRangeCacheTest {
private RxDocumentClientImpl client;
private RxCollectionCache collectionCache;
private RxPartitionKeyRangeCache cache;

@BeforeClass(groups = "unit")
public void before_test() {
client = Mockito.mock(RxDocumentClientImpl.class);
collectionCache = Mockito.mock(RxCollectionCache.class);
cache = new RxPartitionKeyRangeCache(client, collectionCache);
}

@Test(groups = "unit")
public void getRoutingMapUsesChangeFeedNextIfNoneMatchWhenNotEmpty() {
String collectionRid = "collection1";
String changeFeedToken = "token1";

PartitionKeyRange range1 = new PartitionKeyRange();
range1.setId("0");
range1.setMinInclusive(PartitionKeyRange.MINIMUM_INCLUSIVE_EFFECTIVE_PARTITION_KEY);
range1.setMaxExclusive(PartitionKeyRange.MAXIMUM_EXCLUSIVE_EFFECTIVE_PARTITION_KEY);

CollectionRoutingMap previousRoutingMap = InMemoryCollectionRoutingMap
.tryCreateCompleteRoutingMap(Arrays.asList(ImmutablePair.of(range1, null)), collectionRid, changeFeedToken);

DocumentCollection collection = new DocumentCollection();
collection.setResourceId(collectionRid);
collection.setSelfLink("dbs/db1/colls/coll1");

FeedResponse<PartitionKeyRange> response = Mockito.mock(FeedResponse.class);
when(response.getResults()).thenReturn(Arrays.asList(range1));
when(response.getContinuationToken()).thenReturn("newToken");

when(collectionCache.resolveCollectionAsync(any(), any()))
.thenReturn(Mono.just(new Utils.ValueHolder<>(collection)));

when(client.readPartitionKeyRanges(eq(collection.getSelfLink()), any(CosmosQueryRequestOptions.class)))
.thenReturn(Flux.just(response));

StepVerifier.create(cache.tryLookupAsync(null, collectionRid, previousRoutingMap, new HashMap<>()))
.expectNextMatches(routingMapHolder ->
routingMapHolder != null &&
routingMapHolder.v != null &&
changeFeedToken.equals(previousRoutingMap.getChangeFeedNextIfNoneMatch()))
.verifyComplete();
}

@Test(groups = "unit")
public void getRoutingMapWithEmptyChangeFeedNextIfNoneMatch() {
String collectionRid = "collection1";

PartitionKeyRange range1 = new PartitionKeyRange();
range1.setId("0");
range1.setMinInclusive(PartitionKeyRange.MINIMUM_INCLUSIVE_EFFECTIVE_PARTITION_KEY);
range1.setMaxExclusive(PartitionKeyRange.MAXIMUM_EXCLUSIVE_EFFECTIVE_PARTITION_KEY);

CollectionRoutingMap previousRoutingMap = InMemoryCollectionRoutingMap
.tryCreateCompleteRoutingMap(
Arrays.asList(ImmutablePair.of(range1, null)),
collectionRid,
null);

DocumentCollection collection = new DocumentCollection();
collection.setResourceId(collectionRid);
collection.setSelfLink("dbs/db1/colls/coll1");

FeedResponse<PartitionKeyRange> response = Mockito.mock(FeedResponse.class);
when(response.getResults()).thenReturn(Arrays.asList(range1));
when(response.getContinuationToken()).thenReturn("newToken");

when(collectionCache.resolveCollectionAsync(any(), any()))
.thenReturn(Mono.just(new Utils.ValueHolder<>(collection)));

when(client.readPartitionKeyRanges(eq(collection.getSelfLink()), any(CosmosQueryRequestOptions.class)))
.thenReturn(Flux.just(response));

StepVerifier.create(cache.tryLookupAsync(null, collectionRid, previousRoutingMap, new HashMap<>()))
.expectNextMatches(routingMapHolder ->
routingMapHolder != null &&
routingMapHolder.v != null &&
previousRoutingMap.getChangeFeedNextIfNoneMatch() == null)
.verifyComplete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public void before_AddressResolverTest() throws Exception {
this.routingMapCollection1BeforeSplit =
InMemoryCollectionRoutingMap.tryCreateCompleteRoutingMap(
rangesBeforeSplit1,
collection1.getResourceId());
collection1.getResourceId(),
"1");

List<ImmutablePair<PartitionKeyRange, IServerIdentity>> rangesAfterSplit1 =
new ArrayList<>();
Expand All @@ -127,7 +128,7 @@ public void before_AddressResolverTest() throws Exception {

addPartitionKeyRangeFunc.apply(rangesAfterSplit1);

this.routingMapCollection1AfterSplit = InMemoryCollectionRoutingMap.tryCreateCompleteRoutingMap(rangesAfterSplit1, collection1.getResourceId());
this.routingMapCollection1AfterSplit = InMemoryCollectionRoutingMap.tryCreateCompleteRoutingMap(rangesAfterSplit1, collection1.getResourceId(), "2");

List<ImmutablePair<PartitionKeyRange, IServerIdentity>> rangesBeforeSplit2 =
new ArrayList<>();
Expand All @@ -141,7 +142,7 @@ public void before_AddressResolverTest() throws Exception {
addPartitionKeyRangeFunc.apply(rangesBeforeSplit2);


this.routingMapCollection2BeforeSplit = InMemoryCollectionRoutingMap.tryCreateCompleteRoutingMap(rangesBeforeSplit2, collection2.getResourceId());
this.routingMapCollection2BeforeSplit = InMemoryCollectionRoutingMap.tryCreateCompleteRoutingMap(rangesBeforeSplit2, collection2.getResourceId(), "2");

List<ImmutablePair<PartitionKeyRange, IServerIdentity>> rangesAfterSplit2 =
new ArrayList<>();
Expand All @@ -162,7 +163,7 @@ public void before_AddressResolverTest() throws Exception {
addPartitionKeyRangeFunc.apply(rangesAfterSplit2);


this.routingMapCollection2AfterSplit = InMemoryCollectionRoutingMap.tryCreateCompleteRoutingMap(rangesAfterSplit2, collection2.getResourceId());
this.routingMapCollection2AfterSplit = InMemoryCollectionRoutingMap.tryCreateCompleteRoutingMap(rangesAfterSplit2, collection2.getResourceId(), "2");
}

private void TestCacheRefreshWhileRouteByPartitionKey(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

package com.azure.cosmos.implementation.query;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.implementation.Resource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ static ServerIdentityImp of(int value) {
@Test(groups = { "unit" })
public void collectionRoutingMap() {
InMemoryCollectionRoutingMap routingMap = InMemoryCollectionRoutingMap
.tryCreateCompleteRoutingMap(Arrays.asList(
.tryCreateCompleteRoutingMap(
Arrays.asList(
new ImmutablePair<>(
new PartitionKeyRange("2", "0000000050", "0000000070"), ServerIdentityImp.of(2)),
new ImmutablePair<>(new PartitionKeyRange("0", "", "0000000030"),
Expand All @@ -41,8 +42,11 @@ public void collectionRoutingMap() {
new PartitionKeyRange("1", "0000000030", "0000000050"), ServerIdentityImp.of(1)),
new ImmutablePair<>(new PartitionKeyRange("3", "0000000070", "FF"),
ServerIdentityImp.of(3))),
StringUtils.EMPTY);
StringUtils.EMPTY,
"2");

assertThat(routingMap.getChangeFeedNextIfNoneMatch()).isNotNull();
assertThat(routingMap.getChangeFeedNextIfNoneMatch()).isEqualTo("2");
assertThat("0").isEqualTo(routingMap.getOrderedPartitionKeyRanges().get(0).getId());
assertThat("1").isEqualTo(routingMap.getOrderedPartitionKeyRanges().get(1).getId());
assertThat("2").isEqualTo(routingMap.getOrderedPartitionKeyRanges().get(2).getId());
Expand Down Expand Up @@ -94,25 +98,30 @@ public void invalidRoutingMap() {
ServerIdentityImp.of(2)),
new ImmutablePair<>(new PartitionKeyRange("2", "0000000025", "0000000035"),
ServerIdentityImp.of(2))),
StringUtils.EMPTY);
StringUtils.EMPTY,
"2");
}

@Test(groups = { "unit" })
public void incompleteRoutingMap() {
InMemoryCollectionRoutingMap routingMap = InMemoryCollectionRoutingMap
.tryCreateCompleteRoutingMap(Arrays.asList(
.tryCreateCompleteRoutingMap(
Arrays.asList(
new ImmutablePair<>(new PartitionKeyRange("2", "", "0000000030"),
ServerIdentityImp.of(2)),
new ImmutablePair<>(new PartitionKeyRange("3", "0000000031", "FF"),
ServerIdentityImp.of(2))),
StringUtils.EMPTY);
StringUtils.EMPTY,
"2");

assertThat(routingMap).isNull();

routingMap = InMemoryCollectionRoutingMap.tryCreateCompleteRoutingMap(Arrays.asList(
routingMap = InMemoryCollectionRoutingMap.tryCreateCompleteRoutingMap(
Arrays.asList(
new ImmutablePair<>(new PartitionKeyRange("2", "", "0000000030"), ServerIdentityImp.of(2)),
new ImmutablePair<>(new PartitionKeyRange("3", "0000000030", "FF"), ServerIdentityImp.of(2))),
StringUtils.EMPTY);
StringUtils.EMPTY,
"2");

assertThat(routingMap).isNotNull();
}
Expand All @@ -125,7 +134,8 @@ public void goneRanges() {
new ImmutablePair(new PartitionKeyRange("2", "", "0000000030", ImmutableList.of("1", "0")), null),
new ImmutablePair(new PartitionKeyRange("3", "0000000030", "0000000032", ImmutableList.of("5")), null),
new ImmutablePair(new PartitionKeyRange("4", "0000000032", "FF"), null)),
StringUtils.EMPTY);
StringUtils.EMPTY,
"2");

assertThat(routingMap.isGone("1")).isTrue();
assertThat(routingMap.isGone("0")).isTrue();
Expand Down Expand Up @@ -168,7 +178,9 @@ public void tryCombineRanges() {
"0000000070",
"FF"),
null)
), StringUtils.EMPTY);
),
StringUtils.EMPTY,
"2");

CollectionRoutingMap newRoutingMap = routingMap.tryCombine(
ImmutableList.<ImmutablePair<PartitionKeyRange, IServerIdentity>>of(
Expand All @@ -189,7 +201,7 @@ public void tryCombineRanges() {
ImmutableList.of("0")
),
null)
));
), null);

assertThat(newRoutingMap).isNotNull();

Expand Down Expand Up @@ -230,7 +242,7 @@ public void tryCombineRanges() {
ImmutableList.of("0", "5")
),
null)
));
), null);

assertThat(newRoutingMap).isNotNull();

Expand All @@ -244,7 +256,7 @@ public void tryCombineRanges() {
ImmutableList.of("0", "4", "6")
),
null)
));
), "2");

assertThat(newRoutingMap).isNull();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public MockRoutingMapProvider(Collection<PartitionKeyRange> ranges) {
pairs.add(new ImmutablePair<>(range, null));
}

this.routingMap = InMemoryCollectionRoutingMap.tryCreateCompleteRoutingMap(pairs, StringUtils.EMPTY);
this.routingMap = InMemoryCollectionRoutingMap.tryCreateCompleteRoutingMap(pairs, StringUtils.EMPTY, null);
}

@Override
Expand Down Expand Up @@ -71,7 +71,7 @@ public MockIRoutingMapProvider(List<PartitionKeyRange> ranges) {
pairs.add(new ImmutablePair<>(range, null));
}

this.routingMap = InMemoryCollectionRoutingMap.tryCreateCompleteRoutingMap(pairs, StringUtils.EMPTY);
this.routingMap = InMemoryCollectionRoutingMap.tryCreateCompleteRoutingMap(pairs, StringUtils.EMPTY, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void readPartitionKeyRanges() throws Exception {

FeedResponseListValidator<PartitionKeyRange> validator = new FeedResponseListValidator.Builder<PartitionKeyRange>()
.totalSize(1)
.numberOfPages(1)
.numberOfPages(2) // when using changeFeed to get the pkRanges, first page is empty with continuationToken
.build();
validateQuerySuccess(feedObservable, validator, FEED_TIMEOUT);
}
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* Changed to use lower casing http header names for gateway response. - [46736](https://github.com/Azure/azure-sdk-for-java/pull/46736)
* Improved resilience around several completion events for an ssl handshake. - [PR 46734](https://github.com/Azure/azure-sdk-for-java/pull/46734)
* Added `MetadataThrottlingRetryPolicy` for `PartitionKeyRange` `RequestRateTooLargeException` handling. - [PR 46823](https://github.com/Azure/azure-sdk-for-java/pull/46823)
* Changed to use incremental change feed to get partition key ranges. - [46810](https://github.com/Azure/azure-sdk-for-java/pull/46810)

### 4.74.0 (2025-09-05)

Expand Down
Loading
Loading