rpc-pigeon 中采用插件设计,插件之间低耦合设计。
pkg 目录下包含 core 和 plugins,core 包含插件中共用的组件服务,plugins 包含各个插件,如下:
pkg
├──core
│ └── log
└──plugins
├── httpserver
├── kvs
├── metrics
├── pool
├── proxy
└── plugin.go
- httpserver : api server 插件,提供 api服务;
- kvs : k8s 插件,提供自动虚拟化服务;
- metrics : 指标数据监控插件,提供指标数据采集、聚合服务;
- pool : gRPC、TCP 等连接池插件,提供资源连接管理等服务;
- proxy:gRPC、HTTP 方向代理插件,提供方向代理服务;
各插件通过注册方式进行运行,如:
var httpPlugin, gRPCPlugin proxy.Plugin gRPCPluginChan := registerProxy(gRPCPlugin.GRPCServer) // 注册 gRPC Server httpPluginChan := registerProxy(httpPlugin.HttpServer) // 注册 HTTP API Server var metricsPlugin metrics.Plugin metricsPluginChan := registerProxy(metricsPlugin.ShowMetrics) // 注册 Metrics 数据服务 <-gRPCPluginChan <-httpPluginChan <-metricsPluginChanPool 结构体定义
type Pool struct { clients chan *Client connCurrent int32 // 当前连接数 capacity int32 // 容量 size int32 // 容量大小 (动态变化) idleDur time.Duration // 空闲时间 maxLifeDur time.Duration // 最大连接时间 timeout time.Duration // Pool 的关闭超时时间 factor Factory // gRPC 工厂函数 lock sync.RWMutex // 读写锁 mode int // 连接池 模型 poolRemoteAddr string // 远程连接地址 }将连接维护在 Client 中的通道中,通道的 Size 设置成连接池的大小,处理完请求后进行回收等;
- STRICT_MODE: 在实际创建连接数达上限后,Pool 中没有连接时不会新建连接 ;
- LOOSE_MODE: 在实际创建连接数达上限后,Pool 中没有连接时会新建连接 ;
// grpc dial func grpcDial(address string) (*grpc.ClientConn, error) { ctx, ctxCancel := context.WithTimeout(context.Background(), DialTimeout) defer ctxCancel() gcc, err := grpc.DialContext(ctx, address, grpc.WithCodec(Codec()), grpc.WithInsecure(), grpc.WithBackoffMaxDelay(BackoffMaxDelay), grpc.WithInitialWindowSize(InitialWindowSize), grpc.WithInitialConnWindowSize(InitialConnWindowSize), grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(MaxSendMsgSize)), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MaxRecvMsgSize)), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: KeepAliveTime, Timeout: KeepAliveTimeout, PermitWithoutStream: true, }), ) if err != nil { logging.Log.Error("grpc dial failed !", err) } return gcc, err }通过 grpcDial 构建工厂方法,通过 pool.factor() 完成 gRPC 连接
// 从连接池取出一个连接 func (pool *Pool) Acquire(ctx context.Context) (*Client, error) { if pool.IsClose() { return nil, errors.New("Pool is closed") } var client *Client now := time.Now() select { case <-ctx.Done(): if pool.mode == STRICT_MODE { pool.lock.Lock() defer pool.lock.Unlock() var err error if pool.GetConnCurrent() >= int32(pool.capacity) { err = errors.New("Getting connection client timeout from pool") } else { client, err = pool.createClient() } return client, err } else if pool.mode == LOOSE_MODE { var err error if pool.GetConnCurrent() >= int32(pool.capacity) && pool.GetConnCurrent() <= 5*int32(pool.capacity) { client, err = pool.createClient() pool.clients <- client } return <-pool.clients, err } case client = <-pool.clients: if client != nil && pool.idleDur > 0 && client.timeUsed.Add(pool.idleDur).After(now) { client.timeUsed = now return client, nil } } // 如果连接已经是idle连接,或者是非严格模式下没有获取连接 // 则新建一个连接同时销毁原有idle连接 if client != nil { client.Destory() } client, err := pool.createClient() if err != nil { return nil, err } return client, nil }当 ctx.Done() 结束时(一般是客户端主动 Close 导致连接断开),异常进行回收处理
- pool.mode == STRICT_MODE (STRICT 模式),如果当前连接超过连接池范围则不会新建一个连接;
- pool.mode == LOOSE_MODE (LOOSE 模式),如果当前连接超过连接池范围则会新建一个连接;
代理基本原理:
- 基于TCP启动一个gRPC代理服务网关 ;
- 将gRPC请求的服务拦截到转发代理的一个函数中执行 ;
- 接收客户端的请求,处理业务指标后转发给服务端 ;
- 接收服务端的响应,处理业务指标后转发给客户端 ;
gRPC的客户端将所有的请求都发给 gRPC Server Proxy,这个代理网关实现请求转发,将gRPC Client的请求流转发到gRPC 服务实现的节点上。并将服务处理结果响应返回给客户端 。
在 gRPC 框架代码中的 HandleStream 存在两类服务,一类是已知服务 knownService, 第二类是 unknownService ;
已知服务 knownService 就是 gRPC 服务端代码注册到 gRPC 框架中的服务,叫做已知服务,其他没有注册的服务叫做未知服务,这个未知服务 unknownService 就是我们实现 gRPC 服务代码的关键所在;
要实现 gRPC 服务代理,我们在创建 gRPC 服务 grpc.NewServer 时,传递一个未知服务的 Handler,将未知服务的处理进行接管,然后通过注册的这个 Handler 实现 gRPC 代理转发的逻辑.
GrpcProxyTransport 是自定义的 StreamDirector,将 unknownService 的请求由 GrpcProxyTransport 层处理(这里有实现比较负责的连接池 + 负载均衡 的设计),提供代理和负载的能力;
基于如上描述,gRPC 代理的原理如下:
- 创建 gRPC 服务时,注册一个未知服务处理器 Handler 和一个自定义的编码 Codec 编码和解码,此处使用 proto 标准的 Codec;
- 这个 Handle 给业务方预留一个 director 的接口,用于代理重定向转发的 gRPC 连接获取,这样 proxy 就可以通过 redirector 得到 gRPCServer 的 gRPC 连接。
- Proxy 接收 gRPC 客户端的连接,并使用 gRPC 的 RecvMsg 方法,接收客户端的消息请求 ;
- Proxy 将接收到的 gRPC 客户端消息请求,通过 SendHeader 和 SendMsg 方法发送给 gRPC 服务端 ;
- 同样的方法,RecvMsg 接收 gRPC 服务端的响应消息,使用 SendMsg 发送给 gRPC 客户端。
至此 gRPC 代理服务就完成了消息的转发功能,限流,权限等功能可以通过转发的功能进行拦截处理 ; 如图所示 gRPC Proxy:
gRPC Proxy 流程图:
目前支持 gRPC 均衡策略是:
- 最少连接数 ;
- 随机 ;
// 负载均衡 func balance(pools []*Pool) *Pool { var index, sumSize ,size int if len(pools) == 1 { return pools[index] } // 最小连接数 for k, pool := range pools { pSize := pool.Size() sumSize += int(pool.Size()) if size < pSize { size = int(pSize) index = k } } // 随机 if sumSize == 0 { index = rand.Intn(len(pools)) } return pools[index] }提供 HTTP 方向代理能力,提供路由热更新能力,路由信息在 RouteConfig.yaml 文件中配置,目前只支持对新增路由信息的热更新操作 ;
在 config/RouteConfig.yaml 中定义如下:
route: - path: "/users" method: "get" to: "http://*.*.*.*:8090/users" 字段解释:
- path: HTTP 网关路由地址
- method:请求方法
- to:目标服务地址
可以配置多条记录(保存后即可启动热更新)
支持 HTTP API Restful,提供配置话能力
虚拟化服务目前是通过实现 K8s API 和 Docker API 来完成引擎容器资源自动化部署等(如 容器创建、销毁、自动化配置等等)
K8s 采用的是 InClusterConfig 的方式进行初始化(采用 pod 内调用 K8s API,免去获取 kubeconfig 等配置的麻烦), 对 K8s API 进行了一层封装, 如 kvs/interface.go
//service UnmarshalService(bytes []byte) corev1.Service GetServices(apps ...string) []*corev1.Service DeployService(svc corev1.Service) error //deployment UnmarshalDeployment(bytes []byte) appsv1.Deployment DeployDeployment(deploy appsv1.Deployment) error GetDeployments(apps ...string) []*appsv1.Deployment //configmap UnmarshalConfigMap(bytes []byte) corev1.ConfigMap DeployConfigMap(cm corev1.ConfigMap) error GetConfigMaps(cms ...string) []*corev1.ConfigMap //statefulset //pod GetPodsByLabel(app string) *corev1.PodList PodExecCommand(namespace, podName, command, containerName string) (string, string, error) //resource delete ResDelete(resourceType string, resName string)在 clent.go 集成实现上述功能,即可满足虚拟化服务自动化能力
暂时不支持自动化
- CLUSTER_ENABLED: 是否开启集群部署模式
- CLUSTER_NODE_NUM: 集群节点数量设置
- OMP_ENABLED: 是否开启 OMP (运营管理平台) 支持
- TENANT_ENABLED: 是否开启租户模式支持 (如果 OMP_ENABLED 为true 则默认支持租户模式)
- NETWORK_MODE: 网络模式,值可选为 1:严格匹配ip+port(内网环境), 2:仅匹配端口(如:支持内网和公网环境,设置成 '0.0.0.0' )
- RUN_MODE:运行模式(分为 dev、testing、release, dev 模式会标准输出所有日志,release 则会输出 error 日志)
- HTTP_DEBUG_MODE:HTTP Server 的运行模式
- HTTP_TIME_DURATION: HTTP 请求超时 (单位秒)
- HTTP_SERVER_PORT:HTTP Server 监听端口
- GRPC_PORT:gRPC Server 监听端口
- DIAL_TIMEOUT:连接超时时间 (单位秒)
- KEEPALIVE_TIME:长连接回话时间 (单位秒)
- TENANT_ENABLED: 是否开启多租户模式 (true 或者 false)
- GATEWAY_PROXY_ADDR: 网关接受请求地址
- ENGINE_NAME:引擎名称
- POOL_ENABLED:表示是否启用 (true 或者 false)
- POOL_MODEL:配置连接池模型 (详情查看上述文档中连接池模型)
- GATEWAY_PROXY_PORT: 请求接口
- GRPC_REQUEST_REUSABLE:连接是否可复用 (true 或者 false)
- REQUEST_IDLE_TIME: 等待时间 (单位秒)
- REQUEST_MAX_LIFE: 最大请求生命周期 (单位秒)
- REQUEST_TIMEOUT:请求关闭时间 (单位秒)
- ENGINE_LIST 是一个列表,包含了引擎列表
- SERVER_HOST:是远程服务地址
- ENGINE_GRPC_POOL_SIZE:是连接池大小
- LOG_LEVEL:日志 level,默认为 info
- LOG_MAX_AGE:日志存储最大时长
- LOG_ROTATION_TIME:日志分割时间,默认是1天
按需求来设置(目前暂无使用 DB 和 Cache)
docker-compose -f docker/rpc-pigeon.yaml up 设置 grpc
kubectl label nodes nodeName service.rpc/grpc=granted查看调度策略 labels
kubectl get nodes nodeName --show-labels删除调度策略 label
kubectl label nodes nodeName service.rpc/grpc-kubectl apply -f k8s/rpc-pigeon.yaml分布式部署,rpc-pigeon 和 grpc 服务 是多对多部署即可