package queue import ( "errors" "sync" "time" ) type Broker struct { exit chan bool // 关闭消息队列通道 capacity int // 消息队列的容量 topics map[string][]chan any // key: topic value : queue, 一个topic可以有多个订阅者,一个订阅者对应着一个通道 sync.RWMutex // 同步锁 } // 设置消息容量 // @description 控制消息队列的大小 func (b *Broker) setConditions(capacity int) { b.capacity = capacity } // 关闭消息队列 func (b *Broker) close() { select { case <-b.exit: return default: close(b.exit) b.Lock() b.topics = make(map[string][]chan any) b.Unlock() } return } // 消息推送 // @param topic 订阅的主题 // @param msg 传递的消息 func (b *Broker) publish(topic string, pub any) error { select { case <-b.exit: return errors.New("broker closed") default: } b.RLock() subscribers, ok := b.topics[topic] b.RUnlock() if !ok { return nil } b.broadcast(pub, subscribers) return nil } // 消息广播 // @description 对推送的消息进行广播,保证每一个订阅者都可以收到 func (b *Broker) broadcast(msg any, subscribers []chan any) { count := len(subscribers) concurrency := 1 switch { case count > 1000: concurrency = 3 case count > 100: concurrency = 2 default: concurrency = 1 } pub := func(start int) { //采用Timer 而不是使用time.After 原因:time.After会产生内存泄漏 在计时器触发之前,垃圾回收器不会回收Timer idleDuration := 5 * time.Millisecond idleTimeout := time.NewTimer(idleDuration) defer idleTimeout.Stop() for j := start; j < count; j += concurrency { if !idleTimeout.Stop() { select { case <-idleTimeout.C: default: } } idleTimeout.Reset(idleDuration) select { case subscribers[j] <- msg: case <-idleTimeout.C: case <-b.exit: return } } } for i := 0; i < concurrency; i++ { go pub(i) } } // 消息订阅 // @description 传入订阅的主题,即可完成订阅 // @param topic 订阅的主题 // @return sub 通道用来接收数据 func (b *Broker) subscribe(topic string) (<-chan any, error) { select { case <-b.exit: return nil, errors.New("broker closed") default: } ch := make(chan any, b.capacity) b.Lock() b.topics[topic] = append(b.topics[topic], ch) b.Unlock() return ch, nil } // 取消订阅 // @param topic 订阅的主题 // @param sub 消息订阅的通道 func (b *Broker) unsubscribe(topic string, sub <-chan any) error { select { case <-b.exit: return errors.New("broker closed") default: } b.RLock() subscribers, ok := b.topics[topic] b.RUnlock() if !ok { return nil } // delete subscriber b.Lock() var newSubs []chan any for _, subscriber := range subscribers { if subscriber == sub { continue } newSubs = append(newSubs, subscriber) } b.topics[topic] = newSubs b.Unlock() return nil } func NewBroker() *Broker { return &Broker{ exit: make(chan bool), topics: make(map[string][]chan any), } }