From 3617419a37ea079fcb6866cf262a257c0c60e9f9 Mon Sep 17 00:00:00 2001 From: Karlen Simonyan Date: Mon, 18 Dec 2017 20:54:51 +0100 Subject: [PATCH 1/7] More optimal LINQ usage --- src/KafkaNET.Library/Messages/BufferedMessageSet.cs | 2 +- src/KafkaNET.Library/Producers/Sync/SyncProducerPool.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/KafkaNET.Library/Messages/BufferedMessageSet.cs b/src/KafkaNET.Library/Messages/BufferedMessageSet.cs index 878bb84..824e7b1 100644 --- a/src/KafkaNET.Library/Messages/BufferedMessageSet.cs +++ b/src/KafkaNET.Library/Messages/BufferedMessageSet.cs @@ -311,7 +311,7 @@ private bool MaybeComputeNext() private MessageAndOffset MakeNextOuter() { - if (topIterPosition >= this.Messages.Count()) + if (!this.Messages.Skip(topIterPosition).Any()) { return AllDone(); } diff --git a/src/KafkaNET.Library/Producers/Sync/SyncProducerPool.cs b/src/KafkaNET.Library/Producers/Sync/SyncProducerPool.cs index 2e61c0f..a7dcb2f 100644 --- a/src/KafkaNET.Library/Producers/Sync/SyncProducerPool.cs +++ b/src/KafkaNET.Library/Producers/Sync/SyncProducerPool.cs @@ -91,7 +91,7 @@ public override string ToString() if (Config.ZooKeeper != null) sb.AppendFormat("\t Broker zookeeper: {0} \t", this.Config.ZooKeeper.ZkConnect); - sb.Append(string.Join(",", this.syncProducers.Select(r => string.Format("BrokerID:{0} syncProducerCount:{1} ", r.Key, r.Value.Producers.Count())).ToArray())); + sb.Append(string.Join(",", this.syncProducers.Select(r => string.Format("BrokerID:{0} syncProducerCount:{1} ", r.Key, r.Value.Producers.Count)).ToArray())); return sb.ToString(); } public void AddProducer(Broker broker) From 020d9e75de52c91d94bb364c91816d937eb9191c Mon Sep 17 00:00:00 2001 From: Karlen Simonyan Date: Mon, 18 Dec 2017 20:56:54 +0100 Subject: [PATCH 2/7] Do not lock on 'this'. Use inner object. --- src/KafkaNET.Library/Consumers/Consumer.cs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/KafkaNET.Library/Consumers/Consumer.cs b/src/KafkaNET.Library/Consumers/Consumer.cs index 1bd1a14..3fcbfbc 100644 --- a/src/KafkaNET.Library/Consumers/Consumer.cs +++ b/src/KafkaNET.Library/Consumers/Consumer.cs @@ -52,6 +52,9 @@ public class Consumer : IConsumer private KafkaConnection connection; internal long CreatedTimeInUTC; + + private readonly object _lockOp = new object(); + public ConsumerConfiguration Config { get @@ -137,7 +140,7 @@ public FetchResponse Fetch(FetchRequest request) try { Logger.Debug("Fetch is waiting for send lock"); - lock (this) + lock (_lockOp) { Logger.Debug("Fetch acquired send lock. Begin send"); return connection.Send(request); @@ -212,7 +215,7 @@ public OffsetResponse GetOffsetsBefore(OffsetRequest request) { try { - lock (this) + lock (_lockOp) { return connection.Send(request); } @@ -240,7 +243,7 @@ public IEnumerable GetMetaData(TopicMetadataRequest request) { try { - lock (this) + lock (_lockOp) { return connection.Send(request); } @@ -273,7 +276,7 @@ protected virtual void Dispose(bool disposing) { if (connection != null) { - lock (this) + lock (_lockOp) { if (connection != null) { From ff6b17319ec0cc80c3ccff92e600e7ec315492b9 Mon Sep 17 00:00:00 2001 From: Karlen Simonyan Date: Mon, 18 Dec 2017 21:03:25 +0100 Subject: [PATCH 3/7] Fix recursion in Dispose() method --- .../JavaEventServerPerfTestHelper.cs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/KafkaNET.Library/Examples/ProduceJavaEventServer/JavaEventServerPerfTestHelper.cs b/src/KafkaNET.Library/Examples/ProduceJavaEventServer/JavaEventServerPerfTestHelper.cs index 201684d..d128371 100644 --- a/src/KafkaNET.Library/Examples/ProduceJavaEventServer/JavaEventServerPerfTestHelper.cs +++ b/src/KafkaNET.Library/Examples/ProduceJavaEventServer/JavaEventServerPerfTestHelper.cs @@ -252,6 +252,8 @@ private class BatchedMessages : IDisposable int limit = 0; int size = 0; + bool _disposed; + internal BatchedMessages(int limit) { this.stream = new MemoryStream(limit); @@ -326,9 +328,13 @@ internal byte[] GetData() public void Dispose() { - if (stream != null) - stream.Dispose(); - this.Dispose(); + if (!_disposed) + { + if (stream != null) + stream.Dispose(); + + _disposed = true; + } } private uint ReverseBytes(uint value) From b5103faf510135a590e0d5b3e40e8015b33fbbad Mon Sep 17 00:00:00 2001 From: Karlen Simonyan Date: Mon, 18 Dec 2017 21:08:52 +0100 Subject: [PATCH 4/7] More readonly fields usage --- .../Consumers/ConsumerIterator.cs | 6 ++-- .../Consumers/KafkaMessageStream.cs | 4 +-- .../Consumers/ZookeeperConsumerConnector.cs | 8 +++--- .../ConsumeGroup/ConsumerGroupHelper.cs | 4 +-- .../ConsumeGroupMonitorHelper.cs | 2 +- .../JavaEventServerPerfTestHelper.cs | 10 +++---- ...roducePerfTestKafkaSimpleManagerWrapper.cs | 6 ++-- .../Exceptions/TimeStampTooSmallException.cs | 2 +- .../Helper/KafkaSimpleManager.cs | 28 +++++++++---------- .../Partitioning/BrokerPartitionInfo.cs | 2 +- src/KafkaNET.Library/Producers/Producer.cs | 2 +- src/KafkaNET.Library/Utils/Crc32Hasher.cs | 4 +-- .../Listeners/ZKRebalancerListener.cs | 6 ++-- .../Listeners/ZkPartitionLeaderListener.cs | 4 +-- 14 files changed, 44 insertions(+), 44 deletions(-) diff --git a/src/KafkaNET.Library/Consumers/ConsumerIterator.cs b/src/KafkaNET.Library/Consumers/ConsumerIterator.cs index 99c0345..e4d0451 100644 --- a/src/KafkaNET.Library/Consumers/ConsumerIterator.cs +++ b/src/KafkaNET.Library/Consumers/ConsumerIterator.cs @@ -49,9 +49,9 @@ public class ConsumerIterator : IConsumerIterator private FetchedDataChunk currentDataChunk = null; private TData nextItem; private long consumedOffset = -1; - private SemaphoreSlim makeNextSemaphore = new SemaphoreSlim(1, 1); - private string topic; - private IDecoder decoder; + private readonly SemaphoreSlim makeNextSemaphore = new SemaphoreSlim(1, 1); + private readonly string topic; + private readonly IDecoder decoder; /// /// Initializes a new instance of the class. diff --git a/src/KafkaNET.Library/Consumers/KafkaMessageStream.cs b/src/KafkaNET.Library/Consumers/KafkaMessageStream.cs index 4249afe..a5bcec5 100644 --- a/src/KafkaNET.Library/Consumers/KafkaMessageStream.cs +++ b/src/KafkaNET.Library/Consumers/KafkaMessageStream.cs @@ -35,9 +35,9 @@ public class KafkaMessageStream : IKafkaMessageStream public IConsumerIterator iterator { get; private set; } - private string topic; + private readonly string topic; - private IDecoder decoder; + private readonly IDecoder decoder; internal KafkaMessageStream(string topic, BlockingCollection queue, int consumerTimeoutMs, IDecoder decoder) { diff --git a/src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs b/src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs index 4d7f651..18d14db 100644 --- a/src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs +++ b/src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs @@ -54,11 +54,11 @@ public class ZookeeperConsumerConnector : KafkaClientBase, IZookeeperConsumerCon private readonly KafkaScheduler scheduler = new KafkaScheduler(); private readonly IDictionary> topicRegistry = new ConcurrentDictionary>(); private readonly IDictionary, BlockingCollection> queues = new Dictionary, BlockingCollection>(); - private List stopAsyncRebalancing = new List(); + private readonly List stopAsyncRebalancing = new List(); private volatile bool disposed; - private EventHandler consumerRebalanceHandler; - private EventHandler zkSessionDisconnectedHandler; - private EventHandler zkSessionExpiredHandler; + private readonly EventHandler consumerRebalanceHandler; + private readonly EventHandler zkSessionDisconnectedHandler; + private readonly EventHandler zkSessionExpiredHandler; /// /// Initializes a new instance of the class. diff --git a/src/KafkaNET.Library/Examples/ConsumeGroup/ConsumerGroupHelper.cs b/src/KafkaNET.Library/Examples/ConsumeGroup/ConsumerGroupHelper.cs index 302a82c..224ae13 100644 --- a/src/KafkaNET.Library/Examples/ConsumeGroup/ConsumerGroupHelper.cs +++ b/src/KafkaNET.Library/Examples/ConsumeGroup/ConsumerGroupHelper.cs @@ -116,8 +116,8 @@ internal class ConsumerGroupHelperUnit internal static log4net.ILog Logger = log4net.LogManager.GetLogger(typeof(ConsumerGroupHelperUnit)); internal int ThreadID; internal ConsumerConfiguration configSettings; - ConsumeGroupHelperOptions cgOptions; - AutoResetEvent resetEvent; + readonly ConsumeGroupHelperOptions cgOptions; + readonly AutoResetEvent resetEvent; internal int Count = -1; internal int consumedTotalCount = 0; internal ConsumerGroupHelperUnit(int threadID, ConsumeGroupHelperOptions cg, AutoResetEvent e, int c) diff --git a/src/KafkaNET.Library/Examples/ConsumeGroupMonitor/ConsumeGroupMonitorHelper.cs b/src/KafkaNET.Library/Examples/ConsumeGroupMonitor/ConsumeGroupMonitorHelper.cs index b551df9..f3a3951 100644 --- a/src/KafkaNET.Library/Examples/ConsumeGroupMonitor/ConsumeGroupMonitorHelper.cs +++ b/src/KafkaNET.Library/Examples/ConsumeGroupMonitor/ConsumeGroupMonitorHelper.cs @@ -128,7 +128,7 @@ internal class ConsumeGroupMonitorUnit public string group; public string topic; - private DateTime startTime = DateTime.UtcNow; + private readonly DateTime startTime = DateTime.UtcNow; private SortedDictionary latestOffsetDictLastValue = null; private SortedDictionary latestCommitedDictLastValue = null; private SortedDictionary latestOffsetDictFirstValue = null; diff --git a/src/KafkaNET.Library/Examples/ProduceJavaEventServer/JavaEventServerPerfTestHelper.cs b/src/KafkaNET.Library/Examples/ProduceJavaEventServer/JavaEventServerPerfTestHelper.cs index d128371..8d132ed 100644 --- a/src/KafkaNET.Library/Examples/ProduceJavaEventServer/JavaEventServerPerfTestHelper.cs +++ b/src/KafkaNET.Library/Examples/ProduceJavaEventServer/JavaEventServerPerfTestHelper.cs @@ -247,9 +247,9 @@ private void RunOneThread(object parameter) private class BatchedMessages : IDisposable { - MemoryStream stream; - BinaryWriter writer; - int limit = 0; + readonly MemoryStream stream; + readonly BinaryWriter writer; + readonly int limit = 0; int size = 0; bool _disposed; @@ -363,8 +363,8 @@ internal ResponseStatus() { } private class Message { - byte[] key; - byte[] val; + readonly byte[] key; + readonly byte[] val; internal Message(byte[] key, byte[] val) { diff --git a/src/KafkaNET.Library/Examples/ProducePerfTest/ProducePerfTestKafkaSimpleManagerWrapper.cs b/src/KafkaNET.Library/Examples/ProducePerfTest/ProducePerfTestKafkaSimpleManagerWrapper.cs index 8b7f49f..83e30c7 100644 --- a/src/KafkaNET.Library/Examples/ProducePerfTest/ProducePerfTestKafkaSimpleManagerWrapper.cs +++ b/src/KafkaNET.Library/Examples/ProducePerfTest/ProducePerfTestKafkaSimpleManagerWrapper.cs @@ -25,12 +25,12 @@ internal class ProducePerfTestKafkaSimpleManagerWrapper private static volatile ProducePerfTestKafkaSimpleManagerWrapper instance; private static object syncRoot = new Object(); - private object lockForDictionaryChange = new Object(); + private readonly object lockForDictionaryChange = new Object(); private KafkaSimpleManagerConfiguration config; private KafkaSimpleManager kafkaSimpleManage; - private ProducerConfiguration producerConfigTemplate; + private readonly ProducerConfiguration producerConfigTemplate; private int correlationIDGetProducer = 0; - private string clientId = "KafkaSimpleManagerProducerWrapper"; + private readonly string clientId = "KafkaSimpleManagerProducerWrapper"; private ProducePerfTestKafkaSimpleManagerWrapper() { diff --git a/src/KafkaNET.Library/Exceptions/TimeStampTooSmallException.cs b/src/KafkaNET.Library/Exceptions/TimeStampTooSmallException.cs index fad7827..a3cd5d2 100644 --- a/src/KafkaNET.Library/Exceptions/TimeStampTooSmallException.cs +++ b/src/KafkaNET.Library/Exceptions/TimeStampTooSmallException.cs @@ -12,7 +12,7 @@ namespace Kafka.Client.Exceptions public class TimeStampTooSmallException : Exception { - private long offsetTime; + private readonly long offsetTime; public TimeStampTooSmallException() diff --git a/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs b/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs index 90aa80a..b3b5cb4 100644 --- a/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs +++ b/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs @@ -65,30 +65,30 @@ public class KafkaSimpleManager : IDisposable public KafkaSimpleManagerConfiguration Config { get; private set; } - private ConcurrentDictionary TopicLockProduce = new ConcurrentDictionary(); - private ConcurrentDictionary TopicPartitionLockConsume = new ConcurrentDictionary(); + private readonly ConcurrentDictionary TopicLockProduce = new ConcurrentDictionary(); + private readonly ConcurrentDictionary TopicPartitionLockConsume = new ConcurrentDictionary(); //topic --> TopicMetadata - private ConcurrentDictionary TopicMetadatas = new ConcurrentDictionary(); + private readonly ConcurrentDictionary TopicMetadatas = new ConcurrentDictionary(); //topic --> TopicMetadatasLastUpdateTime - private ConcurrentDictionary TopicMetadatasLastUpdateTime = new ConcurrentDictionary(); + private readonly ConcurrentDictionary TopicMetadatasLastUpdateTime = new ConcurrentDictionary(); //topic --> partitionid --> - private ConcurrentDictionary>> TopicMetadataPartitionsLeaders = new ConcurrentDictionary>>(); - private ConcurrentDictionary> TopicOffsetEarliest = new ConcurrentDictionary>(); - private ConcurrentDictionary> TopicOffsetLatest = new ConcurrentDictionary>(); - private object topicProducerLock = new object(); + private readonly ConcurrentDictionary>> TopicMetadataPartitionsLeaders = new ConcurrentDictionary>>(); + private readonly ConcurrentDictionary> TopicOffsetEarliest = new ConcurrentDictionary>(); + private readonly ConcurrentDictionary> TopicOffsetLatest = new ConcurrentDictionary>(); + private readonly object topicProducerLock = new object(); //topic --> partitionid --> - private ConcurrentDictionary>> TopicPartitionsLeaderProducers = new ConcurrentDictionary>>(); - private ConcurrentDictionary> TopicProducersWithPartitionerClass = new ConcurrentDictionary>(); + private readonly ConcurrentDictionary>> TopicPartitionsLeaderProducers = new ConcurrentDictionary>>(); + private readonly ConcurrentDictionary> TopicProducersWithPartitionerClass = new ConcurrentDictionary>(); //topic --> partitionid --> - private ConcurrentDictionary> TopicPartitionsLeaderConsumers = new ConcurrentDictionary>(); + private readonly ConcurrentDictionary> TopicPartitionsLeaderConsumers = new ConcurrentDictionary>(); #region SyncProducerPool for metadata. private volatile bool disposed = false; // the pool of syncProducer for TopicMetaData requests, which retrieve PartitionMetaData, including leaders and ISRs. private volatile SyncProducerPool syncProducerPoolForMetaData = null; - private object syncProducerPoolForMetadataLock = new object(); - private Random random = new Random(); - private Random randomForGetCachedProducer = new Random(); + private readonly object syncProducerPoolForMetadataLock = new object(); + private readonly Random random = new Random(); + private readonly Random randomForGetCachedProducer = new Random(); #endregion public KafkaSimpleManager(KafkaSimpleManagerConfiguration config) diff --git a/src/KafkaNET.Library/Producers/Partitioning/BrokerPartitionInfo.cs b/src/KafkaNET.Library/Producers/Partitioning/BrokerPartitionInfo.cs index bcfe9cf..9fc56a1 100644 --- a/src/KafkaNET.Library/Producers/Partitioning/BrokerPartitionInfo.cs +++ b/src/KafkaNET.Library/Producers/Partitioning/BrokerPartitionInfo.cs @@ -42,7 +42,7 @@ public class BrokerPartitionInfo : IBrokerPartitionInfo private readonly IDictionary topicPartitionInfoLastUpdateTime = new Dictionary(); private readonly object updateLock = new object(); - private int topicMetaDataRefreshIntervalMS; + private readonly int topicMetaDataRefreshIntervalMS; private readonly ZooKeeperClient zkClient; public BrokerPartitionInfo(ISyncProducerPool syncProducerPool, IDictionary cache, IDictionary lastUpdateTime, int topicMetaDataRefreshIntervalMS, ZooKeeperClient zkClient) diff --git a/src/KafkaNET.Library/Producers/Producer.cs b/src/KafkaNET.Library/Producers/Producer.cs index d640ba9..68ae791 100644 --- a/src/KafkaNET.Library/Producers/Producer.cs +++ b/src/KafkaNET.Library/Producers/Producer.cs @@ -45,7 +45,7 @@ public class Producer : KafkaClientBase, IProducer private readonly object shuttingDownLock = new object(); private readonly IDictionary topicPartitionInfo = new Dictionary(); private readonly IDictionary topicPartitionInfoLastUpdateTime = new Dictionary(); - private SyncProducerPool syncProducerPool; + private readonly SyncProducerPool syncProducerPool; public Producer(ICallbackHandler callbackHandler) { diff --git a/src/KafkaNET.Library/Utils/Crc32Hasher.cs b/src/KafkaNET.Library/Utils/Crc32Hasher.cs index 10ad384..d615ac8 100644 --- a/src/KafkaNET.Library/Utils/Crc32Hasher.cs +++ b/src/KafkaNET.Library/Utils/Crc32Hasher.cs @@ -29,8 +29,8 @@ public class Crc32Hasher : HashAlgorithm internal const UInt32 DefaultSeed = 0xffffffff; private UInt32 hash; - private UInt32 seed; - private UInt32[] table; + private readonly UInt32 seed; + private readonly UInt32[] table; private static UInt32[] defaultTable; public Crc32Hasher() diff --git a/src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs b/src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs index ec7eae8..0e22f4c 100644 --- a/src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs +++ b/src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs @@ -43,9 +43,9 @@ internal class ZKRebalancerListener : IZooKeeperChildListener public event EventHandler ConsumerRebalance; - private IDictionary> oldPartitionsPerTopicMap = new Dictionary>(); - private IDictionary> oldConsumersPerTopicMap = new Dictionary>(); - private IDictionary> topicRegistry; + private readonly IDictionary> oldPartitionsPerTopicMap = new Dictionary>(); + private readonly IDictionary> oldConsumersPerTopicMap = new Dictionary>(); + private readonly IDictionary> topicRegistry; private readonly IDictionary, BlockingCollection> queues; private readonly string consumerIdString; private readonly object syncLock = new object(); diff --git a/src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZkPartitionLeaderListener.cs b/src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZkPartitionLeaderListener.cs index 2a55e6c..2141ef8 100644 --- a/src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZkPartitionLeaderListener.cs +++ b/src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZkPartitionLeaderListener.cs @@ -13,8 +13,8 @@ internal class ZkPartitionLeaderListener : IZooKeeperDataListener { public static ILog Logger { get { return LogManager.GetLogger(typeof(ZkPartitionLeaderListener)); } } - private ZKRebalancerListener _rebalancer; - private Dictionary _partitionLeaderMap; + private readonly ZKRebalancerListener _rebalancer; + private readonly Dictionary _partitionLeaderMap; public ZkPartitionLeaderListener(ZKRebalancerListener rebalancer, Dictionary partitionLeaderMap = null) { From 69d57c33fa1c875bd071c21c50497e6d00da5a0d Mon Sep 17 00:00:00 2001 From: Karlen Simonyan Date: Mon, 18 Dec 2017 21:11:13 +0100 Subject: [PATCH 5/7] Fix redundant catch blocks --- .../JavaEventServerPerfTestHelper.cs | 31 +++++++------------ 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/src/KafkaNET.Library/Examples/ProduceJavaEventServer/JavaEventServerPerfTestHelper.cs b/src/KafkaNET.Library/Examples/ProduceJavaEventServer/JavaEventServerPerfTestHelper.cs index 8d132ed..97b1fd4 100644 --- a/src/KafkaNET.Library/Examples/ProduceJavaEventServer/JavaEventServerPerfTestHelper.cs +++ b/src/KafkaNET.Library/Examples/ProduceJavaEventServer/JavaEventServerPerfTestHelper.cs @@ -274,6 +274,11 @@ internal bool PutMessage(byte[] val) internal bool PutMessage(byte[] key, byte[] val) { + if (val == null) + { + return false; + } + if (key == null) { if (size + 8 + val.Length > limit) return false; @@ -283,29 +288,17 @@ internal bool PutMessage(byte[] key, byte[] val) if (size + 8 + key.Length + val.Length > limit) return false; } - if (val == null) - { - return false; - } - - try + if (key == null) { - if (key == null) - { - writer.Write(0); - } - else - { - writer.Write(ReverseBytes((uint)key.Length)); - writer.Write(key); - } - writer.Write(ReverseBytes((uint)val.Length)); - writer.Write(val); + writer.Write(0); } - catch (Exception ex) + else { - throw ex; + writer.Write(ReverseBytes((uint)key.Length)); + writer.Write(key); } + writer.Write(ReverseBytes((uint)val.Length)); + writer.Write(val); if (key == null) { From cef04110cee444bcfced70f7badfbece76158854 Mon Sep 17 00:00:00 2001 From: Karlen Simonyan Date: Mon, 18 Dec 2017 21:12:53 +0100 Subject: [PATCH 6/7] Remove unused format arguments --- .../Examples/Arguments/KafkaNETExampleCommandVerb.cs | 4 ++-- src/KafkaNET.Library/Utils/KafkaConsoleUtil.cs | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/KafkaNET.Library/Examples/Arguments/KafkaNETExampleCommandVerb.cs b/src/KafkaNET.Library/Examples/Arguments/KafkaNETExampleCommandVerb.cs index aba0df7..14d968d 100644 --- a/src/KafkaNET.Library/Examples/Arguments/KafkaNETExampleCommandVerb.cs +++ b/src/KafkaNET.Library/Examples/Arguments/KafkaNETExampleCommandVerb.cs @@ -24,9 +24,9 @@ internal static string GetUsage() { StringBuilder sb = new StringBuilder(); sb.AppendFormat("{0} is one utility tool to try KafkaNET.Library.\r\n", AssemblyName); - sb.AppendFormat("Usage:\r\n", AssemblyName); + sb.Append("Usage:\r\n"); sb.AppendFormat("{0} ArgumentsAndOpitons \r\n", AssemblyName); - sb.AppendFormat("Valid verbs includes: \r\n\r\n"); + sb.Append("Valid verbs includes: \r\n\r\n"); sb.AppendFormat("\t{0,-30} {1}\r\n\r\n", KafkaNETExampleType.Topic.ToString().ToLowerInvariant(), new TopicHelperArguments().GetUsage(true)); sb.AppendFormat("\t{0,-30} {1}\r\n\r\n", KafkaNETExampleType.ConsumeSimple.ToString().ToLowerInvariant(), new ConsumeDataHelperArguments().GetUsage(true)); sb.AppendFormat("\t{0,-30} {1}\r\n\r\n", KafkaNETExampleType.ConsumeGroup.ToString().ToLowerInvariant(), new ConsumeGroupMonitorHelperOptions().GetUsage(true)); diff --git a/src/KafkaNET.Library/Utils/KafkaConsoleUtil.cs b/src/KafkaNET.Library/Utils/KafkaConsoleUtil.cs index 0730bd4..a6c070d 100644 --- a/src/KafkaNET.Library/Utils/KafkaConsoleUtil.cs +++ b/src/KafkaNET.Library/Utils/KafkaConsoleUtil.cs @@ -71,8 +71,7 @@ public static void DumpDataToFile(bool dumpDataAsUTF8, bool dumpOriginalData, St sw.Flush(); fs.Write(v.Message.Payload, 0, v.Message.Payload.Length); fs.Flush(); - sw.WriteLine("\r\n==Binary END==", - offsetBase + i, payload.Count, v.Message.Payload.Length); + sw.WriteLine("\r\n==Binary END=="); i++; totalCountOriginal++; if (totalCountOriginal >= count && count > 0) From 2ee75d46377309fc625a62bcfdd0afc5db57848e Mon Sep 17 00:00:00 2001 From: Karlen Simonyan Date: Mon, 18 Dec 2017 21:19:52 +0100 Subject: [PATCH 7/7] Fix the argument exceptions parameter names usage --- src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs | 4 ++-- src/KafkaNET.Library/ZooKeeperIntegration/ZooKeeperClient.cs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs b/src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs index 18d14db..d141474 100644 --- a/src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs +++ b/src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs @@ -78,7 +78,7 @@ public ZookeeperConsumerConnector(ConsumerConfiguration config, { if (string.IsNullOrEmpty(config.GroupId)) { - throw new ArgumentNullException("GroupId of ConsumerConfiguration should not be empty."); + throw new ArgumentException("GroupId of ConsumerConfiguration should not be empty.", nameof(config)); } Logger.Info("Enter ZookeeperConsumerConnector ..."); try @@ -555,7 +555,7 @@ private IDictionary>> Consume(IDi if (topicCountDict == null) { - throw new ArgumentNullException(); + throw new ArgumentNullException(nameof(topicCountDict)); } var dirs = new ZKGroupDirs(this.config.GroupId); diff --git a/src/KafkaNET.Library/ZooKeeperIntegration/ZooKeeperClient.cs b/src/KafkaNET.Library/ZooKeeperIntegration/ZooKeeperClient.cs index 305ef66..439fb90 100644 --- a/src/KafkaNET.Library/ZooKeeperIntegration/ZooKeeperClient.cs +++ b/src/KafkaNET.Library/ZooKeeperIntegration/ZooKeeperClient.cs @@ -952,7 +952,7 @@ private string Create(string path, object data, CreateMode mode) { if (path == null) { - throw new ArgumentNullException("Path must not be null"); + throw new ArgumentNullException(nameof(path), "Path must not be null"); } byte[] bytes = data == null ? null : this.serializer.Serialize(data);