Featured image of post 安装 php-rdkafka 扩展并使用 Kafka 记录日志

安装 php-rdkafka 扩展并使用 Kafka 记录日志

初步了解使用消息队列

最近项目的用户日志达到了上亿条,之前图方便,直接存储到MySQL,然后大数据的技术让我把这些日志都存储到Kafka


安装

  • 因为我的开发环境是Windows,测试环境用的不是编译安装,生产环境由运维负责维护

  • 得到你的PHP环境

  • Linux

  • 确保有pecl,运行下面的命令,没有报错那么就是已安装

    • pecl help version
    • 执行通过pecl安装
    • sudo pecl install rdkafka

  • 编译安装
    • php-rdkafka依赖php-rdkafka based on librdkafka
    • 找一个目录用于放扩展源码

参考(PHP 安装 Kafka 扩展)

## 前提条件,先确定好自己的环境目录
# phpize 目录
whereis phpize
### phpize: /www/server/php/71/bin/phpize
php -i | grep php.ini
### Configuration File (php.ini) Path => /www/server/php/71/etc
### Loaded Configuration File => /www/server/php/71/etc/php.ini
### 从上面输出找到 php-config 目录: /www/server/php/71/bin/php-config

# 先编译 librdkafka
wget -c https://github.com/edenhill/librdkafka/archive/v0.11.0.tar.gz
tar xvzf v0.11.0.tar.gz
cd librdkafka-0.11.0/
./configure
make 
sudo  make install

# 下载扩展源代码(需要确定 kafka 服务的版本以下载对应的版本)
wget -c https://github.com/arnaud-lb/php-rdkafka/archive/3.0.4.tar.gz
# 解压
tar xvzf 3.0.4.tar.gz
cd php-rdkafka-3.0.4
# 编译 (用前面找到的 phpize 目录)
/usr/bin/phpize
# 用前面找到的 php-config 配置
./configure --with-php-config=/www/server/php/71/bin/php-config --with-rdkafka 

# 安装
$ make all -j 5
$ make install

# 根据前面的 php.ini 路径写入
extension = rdkafka.so
  • Windows
php -v
PHP 7.1.9 (cli) (built: Aug 30 2017 18:33:57) ( NTS MSVC14 (Visual C++ 2015) x64 )
Copyright (c) 1997-2017 The PHP Group
Zend Engine v3.1.0, Copyright (c) 1998-2017 Zend Technologies
  • 去到这里下载对应的动态链接文件(PHP版本,X86,x64, NTS,TS都要对应上)
    https://windows.php.net/downloads/pecl/releases/rdkafka/
  • 解压之后找到两个文件librdkafka.dll, php_rdkafka.dll
  • librdkafka.dll丢进PHP安装根目录,php_rdkafka.dll丢进PHP安装目录下的ext
  • 然后在php.ini加入
php_rdkafka.dll
Warning: PHP Startup: Unable to load dynamic library 'C:\phpstudy_pro\Extensions\php\php7.1.9nts\ext\php_rdkafka.dll' - 找不到指定的模块。
 in Unknown on line 0
  • 一切正常的话,运行php -m你就能看到
D:\www\xxx>php -m
[PHP Modules]
bcmath
calendar
Core
ctype
curl
date
dom
fileinfo
filter
gd
hash
iconv
json
libxml
mbstring
mcrypt
mysqli
mysqlnd
openssl
pcre
PDO
pdo_mysql
pdo_sqlite
Phar
## 此项为 kafka
rdkafka
readline
redis
Reflection
session
SimpleXML
SPL
standard
tokenizer
wddx
xml
xmlreader
xmlwriter
zip
zlib

[Zend Modules]

通过一下命令查看扩展的版本

php --ri rdkafka

# rdkafka

# rdkafka support => enabled
# version => 3.0.1
# build date => May 19 2020 20:02:07
# librdkafka version (runtime) => 0.9.4
# librdkafka version (build) => 0.9.4.0

开始使用

####################################################
# 生产者 producer.php

$max = 100;
$configBrokers = '127.0.0.1:9092';
$configTopic = 'test';

$conf = new \RdKafka\Conf();
// 注册发送消息事件
$conf->setDrMsgCb(function ($kafka, $message) {

    var_dump('msg:', $kafka, $message);
});

// 注册错误发送的事件回调  
$conf->setErrorCb(function ($kafka, $err, $reason) {

    dump('error', $kafka, $err, $reason);
});

// 实例化生产者
$producer = new \RdKafka\Producer($conf);
$producer->addBrokers($configBrokers);

$configObj = new \RdKafka\TopicConf();
$topic = $producer->newTopic($configTopic, $configObj);

// 尝试发送几个消息, 这里注意,发送是异步的
for ($i = 0; $i < $max; ++ $i) {
    // RD_KAFKA_PARTITION_UA 让 kafka 自由选择分区
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "php send " . $i);
}

// 这里必须 poll 消息发送完毕
while (($len = $producer->getOutQLen()) > 0) {

    $producer->poll(1);
}


####################################################
# 消费者 consumer.php

$configBrokers = '127.0.0.1:9092';
$configTopic = 'test';

$consumer = new \RdKafka\Consumer();
$consumer->addBrokers($configBrokers);
$topic = $consumer->newTopic($configTopic);
// 从上一次记录的偏移量消费
$topic->consumeStart(0, RD_KAFKA_OFFSET_END);

while (true) {
    // 连接的超时时间, 如果常驻内存消费, 设置时间长点
    $message = $topic->consume(0, 60*60);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message, Carbon::now()->toDateTimeString(), 'count:' . $count);
        break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            var_dump("No more messages; will wait for more\n");
        break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            var_dump("Timed out\n");
        break;
        default:
            var_dump('error', $message);
        break;
    }
}