commit ae0e0992773d7d85e1c393adc48152d8656faf88 Author: what-00 Date: Wed Apr 12 15:53:13 2023 +0800 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..adf8f72 --- /dev/null +++ b/.gitignore @@ -0,0 +1,23 @@ +# ---> Go +# If you prefer the allow list template instead of the deny list, see community template: +# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore +# +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work + diff --git a/README.md b/README.md new file mode 100644 index 0000000..1a70f0c --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ +# go-queue + diff --git a/broker.go b/broker.go new file mode 100644 index 0000000..8fa394d --- /dev/null +++ b/broker.go @@ -0,0 +1,153 @@ +package queue + +import ( + "errors" + "sync" + "time" +) + +type Broker struct { + exit chan bool // 关闭消息队列通道 + capacity int // 消息队列的容量 + topics map[string][]chan any // key: topic value : queue, 一个topic可以有多个订阅者,一个订阅者对应着一个通道 + sync.RWMutex // 同步锁 +} + +// 设置消息容量 +// @description 控制消息队列的大小 +func (b *Broker) setConditions(capacity int) { + b.capacity = capacity +} + +// 关闭消息队列 +func (b *Broker) close() { + select { + case <-b.exit: + return + default: + close(b.exit) + b.Lock() + b.topics = make(map[string][]chan any) + b.Unlock() + } + return +} + +// 消息推送 +// @param topic 订阅的主题 +// @param msg 传递的消息 +func (b *Broker) publish(topic string, pub any) error { + select { + case <-b.exit: + return errors.New("broker closed") + default: + } + + b.RLock() + subscribers, ok := b.topics[topic] + b.RUnlock() + if !ok { + return nil + } + + b.broadcast(pub, subscribers) + return nil +} + +// 消息广播 +// @description 对推送的消息进行广播,保证每一个订阅者都可以收到 +func (b *Broker) broadcast(msg any, subscribers []chan any) { + count := len(subscribers) + concurrency := 1 + + switch { + case count > 1000: + concurrency = 3 + case count > 100: + concurrency = 2 + default: + concurrency = 1 + } + + pub := func(start int) { + //采用Timer 而不是使用time.After 原因:time.After会产生内存泄漏 在计时器触发之前,垃圾回收器不会回收Timer + idleDuration := 5 * time.Millisecond + idleTimeout := time.NewTimer(idleDuration) + defer idleTimeout.Stop() + for j := start; j < count; j += concurrency { + if !idleTimeout.Stop() { + select { + case <-idleTimeout.C: + default: + } + } + idleTimeout.Reset(idleDuration) + select { + case subscribers[j] <- msg: + case <-idleTimeout.C: + case <-b.exit: + return + } + } + } + for i := 0; i < concurrency; i++ { + go pub(i) + } +} + +// 消息订阅 +// @description 传入订阅的主题,即可完成订阅 +// @param topic 订阅的主题 +// @return sub 通道用来接收数据 +func (b *Broker) subscribe(topic string) (<-chan any, error) { + select { + case <-b.exit: + return nil, errors.New("broker closed") + default: + } + + ch := make(chan any, b.capacity) + b.Lock() + b.topics[topic] = append(b.topics[topic], ch) + b.Unlock() + return ch, nil +} + +// 取消订阅 +// @param topic 订阅的主题 +// @param sub 消息订阅的通道 +func (b *Broker) unsubscribe(topic string, sub <-chan any) error { + select { + case <-b.exit: + return errors.New("broker closed") + default: + } + + b.RLock() + subscribers, ok := b.topics[topic] + b.RUnlock() + + if !ok { + return nil + } + // delete subscriber + b.Lock() + var newSubs []chan any + for _, subscriber := range subscribers { + if subscriber == sub { + continue + } + newSubs = append(newSubs, subscriber) + } + + b.topics[topic] = newSubs + b.Unlock() + return nil +} + +func NewBroker() *Broker { + return &Broker{ + exit: make(chan bool), + topics: make(map[string][]chan any), + } +} diff --git a/client.go b/client.go new file mode 100644 index 0000000..90491a3 --- /dev/null +++ b/client.go @@ -0,0 +1,53 @@ +package queue + +type Client struct { + bro *Broker +} + +// 设置消息容量 +// @description 控制消息队列的大小 +func (c *Client) SetConditions(capacity int) { + c.bro.setConditions(capacity) +} + +// 消息推送 +// @param topic 订阅的主题 +// @param msg 传递的消息 +func (c *Client) Publish(topic string, msg any) error { + return c.bro.publish(topic, msg) +} + +// 消息订阅 +// @description 传入订阅的主题,即可完成订阅 +// @param topic 订阅的主题 +// @return sub 通道用来接收数据 +func (c *Client) Subscribe(topic string) (sub <-chan any, err error) { + return c.bro.subscribe(topic) +} + +// 取消订阅 +// @param topic 订阅的主题 +// @param sub 消息订阅的通道 +func (c *Client) Unsubscribe(topic string, sub <-chan any) error { + return c.bro.unsubscribe(topic, sub) +} + +// 关闭消息队列 +func (c *Client) Close() { + c.bro.close() +} + +func (c *Client) GetPayload(sub <-chan any) any { + for val := range sub { + if val != nil { + return val + } + } + return nil +} + +func NewClient() *Client { + return &Client{ + bro: NewBroker(), + } +} diff --git a/client_test.go b/client_test.go new file mode 100644 index 0000000..5521b42 --- /dev/null +++ b/client_test.go @@ -0,0 +1,95 @@ +package queue + +import ( + "fmt" + "testing" + "time" +) + +const topic = "Golang梦工厂" + +// 一个topic 测试 +func TestOnceTopic(t *testing.T) { + m := NewClient() + defer m.Close() + m.SetConditions(10) + ch, err := m.Subscribe(topic) + if err != nil { + fmt.Println("subscribe failed") + return + } + go OncePub(m) + OnceSub(ch, m) +} + +// 定时推送 +func OncePub(c *Client) { + t := time.NewTicker(10 * time.Second) + defer t.Stop() + for { + select { + case <-t.C: + err := c.Publish(topic, "test messs") + if err != nil { + fmt.Println("pub message failed") + } + default: + + } + } +} + +// 接受订阅消息 +func OnceSub(m <-chan interface{}, c *Client) { + for { + val := c.GetPayload(m) + fmt.Printf("get message is %s\n", val) + } +} + +//多个topic测试 +func TestManyTopic(t *testing.T) { + m := NewClient() + defer m.Close() + m.SetConditions(10) + top := "" + for i := 0; i < 10; i++ { + top = fmt.Sprintf("Golang梦工厂_%02d", i) + go Sub(m, top) + } + ManyPub(m) +} + +func ManyPub(c *Client) { + t := time.NewTicker(10 * time.Second) + defer t.Stop() + for { + select { + case <-t.C: + for i := 0; i < 10; i++ { + //多个topic 推送不同的消息 + top := fmt.Sprintf("Golang梦工厂_%02d", i) + payload := fmt.Sprintf("asong真帅_%02d", i) + err := c.Publish(top, payload) + if err != nil { + fmt.Println("pub message failed") + } + } + default: + + } + } +} + +func Sub(c *Client, top string) { + ch, err := c.Subscribe(top) + if err != nil { + fmt.Printf("sub top:%s failed\n", top) + } + for { + val := c.GetPayload(ch) + if val != nil { + fmt.Printf("%s get message is %s\n", top, val) + } + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..a8fc207 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module git.fsdpf.net/go/queue + +go 1.18