Joyit 2 сар өмнө
parent
commit
7fad451a8a
3 өөрчлөгдсөн 155 нэмэгдсэн , 106 устгасан
  1. 145 93
      hub.go
  2. 4 2
      mapx.go
  3. 6 11
      type.go

+ 145 - 93
hub.go

@@ -241,73 +241,78 @@ func (h *Hub) sendRequest(gd *GetData) (count int) {
 		log.Println(err)
 		return 0
 	}
-	// 如果没有发送到消息,延时100ms重连直到超时
-	for i := 0; i <= gd.Timeout; i += 100 {
-		h.lines.Range(func(id int, line *Line) bool {
-			// 检查连接是否OK
-			if line.state != Connected {
-				return true
-			}
-			// 验证连接是否达到发送数据的要求
-			if h.checkConnectOkFunc != nil && !h.checkConnectOkFunc(line, gd) {
-				return true
-			}
-			if gd.Filter(line) {
-				var id uint16
-				if gd.backchan != nil {
-					id = h.GetID()
-					timeout := gd.Timeout
-					if timeout <= 0 {
-						timeout = h.cf.WriteWait
-					}
-					fn := func(id uint16, conn *Line) func() {
-						return func() {
-							go h.outResponse(&ResponseData{
-								Id:    id,
-								State: config.GET_TIMEOUT,
-								Data:  fmt.Appendf(nil, "[%s] %s %s", config.GET_TIMEOUT_MSG, conn.channel, gd.Cmd),
-								conn:  conn,
-							})
-							// 检查是否已经很久时间没有使用连接了
-							if time.Since(conn.lastRead) > time.Duration(h.cf.PingInterval*3*int(time.Millisecond)) {
-								// 超时关闭当前的连接
-								log.Println("get message timeout", conn.channel)
-								// 有可能连接出现问题,断开并重新连接
-								conn.Close(false)
-								return
-							}
+	doit := func(_ int, line *Line) bool {
+		// 检查连接是否OK
+		if line.state != Connected {
+			return true
+		}
+		// 验证连接是否达到发送数据的要求
+		if h.checkConnectOkFunc != nil && !h.checkConnectOkFunc(line, gd) {
+			return true
+		}
+		if gd.Filter(line) {
+			var id uint16
+			if gd.backchan != nil {
+				id = h.GetID()
+				timeout := gd.Timeout
+				if timeout <= 0 {
+					timeout = h.cf.WriteWait
+				}
+				fn := func(id uint16, conn *Line) func() {
+					return func() {
+						go h.outResponse(&ResponseData{
+							Id:    id,
+							State: config.GET_TIMEOUT,
+							Data:  fmt.Appendf(nil, "[%s] %s %s", config.GET_TIMEOUT_MSG, conn.channel, gd.Cmd),
+							conn:  conn,
+						})
+						// 检查是否已经很久时间没有使用连接了
+						if time.Since(conn.lastRead) > time.Duration(h.cf.PingInterval*3*int(time.Millisecond)) {
+							// 超时关闭当前的连接
+							log.Println("get message timeout", conn.channel)
+							// 有可能连接出现问题,断开并重新连接
+							conn.Close(false)
+							return
 						}
-					}(id, line)
-					// 将要发送的请求缓存
-					gm := &GetMsg{
-						out:   gd.backchan,
-						timer: time.AfterFunc(time.Millisecond*time.Duration(timeout), fn),
 					}
-					h.msgCache.Store(id, gm)
-				}
-				// 组织数据并发送到Connect
-				line.sendRequest <- &RequestData{
-					Id:       id,
-					Cmd:      gd.Cmd,
-					Data:     outData,
-					timeout:  gd.Timeout,
-					backchan: gd.backchan,
-					conn:     line,
-				}
-				if h.cf.PrintMsg {
-					log.Println("[SEND]->", id, line.channel, "["+gd.Cmd+"]", subStr(string(outData), 200))
-				}
-				count++
-				if gd.Max > 0 && count >= gd.Max {
-					return false
+				}(id, line)
+				// 将要发送的请求缓存
+				gm := &GetMsg{
+					out:   gd.backchan,
+					timer: time.AfterFunc(time.Millisecond*time.Duration(timeout), fn),
 				}
+				h.msgCache.Store(id, gm)
 			}
-			return true
-		})
+			// 组织数据并发送到Connect
+			line.sendRequest <- &RequestData{
+				Id:       id,
+				Cmd:      gd.Cmd,
+				Data:     outData,
+				timeout:  gd.Timeout,
+				backchan: gd.backchan,
+				conn:     line,
+			}
+			if h.cf.PrintMsg {
+				log.Println("[SEND]->", id, line.channel, "["+gd.Cmd+"]", subStr(string(outData), 200))
+			}
+			count++
+			if gd.Max > 0 && count >= gd.Max {
+				return false
+			}
+		}
+		return true
+	}
+	// 如果没有发送到消息,延时重连直到超时
+	for i := 0; i <= gd.Timeout; i += 500 {
+		if gd.Rand {
+			h.lines.RandRange(doit, i == 0)
+		} else {
+			h.lines.Range(doit)
+		}
 		if count > 0 {
 			break
 		}
-		time.Sleep(time.Millisecond * 100)
+		time.Sleep(time.Millisecond * 500)
 	}
 	return
 }
@@ -396,22 +401,31 @@ func (h *Hub) requestFromNet(request *RequestData) {
 // 当前调用将会阻塞,直到命令都执行结束,最后返回执行的数量
 // 如果 backFunc 返回为 false 则提前结束
 // 最大数量和超时时间如果为0的话表示使用默认值
-func (h *Hub) GetWithMaxAndTimeout(filter FilterFunc, cmd string, data any, backFunc GetBackFunc, max int, timeout int) (count int) {
-	// 排除空频道
-	if filter == nil {
+func (h *Hub) GetWithStruct(gd *GetData, backFunc GetBackFunc) (count int) {
+	if gd.Filter == nil {
 		return 0
 	}
-	if timeout <= 0 {
-		timeout = h.cf.WriteWait
-	}
-	gd := &GetData{
-		Filter:   filter,
-		Cmd:      cmd,
-		Data:     data,
-		Max:      max,
-		Timeout:  timeout,
-		backchan: make(chan *ResponseData, 32),
-	}
+	if gd.Timeout <= 0 {
+		gd.Timeout = h.cf.WriteWait
+	}
+	if gd.backchan == nil {
+		gd.backchan = make(chan *ResponseData, 32)
+	}
+	// // 排除空频道
+	// if filter == nil {
+	// 	return 0
+	// }
+	// if timeout <= 0 {
+	// 	timeout = h.cf.WriteWait
+	// }
+	// gd := &GetData{
+	// 	Filter:   filter,
+	// 	Cmd:      cmd,
+	// 	Data:     data,
+	// 	Max:      max,
+	// 	Timeout:  timeout,
+	// 	backchan: make(chan *ResponseData, 32),
+	// }
 	sendMax := h.sendRequest(gd)
 	if sendMax <= 0 {
 		return 0
@@ -457,39 +471,77 @@ func (h *Hub) GetWithMaxAndTimeout(filter FilterFunc, cmd string, data any, back
 // 当前调用将会阻塞,直到命令都执行结束,最后返回执行的数量
 // 如果 backFunc 返回为 false 则提前结束
 func (h *Hub) Get(filter FilterFunc, cmd string, data any, backFunc GetBackFunc) (count int) {
-	return h.GetWithMaxAndTimeout(filter, cmd, data, backFunc, 0, 0)
+	return h.GetWithStruct(&GetData{
+		Filter: filter,
+		Cmd:    cmd,
+		Data:   data,
+	}, backFunc)
 }
 
-// 只获取一个频道的数据,阻塞等待到默认超时间隔
-// 如果没有结果将返回 NO_MATCH
-func (h *Hub) GetOne(filter FilterFunc, cmd string, data any) (response *ResponseData) {
-	h.GetWithMaxAndTimeout(filter, cmd, data, func(rp *ResponseData) (ok bool) {
+// 获取一个数据,阻塞等待到超时间隔
+func (h *Hub) GetOneWithStruct(gd *GetData) (response *ResponseData) {
+	if gd.Filter == nil {
+		return &ResponseData{
+			State: config.CONNECT_NO_MATCH,
+			Data:  fmt.Appendf(nil, "[%s] not filter function", config.CONNECT_NO_MATCH_MSG),
+		}
+	}
+	gd.Max = 1
+	h.GetWithStruct(gd, func(rp *ResponseData) (ok bool) {
 		response = rp
 		return false
-	}, 1, 0)
+	})
 	if response == nil {
-		response = &ResponseData{
+		return &ResponseData{
 			State: config.CONNECT_NO_MATCH,
-			Data:  fmt.Appendf(nil, "[%s] %s", config.CONNECT_NO_MATCH_MSG, cmd),
+			Data:  fmt.Appendf(nil, "[%s] %s", config.CONNECT_NO_MATCH_MSG, gd.Cmd),
 		}
 	}
 	return
 }
 
+// 只获取一个频道的数据,阻塞等待到默认超时间隔
+// 如果没有结果将返回 NO_MATCH
+func (h *Hub) GetOne(filter FilterFunc, cmd string, data any) (response *ResponseData) {
+	return h.GetOneWithStruct(&GetData{
+		Filter: filter,
+		Cmd:    cmd,
+		Data:   data,
+		Max:    1,
+	})
+}
+
+func (h *Hub) GetRandOne(filter FilterFunc, cmd string, data any) (response *ResponseData) {
+	return h.GetOneWithStruct(&GetData{
+		Filter: filter,
+		Cmd:    cmd,
+		Data:   data,
+		Max:    1,
+		Rand:   true,
+	})
+}
+
 // 只获取一个频道的数据,阻塞等待到指定超时间隔
 // 如果没有结果将返回 NO_MATCH
 func (h *Hub) GetOneWithTimeout(filter FilterFunc, cmd string, data any, timeout int) (response *ResponseData) {
-	h.GetWithMaxAndTimeout(filter, cmd, data, func(rp *ResponseData) (ok bool) {
-		response = rp
-		return false
-	}, 1, timeout)
-	if response == nil {
-		response = &ResponseData{
-			State: config.CONNECT_NO_MATCH,
-			Data:  fmt.Appendf(nil, "[%s] %s", config.CONNECT_NO_MATCH_MSG, cmd),
-		}
-	}
-	return
+	return h.GetOneWithStruct(&GetData{
+		Filter:  filter,
+		Cmd:     cmd,
+		Data:    data,
+		Max:     1,
+		Timeout: timeout,
+	})
+}
+
+func (h *Hub) GetRandOneWithTimeout(filter FilterFunc, cmd string, data any, timeout int) (response *ResponseData) {
+	return h.GetOneWithStruct(&GetData{
+		Filter:  filter,
+		Cmd:     cmd,
+		Data:    data,
+		Max:     1,
+		Timeout: timeout,
+		Rand:    true,
+	})
 }
 
 // 推送消息出去,不需要返回数据

+ 4 - 2
mapx.go

@@ -127,8 +127,10 @@ func (m *Mapx) Rand() {
 }
 
 // 随机遍历
-func (m *Mapx) RandRange(fn func(i int, v *Line) bool) {
-	m.Rand()
+func (m *Mapx) RandRange(fn func(i int, v *Line) bool, rand bool) {
+	if rand {
+		m.Rand()
+	}
 	m.RLock()
 	defer m.RUnlock()
 	for i, v := range m.randLines {

+ 6 - 11
type.go

@@ -25,12 +25,8 @@ type ConnectStatusFunc func(conn *Line)
 // 频道过滤器函数,如果返回true表示成功匹配
 type FilterFunc func(conn *Line) (ok bool)
 
-// 数据获取函数,为了应对需要延迟获取数据的情况
-// type DataFunc func() ([]byte, error)
-
 // 订阅频道数据结构
 type SubscribeData struct {
-	// Channel  *regexp.Regexp    //频道的正则表达式
 	Filter   FilterFunc        // 频道匹配过滤
 	Cmd      string            // 请求的命令
 	BackFunc SubscribeBackFunc //回调函数,如果状态为 NEXT_SUBSCRIBE 将继续下一个频道调用
@@ -38,13 +34,12 @@ type SubscribeData struct {
 
 // 获取数据使用的数据结构
 type GetData struct {
-	// Channel *regexp.Regexp
-	Filter  FilterFunc // 命令匹配过滤
-	Cmd     string
-	Data    any
-	Max     int // 获取数据的频道最多有几个,如果为0表示没有限制
-	Timeout int // 超时时间(毫秒)
-
+	Filter   FilterFunc // 命令匹配过滤
+	Cmd      string
+	Data     any
+	Max      int                // 获取数据的频道最多有几个,如果为0表示没有限制
+	Timeout  int                // 超时时间(毫秒)
+	Rand     bool               // 是否使用随机的数列
 	backchan chan *ResponseData // 获取响应返回的数据
 }