queue/broker.go
2023-04-12 15:53:13 +08:00

154 lines
3.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package queue
import (
"errors"
"sync"
"time"
)
type Broker struct {
exit chan bool // 关闭消息队列通道
capacity int // 消息队列的容量
topics map[string][]chan any // key topic value queue, 一个topic可以有多个订阅者一个订阅者对应着一个通道
sync.RWMutex // 同步锁
}
// 设置消息容量
// @description 控制消息队列的大小
func (b *Broker) setConditions(capacity int) {
b.capacity = capacity
}
// 关闭消息队列
func (b *Broker) close() {
select {
case <-b.exit:
return
default:
close(b.exit)
b.Lock()
b.topics = make(map[string][]chan any)
b.Unlock()
}
return
}
// 消息推送
// @param topic 订阅的主题
// @param msg 传递的消息
func (b *Broker) publish(topic string, pub any) error {
select {
case <-b.exit:
return errors.New("broker closed")
default:
}
b.RLock()
subscribers, ok := b.topics[topic]
b.RUnlock()
if !ok {
return nil
}
b.broadcast(pub, subscribers)
return nil
}
// 消息广播
// @description 对推送的消息进行广播,保证每一个订阅者都可以收到
func (b *Broker) broadcast(msg any, subscribers []chan any) {
count := len(subscribers)
concurrency := 1
switch {
case count > 1000:
concurrency = 3
case count > 100:
concurrency = 2
default:
concurrency = 1
}
pub := func(start int) {
//采用Timer 而不是使用time.After 原因time.After会产生内存泄漏 在计时器触发之前垃圾回收器不会回收Timer
idleDuration := 5 * time.Millisecond
idleTimeout := time.NewTimer(idleDuration)
defer idleTimeout.Stop()
for j := start; j < count; j += concurrency {
if !idleTimeout.Stop() {
select {
case <-idleTimeout.C:
default:
}
}
idleTimeout.Reset(idleDuration)
select {
case subscribers[j] <- msg:
case <-idleTimeout.C:
case <-b.exit:
return
}
}
}
for i := 0; i < concurrency; i++ {
go pub(i)
}
}
// 消息订阅
// @description 传入订阅的主题,即可完成订阅
// @param topic 订阅的主题
// @return sub 通道用来接收数据
func (b *Broker) subscribe(topic string) (<-chan any, error) {
select {
case <-b.exit:
return nil, errors.New("broker closed")
default:
}
ch := make(chan any, b.capacity)
b.Lock()
b.topics[topic] = append(b.topics[topic], ch)
b.Unlock()
return ch, nil
}
// 取消订阅
// @param topic 订阅的主题
// @param sub 消息订阅的通道
func (b *Broker) unsubscribe(topic string, sub <-chan any) error {
select {
case <-b.exit:
return errors.New("broker closed")
default:
}
b.RLock()
subscribers, ok := b.topics[topic]
b.RUnlock()
if !ok {
return nil
}
// delete subscriber
b.Lock()
var newSubs []chan any
for _, subscriber := range subscribers {
if subscriber == sub {
continue
}
newSubs = append(newSubs, subscriber)
}
b.topics[topic] = newSubs
b.Unlock()
return nil
}
func NewBroker() *Broker {
return &Broker{
exit: make(chan bool),
topics: make(map[string][]chan any),
}
}