Skip to content
This repository was archived by the owner on Jun 10, 2022. It is now read-only.

Conversation

tongsq
Copy link
Contributor

@tongsq tongsq commented Nov 24, 2017

when consumer restart will lost first message in partisions
i have send 5 messages ,but get only get 4 message
my kafka version is 1.0.0

when consumer restart will lost first message in partisions
i have send 5 messages ,but get only get 4 message
my kafka version is 1.0.0
@nmred nmred merged commit d9e421c into weiboad:master Nov 30, 2017
@lcobucci
Copy link
Contributor

lcobucci commented Dec 10, 2017

@nmred while working on #147 I've found out that this patch is actually wrong, as you can see on the logs in Travis, the second execution of the consumer gets more messages instead of 30 (which is the amount of messages sent by the producers). This got fixed by reverting this PR.

@tongsq is it possible that you have the consumer offset reset action set as latest (default value) instead of earliest? (@nmred feel free to correct me) As far as I saw, that option defines the behaviour when that consumer group doesn't have an offset registered in Kafka, so latest will reset the consumer group to the latest offset of the topic when the first consumer of the group is started, whereas earliest will use offset 0 as starting point of the consumer group. You can configure that option using $consumerConfig->setOffsetReset('earliest');

@noname007
Copy link
Contributor

noname007 commented Jan 29, 2018

earliest means not offset from 0 but from the offset stored in offset manager server

@noname007
Copy link
Contributor

i meet the problem in the production machine. and i past the log message at the below

@noname007
Copy link
Contributor

@lcobucci

@noname007
Copy link
Contributor

noname007 commented Jan 29, 2018

[2018-01-29 02:32:05] general_approval_logger.DEBUG: Start sync metadata request
[2018-01-29 02:32:05] general_approval_logger.DEBUG: Start sync metadata request params:["mytopic-2","mytopic"]
[2018-01-29 02:32:05] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: MetadataRequest  ApiVersion: 0
[2018-01-29 02:32:06] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: GroupCoordinatorRequest  ApiVersion: 0
[2018-01-29 02:32:07] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: JoinGroupRequest  ApiVersion: 1
[2018-01-29 02:32:07] general_approval_logger.DEBUG: Join group start, params:{"group_id":"mygroupid","session_timeout":30000,"rebalance_timeout":30000,"member_id":"","data":[{"protocol_name":"range","version":0,"subscription":["mytopic-2","mytopic"],"user_data":""}]}
[2018-01-29 02:32:14] general_approval_logger.DEBUG: Join group sucess, params: {"errorCode":0,"generationId":156,"groupProtocol":"range","leaderId":"kafka-php-9e451968-46b0-4222-ad6f-1a6f381f774b","memberId":"kafka-php-9e451968-46b0-4222-ad6f-1a6f381f774b","members":[{"memberId":"kafka-php-9e451968-46b0-4222-ad6f-1a6f381f774b","memberMeta":{"version":0,"topics":["mytopic-2","mytopic"],"userData":""}}]}
[2018-01-29 02:32:15] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: SyncGroupRequest  ApiVersion: 0
[2018-01-29 02:32:15] general_approval_logger.DEBUG: Sync group start, params:{"group_id":"mygroupid","generation_id":156,"member_id":"kafka-php-9e451968-46b0-4222-ad6f-1a6f381f774b","data":[{"version":0,"member_id":"kafka-php-9e451968-46b0-4222-ad6f-1a6f381f774b","assignments":{"mytopic-2":{"topic_name":"mytopic-2","partitions":[2,1,0]},"mytopic":{"topic_name":"mytopic","partitions":[2,1,0]}}}]}
[2018-01-29 02:32:15] general_approval_logger.DEBUG: Sync group sucess, params: {"errorCode":0,"partitionAssignments":[{"topicName":"mytopic-2","partitions":[2,1,0]},{"topicName":"mytopic","partitions":[2,1,0]}],"version":0,"userData":""}
[2018-01-29 02:32:16] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: OffsetRequest  ApiVersion: 0
[2018-01-29 02:32:16] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: OffsetRequest  ApiVersion: 0
[2018-01-29 02:32:16] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: OffsetRequest  ApiVersion: 0
[2018-01-29 02:32:16] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: HeartbeatRequest  ApiVersion: 0
[2018-01-29 02:32:17] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: OffsetFetchRequest  ApiVersion: 1
[2018-01-29 02:32:17] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: OffsetRequest  ApiVersion: 0
[2018-01-29 02:32:17] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: OffsetRequest  ApiVersion: 0
[2018-01-29 02:32:17] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: OffsetRequest  ApiVersion: 0
[2018-01-29 02:32:17] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: HeartbeatRequest  ApiVersion: 0
[2018-01-29 02:32:17] general_approval_logger.DEBUG: Get current fetch offset sucess, result: [{"topicName":"mytopic-2","partitions":[{"partition":0,"errorCode":0,"metadata":"","offset":695186},{"partition":1,"errorCode":0,"metadata":"","offset":691431},{"partition":2,"errorCode":0,"metadata":"","offset":696159}]},{"topicName":"mytopic","partitions":[{"partition":0,"errorCode":0,"metadata":"","offset":9649},{"partition":1,"errorCode":0,"metadata":"","offset":9631},{"partition":2,"errorCode":0,"metadata":"","offset":9749}]}]
[2018-01-29 02:32:18] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: OffsetRequest  ApiVersion: 0
[2018-01-29 02:32:18] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: OffsetRequest  ApiVersion: 0
[2018-01-29 02:32:18] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: OffsetRequest  ApiVersion: 0
[2018-01-29 02:32:18] general_approval_logger.DEBUG: Fetch message start, params:{"max_wait_time":100,"replica_id":-1,"min_bytes":"1000","data":[{"topic_name":"mytopic-2","partitions":[{"partition_id":2,"offset":696159,"max_bytes":65536}]},{"topic_name":"mytopic","partitions":[{"partition_id":1,"offset":9631,"max_bytes":65536}]}]}
[2018-01-29 02:32:18] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: FetchRequest  ApiVersion: 2
[2018-01-29 02:32:18] general_approval_logger.DEBUG: Fetch message start, params:{"max_wait_time":100,"replica_id":-1,"min_bytes":"1000","data":[{"topic_name":"mytopic-2","partitions":[{"partition_id":1,"offset":691431,"max_bytes":65536}]},{"topic_name":"mytopic","partitions":[{"partition_id":0,"offset":9649,"max_bytes":65536}]}]}
[2018-01-29 02:32:18] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: FetchRequest  ApiVersion: 2
[2018-01-29 02:32:18] general_approval_logger.DEBUG: Fetch message start, params:{"max_wait_time":100,"replica_id":-1,"min_bytes":"1000","data":[{"topic_name":"mytopic-2","partitions":[{"partition_id":0,"offset":695187,"max_bytes":65536}]},{"topic_name":"mytopic","partitions":[{"partition_id":2,"offset":9749,"max_bytes":65536}]}]}
[2018-01-29 02:32:18] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: FetchRequest  ApiVersion: 2
[2018-01-29 02:32:18] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: OffsetFetchRequest  ApiVersion: 1
[2018-01-29 02:32:18] general_approval_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: HeartbeatRequest  ApiVersion: 0
[2018-01-29 02:32:18] general_approval_logger.DEBUG: Get current fetch offset sucess, result: [{"topicName":"mytopic-2","partitions":[{"partition":0,"errorCode":0,"metadata":"","offset":695186},{"partition":1,"errorCode":0,"metadata":"","offset":691431},{"partition":2,"errorCode":0,"metadata":"","offset":696159}]},{"topicName":"mytopic","partitions":[{"partition":0,"errorCode":0,"metadata":"","offset":9649},{"partition":1,"errorCode":0,"metadata":"","offset":9631},{"partition":2,"errorCode":0,"metadata":"","offset":9749}]}]
[2018-01-29 02:32:18] general_approval_logger.DEBUG: Fetch success, result:{"throttleTime":0,"topics":[{"topicName":"mytopic-2","partitions":[{"partition":0,"errorCode":0,"highwaterMarkOffset":695189,"messageSetSize":4822,"messages":[{"offset":695187,"size":2481,"message":{"crc":2855610672,"magic":1,"attr":0,"timestamp":1517164335013,"key":"","value":

@noname007
Copy link
Contributor

here get offset 695186

[2018-01-29 02:32:17] general_approval_logger.DEBUG: Get current fetch offset sucess, result: [{"topicName":"mytopic-2","partitions":[{"partition":0,"errorCode":0,"metadata":"","offset":695186},{"partition":1,"errorCode":0,"metadata":"","offset":691431},{"partition":2,"errorCode":0,"metadata":"","offset":696159}]},{"topicName":"mytopic","partitions":[{"partition":0,"errorCode":0,"metadata":"","offset":9649},{"partition":1,"errorCode":0,"metadata":"","offset":9631},{"partition":2,"errorCode":0,"metadata":"","offset":9749}]}]

but here was 695187

[2018-01-29 02:32:18] general_approval_logger.DEBUG: Fetch message start, params:{"max_wait_time":100,"replica_id":-1,"min_bytes":"1000","data":[{"topic_name":"mytopic-2","partitions":[{"partition_id":0,"offset":695187,"max_bytes":65536}]},{"topic_name":"mytopic","partitions":[{"partition_id":2,"offset":9749,"max_bytes":65536}]}]}

@noname007
Copy link
Contributor

noname007 commented Jan 29, 2018

i replace the real topic name and the groupid use the fake in the pasted log info

@lcobucci
Copy link
Contributor

@noname007 can you please send a PR with a failing functional test that reproduces that? The changes in this PR were causing the current tests to fail (incorrectly)

@noname007
Copy link
Contributor

noname007 commented Jan 29, 2018

e.....how to write?i feel a litter hard to me......

@lcobucci
Copy link
Contributor

lcobucci commented Jan 29, 2018

@noname007 it's not that hard, and we're here to help you (in the worst case scenario you will learn new things).

In https://github.com/weiboad/kafka-php/tree/be0c26199eb89f0f3a032b0fb271b1dd2656155c/tests/Functional you will find the tests I've wrote (which are quite basic), you can use them as base.

What you'll have to think more is how can you reproduce that wrong behaviour (without changing anything in the src folder), like:

  • how many messages do you need to produce?
  • do you need to stop the consumer and start it again?

The great thing about adding new functional tests is that it's executed for the most important Kafka versions (0.9+) so we'll be able to guarantee 100% that this bug will never come back.

Thanks for your contribution!

@lcobucci
Copy link
Contributor

By the way, we're executing the tests using docker, if you don't have it installed or don't have any experience with it you can simply create the PR and push new commits to the branch. Travis will execute everything and validate your code (check .travis.yml if you want to try to run things on your environment as well).

@noname007
Copy link
Contributor

noname007 commented Jan 29, 2018

in my production machine i use the libuv ext and kafka version 0.10.2

the docker use the libuv extension?
in my computer not use libuv ext, i couldn't find the problem

@noname007
Copy link
Contributor

i just add test code to the test/ ,that's all the work?don't modify travis.yml?

@lcobucci
Copy link
Contributor

in my production machine i use the libuv ext and kafka version 0.10.2

the docker use the libuv extension?
in my computer not use libuv ext, i couldn't find the problem

@noname007 the docker container doesn't use the libuv but that shouldn't necessarily be a problem (unless amphp doesn't cover things properly or we are using it in the wrong way). I'll check this...

i just add test code to the test/ ,that's all the work?don't modify travis.yml?

That's the idea, isolate the failure in a functional test (test/Functional) and send the PR =)

@noname007
Copy link
Contributor

noname007 commented Jan 29, 2018

@lcobucci thanks for your help , i will have a try

@lcobucci
Copy link
Contributor

@noname007 just to give you an update, I've executed the current test suite using libuv and didn't have any issue (Kafka 1.0.0 and PHP 7.1.12).

@noname007
Copy link
Contributor

hi i meet this again

➜  ~ php -m
[PHP Modules]
bcmath
bz2
Core
ctype
curl
date
dom
exif
fileinfo
filter
gd
hash
iconv
json
libxml
mbstring
mcrypt
mysqlnd
openssl
pcre
PDO
pdo_mysql
pdo_sqlite
Phar
posix
Reflection
session
shmop
SimpleXML
sockets
SPL
sqlite3
standard
swoole
tokenizer
wddx
xml
xmlreader
xmlwriter
Zend OPcache
zip
zlib

[Zend Modules]
Zend OPcache

PHP 7.0.22 (cli) (built: Dec 10 2017 23:46:57) ( NTS )
Copyright (c) 1997-2017 The PHP Group
Zend Engine v3.0.0, Copyright (c) 1998-2017 Zend Technologies
    with Zend OPcache v7.0.22, Copyright (c) 1999-2017, by Zend Technologies



ev version 1.0.5 , i compile this manually

@noname007
Copy link
Contributor

noname007 commented Jan 30, 2018

i found when i throw a exception in the function of Consumer::start ,then, this problem appear.....



        $config = ConsumerConfig::getInstance();
        $config->setClientId('xxxxxxx');
        $config->setTopics($topics);
        $config->setMetadataBrokerList($list);
        $config->setGroupId($group_id);
        $config->setOffsetReset('earliest');
        $config->setConsumeMode(ConsumerConfig::CONSUME_BEFORE_COMMIT_OFFSET);

        $consumer = new Consumer();

        /**
         * @var Logger $logger
         */
        $logger = app()->make(LoggerInterface::class);
        if($logger->getHandlers())
        {
            $logger->setHandlers([]);
        }

        $log_file_name = './'.date('Ymd').'.log';

        $handler = (new \Monolog\Handler\StreamHandler($log_file_name))
            ->setFormatter(new \Monolog\Formatter\LineFormatter(null, null, true, true));

        $bh = new BufferHandler($handler, 100, LogLevel::DEBUG, true, true);

        $logger->pushHandler($bh);

        $consumer->setLogger($logger);

        echo '[start] at '. date('Y-m-d H:i:s'). ' log at:'.$log_file_name;
        $consumer->start(function ($topic, $part, $message) use ($logger, $bh) {
            throw exception
      }

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants