Go语言实战(七)并发模式

runner

type Runner struct {
	interrupt chan os.Signal   // 任务中断
	timeout   <-chan time.Time // 单向通道,只能接收,不能写入
	complete  chan error       // 任务结束,有可能返回error
	tasks     []func(int)      // 需要执行的任务列表
}

var (
	ErrTimeout   = errors.New("received timeout")
	ErrInterrupt = errors.New("received interrupt")
)
  • tasks设计为数组函数,是一个非常优雅的地方,这样需要执行的逻辑可以放到task函数里面实现
  • timeout是一个单向的通道,只能写入,即runner初始化的时候已经指定好了超时时间,无法再次修改
  • signal是什么,作用是什么? golang的 signal ; unix-signal 操作系统层面的一些信号标识
  • 先声明错误,这样效率会更高一些

创建runner

func NewRunner(t time.Duration) *Runner {
	return &Runner{
		timeout:   time.After(t),
		interrupt: make(chan os.Signal, 1),
		complete:  make(chan error),
	}
}

有了任务的处理,我们想添加任务怎么做?接收一个tasks列表,将其append到runner里面;注意tasks是一个数组,需要解开。

func (r *Runner) AddTask(tasks ...func(int)) {
	r.tasks = append(r.tasks, tasks...)
}

如何判断任务是否被打断? 若signal发出来信号,那么会传递到interrupt的通道里面,可以根据runner里面的interrupt通道里面是否存在数据信息来进行处理

func (r *Runner) gotInterrupt() bool {
	select {
	case <-r.interrupt:
		signal.Stop(r.interrupt)
		return true
	default:
		return false
	}
}

将任务开启起来,遍历tasks;开始运行,返回对应的错误

func (r *Runner) run() error {
	for i, task := range r.tasks {
		if r.gotInterrupt() {
			return ErrInterrupt
		}
		task(i)
	}
	return nil
}

Start任务,主任务

func (r *Runner) Start() error {
  // 接收操作系统interrupt信号; Ctrl+c
	signal.Notify(r.interrupt, os.Interrupt)

	go func() {
		r.complete <- r.run() //将run出来的结果返回给complete
	}()

	select {
	case err := <-r.complete:
		return err
	case <-r.timeout:
		return ErrTimeout
	}
}

测试方法

func TestRunner(t *testing.T) {
	r := NewRunner(3 * time.Second)

	r.AddTask(CreateTask(), CreateTask(), CreateTask())

	if err := r.Start(); err != nil {
		switch err {
		case ErrTimeout:
			log.Println("Terminating due to timeout")
			os.Exit(1)
		case ErrInterrupt:
			log.Println("Terminating due to interrupt")
			os.Exit(2)
		}
	}
	log.Println("process done")
}

func CreateTask() func(int) {
	return func(id int) {
		log.Printf("task %d", id)
		time.Sleep(time.Duration(id) * time.Second)
	}
}