Leo框架系列(二)gRPC服务、服务注册与发现
导航
前言
在Go语言生态中,gRPC的使用场景相当的广泛,已经有与HTTP分庭抗礼的趋势。七猫推荐引擎全部服务都使用gRPC来传输数据与远程调用。Leo对gRPC进行了适配,简化配置,方便gRPC的启动与关闭。
开启gRPC功能
开启gRPC功能,只需要两步:
第一步,生产pb文件:
protoc \
--proto_path=. \
--go_out=. \
--go_opt=module=pkgname \
--go-grpc_out=. \
--go-grpc_opt=module=pkgname \
--go-leo_out=. \
--go-leo_opt=module=pkgname \
*.proto
第二步,在NewApp()
时候加上leo.Service()
和leo.GRPC(&leo.GRPCOptions{})
配置项:
app := leo.NewApp(
leo.Service(helloworld.GreeterServiceDesc(new(GreeterService))),
leo.GRPC(&leo.GRPCOptions{
Port: 9090,
UnaryServerInterceptors: []grpc.UnaryServerInterceptor{},
TLSConf: nil,
GRPCServerOptions: []grpc.ServerOption{},
}),
// ...
)
app.Run(ctx)
leo.Service()
的参数是一个闭包,此闭包是Leo的protoc插件生成的,返回四个值,其中两个值是gRPC服务必要的:
ServiceImpl
业务服务的具体实现(比如GreeterService
)。GRPCDesc
gRPC proto文件里定义的服务描述,用来绑定gRPC服务。
leo.GRPC()
的参数是gRPC一些可选配置:
Port
端口号。UnaryServerInterceptors
gRPC服务中间件,对gRPC的请求进行拦截和扩展。Leo封装了许多gRPC常用的中间件,见middleware包。TLSConf
指定gRPC服务需要使用安全的网络连接。GRPCServerOptions
gRPC服务的其他参数设置,比如Keepalive参数、连接数、缓冲区大小等。
最后调app.Run()
就可将gRPC服务运行起来了。完整示例见grpc-demo
启动gRPC流程
Leo开启了gRPC功能,那么Leo内部是如何启动gRPC服务的呢?
app.Run()
方法里,调用app.startGRPCServer()
方法启动gRPC服务。
func (app *App) Run(ctx context.Context) error {
//...
if app.o.GRPCOpts != nil {
if err := app.startGRPCServer(ctx); err != nil {
return err
}
}
//...
}
func (app *App) startGRPCServer(ctx context.Context) error {
srv, err := app.newGRPCServer()
// ...
app.run(ctx, srv)
if app.o.Registrar == nil {
return nil
}
serviceInfo, err := app.newServiceInfo(registry.TransportGRPC, app.o.GRPCOpts.Port)
// ...
app.run(ctx, ®istrar{Registrar: app.o.Registrar, ServiceInfo: serviceInfo, Logger: app.o.Logger})
return nil
}
func (app *App) newGRPCServer() (*grpcserver.Server, error) {
if app.o.ServiceImpl == nil || app.o.GRPCDesc == nil{return nil, err}
// ...
lis, err := net.Listen("tcp", net.JoinHostPort("", strconv.Itoa(grpcOpts.Port)))
// ...
grpcOpts.Port = netx.ExtractPort(lis.Addr())
// ...
srv := server.New(lis, grpcserver.Service{Impl: app.o.ServiceImpl, Desc: app.o.GRPCDesc}, opts...)
return srv, nil
}
第一步,创建grpc Server:
app.newGRPCServer()
方法创建一个gRPC的服务srv
:- 判断是否配置了
ServiceImpl
和GRPCDesc
,如果没有其中任何一项,报错并返回。 - 调用
net.Listen()
监听网络端口,如果端口号未配置,则系统会随机选一个可用的端口,然后将端口号回填到grpcOpts
中。 server.New()
创建一个gRPC服务并返回。
- 判断是否配置了
第二步,注册grpc服务:
- 如果设置了
Registrar
服务注册器,则会调用app.newServiceInfo()
方法创建一个serviceInfo
。 - 将
Registrar
包装成registrar
。 - 调用
app.run(ctx, runnable)
启动服务注册器,将服务注册到注册中心中。
Server
Server
是Leo对原生gRPC服务的简单包装,方便Leo对其进行启动与管理。
type Server struct {
o *options // 可选参数
lis net.Listener // 网络端口监听接口
service Service // 包含业务Service的实现和描述信息
gRPCSrv *grpc.Server // 原生的grpc服务
startOnce sync.Once
stopOnce sync.Once
healthSrv *health.Server // 健康检查服务
}
func New(lis net.Listener, service Service, opts ...Option) *Server {
o := new(options)
// ...
serverOptions := []grpc.ServerOption{
//...
}
// ...
serverOptions = append(serverOptions, grpc.ChainUnaryInterceptor(o.unaryInterceptors...))
gRPCSrv := grpc.NewServer(serverOptions...)
healthSrv := health.NewServer()
healthSrv.SetServingStatus(service.Desc.ServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
grpc_health_v1.RegisterHealthServer(gRPCSrv, healthSrv)
reflection.Register(gRPCSrv)
gRPCSrv.RegisterService(service.Desc, service.Impl)
srv := &Server{
//...
}
return srv
}
- 创建
options
并初始化 - 合并gRPC服务参数与中间件
- 调用
grpc.NewServer()
创建原生的grpc服务gRPCSrv
- 调用
health.NewServer()
创建原生的grpc的健康检查服务healthSrv
,并初始化当前服务的状态为SERVING
reflection.Register(gRPCSrv)
为gRPC服务注册一个反射服务,反射服务是gRPC原生支持的一个功能,可以在线实时查询gRPC的服务信息和泛化调用(参考grpcurl)。gRPCSrv.RegisterService(service.Desc, service.Impl)
绑定原生的gRPC服务和业务Service的实现。- 创建
Server
并返回。
Start()
启动gRPC服务
func (s *Server) Start(_ context.Context) error {
if s.lis == nil {
return errors.New("net listener is nil")
}
err := errors.New("server already started")
s.startOnce.Do(func() {
err = nil
s.healthSrv.Resume()
err = s.gRPCSrv.Serve(s.lis)
})
return err
}
- 如果没有设置网络监听器
net.Listener
,则报错返回 - 一个服务如果重复启动和关闭,可能会造成一些未知的问题,所以这里避免这些问题,
startOnce
和stopOnce
来保证只能启动一次和关闭一次。 s.healthSrv.Resume()
将健康状态设置成SERVING
状态。s.gRPCSrv.Serve(s.lis)
启动gRPC服务并开始接受请求。
Stop()
关闭gRPC服务。
func (s *Server) Stop(_ context.Context) error {
err := errors.New("server already stopped")
s.stopOnce.Do(func() {
err = nil
s.healthSrv.Shutdown()
s.gRPCSrv.GracefulStop()
})
return err
}
s.healthSrv.Shutdown()
将健康检查状态设置成NOT_SERVING
s.gRPCSrv.GracefulStop()
优雅关闭服务。
服务注册
如果需要Leo开启服务注册功能,只需要两步。
第一步,创建Registrar
:
registrar, err := factory.NewRegistrar(uri)
第二步,在NewApp()
时候加上leo.Registrar(registrar)
配置项:
app := leo.NewApp(
leo.Registrar(registrar),
//...
)
上文提到的启动gRPC流程的第二步就是注册服务,当启动gRPC服务和HTTP服务时候,创建一个ServiceInfo
,其携带的服务信息将被注册到注册中心中。
ServiceInfo
type ServiceInfo struct {
// 服务ID
ID string
// 服务名
Name string
// 传输协议 "HTTP" 或者 "gRPC".
Transport string
// 主机IP地址
Host string
// 端口号
Port int
// 服务其他元数据
Metadata map[string]string
// 服务版本号
Version string
}
func (app *App) newServiceInfo(transport string, port int) (*registry.ServiceInfo, error) {
host, err := netx.GlobalUnicastIPString()
// ...
id := app.o.ID + "_" + transport + "_" + strconv.Itoa(port)
serviceInfo := ®istry.ServiceInfo{
// ...
}
return serviceInfo, nil
}
newServiceInfo()
创建ServiceInfo
:
netx.GlobalUnicastIPString()
获取一个公开的单播地址。- 为了区分gRPC和HTTP两种传输协议,服务ID会拼接传输协议名和端口号。
- 创建
registry.ServiceInfo
并返回。
Registrar服务注册器
type Registrar interface {
Register(ctx context.Context, service *ServiceInfo) error
Deregister(ctx context.Context, service *ServiceInfo) error
}
var _ runner.Runnable = new(registrar)
type registrar struct {
Registrar registry.Registrar
ServiceInfo *registry.ServiceInfo
Logger log.Logger
}
func (rr *registrar) String() string {
return "registrar"
}
func (rr *registrar) Start(ctx context.Context) error {
return rr.Registrar.Register(ctx, rr.ServiceInfo)
}
func (rr *registrar) Stop(ctx context.Context) error {
return rr.Registrar.Deregister(ctx, rr.ServiceInfo)
}
Registrar
是服务注册接口,registrar
是将Registrar
包装成runner.Runnable
,方便Registrar
启动与停止。Register()
方法在服务启动时,将服务注册到注册中心上。Deregister()
方法在服务关闭时,从注册中心中将服务注销。
服务发现
Leo暂时未对gRPC客户端进行封装,但封装了gRPC客户端的Resolver
,Resolver
可以基于Leo的服务发现组件Discovery
解析出具体的目标服务地址。
Discovery
type Discovery interface {
Scheme() string
GetService(ctx context.Context, service *ServiceInfo) ([]*ServiceInfo, error)
Watch(ctx context.Context, service *ServiceInfo) (<-chan []*ServiceInfo, error)
StopWatch(ctx context.Context, service *ServiceInfo) error
}
Scheme()
返回此服务发现组件支持的scheme(比如consul、nacos等等)。GetService()
方法查询注册中心中符合条件的服务列表。Watch()
方法监听注册中心中服务的变化情况,如果服务有变化,会将最新结果发送到<-chan []*ServiceInfo
里。StopWatch()
方法停止监听。
ResolverBuilder服务名解析器创建者
ResolverBuilder
实现了gRPC原生的resolver.Builder
,其目的是通过Build
方法创建一个Resolver
服务名解析器。
var _ resolver.Builder = new(ResolverBuilder)
type ResolverBuilder struct {
discovery registry.Discovery
}
// 创建Builder需要传入服务发现组件Discovery
func NewResolverBuilder(discovery registry.Discovery) resolver.Builder {
return &ResolverBuilder{discovery: discovery}
}
func (rb *ResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
// 如果在创建客户端时,url里的scheme与`Discovery`的scheme不匹配,则返回错误。
if target.URL.Scheme != rb.discovery.Scheme() {
return nil, fmt.Errorf("target schema is %s, but discovery schema is %s", target.URL.Scheme, rb.discovery.Scheme())
}
// 基于url创建ServiceInfo
uri := target.URL
serviceInfo := registry.ServiceInfoFromURL(uri, registry.TransportGRPC)
ctx, cancel := context.WithCancel(context.Background())
// 创建Resolver服务名解析器
r := &Resolver{
ctx: ctx,
cancelFunc: cancel,
serviceInfo: serviceInfo,
cc: cc,
opts: opts,
discovery: rb.discovery,
}
// 开始服务名解析,异步监听注册中心中服务的变化。
if err := r.Start(); err != nil {
return nil, err
}
return r, nil
}
func (rb *ResolverBuilder) Scheme() string {
return rb.discovery.Scheme()
}
Resolver服务名解析器:
var _ resolver.Resolver = new(Resolver)
type Resolver struct {
// ctx和cancelFunc,可以监听解析器Close信号,以便goroutine可以正常退出
ctx context.Context
cancelFunc context.CancelFunc
// cc和opts原生gRPC框架传入的,可以控制客户端连接的状态和行为
cc resolver.ClientConn
opts resolver.BuildOptions
// Leo服务发现组件
discovery registry.Discovery
// 当前需要被解析的服务
serviceInfo *registry.ServiceInfo
}
func (r *Resolver) Start() error {
service, err := r.discovery.GetService(r.ctx, r.serviceInfo)
if err != nil {
return err
}
if err := r.update(service); err != nil {
return err
}
eventC, err := r.discovery.Watch(r.ctx, r.serviceInfo)
if err != nil {
return err
}
go func() {
for {
select {
case <-r.ctx.Done():
return
case service := <-eventC:
if err := r.update(service); err != nil {
global.Logger().Errorf("failed to resolve, %v", err)
}
}
}
}()
return nil
}
func (r *Resolver) update(service []*registry.ServiceInfo) error {
if len(service) <= 0 {
return nil
}
addresses := r.convertService(service)
return r.cc.UpdateState(resolver.State{Addresses: addresses})
}
func (r *Resolver) convertService(service []*registry.ServiceInfo) []resolver.Address {
addresses := make([]resolver.Address, 0, len(service))
for _, service := range service {
attr := &attributes.Attributes{}
address := resolver.Address{
Addr: net.JoinHostPort(service.Host, strconv.Itoa(service.Port)),
ServerName: r.serviceInfo.Name,
Attributes: attr,
}
addresses = append(addresses, address)
}
sort.Slice(addresses, func(i, j int) bool {
return strings.Compare(addresses[i].Addr, addresses[j].Addr) > 0
})
return addresses
}
Start()
方法封装了服务名解析逻辑代码
- 调用
discovery.GetService()
来查询服务列表 - 如果查询失败,则就返回错误
- 如果查询成功,就会更新grpc连接状态(即地址)
update()
方法封装了更新gRPC连接的地址,底层是调cc.UpdateState()
来完成的,在更新之前,会将[]*registry.ServiceInfo
转成[]resolver.Address
。convertService()
函数就是转换函数,注意在转换后,会对[]resolver.Address
进行排序,原因是给负载均衡提供一个幂等的输入,防止负载均衡失效。
- 然后调用
discovery.Watch()
方法开始监听,开启goroutine异步监听服务变化,如果有变化,更新grpc连接状态。
未完待续...