Skip to content

Commit 8b65d2b

Browse files
committed
NCBC-3807: ConfigPushHandler unit tests / Handle server version regressed edge case.
Motivation: Test coverage and edge case discovery for ConfigPushHandler Modifications: * ConfigHandler doesn't blow up when attempting to log configs with Trace enabled. * ConfigPushHandler mitigates looping if server returns version older than what was pushed. * Basic unit tests for ConfigPushHandler (more to come) Change-Id: If82713f43f62353e9f99816bfe8f240eeb928081 Reviewed-on: https://review.couchbase.org/c/couchbase-net-client/+/211964 Tested-by: Build Bot <[email protected]> Reviewed-by: Jeffry Morris <[email protected]>
1 parent 1ed9446 commit 8b65d2b

File tree

3 files changed

+177
-2
lines changed

3 files changed

+177
-2
lines changed

src/Couchbase/Core/Configuration/Server/ConfigHandler.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,15 @@ public void Publish(BucketConfig config)
232232
if (_logger.IsEnabled(LogLevel.Trace))
233233
{
234234
// Only log if debug logging is enabled to avoid serialization cost
235-
_logger.LogDebug(LoggingEvents.ConfigEvent,
236-
JsonSerializer.Serialize(config, InternalSerializationContext.Default.BucketConfig));
235+
try
236+
{
237+
_logger.LogDebug(LoggingEvents.ConfigEvent,
238+
JsonSerializer.Serialize(config, InternalSerializationContext.Default.BucketConfig));
239+
}
240+
catch (Exception ex)
241+
{
242+
_logger.LogWarning(ex, "Failed to deserialize config");
243+
}
237244
}
238245

239246
if (_configQueue?.Completion.IsCompleted ?? true)

src/Couchbase/Core/Configuration/Server/ConfigPushHandler.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public ConfigPushHandler(BucketBase bucket, ClusterContext context, ILogger logg
5656
private async Task ProcessConfigPushesAsync(CancellationToken cancellationToken)
5757
{
5858
ConfigVersion lastSkippedVersion = new(0, 0);
59+
long skipsWithNoPublish = 0;
5960
while (!cancellationToken.IsCancellationRequested)
6061
{
6162
// Don't release this semaphore, it is released whenever a new config is received.
@@ -143,6 +144,7 @@ private async Task ProcessConfigPushesAsync(CancellationToken cancellationToken)
143144
// therefore, not-null means this is a new config that must be published to all subscribers.
144145
LogConfigPublished(_redactor.SystemData(_bucket.Name), fetchedBucketConfig.ConfigVersion,
145146
effectiveVersion);
147+
Interlocked.Exchange(ref skipsWithNoPublish, 0);
146148
_context.PublishConfig(fetchedBucketConfig);
147149
}
148150
else if (fetchedVersion < pushedVersion)
@@ -163,6 +165,12 @@ private async Task ProcessConfigPushesAsync(CancellationToken cancellationToken)
163165
{
164166
// a new version came in while we were publishing.
165167
LogAttemptedButSkipped(nextVersion, fetchedVersion);
168+
var skips = Interlocked.Increment(ref skipsWithNoPublish);
169+
if (skips > 100)
170+
{
171+
await Task.Delay(10).ConfigureAwait(false);
172+
}
173+
166174
TryReleaseNewVersionSemaphore();
167175
}
168176
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
#nullable enable
2+
using System;
3+
using System.Collections.Generic;
4+
using System.Runtime.CompilerServices;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Couchbase.Core;
8+
using Couchbase.Core.Bootstrapping;
9+
using Couchbase.Core.Configuration.Server;
10+
using Couchbase.Core.DI;
11+
using Couchbase.Core.Diagnostics.Tracing;
12+
using Couchbase.Core.IO.Operations;
13+
using Couchbase.Core.Logging;
14+
using Couchbase.Core.Retry;
15+
using Couchbase.Test.Common.Utils;
16+
using Couchbase.UnitTests.Utils;
17+
using Microsoft.Extensions.Logging;
18+
using Moq;
19+
using Xunit;
20+
using Xunit.Abstractions;
21+
22+
namespace Couchbase.UnitTests.Core.Configuration.Server;
23+
24+
public class ConfigPushHandlerTests
25+
{
26+
private readonly ITestOutputHelper _outputHelper;
27+
28+
public ConfigPushHandlerTests(ITestOutputHelper outputHelper)
29+
{
30+
_outputHelper = outputHelper;
31+
}
32+
[Fact]
33+
public async Task ConfigPushHandler_ServerVersionRegressed()
34+
{
35+
// server pushes (1,3), but returns (1,1)
36+
bool publishedAnything = false;
37+
var initialBucketConfig = new BucketConfig() { RevEpoch = 1, Rev = 2 };
38+
var versionPublished = new ConfigVersion(0, 0);
39+
var mockBucket = CreateBucketMock(initialConfig: initialBucketConfig, onPublish: bc =>
40+
{
41+
publishedAnything = true;
42+
Assert.NotNull(bc);
43+
versionPublished = bc!.ConfigVersion;
44+
});
45+
ClusterContext mockContext = mockBucket.Context;
46+
var mockNode = new Mock<IClusterNode>();
47+
BucketConfig getClusterMapResult = new BucketConfig() { RevEpoch = 1, Rev = 1 };
48+
getClusterMapResult.OnDeserialized();
49+
IReadOnlyCollection<HostEndpointWithPort> endpoints = new List<HostEndpointWithPort>();
50+
mockNode.Setup(x => x.GetClusterMap(It.IsAny<ConfigVersion?>(), It.IsAny<CancellationToken>()))
51+
.Returns(Task.FromResult(getClusterMapResult));
52+
mockNode.SetupGet(x => x.KeyEndPoints).Returns(endpoints);
53+
mockContext.Nodes.Add(mockNode.Object);
54+
mockBucket.Nodes.Add(mockNode.Object);
55+
mockContext.RegisterBucket(mockBucket);
56+
mockContext.Start();
57+
var logger = new TestOutputLogger(_outputHelper, nameof(ConfigPushHandler_ServerVersionRegressed));
58+
var redactor = new TypedRedactor(RedactionLevel.None);
59+
using var configPushHandler = new ConfigPushHandler(mockBucket, mockContext, logger, redactor);
60+
var pushedVersion = new ConfigVersion(1, 3);
61+
configPushHandler.ProcessConfigPush(pushedVersion);
62+
// while server is returning older version, do not publish
63+
await Task.Delay(100);
64+
Assert.NotEqual(versionPublished, pushedVersion);
65+
66+
// update the version of the config that is returned. This should result in a publish.
67+
getClusterMapResult.Rev = 3;
68+
getClusterMapResult.OnDeserialized();
69+
await Task.Delay(100);
70+
Assert.Equal(versionPublished, pushedVersion);
71+
}
72+
73+
[Fact]
74+
public async Task ConfigPushHandler_BasicAdvance()
75+
{
76+
// server pushes (1,2), and returns (1,2)
77+
bool publishedAnything = false;
78+
var initialBucketConfig = new BucketConfig() { RevEpoch = 1, Rev = 1 };
79+
var versionPublished = new ConfigVersion(0, 0);
80+
var mockBucket = CreateBucketMock(initialConfig: initialBucketConfig, onPublish: bc =>
81+
{
82+
publishedAnything = true;
83+
Assert.NotNull(bc);
84+
versionPublished = bc!.ConfigVersion;
85+
});
86+
87+
ClusterContext mockContext = mockBucket.Context;
88+
var mockNode = new Mock<IClusterNode>();
89+
BucketConfig getClusterMapResult = new BucketConfig() { RevEpoch = 1, Rev = 2 };
90+
getClusterMapResult.OnDeserialized();
91+
IReadOnlyCollection<HostEndpointWithPort> endpoints = new List<HostEndpointWithPort>();
92+
mockNode.Setup(x => x.GetClusterMap(It.IsAny<ConfigVersion?>(), It.IsAny<CancellationToken>()))
93+
.Returns(Task.FromResult(getClusterMapResult));
94+
mockNode.SetupGet(x => x.KeyEndPoints).Returns(endpoints);
95+
mockContext.Nodes.Add(mockNode.Object);
96+
mockBucket.Nodes.Add(mockNode.Object);
97+
mockContext.RegisterBucket(mockBucket);
98+
mockContext.Start();
99+
var logger = new TestOutputLogger(_outputHelper, nameof(ConfigPushHandler_ServerVersionRegressed));
100+
var redactor = new TypedRedactor(RedactionLevel.None);
101+
using var configPushHandler = new ConfigPushHandler(mockBucket, mockContext, logger, redactor);
102+
configPushHandler.ProcessConfigPush(new ConfigVersion(1, 2));
103+
await Task.Delay(100);
104+
Assert.True(publishedAnything);
105+
Assert.Equal(expected: getClusterMapResult.ConfigVersion, actual: versionPublished);
106+
}
107+
108+
private BucketBase CreateBucketMock(
109+
string bucketName = "default",
110+
BucketConfig? initialConfig = null,
111+
Action<BucketConfig?>? onPublish = null,
112+
[CallerMemberName] string caller = "CreateBucketMock")
113+
{
114+
onPublish ??= _ => { };
115+
initialConfig ??= new();
116+
Action doNothing = () => { };
117+
var mockCluster = new Mock<ICluster>(MockBehavior.Strict);
118+
var mockConfigHandler = new Mock<IConfigHandler>(MockBehavior.Strict);
119+
mockConfigHandler.Setup(ch => ch.Publish(It.IsAny<BucketConfig>())).Callback(onPublish);
120+
mockConfigHandler.Setup(ch => ch.Subscribe(It.IsAny<IConfigUpdateEventSink>())).Callback(doNothing);
121+
mockConfigHandler.Setup(ch => ch.Start(It.IsAny<bool>())).Callback(doNothing);
122+
var clusterOptions = new ClusterOptions().WithLogging(new TestOutputLoggerFactory(_outputHelper));
123+
clusterOptions.AddClusterService(mockConfigHandler.Object);
124+
var mock = new Mock<BucketBase>(
125+
bucketName,
126+
new ClusterContext(mockCluster.Object, new CancellationTokenSource(), clusterOptions),
127+
new Mock<IScopeFactory>().Object,
128+
new Mock<IRetryOrchestrator>().Object,
129+
new TestOutputLogger(_outputHelper, nameof(ConfigPushHandlerTests)),
130+
new TypedRedactor(RedactionLevel.None),
131+
new Mock<IBootstrapperFactory>().Object,
132+
NoopRequestTracer.Instance,
133+
new Mock<IOperationConfigurator>().Object,
134+
new BestEffortRetryStrategy(),
135+
initialConfig);
136+
137+
mock.SetupGet(it => it.Name).Returns(bucketName);
138+
mock.Setup(it => it.ConfigUpdatedAsync(It.IsAny<BucketConfig>()))
139+
.Returns((BucketConfig bc) =>
140+
{
141+
_outputHelper.WriteLine("Config Published: bucket={}, version={}", bucketName, bc.ConfigVersion);
142+
return Task.CompletedTask;
143+
});
144+
return mock.Object;
145+
}
146+
147+
internal class MockConfigUpdatedSink : IConfigUpdateEventSink
148+
{
149+
public Action<BucketConfig> ConfigUpdatedAction { get; set; } = _ => { };
150+
151+
public Task ConfigUpdatedAsync(BucketConfig newConfig)
152+
{
153+
ConfigUpdatedAction(newConfig);
154+
return Task.CompletedTask;
155+
}
156+
157+
public string Name => nameof(MockConfigUpdatedSink);
158+
public IEnumerable<IClusterNode> ClusterNodes { get; set; }
159+
}
160+
}

0 commit comments

Comments
 (0)