- 新增 pending 缓冲区,publish 时若无订阅者则暂存消息
- subscribe 时自动将缓冲消息投入 channel,解决服务重启后恢复任务丢失的问题
- 去除 broadcast 5ms 超时导致的消息丢失
- chan bool 改为 chan struct{},RWMutex 改为 Mutex
- 新增 broker_test.go,12 个单元测试覆盖核心场景(含 -race)
- 为 client_test.go 中的无限循环 demo 添加 t.Skip()
239 lines
4.9 KiB
Go
239 lines
4.9 KiB
Go
package queue
|
||
|
||
import (
|
||
"fmt"
|
||
"sync"
|
||
"sync/atomic"
|
||
"testing"
|
||
"time"
|
||
)
|
||
|
||
// recv 从 channel 读取一条消息,超时则返回 nil。
|
||
func recv(ch <-chan any, timeout time.Duration) any {
|
||
select {
|
||
case v := <-ch:
|
||
return v
|
||
case <-time.After(timeout):
|
||
return nil
|
||
}
|
||
}
|
||
|
||
// TestPublishSubscribe 基本收发
|
||
func TestPublishSubscribe(t *testing.T) {
|
||
b := NewBroker()
|
||
defer b.close()
|
||
|
||
ch, err := b.subscribe("job-a")
|
||
if err != nil {
|
||
t.Fatalf("subscribe: %v", err)
|
||
}
|
||
|
||
if err := b.publish("job-a", "hello"); err != nil {
|
||
t.Fatalf("publish: %v", err)
|
||
}
|
||
|
||
got := recv(ch, time.Second)
|
||
if got != "hello" {
|
||
t.Fatalf("want %q, got %v", "hello", got)
|
||
}
|
||
}
|
||
|
||
// TestPendingBuffer 先发布再订阅,消息不能丢失(队列核心保证)
|
||
func TestPendingBuffer(t *testing.T) {
|
||
b := NewBroker()
|
||
defer b.close()
|
||
|
||
// 先 publish,此时无订阅者
|
||
for i := 0; i < 3; i++ {
|
||
if err := b.publish("job-b", i); err != nil {
|
||
t.Fatalf("publish %d: %v", i, err)
|
||
}
|
||
}
|
||
|
||
// 再 subscribe,应收到全部缓冲消息
|
||
ch, _ := b.subscribe("job-b")
|
||
|
||
for want := 0; want < 3; want++ {
|
||
got := recv(ch, time.Second)
|
||
if got != want {
|
||
t.Fatalf("pending msg[%d]: want %d, got %v", want, want, got)
|
||
}
|
||
}
|
||
}
|
||
|
||
// TestPendingThenNormal 缓冲消息先于后续消息到达,顺序正确
|
||
func TestPendingThenNormal(t *testing.T) {
|
||
b := NewBroker()
|
||
defer b.close()
|
||
|
||
b.publish("job-c", "buffered")
|
||
|
||
ch, _ := b.subscribe("job-c")
|
||
b.publish("job-c", "live")
|
||
|
||
msgs := []any{recv(ch, time.Second), recv(ch, time.Second)}
|
||
if msgs[0] != "buffered" || msgs[1] != "live" {
|
||
t.Fatalf("order wrong: %v", msgs)
|
||
}
|
||
}
|
||
|
||
// TestMultipleSubscribers 同一 topic 多个订阅者都能收到消息
|
||
func TestMultipleSubscribers(t *testing.T) {
|
||
b := NewBroker()
|
||
defer b.close()
|
||
|
||
ch1, _ := b.subscribe("broadcast")
|
||
ch2, _ := b.subscribe("broadcast")
|
||
|
||
b.publish("broadcast", "msg")
|
||
|
||
if recv(ch1, time.Second) != "msg" {
|
||
t.Fatal("ch1 did not receive message")
|
||
}
|
||
if recv(ch2, time.Second) != "msg" {
|
||
t.Fatal("ch2 did not receive message")
|
||
}
|
||
}
|
||
|
||
// TestUnsubscribe 取消订阅后不再收到消息
|
||
func TestUnsubscribe(t *testing.T) {
|
||
b := NewBroker()
|
||
defer b.close()
|
||
|
||
ch, _ := b.subscribe("job-d")
|
||
b.unsubscribe("job-d", ch)
|
||
|
||
b.publish("job-d", "should not arrive")
|
||
|
||
if got := recv(ch, 100*time.Millisecond); got != nil {
|
||
t.Fatalf("after unsubscribe, still got %v", got)
|
||
}
|
||
}
|
||
|
||
// TestClosedPublish 关闭后 publish 返回错误
|
||
func TestClosedPublish(t *testing.T) {
|
||
b := NewBroker()
|
||
b.close()
|
||
|
||
if err := b.publish("x", "msg"); err == nil {
|
||
t.Fatal("expected error after close, got nil")
|
||
}
|
||
}
|
||
|
||
// TestClosedSubscribe 关闭后 subscribe 返回错误
|
||
func TestClosedSubscribe(t *testing.T) {
|
||
b := NewBroker()
|
||
b.close()
|
||
|
||
if _, err := b.subscribe("x"); err == nil {
|
||
t.Fatal("expected error after close, got nil")
|
||
}
|
||
}
|
||
|
||
// TestConcurrentPublish 并发发布不丢消息,无 data race
|
||
func TestConcurrentPublish(t *testing.T) {
|
||
b := NewBroker()
|
||
defer b.close()
|
||
b.setConditions(100)
|
||
|
||
const n = 50
|
||
ch, _ := b.subscribe("concurrent")
|
||
|
||
var wg sync.WaitGroup
|
||
for i := 0; i < n; i++ {
|
||
wg.Add(1)
|
||
go func(i int) {
|
||
defer wg.Done()
|
||
b.publish("concurrent", i)
|
||
}(i)
|
||
}
|
||
wg.Wait()
|
||
|
||
var count int32
|
||
done := make(chan struct{})
|
||
go func() {
|
||
for recv(ch, 200*time.Millisecond) != nil {
|
||
atomic.AddInt32(&count, 1)
|
||
}
|
||
close(done)
|
||
}()
|
||
<-done
|
||
|
||
if int(count) != n {
|
||
t.Fatalf("want %d messages, got %d", n, count)
|
||
}
|
||
}
|
||
|
||
// TestSetConditions 容量设置生效(channel 满时不丢已缓冲的消息)
|
||
func TestSetConditions(t *testing.T) {
|
||
b := NewBroker()
|
||
defer b.close()
|
||
b.setConditions(5)
|
||
|
||
ch, _ := b.subscribe("cap-test")
|
||
|
||
for i := 0; i < 5; i++ {
|
||
if err := b.publish("cap-test", i); err != nil {
|
||
t.Fatalf("publish %d: %v", i, err)
|
||
}
|
||
}
|
||
|
||
for want := 0; want < 5; want++ {
|
||
got := recv(ch, time.Second)
|
||
if got != want {
|
||
t.Fatalf("msg[%d]: want %d, got %v", want, want, got)
|
||
}
|
||
}
|
||
}
|
||
|
||
// TestClientWrapper Client 封装与 Broker 行为一致
|
||
func TestClientWrapper(t *testing.T) {
|
||
c := NewClient()
|
||
defer c.Close()
|
||
c.SetConditions(10)
|
||
|
||
ch, err := c.Subscribe("wrap")
|
||
if err != nil {
|
||
t.Fatalf("Subscribe: %v", err)
|
||
}
|
||
|
||
c.Publish("wrap", "ok")
|
||
|
||
got := recv(ch, time.Second)
|
||
if got != "ok" {
|
||
t.Fatalf("want %q got %v", "ok", got)
|
||
}
|
||
}
|
||
|
||
// TestGetPayload GetPayload 在 channel 关闭时返回 nil 而非阻塞
|
||
func TestGetPayload(t *testing.T) {
|
||
c := NewClient()
|
||
ch, _ := c.Subscribe("gp")
|
||
c.Publish("gp", "payload")
|
||
|
||
got := c.GetPayload(ch)
|
||
if got != "payload" {
|
||
t.Fatalf("want %q got %v", "payload", got)
|
||
}
|
||
}
|
||
|
||
// TestPendingBufferMultipleTopics 多个 topic 的缓冲互不干扰
|
||
func TestPendingBufferMultipleTopics(t *testing.T) {
|
||
b := NewBroker()
|
||
defer b.close()
|
||
|
||
for i := 0; i < 5; i++ {
|
||
topic := fmt.Sprintf("topic-%d", i)
|
||
b.publish(topic, i*10)
|
||
}
|
||
|
||
for i := 0; i < 5; i++ {
|
||
topic := fmt.Sprintf("topic-%d", i)
|
||
ch, _ := b.subscribe(topic)
|
||
got := recv(ch, time.Second)
|
||
if got != i*10 {
|
||
t.Fatalf("topic-%d: want %d, got %v", i, i*10, got)
|
||
}
|
||
}
|
||
}
|