Moby 代码中一个发布订阅模式的实现

Moby 代码中发现一个发布订阅设计的实现,想简单分析一下。这是一个既可以发布消息,也可以发起订阅的组件。发布者和订阅者都持有这个组件的引用,其中发布者负责发布消息,订阅者负责取消息。

组件中通过一个 map 来储存所有的发布者,以及每个发布者对消息的过滤函数。map 的 key 是一个 channel,这里要注意一下 channel 的判等操作,channel 在满足下面条件时,是相等的:1)都是 nil;2)通过同一个 make 函数生成的。 不满足上面条件时,channel 是不相等的。下面的 c1 与 c2 就是不相等的。

c1 := make(chan interface{}, 1)
c2 := make(chan interface{}, 1)

在此组件的实现中,订阅者通过 Subscribe 方法添加订阅,每次添加订阅,Publisher 都会调用 make 方法生成一个 channel,此 channel 存放需要订阅者消费的消息,因为每个订阅者的 channel 都是单独调用 make 生成的,因此每个订阅者都不可能相等,可以将此 channel 理解为每一个订阅者的唯一 ID。

此组件用了一个读写锁 sync.RWMutex,用来保护对 map 的访问,当 map 在结构上有变化时(比如有添加或者删除),则使用写锁;如果是查看订阅者的数量,或者向buf中写消息时(Publish 消息),使用读锁就够了。在这种情况下,可以同时向多个订阅者发布消息(也就是同时向不同订阅者的 channel 中写数据,当存在多个 goroutine 同时发布消息时,也可以同时向一个消费者发送消息,channel 本身就是线程安全的),综上这个实现是线程安全的。

每次发布者发布消息的时候,都需要一个 WaitGroup 来等待该发布者向所有的消费者成功发布消息,使用 sync.Pool 来作为这个WaitGroup 的临时缓存,当没有 goroutine 引用这个 wg 时,可能会回收,但是 sync.Pool 中有时就不需要重新分配一个,节省时间以及减小 GC 压力。

这里稍微介绍一个 sync.Pool 的使用,在某些业务场景下,会大量重复的创建许多对象,这会给 Golang 的 GC 带来压力,此时可以通过 sync.Pool 来缓存对象(内部有一个双端队列来缓存对象),需要对象时候调用 pool.Get() 取对象,用完之后,可以再调用 pool.Put(a interface{}) 重新放回到 pool 中。如果有两个 goroutine A、B 同时访问 pool,A 调用 Get() 取出缓存的对象之后(假设之前调用 Put 放进去一个,且只有这一个),B 调用 Get() 时,Pool 中就没有对象了,此时只能通过初始化 pool 时传入的 New 方法重新初始化一个 Object。在使用 pool 时,用户应该假设 pool 是无状态的(每次返回的都是 empty 对象),即使上次 put 进去一个有状态的 object,但是这个 object 有可能被其他 goroutine 取走了。 我们在 put 对象到 pool 中时,也应该清空后再放进去,pool 是不会给我们清空的。

下面是 Moby 模型示意图,从图中也可以看出 Publisher 结构体类似于桥梁作用(或者称为 proxy?),被发布者和订阅者同时持有,发布者通过 Publish 发布消息,订阅者通过 Subscribe 订阅消息,Subscribe 方法返回一个 channel,订阅者要阻塞在这个 channel 上等待消息。 java-javascript

另外需要说明一下这个是本地发布订阅模式的实现,基于 gRPC 的发布订阅模式,可以参考《gRPC 示例:实现发布订阅模式》,Moby 中订阅发布模式代码实现如下。

package pubsub // import "github.com/docker/docker/pkg/pubsub"

import (
	"sync"
	"time"
)

// 私有全局变量,使用了临时对象池,临时对象池的 New 函数是返回一个指向 sync.WaitGroup 的指针,只有在
// 临时对象池的 Get 方法拿不到任何值(任意一个值时,才调用这里的 New 方法返回一个值)
var wgPool = sync.Pool{New: func() interface{} { return new(sync.WaitGroup) }}

// 新建一个 Publisher,参数是发布消息的超时时间,以及订阅者 channel 的默认大小
// 如果订阅者消息满了,一直写不进去,就靠这个 publishTimeout 来取消发布
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
	return &Publisher{
		buffer:      buffer,
		timeout:     publishTimeout,
		subscribers: make(map[subscriber]topicFunc),
	}
}

// subscriber 类型是一个阻塞channel,可以传递任意类型,这里实际含义是一个订阅者感兴趣的消息的集合
// 发布者往这个channel中写数据
// 订阅者拿到这个channel之后,从这个channel中读数据
type subscriber chan interface{}
// topicFunc 是一个函数类型,该类型参数可以是任意类型,返回值是true或者false
// 订阅者传递这个函数类型,表示是否对某个变量感兴趣
type topicFunc func(v interface{}) bool

type Publisher struct {
	m           sync.RWMutex
	buffer      int
    timeout     time.Duration
	subscribers map[subscriber]topicFunc
}

// 返回当前发布者的订阅者的人数
func (p *Publisher) Len() int {
	p.m.RLock()
	i := len(p.subscribers)
	p.m.RUnlock()
	return i
}
// 下面三个方法都是添加订阅者,返回一个channel,供订阅者消费,可以注册一个topic函数,或者指定channel buffer大小
func (p *Publisher) Subscribe() chan interface{} {
	return p.SubscribeTopic(nil)
}
// 订阅者带有过滤函数,可以选择订阅的消息的特点
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
	ch := make(chan interface{}, p.buffer)
	p.m.Lock()
	p.subscribers[ch] = topic
	p.m.Unlock()
	return ch
}
// 订阅者还可以配置自己的 channel 的缓冲区大小
func (p *Publisher) SubscribeTopicWithBuffer(topic topicFunc, buffer int) chan interface{} {
	ch := make(chan interface{}, buffer)
	p.m.Lock()
	p.subscribers[ch] = topic
	p.m.Unlock()
	return ch
}
// Evict 删除一个订阅者
func (p *Publisher) Evict(sub chan interface{}) {
	p.m.Lock()
	_, exists := p.subscribers[sub]
	if exists {
		delete(p.subscribers, sub)
		close(sub)
	}
	p.m.Unlock()
}

// 向所有订阅者发布一个消息
func (p *Publisher) Publish(v interface{}) {
	p.m.RLock()
	if len(p.subscribers) == 0 {
		p.m.RUnlock()
		return
	}
	// 顺序遍历所有的订阅者,如果该订阅者对该消息感兴趣,就向对应的 channel 发消息,
	// 这里向所有订阅者的 channel 发消息是异步的,但是要
	// 等到所有发的消息都成功发送之后,发布才算成功,否则通过 WaitGroup 来阻塞
	wg := wgPool.Get().(*sync.WaitGroup)
	for sub, topic := range p.subscribers {
		wg.Add(1)
		go p.sendTopic(sub, topic, v, wg)
	}
	wg.Wait()
	wgPool.Put(wg)
	p.m.RUnlock()
}
func (p *Publisher) Close() {
	p.m.Lock()
	for sub := range p.subscribers {
		delete(p.subscribers, sub)
		close(sub)
	}
	p.m.Unlock()
}

// 向订阅者发布一个消息
func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
    defer wg.Done()
    	// 对此消息不感兴趣
	if topic != nil && !topic(v) {
		return
	}
	if p.timeout > 0 {
        // 如果设置了超时,就启动一个timer
        timeout := time.NewTimer(p.timeout)
		defer timeout.Stop()

		select {
		case sub <- v:
		case <-timeout.C:
		}
		return
	}

	// 没有设置timer就一直阻塞,直到写入成功
	select {
	case sub <- v:
	default:
	}
}

在分析完实现后,我们看下这个 Publisher 模型的使用,使用比较简单,在下面的代码用,我使用了 Producer 和 Consumer 来命名发布者和订阅者,比较容易理解。

package main

import (
	"fmt"
	"strings"
	"time"
)
// 定义一个发布者
type Producer struct {
	p *Publisher
}
func (p *Producer) Publish(msg string) {
	p.p.Publish(msg)
}
// 定义一个订阅者
type Consumer struct {
	p *Publisher
	c chan interface{}
}
func (c *Consumer) Subscribe() {
	// 只对以 /reg 为前缀的字符串消息感兴趣
	filter := func(msg interface{}) bool {
		str, ok := msg.(string)
		if !ok {return false}
		return strings.HasPrefix(str, "/reg")
	}
	c.c = c.p.SubscribeTopic(filter)
}
// 消费消息
func (c *Consumer) WaitMsg() {
	for m := range c.c {
		fmt.Printf("received msg: %s\n", m)
	}
}

func main() {
	publisher := NewPublisher(time.Second, 10)

	producer := Producer{p:publisher}
	consumer := Consumer{p:publisher}
	consumer.Subscribe()
	go func() {
		consumer.WaitMsg()
	}()

	producer.Publish("/reg abc")
	producer.Publish("def")
	producer.Publish("/reg hij")

	time.Sleep(time.Second)
}

上述代码的输出:

received msg: /reg abc
received msg: /reg hij