Recently, the user logs in our project have reached hundreds of millions. For convenience, we previously stored them directly in MySQL. Now big data requirements are pushing us to migrate these logs to Kafka.
Installation
Development Environment Considerations
- Windows for local development
- Non-compiled installation for testing environment
- Production environment maintained by Ops team
Linux Installation
- Ensure
pecl
is installed:pecl help version
- Install via
pecl
:sudo pecl install rdkafka
Compile from Source
-
Install dependencies (requires
librdkafka
):wget -c https://github.com/edenhill/librdkafka/archive/v0.11.0.tar.gz tar xvzf v0.11.0.tar.gz cd librdkafka-0.11.0/ ./configure make sudo make install
-
Install php-rdkafka extension:
wget -c https://github.com/arnaud-lb/php-rdkafka/archive/3.0.4.tar.gz tar xvzf 3.0.4.tar.gz cd php-rdkafka-3.0.4 /usr/bin/phpize ./configure --with-php-config=/www/server/php/71/bin/php-config --with-rdkafka make all -j 5 sudo make install
-
Add to
php.ini
:extension = rdkafka.so
Reference: PHP Kafka Extension Installation
Windows Installation
-
Download precompiled DLLs matching your PHP version (NTS/TS, x86/x64) from:
https://windows.php.net/downloads/pecl/releases/rdkafka/ -
Place files:
librdkafka.dll
→ PHP root directoryphp_rdkafka.dll
→ PHPext
directory
-
Add to
php.ini
:extension=php_rdkafka.dll
-
Verify installation:
php -m | findstr rdkafka
Check Extension Version
php --ri rdkafka
/* Output:
rdkafka
rdkafka support => enabled
version => 3.0.1
librdkafka version (runtime) => 0.9.4
librdkafka version (build) => 0.9.4.0
*/
Basic Usage
Producer Example
// producer.php
$configBrokers = '127.0.0.1:9092';
$configTopic = 'test';
$conf = new \RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
var_dump('Message delivered:', $message);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
echo "Error: $err ($reason)\n";
});
$producer = new \RdKafka\Producer($conf);
$producer->addBrokers($configBrokers);
$topic = $producer->newTopic($configTopic);
for ($i = 0; $i < 100; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Log entry: $i");
}
// Flush messages
while ($producer->getOutQLen() > 0) {
$producer->poll(1);
}
Consumer Example
// consumer.php
$configBrokers = '127.0.0.1:9092';
$configTopic = 'test';
$consumer = new \RdKafka\Consumer();
$consumer->addBrokers($configBrokers);
$topic = $consumer->newTopic($configTopic);
$topic->consumeStart(0, RD_KAFKA_OFFSET_END);
while (true) {
$message = $topic->consume(0, 1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo "Received: {$message->payload}\n";
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timeout\n";
break;
default:
echo "Error: {$message->err}\n";
break;
}
}
Key Notes:
- Producers work asynchronously - use
poll()
to ensure message delivery - Consumers can start from
RD_KAFKA_OFFSET_END
to read new messages only - Always handle consumer timeouts and partition EOF conditions