Найти последнее сохраненное зафиксированное смещение в низкоуровневом потребителе

Привет следующий код для моего потребителя,

    $conf->set('group.id','commonqueue2');
$conf->set('offset.store.method', 'broker');

$rk = new \RdKafka\Consumer($conf);
$rk->addBrokers($kafka_servers);

$topicConf = new \RdKafka\TopicConf();

$topicConf->set('auto.commit.interval.ms',1000);
$topicConf->set('offset.store.method', 'broker');
$topicConf->set('auto.offset.reset', 'smallest');

$topic = $rk->newTopic("registration17", $topicConf);

$topic->consumeStart(0,RD_KAFKA_OFFSET_STORED);

while (true) {

$message = $topic->consume(0,1000);

//store offset to broker
$topic->offsetStore($message->partition,$message->offset);

$message_offset = $message->offset;
echo $message_data = $message->payload;

}//end of while loop

В вышеупомянутой программе, как я могу найти последнее сохраненное смещение перед выполнением следующего кода

$topic->consumeStart(0,RD_KAFKA_OFFSET_STORED);

0

Решение

Есть 2 способа получить последние смещения:
1. От зоопарка Shell
Используя coomand get / consumer // offsets // num_of_partitions
Для помощи используйте эту ссылку: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper

2. Чтобы получить последнее смещение, мы даже можем использовать команду: bin / kafka-run-class.sh kafka.tools.ConsumerOffsetChecker —group —zookeeper: —topic

0

Другие решения

Других решений пока нет …