Procházet zdrojové kódy

change hub interface

Joyit před 1 měsícem
rodič
revize
fd25bff185
4 změnil soubory, kde provedl 121 přidání a 87 odebrání
  1. 11 21
      examples/client-tpv2.go
  2. 14 21
      examples/client-wsv2.go
  3. 16 25
      examples/server.go
  4. 80 20
      hub.go

+ 11 - 21
examples/client-tpv2.go

@@ -41,22 +41,16 @@ func main() {
 	})
 
 	// 订阅频道
-	hub.Subscribe(&tinymq.SubscribeData{
-		Channel: regexp.MustCompile("/tinymq/server"),
-		Cmd:     "hello",
-		BackFunc: func(request *tinymq.RequestData) (state uint8, result []byte) {
-			log.Println("[client RECV]<-", string(request.Data))
-			return 1, []byte("tiny client")
-		},
-	})
-	hub.Subscribe(&tinymq.SubscribeData{
-		Channel: regexp.MustCompile("/tinymq/server"),
-		Cmd:     "nodata",
-		BackFunc: func(request *tinymq.RequestData) (state uint8, result []byte) {
-			log.Println("[client RECV]<-", string(request.Data))
-			return 1, nil
-		},
-	})
+	hub.Subscribe(regexp.MustCompile("/tinymq/server"), "hello", func(request *tinymq.RequestData) (state uint8, result []byte) {
+		log.Println("[client RECV]<-", string(request.Data))
+		return 1, []byte("tiny client")
+	},
+	)
+	hub.Subscribe(regexp.MustCompile("/tinymq/server"), "nodata", func(request *tinymq.RequestData) (state uint8, result []byte) {
+		log.Println("[client RECV]<-", string(request.Data))
+		return 1, nil
+	},
+	)
 
 	err := hub.ConnectToServer("/tinymq/server", true)
 	if err != nil {
@@ -64,11 +58,7 @@ func main() {
 	}
 
 	// 获取信息
-	rsp := hub.GetOne(&tinymq.GetData{
-		Channel: regexp.MustCompile("/tinymq/server"),
-		Cmd:     "hello",
-		Data:    []byte("hello from client"),
-	})
+	rsp := hub.GetOne(regexp.MustCompile("/tinymq/server"), "hello", []byte("hello from client"))
 	if rsp.State != config.STATE_OK {
 		log.Println("error state:", rsp.State)
 		return

+ 14 - 21
examples/client-wsv2.go

@@ -44,22 +44,16 @@ func main() {
 	})
 
 	// 订阅频道
-	hub.Subscribe(&tinymq.SubscribeData{
-		Channel: regexp.MustCompile(remoteChannel),
-		Cmd:     "hello",
-		BackFunc: func(request *tinymq.RequestData) (state uint8, result []byte) {
-			log.Println("[client RECV]<-", string(request.Data))
-			return 1, []byte("tiny client")
-		},
-	})
-	hub.Subscribe(&tinymq.SubscribeData{
-		Channel: regexp.MustCompile(remoteChannel),
-		Cmd:     "nodata",
-		BackFunc: func(request *tinymq.RequestData) (state uint8, result []byte) {
-			log.Println("[client RECV]<-", string(request.Data))
-			return 1, nil
-		},
-	})
+	hub.Subscribe(regexp.MustCompile(remoteChannel), "hello", func(request *tinymq.RequestData) (state uint8, result []byte) {
+		log.Println("[client RECV]<-", string(request.Data))
+		return 1, []byte("tiny client")
+	},
+	)
+	hub.Subscribe(regexp.MustCompile(remoteChannel), "nodata", func(request *tinymq.RequestData) (state uint8, result []byte) {
+		log.Println("[client RECV]<-", string(request.Data))
+		return 1, nil
+	},
+	)
 
 	err := hub.ConnectToServer(remoteChannel, true)
 	if err != nil {
@@ -67,17 +61,16 @@ func main() {
 	}
 
 	// 获取信息
-	rsp := hub.GetOne(&tinymq.GetData{
-		Channel: regexp.MustCompile(remoteChannel),
-		Cmd:     "hello",
-		Data:    []byte("hello from client"),
-	})
+	rsp := hub.GetOne(regexp.MustCompile(remoteChannel), "hello", []byte("hello from client"))
 	if rsp.State != config.STATE_OK {
 		log.Println("error state:", rsp.State)
 		return
 	}
 	log.Println("[RESULT]<-", string(rsp.Data))
 
+	time.Sleep(time.Second * 5)
+	hub.Push(regexp.MustCompile(remoteChannel), "hello", []byte(time.Now().GoString()))
+
 	time.Sleep(time.Second * 300)
 	log.Println("client exit")
 }

+ 16 - 25
examples/server.go

@@ -39,21 +39,16 @@ func main() {
 		}, func(conn *tinymq.Line) {
 			log.Println("[Connect state change]", conn.Channel(), conn.State(), conn.Started(), conn.Updated())
 			if conn.State() == tinymq.Connected {
-				go hub.Get(&tinymq.GetData{
-					Channel: regexp.MustCompile(remoteChannel),
-					Cmd:     "hello",
-					Data:    []byte("hello from server push"),
-				}, func(response *tinymq.ResponseData) (ok bool) {
-					log.Println("[hello response]", response.State, string(response.Data))
-					return true
-				})
-				go hub.Get(&tinymq.GetData{
-					Channel: regexp.MustCompile(remoteChannel),
-					Cmd:     "nodata",
-				}, func(response *tinymq.ResponseData) (ok bool) {
-					log.Println("[nodata response]", response.State, string(response.Data))
-					return true
-				})
+				go hub.Get(regexp.MustCompile(remoteChannel), "hello", []byte("hello from server push"),
+					func(response *tinymq.ResponseData) (ok bool) {
+						log.Println("[hello response]", response.State, string(response.Data))
+						return true
+					})
+				go hub.Get(regexp.MustCompile(remoteChannel), "nodata", nil,
+					func(response *tinymq.ResponseData) (ok bool) {
+						log.Println("[nodata response]", response.State, string(response.Data))
+						return true
+					})
 			}
 		},
 	)
@@ -90,22 +85,18 @@ func main() {
 	hub.BindForServer(bindInfo)
 
 	// 订阅频道
-	hub.Subscribe(&tinymq.SubscribeData{
-		Channel: regexp.MustCompile(remoteChannel),
-		Cmd:     "hello",
-		BackFunc: func(request *tinymq.RequestData) (state uint8, result []byte) {
+	hub.Subscribe(regexp.MustCompile(remoteChannel), "hello",
+		func(request *tinymq.RequestData) (state uint8, result []byte) {
 			log.Println("[server RECV]<-", string(request.Data))
 			return 1, []byte("tiny server")
 		},
-	})
-	hub.Subscribe(&tinymq.SubscribeData{
-		Channel: regexp.MustCompile(remoteChannel),
-		Cmd:     "nodata",
-		BackFunc: func(request *tinymq.RequestData) (state uint8, result []byte) {
+	)
+	hub.Subscribe(regexp.MustCompile(remoteChannel), "nodata",
+		func(request *tinymq.RequestData) (state uint8, result []byte) {
 			log.Println("[server RECV]<-", string(request.Data))
 			return 1, nil
 		},
-	})
+	)
 
 	// log.Fatal(http.ListenAndServe(net.JoinHostPort(bindwsv2Info.Bind, strconv.Itoa(int(bindwsv2Info.Port))), nil))
 

+ 80 - 20
hub.go

@@ -6,6 +6,7 @@ import (
 	"log"
 	"math/rand"
 	"net"
+	"regexp"
 	"strconv"
 	"strings"
 	"sync"
@@ -92,11 +93,15 @@ func (h *Hub) GetID() uint16 {
 }
 
 // 注册频道,其中频道为正则表达式字符串
-func (h *Hub) Subscribe(reg *SubscribeData) (err error) {
-	if reg.Channel == nil {
+func (h *Hub) Subscribe(channel *regexp.Regexp, cmd string, backFunc SubscribeBack) (err error) {
+	if channel == nil {
 		return errors.New("channel can not be nil")
 	}
-	cmd := reg.Cmd
+	reg := &SubscribeData{
+		Channel:  channel,
+		Cmd:      cmd,
+		BackFunc: backFunc,
+	}
 	sub, ok := h.subscribes.Load(cmd)
 	if ok {
 		h.subscribes.Store(cmd, append(sub.([]*SubscribeData), reg))
@@ -298,19 +303,25 @@ func (h *Hub) requestFromNet(request *RequestData) {
 // 请求频道并获取数据,采用回调的方式返回结果
 // 当前调用将会阻塞,直到命令都执行结束,最后返回执行的数量
 // 如果 backFunc 返回为 false 则提前结束
-func (h *Hub) Get(gd *GetData, backFunc GetBack) (count int) {
+// 最大数量和超时时间如果为0的话表示使用默认值
+func (h *Hub) GetX(channel *regexp.Regexp, cmd string, data []byte, backFunc GetBack, max int, timeout int) (count int) {
 	// 排除空频道
-	if gd.Channel == nil {
+	if channel == nil {
 		return 0
 	}
-	if gd.Timeout <= 0 {
-		gd.Timeout = h.cf.ReadWait
+	if timeout <= 0 {
+		timeout = h.cf.ReadWait
 	}
-	if gd.backchan == nil {
-		gd.backchan = make(chan *ResponseData, 32)
+	gd := &GetData{
+		Channel:  channel,
+		Cmd:      cmd,
+		Data:     data,
+		Max:      max,
+		Timeout:  timeout,
+		backchan: make(chan *ResponseData, 32),
 	}
-	max := h.sendRequest(gd)
-	if max <= 0 {
+	sendMax := h.sendRequest(gd)
+	if sendMax <= 0 {
 		return 0
 	}
 	// 避免出现异常时线程无法退出
@@ -338,7 +349,7 @@ func (h *Hub) Get(gd *GetData, backFunc GetBack) (count int) {
 			if backFunc != nil && !backFunc(rp) {
 				return
 			}
-			if count >= max {
+			if count >= sendMax {
 				return
 			}
 		case <-timer.C:
@@ -348,14 +359,36 @@ func (h *Hub) Get(gd *GetData, backFunc GetBack) (count int) {
 	// return
 }
 
-// 只获取一个频道的数据,阻塞等待
+// 请求频道并获取数据,采用回调的方式返回结果
+// 当前调用将会阻塞,直到命令都执行结束,最后返回执行的数量
+// 如果 backFunc 返回为 false 则提前结束
+func (h *Hub) Get(channel *regexp.Regexp, cmd string, data []byte, backFunc GetBack) (count int) {
+	return h.GetX(channel, cmd, data, backFunc, 0, 0)
+}
+
+// 只获取一个频道的数据,阻塞等待到默认超时间隔
 // 如果没有结果将返回 NO_MATCH
-func (h *Hub) GetOne(cmd *GetData) (response *ResponseData) {
-	cmd.Max = 1
-	h.Get(cmd, func(rp *ResponseData) (ok bool) {
+func (h *Hub) GetOne(channel *regexp.Regexp, cmd string, data []byte) (response *ResponseData) {
+	h.GetX(channel, cmd, data, func(rp *ResponseData) (ok bool) {
 		response = rp
 		return false
-	})
+	}, 1, 0)
+	if response == nil {
+		response = &ResponseData{
+			State: config.NO_MATCH,
+			Data:  []byte(config.NO_MATCH_MSG),
+		}
+	}
+	return
+}
+
+// 只获取一个频道的数据,阻塞等待到指定超时间隔
+// 如果没有结果将返回 NO_MATCH
+func (h *Hub) GetOneX(channel *regexp.Regexp, cmd string, data []byte, timeout int) (response *ResponseData) {
+	h.GetX(channel, cmd, data, func(rp *ResponseData) (ok bool) {
+		response = rp
+		return false
+	}, 1, timeout)
 	if response == nil {
 		response = &ResponseData{
 			State: config.NO_MATCH,
@@ -366,9 +399,36 @@ func (h *Hub) GetOne(cmd *GetData) (response *ResponseData) {
 }
 
 // 推送消息出去,不需要返回数据
-func (h *Hub) Push(cmd *GetData) {
-	cmd.backchan = nil
-	h.sendRequest(cmd)
+func (h *Hub) Push(channel *regexp.Regexp, cmd string, data []byte) {
+	// 排除空频道
+	if channel == nil {
+		return
+	}
+	gd := &GetData{
+		Channel:  channel,
+		Cmd:      cmd,
+		Data:     data,
+		Timeout:  h.cf.ReadWait,
+		backchan: nil,
+	}
+	h.sendRequest(gd)
+}
+
+// 推送最大对应数量的消息出去,不需要返回数据
+func (h *Hub) PushX(channel *regexp.Regexp, cmd string, data []byte, max int) {
+	// 排除空频道
+	if channel == nil {
+		return
+	}
+	gd := &GetData{
+		Channel:  channel,
+		Cmd:      cmd,
+		Data:     data,
+		Max:      max,
+		Timeout:  h.cf.ReadWait,
+		backchan: nil,
+	}
+	h.sendRequest(gd)
 }
 
 // 增加连接