Leo框架系列(二)gRPC服务、服务注册与发现

导航

前言

在Go语言生态中,gRPC的使用场景相当的广泛,已经有与HTTP分庭抗礼的趋势。七猫推荐引擎全部服务都使用gRPC来传输数据与远程调用。Leo对gRPC进行了适配,简化配置,方便gRPC的启动与关闭。

开启gRPC功能

开启gRPC功能,只需要两步:
第一步,生产pb文件:

protoc \
  --proto_path=. \
  --go_out=. \
  --go_opt=module=pkgname \
  --go-grpc_out=. \
  --go-grpc_opt=module=pkgname \
  --go-leo_out=. \
  --go-leo_opt=module=pkgname \
  *.proto

第二步,在NewApp()时候加上leo.Service()leo.GRPC(&leo.GRPCOptions{})配置项:

app := leo.NewApp(
	leo.Service(helloworld.GreeterServiceDesc(new(GreeterService))),
	leo.GRPC(&leo.GRPCOptions{
		Port:                    9090,
		UnaryServerInterceptors: []grpc.UnaryServerInterceptor{},
		TLSConf:                 nil,
		GRPCServerOptions:       []grpc.ServerOption{},
	}),
	// ...
)
app.Run(ctx)

leo.Service()的参数是一个闭包,此闭包是Leo的protoc插件生成的,返回四个值,其中两个值是gRPC服务必要的:

  • ServiceImpl 业务服务的具体实现(比如GreeterService)。
  • GRPCDesc gRPC proto文件里定义的服务描述,用来绑定gRPC服务。

leo.GRPC()的参数是gRPC一些可选配置:

  • Port 端口号。
  • UnaryServerInterceptors gRPC服务中间件,对gRPC的请求进行拦截和扩展。Leo封装了许多gRPC常用的中间件,见middleware包。
  • TLSConf 指定gRPC服务需要使用安全的网络连接。
  • GRPCServerOptions gRPC服务的其他参数设置,比如Keepalive参数、连接数、缓冲区大小等。

最后调app.Run()就可将gRPC服务运行起来了。完整示例见grpc-demo

启动gRPC流程

Leo开启了gRPC功能,那么Leo内部是如何启动gRPC服务的呢?

app.Run()方法里,调用app.startGRPCServer()方法启动gRPC服务。

func (app *App) Run(ctx context.Context) error {
	//...
	if app.o.GRPCOpts != nil {
		if err := app.startGRPCServer(ctx); err != nil {
			return err
		}
	}
	//...
}

func (app *App) startGRPCServer(ctx context.Context) error {
	srv, err := app.newGRPCServer()
	// ...
	app.run(ctx, srv)
	if app.o.Registrar == nil {
		return nil
	}
	serviceInfo, err := app.newServiceInfo(registry.TransportGRPC, app.o.GRPCOpts.Port)
	// ...
	app.run(ctx, &registrar{Registrar: app.o.Registrar, ServiceInfo: serviceInfo, Logger: app.o.Logger})
	return nil
}
func (app *App) newGRPCServer() (*grpcserver.Server, error) {
	if app.o.ServiceImpl == nil || app.o.GRPCDesc == nil{return nil, err}
	// ...
	lis, err := net.Listen("tcp", net.JoinHostPort("", strconv.Itoa(grpcOpts.Port)))
	// ...
	grpcOpts.Port = netx.ExtractPort(lis.Addr())
	// ...
	srv := server.New(lis, grpcserver.Service{Impl: app.o.ServiceImpl, Desc: app.o.GRPCDesc}, opts...)
	return srv, nil
}

第一步,创建grpc Server:

  • app.newGRPCServer()方法创建一个gRPC的服务srv
    • 判断是否配置了ServiceImplGRPCDesc,如果没有其中任何一项,报错并返回。
    • 调用net.Listen()监听网络端口,如果端口号未配置,则系统会随机选一个可用的端口,然后将端口号回填到grpcOpts中。
    • server.New()创建一个gRPC服务并返回。

第二步,注册grpc服务:

  • 如果设置了Registrar服务注册器,则会调用app.newServiceInfo()方法创建一个serviceInfo
  • Registrar包装成registrar
  • 调用app.run(ctx, runnable)启动服务注册器,将服务注册到注册中心中。

Server

Server是Leo对原生gRPC服务的简单包装,方便Leo对其进行启动与管理。

type Server struct {
	o         *options     // 可选参数
	lis       net.Listener // 网络端口监听接口
	service   Service      // 包含业务Service的实现和描述信息
	gRPCSrv   *grpc.Server // 原生的grpc服务
	startOnce sync.Once
	stopOnce  sync.Once
	healthSrv *health.Server // 健康检查服务
}
func New(lis net.Listener, service Service, opts ...Option) *Server {
	o := new(options)
	// ...
	serverOptions := []grpc.ServerOption{
		//...
	}
	// ...
	serverOptions = append(serverOptions, grpc.ChainUnaryInterceptor(o.unaryInterceptors...))
	gRPCSrv := grpc.NewServer(serverOptions...)

	healthSrv := health.NewServer()
	healthSrv.SetServingStatus(service.Desc.ServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
	grpc_health_v1.RegisterHealthServer(gRPCSrv, healthSrv)

	reflection.Register(gRPCSrv)
	gRPCSrv.RegisterService(service.Desc, service.Impl)
	srv := &Server{
		//...
	}
	return srv
}
  • 创建options并初始化
  • 合并gRPC服务参数与中间件
  • 调用grpc.NewServer()创建原生的grpc服务gRPCSrv
  • 调用health.NewServer()创建原生的grpc的健康检查服务healthSrv,并初始化当前服务的状态为SERVING
  • reflection.Register(gRPCSrv)为gRPC服务注册一个反射服务,反射服务是gRPC原生支持的一个功能,可以在线实时查询gRPC的服务信息和泛化调用(参考grpcurl)。
  • gRPCSrv.RegisterService(service.Desc, service.Impl)绑定原生的gRPC服务和业务Service的实现。
  • 创建Server并返回。

Start()

启动gRPC服务

func (s *Server) Start(_ context.Context) error {
	if s.lis == nil {
		return errors.New("net listener is nil")
	}
	err := errors.New("server already started")
	s.startOnce.Do(func() {
		err = nil
		s.healthSrv.Resume()
		err = s.gRPCSrv.Serve(s.lis)
	})
	return err
}
  • 如果没有设置网络监听器net.Listener,则报错返回
  • 一个服务如果重复启动和关闭,可能会造成一些未知的问题,所以这里避免这些问题,startOncestopOnce来保证只能启动一次和关闭一次。
  • s.healthSrv.Resume()将健康状态设置成SERVING状态。
  • s.gRPCSrv.Serve(s.lis) 启动gRPC服务并开始接受请求。

Stop()

关闭gRPC服务。

func (s *Server) Stop(_ context.Context) error {
	err := errors.New("server already stopped")
	s.stopOnce.Do(func() {
		err = nil
		s.healthSrv.Shutdown()
		s.gRPCSrv.GracefulStop()
	})
	return err
}
  • s.healthSrv.Shutdown()将健康检查状态设置成NOT_SERVING
  • s.gRPCSrv.GracefulStop()优雅关闭服务。

服务注册

如果需要Leo开启服务注册功能,只需要两步。
第一步,创建Registrar:

registrar, err := factory.NewRegistrar(uri)

第二步,在NewApp()时候加上leo.Registrar(registrar)配置项:

app := leo.NewApp(
	leo.Registrar(registrar),
	//...
)

上文提到的启动gRPC流程的第二步就是注册服务,当启动gRPC服务和HTTP服务时候,创建一个ServiceInfo,其携带的服务信息将被注册到注册中心中。

ServiceInfo

type ServiceInfo struct {
	// 服务ID
	ID string
	// 服务名
	Name string
	// 传输协议 "HTTP" 或者 "gRPC".
	Transport string
	// 主机IP地址
	Host string
	// 端口号
	Port int
	// 服务其他元数据
	Metadata map[string]string
	// 服务版本号
	Version string
}
func (app *App) newServiceInfo(transport string, port int) (*registry.ServiceInfo, error) {
	host, err := netx.GlobalUnicastIPString()
	// ...
	id := app.o.ID + "_" + transport + "_" + strconv.Itoa(port)
	serviceInfo := &registry.ServiceInfo{
		// ...
	}
	return serviceInfo, nil
}

newServiceInfo()创建ServiceInfo:

  • netx.GlobalUnicastIPString() 获取一个公开的单播地址
  • 为了区分gRPC和HTTP两种传输协议,服务ID会拼接传输协议名和端口号。
  • 创建registry.ServiceInfo并返回。

Registrar服务注册器

type Registrar interface {
	Register(ctx context.Context, service *ServiceInfo) error
	Deregister(ctx context.Context, service *ServiceInfo) error
}
var _ runner.Runnable = new(registrar)
type registrar struct {
	Registrar   registry.Registrar
	ServiceInfo *registry.ServiceInfo
	Logger      log.Logger
}
func (rr *registrar) String() string {
	return "registrar"
}
func (rr *registrar) Start(ctx context.Context) error {
	return rr.Registrar.Register(ctx, rr.ServiceInfo)
}
func (rr *registrar) Stop(ctx context.Context) error {
	return rr.Registrar.Deregister(ctx, rr.ServiceInfo)
}
  • Registrar是服务注册接口,registrar是将Registrar包装成runner.Runnable,方便Registrar启动与停止。
  • Register()方法在服务启动时,将服务注册到注册中心上。
  • Deregister()方法在服务关闭时,从注册中心中将服务注销。

Leo内置了ConsulNacos两种注册器,详细可阅读ConsulNacos

服务发现

Leo暂时未对gRPC客户端进行封装,但封装了gRPC客户端的Resolver,Resolver可以基于Leo的服务发现组件Discovery解析出具体的目标服务地址。

Discovery

type Discovery interface {
	Scheme() string
	GetService(ctx context.Context, service *ServiceInfo) ([]*ServiceInfo, error)
	Watch(ctx context.Context, service *ServiceInfo) (<-chan []*ServiceInfo, error)
	StopWatch(ctx context.Context, service *ServiceInfo) error
}
  • Scheme() 返回此服务发现组件支持的scheme(比如consul、nacos等等)。
  • GetService()方法查询注册中心中符合条件的服务列表。
  • Watch()方法监听注册中心中服务的变化情况,如果服务有变化,会将最新结果发送到<-chan []*ServiceInfo里。
  • StopWatch()方法停止监听。

Leo内置了ConsulNacos两种服务发现组件,详细可阅读ConsulNacos

ResolverBuilder服务名解析器创建者

ResolverBuilder实现了gRPC原生的resolver.Builder,其目的是通过Build方法创建一个Resolver服务名解析器。

var _ resolver.Builder = new(ResolverBuilder)
type ResolverBuilder struct {
	discovery registry.Discovery
}
// 创建Builder需要传入服务发现组件Discovery
func NewResolverBuilder(discovery registry.Discovery) resolver.Builder {
	return &ResolverBuilder{discovery: discovery}
}
func (rb *ResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	// 如果在创建客户端时,url里的scheme与`Discovery`的scheme不匹配,则返回错误。
	if target.URL.Scheme != rb.discovery.Scheme() {
		return nil, fmt.Errorf("target schema is %s, but discovery schema is %s", target.URL.Scheme, rb.discovery.Scheme())
	}
	// 基于url创建ServiceInfo
	uri := target.URL
	serviceInfo := registry.ServiceInfoFromURL(uri, registry.TransportGRPC)
	ctx, cancel := context.WithCancel(context.Background())
	// 创建Resolver服务名解析器
	r := &Resolver{
		ctx:         ctx,
		cancelFunc:  cancel,
		serviceInfo: serviceInfo,
		cc:          cc,
		opts:        opts,
		discovery:   rb.discovery,
	}
	// 开始服务名解析,异步监听注册中心中服务的变化。
	if err := r.Start(); err != nil {
		return nil, err
	}
	return r, nil
}
func (rb *ResolverBuilder) Scheme() string {
	return rb.discovery.Scheme()
}

Resolver服务名解析器:

var _ resolver.Resolver = new(Resolver)
type Resolver struct {
	// ctx和cancelFunc,可以监听解析器Close信号,以便goroutine可以正常退出
	ctx         context.Context
	cancelFunc  context.CancelFunc
	// cc和opts原生gRPC框架传入的,可以控制客户端连接的状态和行为
	cc          resolver.ClientConn
	opts        resolver.BuildOptions
	// Leo服务发现组件
	discovery   registry.Discovery
	// 当前需要被解析的服务
	serviceInfo *registry.ServiceInfo
}
func (r *Resolver) Start() error {
	service, err := r.discovery.GetService(r.ctx, r.serviceInfo)
	if err != nil {
		return err
	}
	if err := r.update(service); err != nil {
		return err
	}
	eventC, err := r.discovery.Watch(r.ctx, r.serviceInfo)
	if err != nil {
		return err
	}
	go func() {
		for {
			select {
			case <-r.ctx.Done():
				return
			case service := <-eventC:
				if err := r.update(service); err != nil {
					global.Logger().Errorf("failed to resolve, %v", err)
				}
			}
		}
	}()
	return nil
}
func (r *Resolver) update(service []*registry.ServiceInfo) error {
	if len(service) <= 0 {
		return nil
	}
	addresses := r.convertService(service)
	return r.cc.UpdateState(resolver.State{Addresses: addresses})
}
func (r *Resolver) convertService(service []*registry.ServiceInfo) []resolver.Address {
	addresses := make([]resolver.Address, 0, len(service))
	for _, service := range service {
		attr := &attributes.Attributes{}
		address := resolver.Address{
			Addr:       net.JoinHostPort(service.Host, strconv.Itoa(service.Port)),
			ServerName: r.serviceInfo.Name,
			Attributes: attr,
		}
		addresses = append(addresses, address)
	}
	sort.Slice(addresses, func(i, j int) bool {
		return strings.Compare(addresses[i].Addr, addresses[j].Addr) > 0
	})
	return addresses
}

Start()方法封装了服务名解析逻辑代码

  • 调用discovery.GetService()来查询服务列表
  • 如果查询失败,则就返回错误
  • 如果查询成功,就会更新grpc连接状态(即地址)
    • update()方法封装了更新gRPC连接的地址,底层是调cc.UpdateState()来完成的,在更新之前,会将[]*registry.ServiceInfo转成[]resolver.Address
    • convertService()函数就是转换函数,注意在转换后,会对[]resolver.Address进行排序,原因是给负载均衡提供一个幂等的输入,防止负载均衡失效。
  • 然后调用discovery.Watch()方法开始监听,开启goroutine异步监听服务变化,如果有变化,更新grpc连接状态。

未完待续...

展示评论