Channel通道,类似于go语言的chan,支持多生产者协程和多消费者协程,Swoole底层自动实现了协程的切换和调度
Channel实现原理
- 通道与PHP的Array类似,仅占用内存,没有其他额外的资源申请,所有操作均为内存操作,无IO消耗
- 底层使用PHP引用计数实现,无内存拷贝。即使是传递巨大字符串或数组也不会产生额外性能消耗
方法
- Channel->push :当队列中有其他协程正在等待pop数据时,自动按顺序唤醒一个消费者协程。当队列已满时自动yield让出控制器,等待其他协程消费数据
- Channel->pop:当队列为空时自动yield,等待其他协程生产数据。消费数据后,队列可写入新的数据,自动按顺序唤醒一个生产者协程
连接池
使用Coroutine\Channel
来实现MySQL连接池可以使用defer特性来实现资源的回收,同时可以被协程调度,而且使用channel->pop
方法的时候,可以设置超时,减少一系列的心智负担
代码实现
namespace SwExample;
class MysqlPool
{
private static $instance;
private $pool; //连接池容器,一个channel
private $config;
/**
* @param null $config
* @return MysqlPool
* @desc 获取连接池实例
*/
public static function getInstance($config = null)
{
if (empty(self::$instance)) {
if (empty($config)) {
throw new \RuntimeException("mysql config empty");
}
self::$instance = new static($config);
}
return self::$instance;
}
/**
* MysqlPool constructor.
*
* @param $config
* @desc 初始化,自动创建实例,需要放在workerstart中执行
*/
public function __construct($config)
{
if (empty($this->pool)) {
$this->config = $config;
$this->pool = new chan($config['pool_size']);
for ($i = 0; $i < $config['pool_size']; $i++) {
$mysql = new MySQL();
$res = $mysql->connect($config);
if ($res == false) {
//连接失败,抛异常
throw new \RuntimeException("failed to connect mysql server.");
} else {
//mysql连接存入channel
$this->put($mysql);
}
}
}
}
/**
* @param $mysql
* @desc 放入一个mysql连接入池
*/
public function put($mysql)
{
$this->pool->push($mysql);
}
/**
* @return mixed
* @desc 获取一个连接,当超时,返回一个异常
*/
public function get()
{
$mysql = $this->pool->pop($this->config['pool_get_timeout']);
if (false === $mysql) {
throw new \RuntimeException("get mysql timeout, all mysql connection is used");
}
return $mysql;
}
/**
* @return mixed
* @desc 获取当时连接池可用对象
*/
public function getLength()
{
return $this->pool->length();
}
}
使用
<?php
require '../vendor/autoload.php';
use SwExample\MysqlPool;
$config = [
'host' => '127.0.0.1', //数据库ip
'port' => 3306, //数据库端口
'user' => 'root', //数据库用户名
'password' => 'root', //数据库密码
'database' => 'wordpress', //默认数据库名
'timeout' => 0.5, //数据库连接超时时间
'charset' => 'utf8mb4', //默认字符集
'strict_type' => true, //ture,会自动表数字转为int类型
'pool_size' => '3', //连接池大小
'pool_get_timeout' => 0.5, //当在此时间内未获得到一个连接,会立即返回。(表示所有的连接都已在使用中)
];
//创建http server
$http = new Swoole\Http\Server("0.0.0.0", 9501);
$http->set([
//"daemonize" => true, // 常驻进程模式运行
"worker_num" => 1,
"log_level" => SWOOLE_LOG_ERROR,
]);
$http->on('WorkerStart', function ($serv, $worker_id) use ($config) {
//worker启动时,每个进程都初始化连接池,在onRequest中可以直接使用
try {
MysqlPool::getInstance($config);
} catch (\Exception $e) {
//初始化异常,关闭服务
echo $e->getMessage() . PHP_EOL;
$serv->shutdown();
} catch (\Throwable $throwable) {
//初始化异常,关闭服务
echo $throwable->getMessage() . PHP_EOL;
$serv->shutdown();
}
});
$http->on('request', function ($request, $response) {
//浏览器会自动发起这个请求 避免占用请求
if ($request->server['path_info'] == '/favicon.ico') {
$response->end('');
return;
}
//获取数据库
if ($request->server['path_info'] == '/list') {
go(function () use ($request, $response) {
//从池子中获取一个实例
try {
$pool = MysqlPool::getInstance();
$mysql = $pool->get();
defer(function () use ($mysql) {
//利用defer特性,可以达到协程执行完成,归还$mysql到连接池
//好处是 可能因为业务代码很长,导致乱用或者忘记把资源归还
MysqlPool::getInstance()->put($mysql);
echo "当前可用连接数:" . MysqlPool::getInstance()->getLength() . PHP_EOL;
});
$result = $mysql->query("select * from wp_users");
$response->end(json_encode($result));
} catch (\Exception $e) {
$response->end($e->getMessage());
}
});
return;
}
echo "get request:".date('Y-m-d H:i:s').PHP_EOL;
if ($request->server['path_info'] == '/timeout') {
go(function () use ($request, $response) {
//从池子中获取一个实例
try {
$pool = MysqlPool::getInstance();
$mysql = $pool->get();
defer(function () use ($mysql) {
//协程执行完成,归还$mysql到连接池
MysqlPool::getInstance()->put($mysql);
echo "当前可用连接数:" . MysqlPool::getInstance()->getLength() . PHP_EOL;
});
$result = $mysql->query("select * from wp_users");
\Swoole\Coroutine::sleep(10); //sleep 10秒,模拟耗时操作
$response->end(date('Y-m-d H:i:s').PHP_EOL.json_encode($result));
} catch (\Exception $e) {
$response->end($e->getMessage());
}
});
return;
}
});
$http->start();
访问http://127.0.0.1:9501/list
可以看到正常的结果输出
访问http://127.0.0.1:9501/timeout
演示连接池取和存的过程
模拟timeout, 需要浏览器打开4个tab页面,都请求http://127.0.0.1:9501/timeout
,前三个应该是等10秒出结果,第四个500ms后出超时结果
如果是chrome浏览器,会对完全一样的url做并发请求限制需要加一个随机数,例如http://127.0.0.1:9501/timeout?n=0
、http://127.0.0.1:9501/timeout?n=1
swoole协程序,是有点吊
😮 😮 😮 高能时刻 自动退出
@桑先生 信你个鬼,sw版本?