Leo框架系列(一)开篇介绍与Leo启动分析

前言

Leo是一款可以快速构建可直接运行的高性能Go应用程序框架,它提供了很多工具,帮助开发者降低搭建分布式或者微服务系统的复杂度,专注于业务开发。目前已全面应用于七猫免费小说推荐引擎服务中,在这种高要求的场景下也检验了Leo的可靠性。

Leo这个名字来自于我养的一只叫Leo的猫。

Leo的特点

  1. 支持HTTP服务和gRPC服务,同时支持HTTP代理gRPC服务。
  2. 支持Cron定时任务。
  3. 支持Pub/Sub任务。
  4. 服务注册与发现、限流、熔断、重试等微服务措施。
  5. 配置管理。
  6. 高扩展性,所有组件基于接口开发,HTTP、gRPC、Cron、Pub/Sub支持中间件扩展。
  7. 高性能,所有组件仅简单封装,相当于直接运行原框架。
  8. 完善的应用可观测性设计,trace、log、metrics三板斧提高系统可用性。

Getting Started

Quick Start

install protoc-gen-go-leo plugin

go install github.com/go-leo/leo/cmd/protoc-gen-go-leo@latest

Example

Leo的架构

Leo的启动

这里预先定义一个概念:可以跑在Leo上的代码统称为运行实体。比如HTTP服务、gRPC服务、Cron任务、PubSub任务等。

Leo有两种类型运行实体

  • 第一种比较“单纯”,直接运行逻辑,比如进行一个复杂的计算。
  • 第二种是除了执行逻辑外,结束时需要有相关操作的,比如优雅关闭服务、关闭资源或者清理垃圾数据等。

Callable接口

第一种运行实体在Leo里抽象成一个Callable接口。

type Callable interface {
	fmt.Stringer
	Invoke(ctx context.Context) error
}

下面介绍一个简单的Callable,睡眠30秒后退出:

var _ runner.Callable = new(CallDemo)
type CallDemo struct{}
func (c *CallDemo) String() string {
	return "callabledemo"
}
func (c *CallDemo) Invoke(ctx context.Context) error {
	global.Logger().Info("start invoke")
	defer global.Logger().Info("stop invoke")
	global.Logger().Info("will sleep 30s")
	select {
	case <-ctx.Done():
		return ctx.Err()
	case <-time.After(30 * time.Second):
	}
	return nil
}

Runnable接口

第二种运行实体在Leo里抽象成一个Runnable接口。

type Runnable interface {
	fmt.Stringer
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
}

下面介绍一个简单的Runnable。每秒收集cpu的利用率并上报到io.WriteCloser中。

var _ runner.Runnable = new(RunnableDemo)
type RunnableDemo struct {
	w     io.WriteCloser
	exitC chan struct{}
}
func NewRunnableDemo(w io.WriteCloser) *RunnableDemo {
	return &RunnableDemo{w: w, exitC: make(chan struct{})}
}
func (h *RunnableDemo) String() string { return "RunnableDemo" }
func (h *RunnableDemo) Start(ctx context.Context) error {
	ticker := time.NewTicker(time.Second)
	for {
		select {
		case <-h.exitC:
			ticker.Stop()
			return nil
		case t := <-ticker.C:
			percent, err := cpu.Percent(time.Second, true)
			if err != nil {
				return err
			}
			text := fmt.Sprintf("%s cpu percent is %f\n", t.String(), percent)
			if _, err := h.w.Write([]byte(text)); err != nil {
				return err
			}
		}
	}
}
func (h *RunnableDemo) Stop(ctx context.Context) error {
	close(h.exitC)
	return h.w.Close()
}
  • Start(ctx)方法首先创建一个1秒的Ticker,然后死循环,死循环里有个select代码块,监听两个channel的receive。
    • case <-h.exitC代码块作用是,当h.exitC被关闭时,停止Ticker,返回nil
    • case t := <-h.ticker.C代码块作用是每秒收集一次CPU利用率,并上报到一个io.WriteCloser
  • Stop(ctx)调用close()函数关闭h.exitC,导致Start(ctx)会退出,然后关闭io.WriteCloser

在Leo里,HTTP ServergRPC ServerCron TaskPubSub Task等都实现Callable或者Runnable接口。实现这两种接口,可以极大的扩展Leo的功能,实现当前Leo框架不支持的功能(比如实现一个json rpc服务)。

创建Leo应用

Leo可以跑多种多样的运行实体,下面介绍如何创建一个Leo应用。

func newApp() *leo.App {
	return leo.NewApp(
		leo.ID(),
		leo.Name(),
		leo.Version(),
		leo.Metadata(),
		leo.Logger(),
		leo.Service(),
		leo.HTTP(),
		leo.GRPC(),
		leo.Registrar(),
		leo.Cron(),
		leo.PubSub(),
		leo.Runnable(),
		leo.Callable(),
		leo.Management(),
		leo.ShutdownSignal(),
		leo.RestartSignal(),
	)
}

创建应用需要调用leo.NewApp(opts ...Option)方法来创建,Leo提供了很多Option

  • leo.ID() 应用ID,在一个集群中是不可以重复的。如果不设置这个Option,那么会从LEO_SERVICE_ID环境变量获取,还没有则Leo自己生成一个UUID。
  • leo.Name() 应用名,在一个集群中可以重复。如果不设置这个Option,那么会从LEO_SERVICE_NAME环境变量获取,还没有则使用进程名。
  • leo.Version() 应用版本号。如果不设置这个Option,那么会从LEO_SERVICE_VERSION环境变量获取。
  • leo.Metadata() 应用其他元数据。
  • leo.Logger() 设置日志组件,会在应用运行时打印一些日志信息。
  • leo.Service() 绑定服务,作用是将proto定义的Service实现对象绑定到Leo中,Leo会进一步的将其绑定到gRPC服务或者HTTP服务上。
  • leo.GRPC() 设置gRPC服务配置,比如端口号,中间件、运行参数、TLS等。
  • leo.HTTP() 设置HTTP服务配置,比如端口号、中间件、路由信息,此外还可以设置HTTP服务代理gRPC服务的需要用的gRPC客户端。
  • leo.Registrar() 设置服务注册组件。如果有gRPC或者HTTP运行实体,则会将服务注册到注册中心上。注册时,会带上ID、Name、Version、Metadata等信息。
  • leo.Cron() 设置Cron任务配置。
  • leo.PubSub() 设置PubSub任务配置。
  • leo.Runnable() 添加实现了Runnable接口的运行实体。
  • leo.Callable() 添加实现了Callable接口的运行实体。
  • leo.Management() 设置Leo的management配置,management包含了应用信息查询、配置查询、应用环境变量查询、健康检查、日志、指标收集接口、go的pprof接口、重启与关闭控制、系统信息查询、定时任务与PubSub任务信息接口等。
  • leo.ShutdownSignal() 设置应用监听的关闭信号。
  • leo.RestartSignal() 设置应用监听的重启信号。

运行Leo应用

调用app.Run()运行Leo应用

app.Run()

func (app *App) Run(ctx context.Context) error {
	app.o.Logger.Infof("app %d starting...", os.Getpid())
	defer app.o.Logger.Infof("app %d stopping...", os.Getpid())
	ctx, app.cancel = context.WithCancel(ctx)
	app.eg, ctx = errgroup.WithContext(ctx)
	for _, callable := range app.o.Callables {
		app.call(ctx, callable)
	}
	for _, runnable := range app.o.Runnables {
		app.run(ctx, runnable)
	}
	if app.o.CronOpts != nil && len(app.o.CronOpts.Jobs) > 0 {
		app.run(ctx, app.newCronTask())
	}
	if app.o.SubOpts != nil && len(app.o.SubOpts.Jobs) > 0 {
		app.run(ctx, app.newPubSubTask())
	}
	if app.o.GRPCOpts != nil {
		if err := app.startGRPCServer(ctx); err != nil {
			return err
		}
	}
	if app.o.HttpOpts != nil {
		if err := app.startHTTPServer(ctx); err != nil {
			return err
		}
	}
	if app.o.MgmtOpts != nil {
		if err := app.startManagementServer(ctx); err != nil {
			return err
		}
	}
	if len(app.o.ShutdownSignals)+len(app.o.RestartSignals) > 0 {
		app.listenSignal(ctx)
	}
	return app.wait()
}
  1. 记录运行日志。
  2. 创建一个带有取消功能的总context.Context,作用下文再介绍。
  3. 创建errgroup.Group,作用下文再介绍。
  4. 异步运行各种运行实体
    • 调用app.call()方法异步运行实现Callable接口的运行实体。
    • 调用app.run()方法异步运行实现Runnable接口的运行实体。
  5. 监听关闭和重启两种类型的系统信号。默认的关闭信号有:TERMINTQUIT,默认的重启信号有:HUP

sync.WaitGroup 与 errgroup.Group

先简单的介绍下sync.WaitGrouperrgroup.Group机制。

sync.WaitGroup:

  1. sync.WaitGroup里有个计数器。
  2. Add(n)计数器加上n,在Wait()阻塞之前,可多次调用Add(n)
  3. Done()计数器减去1,一般是在go-routine运行结束前调用。
  4. Wait()会阻塞当前代码的执行。
  5. 正常情况是加多少,减多少,计数器最终为0,这样Wait()会正常释放。
  6. 如果加多了,减少了,会导致Wait()永远阻塞。
  7. 如果加少了,减多了,会导致panic。

errgroup.Group:

  1. errgroup.WithContext()返回一个errgroup.Groupcontext.Context
  2. Go(fn func()error)会开一个go-routine异步执行传入的fn
  3. Wait()会阻塞当前代码的执行,但在阻塞之前,可以多次调用Go(func()error),可实现异步执行多个fn
  4. 如果所有的fn都正常执行完毕,返回nilWait()就会释放,然后取消掉context.Context,返回一个nil
  5. 如果有fn执行异常,返回非nil的错误,errgroup.Group会记录第一个错误,然后取消掉context.Context,但此时Wait()还是会阻塞。但如果所有传入的fn都对<-ctx.Done()进行监听并有退出操作,那么Wait()就会释放,然后返回第一个错误。

Leo的运行过程,则充分的运用了sync.WaitGrouperrgroup.Group机制,异步运行的细节封装在app.call()app.run()两个方法里。

app.call() 与 app.run()

func (app *App) call(ctx context.Context, target runner.Callable) {
	app.wg.Add(1)
	app.eg.Go(func() (err error) {
		defer app.wg.Done()
		app.o.Logger.Infof("%s called", target.String())
		return target.Invoke(ctx)
	})
	runtime.Gosched()
}

app.run():

func (app *App) run(ctx context.Context, target runner.Runnable) {
	app.wg.Add(1)
	app.eg.Go(func() error {
		defer app.wg.Done()
		app.o.Logger.Infof("starting %s", target.String())
		return target.Start(ctx)
	})
	app.eg.Go(func() error {
		<-ctx.Done()
		ctx, cancel := context.WithTimeout(context.Background(), app.o.StopTimeout)
		defer cancel()
		err := target.Stop(ctx)
		app.o.Logger.Infof("%s stopped", target.String())
		return err
	})
	runtime.Gosched()
}
  • app.call()方法与app.run()的前半段是一样的。sync.WaitGroup调用Add()方法计数器加1,errgroup.Group调用Go(func()error)异步执行Callable或者Runnable,在执行完毕后sync.WaitGroupDone()方法计数器减1。
  • app.run()后半段是处理Runnable的停止的,在target.Stop(ctx)之前,<-ctx.Done()会一直阻塞此go-routine,直到context.Context被取消掉,导致<-ctx.Done()的释放,从而target.Stop(ctx)会被执行,Runnable停止。
  • runtime.Gosched()目的是优先执行各个运行实体。

信号处理

如果设置了关闭或者重启信号,那么会监听信号。

func (app *App) listenSignal(ctx context.Context) {
	app.eg.Go(func() error {
		signals := append([]os.Signal{}, app.o.ShutdownSignals...)
		signals = append(signals, app.o.RestartSignals...)
		errC := make(chan error)
		go func() {
			runtime.Gosched()
			app.o.Logger.Info("app wait signals...")
			err := signalx.NewSignalWaiter(signals, 15*time.Second).
				AddHook(app.o.ShutdownHook).
				AddHook(app.o.RestartHook).
				WaitSignals().
				WaitHooksAsyncInvoked().
				WaitUntilTimeout().
				Err()
			errC <- err
			close(errC)
		}()
		select {
		case <-ctx.Done():
			return nil
		case e := <-errC:
			return e
		}
	})
}

信号的处理细节封装在signalx包里,有兴趣的读者可以查阅。
当监听到系统发来的信号时,signalx.SignalWaiter就会返回SignalError错误,基于errgroup.Group机制,context.Context会被取消掉,所有的RunnableCallable都会停止。程序无论是监听到关闭还是重启信号,程序都需要退出,在收到重启信号会额外的开启一个新的进程来运行Leo应用。最终sync.WaitGrouperrgroup.GroupWait()方法会释放,这两个Wait()都在wait()方法里。

app.wait()

func (app *App) wait() error {
	app.eg.Go(func() error {
		app.wg.Wait()
		app.cancel()
		return nil
	})
	err := app.eg.Wait()
	if err == nil {
		return nil
	}
	if !signalx.IsSignal(err, app.o.RestartSignals) {
		return err
	}
	if _, e := processx.StartProcess(); e != nil {
		app.o.Logger.Errorf("failed to restart process, %v", e)
		return err
	}
	app.o.Logger.Infof("restart process success")
	return err
}
  • app.eg.Go(func()error开一个go-routine等待sync.WaitGroupWait()释放,然后会调cancel()函数,取消总的context.Context,从而所有监听<-ctx.Done()的go-routine都会释放。
  • err := app.eg.Wait()等待程序退出。
  • wait()的后半段代码是检查是否收到重启信号,并做重启动作的,重启的代码封装在processx包里,有兴趣的读者可以查阅。

读者可能有疑惑,既然有了errgroup.Group,为什么还要sync.Group?
有这样一个场景,如果所有运行实体都正常运行结束了,Runnable.Start()方法或者Callable.Invoke()方法都返回nil,如果还监听信号,导致程序不退出,这是不合理的。所以,当所有运行实体都运行结束后, 取消掉总的context.Context,监听信号的go-routine也退出,这样进程可以正常退出。

未完待续...