package queue import ( "fmt" "testing" "time" ) const topic = "Golang梦工厂" // 一个topic 测试 func TestOnceTopic(t *testing.T) { m := NewClient() defer m.Close() m.SetConditions(10) ch, err := m.Subscribe(topic) if err != nil { fmt.Println("subscribe failed") return } go OncePub(m) OnceSub(ch, m) } // 定时推送 func OncePub(c *Client) { t := time.NewTicker(10 * time.Second) defer t.Stop() for { select { case <-t.C: err := c.Publish(topic, "test messs") if err != nil { fmt.Println("pub message failed") } default: } } } // 接受订阅消息 func OnceSub(m <-chan interface{}, c *Client) { for { val := c.GetPayload(m) fmt.Printf("get message is %s\n", val) } } //多个topic测试 func TestManyTopic(t *testing.T) { m := NewClient() defer m.Close() m.SetConditions(10) top := "" for i := 0; i < 10; i++ { top = fmt.Sprintf("Golang梦工厂_%02d", i) go Sub(m, top) } ManyPub(m) } func ManyPub(c *Client) { t := time.NewTicker(10 * time.Second) defer t.Stop() for { select { case <-t.C: for i := 0; i < 10; i++ { //多个topic 推送不同的消息 top := fmt.Sprintf("Golang梦工厂_%02d", i) payload := fmt.Sprintf("asong真帅_%02d", i) err := c.Publish(top, payload) if err != nil { fmt.Println("pub message failed") } } default: } } } func Sub(c *Client, top string) { ch, err := c.Subscribe(top) if err != nil { fmt.Printf("sub top:%s failed\n", top) } for { val := c.GetPayload(ch) if val != nil { fmt.Printf("%s get message is %s\n", top, val) } } }