摘要:值得一提的是目前的服务即服务,暂没有其他的服务功能,所以基本上相关的配置指代的就是。会将请求传递给各个中间件,最终最终传递给处理。源码剖析系列目录
作者:bromine
链接:https://www.jianshu.com/p/411...
來源:简书
著作权归作者所有,本文已获得作者授权转载,并对原文进行了重新的排版。
Swoft Github: https://github.com/swoft-clou...
Swoft提供了一个自建RPC(远程方法调用)实现,让你可以方便的调用其他Swoft上的服务。
RPC服务端的初始化RPC有两种启动方式Http伴随启动和RPC多带带启动。值得一提的是目前swoole的tcp服务即RPC服务,暂没有其他的tcp服务功能,所以基本上tcp相关的配置指代的就是RPC。
Http伴随启动Swoft 的 RPC 服务在Http服务启动时候伴随启动
//SwoftHttpServerHttpHttpServer.php
/**
* Http Server
*/
class HttpServer extends AbstractServer
/**
* Start Server
*
* @throws SwoftExceptionRuntimeException
*/
public function start()
{
//code ...
//根据.env配置文件Server区段的TCPABLE字段决定是否启动RPC服务
if ((int)$this->serverSetting["tcpable"] === 1) {
$this->registerRpcEvent();
}
//code ....
}
}
Swoole监听
初始化流程即根据相关注解注册一个swoole监听
//SwoftHttpServerHttpHttpServer.php
/**
* Register rpc event, swoft/rpc-server required
*
* @throws SwoftExceptionRuntimeException
*/
protected function registerRpcEvent()
{
//含有@SwooleListener且type为SwooleEvent::TYPE_PORT的Bean,即RpcEventListener
$swooleListeners = SwooleListenerCollector::getCollector();
if (!isset($swooleListeners[SwooleEvent::TYPE_PORT][0]) || empty($swooleListeners[SwooleEvent::TYPE_PORT][0])) {
throw new RuntimeException("Please use swoft/rpc-server, run "composer require swoft/rpc-server"");
}
//添加swoole RPC相关的tcp监听端口,使用的是.env文件中的TCP区段配置
$this->listen = $this->server->listen($this->tcpSetting["host"], $this->tcpSetting["port"], $this->tcpSetting["type"]);
$tcpSetting = $this->getListenTcpSetting();
$this->listen->set($tcpSetting);
//根据RpcEventListener的相关注解添加监听处理句柄
$swooleRpcPortEvents = $swooleListeners[SwooleEvent::TYPE_PORT][0];
$this->registerSwooleEvents($this->listen, $swooleRpcPortEvents);
}
由于是初版,根据@SwooleListener获取RPC监听Bean的相关处理暂时还有点生硬。
目前swoft中type为SwooleEvent::TYPE_PORT的@SwooleListener只有RpcEventListener一个,如果添加了同类Bean容易出问题,稳定版出的时候应该会有相关优化。
入口从SwoftHttpServerCommandServerCommand换成SwoftRpcServerCommandRpcCommand,流程和Http大同小异,就是swoole的设定监听,仅仅是少了HTTP相关的监听接口和事件而已,此处不再赘述。
RPC请求处理RPC服务器和HTTP服务器的区别仅仅在于与客户端交互报文格式和报文所在的网络层(Swoft的RPC基于TCP层次),运行原理基本相通,都是路由,中间件,RPC Service(对应Http的Controller),你完全可以以Http服务的思路去理解他。
Swoole的RPC-TCP监听设置好后,RPC服务端就可以开始接受请求了。RpcEventListener的负责的工作仅仅是把收到的数据转发给SwoftRpcServerServiceDispatcher分发。Dispatcher会将请求传递给各个Middleware中间件,最终最终传递给HandlerAdapterMiddleware处理。
PackerMiddlewarePackerMiddleware 是RPC中比较重要的一个中间件,负责将TCP请求中数据流解包和数据流封包。
getAttribute(self::ATTRIBUTE_DATA);
$data = $packer->unpack($data);
// 触发一个RpcServerEvent::BEFORE_RECEIVE事件,默认只有一个用于添加请求上下文信息的BeforeReceiveListener
// 利用中间件触发流程关键事件的做法耦合有点高,猜测以后会调整
App::trigger(RpcServerEvent::BEFORE_RECEIVE, null, $data);
//替换解包后的解包到Request中,提供给后续中间件和Handler使用
$request = $request->withAttribute(self::ATTRIBUTE_DATA, $data);
/* @var SwoftRpcServerRpcResponse $response */
$response = $handler->handle($request);
//为Response封包返回给RPC客户端
$serviceResult = $response->getAttribute(HandlerAdapter::ATTRIBUTE);
$serviceResult = $packer->pack($serviceResult);
return $response->withAttribute(HandlerAdapter::ATTRIBUTE, $serviceResult);
}
}
RouterMiddleware
RouterMiddleware负责根据RPC请求的method,version,interface 获取处理的RPC服务类,充当了路由的作用
getAttribute(PackerMiddleware::ATTRIBUTE_DATA);
$method = $data["method"]??"";
$version = $data["version"]??"";
$interface = $data["interface"]??"";
/* @var SwoftRpcServerRouterHandlerMapping $serviceRouter */
$serviceRouter = App::getBean("serviceRouter");
//路由匹配,即向SwoftRpcServerRouterHandlerMapping->$routes获取RPC服务信息
$serviceHandler = $serviceRouter->getHandler($interface, $version, $method);
// deliver service data
$request = $request->withAttribute(self::ATTRIBUTE, $serviceHandler);
return $handler->handle($request);
}
}
Swoft启动阶段会扫描并初始化注解信息(参考注解章节),注解初始化完毕后会触发一个AppEvent::APPLICATION_LOADER事件,此时会将来自@Service的所有RPC的路由信息注册到SwoftRpcServerRouterHandlerMapping->$routes中,用于serviceRouter Bean的路由匹配。
HandlerAdapterMiddlewareHandlerAdapterMiddleware最终转发请求给HandlerAdapter处理,HandlerAdapter会使用刚刚RouterMiddleware匹配到的服务类信息转发请求并封装Response最终返回给ServiceDispatcher,ServiceDispatcher会返回TCP流给客户端然后结束本次请求。
getAttribute(PackerMiddleware::ATTRIBUTE_DATA);
$params = $data["params"] ?? [];
//路由解析出来的,处理该请求的服务Bean和方法
list($serviceClass, $method) = $handler;
$service = App::getBean($serviceClass);
// execute handler with params
$response = PhpHelper::call([$service, $method], $params);
$response = ResponseHelper::formatData($response);
// 构造Response返回客户端
if (! $response instanceof Response) {
$response = (new Response())->withAttribute(self::ATTRIBUTE, $response);
}
return $response;
}
}
RPC客户端的实现
在Bean的属性中声明@Reference,Swoft即会根据@var声明的类型注入相应的RPC客户端实例。
/** * @Reference(name="useraaaaaa") * * @var DemoInterface */ private $demoService;
依赖注入的实现会专门另外用一篇文章多带带解释,这里先看看RPC客户端的相关代码。
远程代理namespace SwoftRpcClientService;
/**
* The proxy of service
*/
class ServiceProxy
{
/**
* @param string $className
* @param string $interfaceClass
*/
public static function loadProxyClass(string $className, string $interfaceClass)
{
$reflectionClass = new ReflectionClass($interfaceClass);
$reflectionMethods = $reflectionClass->getMethods(ReflectionMethod::IS_PUBLIC);
$template = "class $className extends SwoftRpcClientService implements {$interfaceClass} {";
//SwoftRpcClientService::class
// the template of methods
$template .= self::getMethodsTemplate($reflectionMethods);
$template .= "}";
eval($template);
}
//code ...
}
和AOP一样,原理一样是使用了动态代理,更具体的说法是动态远程代理。
RPC动态客户端类实现了客户端声明的Interface类型(如DemoInterface)并继承了SwoftRpcClientService类。
动态类的实现很简单,对于接口显式声明的方法,实际上都是调用SwoftRpcClientService->call()方法。
interface DemoInterface
{
/**
* @param array $ids
* @return array
*/
public function getUsers(array $ids);
}
class 动态生成RPC客户端类 extends SwoftRpcClientService implements AppLibDemoInterface {
public function getUsers ( array $ids ) {
$params = func_get_args();
return $this->call("getUsers", $params);
}
//code ...
}
对于自动生成的defer方法,则是通过魔术方法__call(),调用SwoftRpcClientService->deferCall()
/**
* @param string $name
* @param array $arguments
*
* @return ResultInterface
* @throws RpcClientException
*/
function __call(string $name, array $arguments)
{
$method = $name;
$prefix = self::DEFER_PREFIX;//"defer"
if (strpos($name, $prefix) !== 0) {
throw new RpcClientException(sprintf("the method of %s is not exist! ", $name));
}
if ($name == $prefix) {
$method = array_shift($arguments);
} elseif (strpos($name, $prefix) === 0) {
$method = lcfirst(ltrim($name, $prefix));
}
return $this->deferCall($method, $arguments);
}
我们这里只看具有代表性的call()方法,deferCall()大致相同。
RPC客户端动态类的本质是将客户端的参数和接口信息根据Swoft自己的格式传递给RPC服务端,然后将服务器返回的数据解包取出返回值返回给RPC的调用者,对外伪装成一个普通的对象,屏蔽远程调用操作。
// SwoftRpcClientService.php
/**
* Do call service
*
* @param string $func
* @param array $params
*
* @throws Throwable
* @return mixed
*/
public function call(string $func, array $params)
{
$profileKey = $this->interface . "->" . $func;
//根据@reference的fallback属性获取降级处理句柄,在RPC服务调用失败的时候可以会使用fallback句柄代替
$fallback = $this->getFallbackHandler($func);
try {
$connectPool = $this->getPool();
$circuitBreaker = $this->getBreaker();
/* @var $client AbstractServiceConnection */
$client = $connectPool->getConnection();
//数据封包,和RPC服务端一致
$packer = service_packer();
$type = $this->getPackerName();
$data = $packer->formatData($this->interface, $this->version, $func, $params);
$packData = $packer->pack($data, $type);
//通过熔断器调用接口
$result = $circuitBreaker->call([$client, "send"], [$packData], $fallback);
if ($result === null || $result === false) {
return null;
}
//和defercall不一致这里直接收包,解包
App::profileStart($profileKey);
$result = $client->recv();
App::profileEnd($profileKey);
$connectPool->release($client);
App::debug(sprintf("%s call %s success, data=%", $this->interface, $func, json_encode($data, JSON_UNESCAPED_UNICODE)));
$result = $packer->unpack($result);
$data = $packer->checkData($result);
} catch (Throwable $throwable) {
if (empty($fallback)) {
throw $throwable;
}
//RPC调用失败则调用降级句柄,代替实际RPC服务直接返回
$data = PhpHelper::call($fallback, $params);
}
return $data;
}
熔断器
熔断器的swoft-RPC的另一重要概念,RPC的所有请求都通过熔断器发送。
熔断器使用状态模式实现,熔断器有开启,半开,关闭 3种状态,不同状态下熔断器会持有不同的状态实例,状态根据RPC调用情况切换,熔断器根据持有状态实例的不同,行为也有所不同。
circuitBreaker->serviceName . "服务,连接建立失败(null)");
}
if ($class instanceof Client && $class->isConnected() == false) {
throw new Exception($this->circuitBreaker->serviceName . "服务,当前连接已断开");
}
//调用swoole协程客户端的send()方法发送数据
$data = $class->$method(...$params);
} catch (Exception $e) {
//递增失败计数
if ($this->circuitBreaker->isClose()) {
$this->circuitBreaker->incFailCount();
}
App::error($this->circuitBreaker->serviceName . "服务,当前[关闭状态],服务端调用失败,开始服务降级容错处理,error=" . $e->getMessage());
//RPC调用失败则使用降级接口
$data = $this->circuitBreaker->fallback($fallback);
}
//失败次数过线则切换状态
$failCount = $this->circuitBreaker->getFailCounter();
$switchToFailCount = $this->circuitBreaker->getSwitchToFailCount();
if ($failCount >= $switchToFailCount && $this->circuitBreaker->isClose()) {
App::trace($this->circuitBreaker->serviceName . "服务,当前[关闭状态],服务失败次数达到上限,开始切换为开启状态,failCount=" . $failCount);
$this->circuitBreaker->switchToOpenState();
}
App::trace($this->circuitBreaker->serviceName . "服务,当前[关闭状态],failCount=" . $this->circuitBreaker->getFailCounter());
return $data;
}
}
熔断器开启状态策略
circuitBreaker->fallback();
App::trace($this->getServiceName() . "服务,当前[开启状态],执行服务fallback服务降级容错处理");
$nowTime = time();
if ($this->circuitBreaker->isOpen()
&& $nowTime > $this->circuitBreaker->getSwitchOpenToHalfOpenTime()
) {
$delayTime = $this->circuitBreaker->getDelaySwitchTimer();
// swoole定时器不是严格的,3s容错时间 ,定时切换状态的半开
$switchToHalfStateTime = $nowTime + ($delayTime / 1000) + 3;
App::getTimer()->addAfterTimer("openState", $delayTime, [$this, "delayCallback"]);
$this->circuitBreaker->setSwitchOpenToHalfOpenTime($switchToHalfStateTime);
App::trace($this->getServiceName() . "服务,当前[开启状态],创建延迟触发器,一段时间后状态切换为半开状态");
}
return $data;
}
}
熔断器半开状态策略
半开熔断器是熔断器关闭状态和熔断器开启状态的过度状态,半开熔断器的所有RPC调用都是加锁的,连续成功或者连续失败到阈值后会切换到关闭状态或者开启状态,代码类似,此处不再累述,有兴趣的读者可以自行研究。
Swoft源码剖析系列目录:https://segmentfault.com/a/11...
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/28694.html
摘要:和服务关系最密切的进程是中的进程组,绝大部分业务处理都在该进程中进行。随后触发一个事件各组件通过该事件进行配置文件加载路由注册。事件每个请求到来时仅仅会触发事件。服务器生命周期和服务基本一致,详情参考源码剖析功能实现 作者:bromine链接:https://www.jianshu.com/p/4c0...來源:简书著作权归作者所有,本文已获得作者授权转载,并对原文进行了重新的排版。S...
摘要:作者链接來源简书著作权归作者所有,本文已获得作者授权转载,并对原文进行了重新的排版。同时顺手整理个人对源码的相关理解,希望能够稍微填补学习领域的空白。系列文章只会节选关键代码辅以思路讲解,请自行配合源码阅读。 作者:bromine链接:https://www.jianshu.com/p/2f6...來源:简书著作权归作者所有,本文已获得作者授权转载,并对原文进行了重新的排版。Swoft...
摘要:作者链接來源简书著作权归作者所有,本文已获得作者授权转载,并对原文进行了重新的排版。前言为应用提供一个完整的容器作为依赖管理方案,是功能,模块等功能的实现基础。的依赖注入管理方案基于服务定位器。源码剖析系列目录 作者:bromine链接:https://www.jianshu.com/p/a23...來源:简书著作权归作者所有,本文已获得作者授权转载,并对原文进行了重新的排版。Swof...
摘要:作为定时任务的执行者,通过每唤醒自身一次,然后把执行表遍历一次,挑选当下需要执行的任务,通过投递出去并更新该任务执行表中的状态。 作者:bromine链接:https://www.jianshu.com/p/b44...來源:简书著作权归作者所有,本文已获得作者授权转载,并对原文进行了重新的排版。Swoft Github: https://github.com/swoft-clou.....
摘要:基于扩展实现真正的数据库连接池这种方案中,项目占用的连接数仅仅为。一种是连接暂时不再使用,其占用状态解除,可以从使用者手中交回到空闲队列中这种我们称为连接的归队。源码剖析系列目录 作者:bromine链接:https://www.jianshu.com/p/1a7...來源:简书著作权归作者所有,本文已获得作者授权转载,并对原文进行了重新的排版。Swoft Github: https:...
阅读 3203·2023-04-25 22:16
阅读 3031·2021-10-11 11:11
阅读 3388·2019-08-29 13:26
阅读 765·2019-08-29 12:32
阅读 3583·2019-08-26 11:49
阅读 3356·2019-08-26 10:30
阅读 2173·2019-08-23 17:59
阅读 1746·2019-08-23 17:57