Featured image of post Install php-rdkafka Extension and Use Kafka for Logging

Install php-rdkafka Extension and Use Kafka for Logging

A Preliminary Understanding of Using Message Queues

Recently, the user logs in our project have reached hundreds of millions. For convenience, we previously stored them directly in MySQL. Now big data requirements are pushing us to migrate these logs to Kafka.


Installation

Development Environment Considerations

  • Windows for local development
  • Non-compiled installation for testing environment
  • Production environment maintained by Ops team

Linux Installation

  1. Ensure pecl is installed:
    pecl help version
    
  2. Install via pecl:
    sudo pecl install rdkafka
    

Compile from Source

  1. Install dependencies (requires librdkafka):

    wget -c https://github.com/edenhill/librdkafka/archive/v0.11.0.tar.gz
    tar xvzf v0.11.0.tar.gz
    cd librdkafka-0.11.0/
    ./configure
    make 
    sudo make install
    
  2. Install php-rdkafka extension:

    wget -c https://github.com/arnaud-lb/php-rdkafka/archive/3.0.4.tar.gz
    tar xvzf 3.0.4.tar.gz
    cd php-rdkafka-3.0.4
    /usr/bin/phpize
    ./configure --with-php-config=/www/server/php/71/bin/php-config --with-rdkafka 
    make all -j 5
    sudo make install
    
  3. Add to php.ini:

    extension = rdkafka.so
    

Reference: PHP Kafka Extension Installation

Windows Installation

  1. Download precompiled DLLs matching your PHP version (NTS/TS, x86/x64) from:
    https://windows.php.net/downloads/pecl/releases/rdkafka/

  2. Place files:

    • librdkafka.dll → PHP root directory
    • php_rdkafka.dll → PHP ext directory
  3. Add to php.ini:

    extension=php_rdkafka.dll
    
  4. Verify installation:

    php -m | findstr rdkafka
    

Check Extension Version

php --ri rdkafka

/* Output:
rdkafka
rdkafka support => enabled
version => 3.0.1
librdkafka version (runtime) => 0.9.4
librdkafka version (build) => 0.9.4.0
*/

Basic Usage

Producer Example

// producer.php
$configBrokers = '127.0.0.1:9092';
$configTopic = 'test';

$conf = new \RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    var_dump('Message delivered:', $message);
});

$conf->setErrorCb(function ($kafka, $err, $reason) {
    echo "Error: $err ($reason)\n";
});

$producer = new \RdKafka\Producer($conf);
$producer->addBrokers($configBrokers);

$topic = $producer->newTopic($configTopic);

for ($i = 0; $i < 100; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Log entry: $i");
}

// Flush messages
while ($producer->getOutQLen() > 0) {
    $producer->poll(1);
}

Consumer Example

// consumer.php
$configBrokers = '127.0.0.1:9092';
$configTopic = 'test';

$consumer = new \RdKafka\Consumer();
$consumer->addBrokers($configBrokers);
$topic = $consumer->newTopic($configTopic);

$topic->consumeStart(0, RD_KAFKA_OFFSET_END);

while (true) {
    $message = $topic->consume(0, 1000);
    
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            echo "Received: {$message->payload}\n";
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timeout\n";
            break;
        default:
            echo "Error: {$message->err}\n";
            break;
    }
}

Key Notes:

  1. Producers work asynchronously - use poll() to ensure message delivery
  2. Consumers can start from RD_KAFKA_OFFSET_END to read new messages only
  3. Always handle consumer timeouts and partition EOF conditions