Ошибка сокета чтения тайм-аута для установки кластера kafka

Я хочу настроить тип кластера kafka для трех похожих приложений, имеющих одинаковые очереди, такие как AppA -> {TopicX, TopicY, TopicZ}, AppB -> {TopicX, TopicZ}, AppC -> {TopicX, TopicY}. Производитель и потребитель будут зависеть от приложения.
Я настроил кластер kafka с тремя брокерами, имеющими разделы 1,2,3 в трех разных конфигурационных файлах с разными портами. Затем запустите кафку сервер (кластер)

Я использую Kafka PHP обертку http://github.com/nmred/kafka-php

Таким образом, я использовал код производителя для приложения A, как

       $producer->setRequireAck(-1);
$producer->setMessages("TopicX", 0, array(json_encode($this->data)));
$producer->send();

И использовал код производителя для приложения B, как

       $producer->setRequireAck(-1);
$producer->setMessages("TopicX", 1, array(json_encode($this->data)));
$producer->send();

И так далее.

Затем я сделал свои потребительские скрипты для трех приложений, таких как

        $queues = array("TopicX", "TopicY", "TopicZ");
while(true) {
foreach($queues as $queue) {
$consumer = \Kafka\Consumer::getInstance('localhost:2181');
$consumer->setGroup('testgroup');
$consumer->setPartition($queue, 0);
$result = $consumer->fetch();
}
}

Но когда я пытаюсь выполнить потребительский скрипт для любого приложения, я получаю сообщение об ошибке вроде

«Тайм-аут чтения сокета при чтении 750437 байт с 750323 байт»

Я просто не знаю, как я могу решить эту проблему, я пытался изменить некоторые параметры конфигурации kafka, как

 zookeeper.connection.timeout.ms=24000         # Initially 6000
replica.socket.timeout.ms=15000                      # Not exists in default file

но это не сработало.

0

Решение

Вы фактически уничтожаете потребителя Kafka, объявляя его в цикле foreach, выводите его из цикла, используете цикл для установки разделов, а затем извлекаете результаты и различаете источники:

$queues = array("TopicX", "TopicY", "TopicZ");
$consumer = \Kafka\Consumer::getInstance('localhost:2181');
$consumer->setGroup('testgroup');

foreach($queues as $queue) {
$consumer->setPartition($queue, 0);
}

while(true) {
$result = $consumer->fetch();
foreach ($result as $topicName => $topic) {
foreach ($topic as $partId => $partition) {
var_dump($partition->getHighOffset());
foreach ($partition as $message) {
var_dump((string)$message);
}
}
}
}

Посмотрите github README проекта kafka php, чтобы увидеть пример.

1

Другие решения

Других решений пока нет …