Browse Source

add CheckConnectOkFunc

Joyit 8 hours ago
parent
commit
44169c18b7
3 changed files with 77 additions and 56 deletions
  1. 2 0
      README.md
  2. 72 56
      hub.go
  3. 3 0
      type.go

+ 2 - 0
README.md

@@ -21,6 +21,8 @@
 
 ## 问题与优化
 
+- 断线时命令在还没有超时的情况下,要等待重连后发送
+- 优化命令匹配的算法,当有数量级的连接时需要考虑
 - 建立内存池来分配内存,减少内存碎片
 - 同地址多连接共存,使用不同的连接发送消息,减少延时,提高消息送达可靠性
 

+ 72 - 56
hub.go

@@ -52,6 +52,9 @@ type Hub struct {
 	// 连接状态变化时调用的函数
 	connectStatusFunc ConnectStatusFunc
 
+	// 验证发送的数据条件是否满足 (可为空)
+	checkConnectOkFunc CheckConnectOkFunc
+
 	// 上次清理异常连接时间戳
 	lastCleanDeadConnect int64
 }
@@ -254,65 +257,75 @@ func (h *Hub) sendRequest(gd *GetData) (count int) {
 		log.Println(err)
 		return 0
 	}
-	h.connects.Range(func(key, _ any) bool {
-		conn := key.(*Line)
-		// 检查连接是否OK
-		if conn.state != Connected {
-			return true
-		}
-		// if gd.Channel.MatchString(conn.channel) {
-		if gd.Filter(conn) {
-			var id uint16
-			if gd.backchan != nil {
-				id = h.GetID()
-				timeout := gd.Timeout
-				if timeout <= 0 {
-					timeout = h.cf.WriteWait
-				}
-				fn := func(id uint16, conn *Line) func() {
-					return func() {
-						go h.outResponse(&ResponseData{
-							Id:    id,
-							State: config.GET_TIMEOUT,
-							Data:  fmt.Appendf(nil, "[%s] %s %s", config.GET_TIMEOUT_MSG, conn.channel, gd.Cmd),
-							conn:  conn,
-						})
-						// 检查是否已经很久时间没有使用连接了
-						if time.Since(conn.lastRead) > time.Duration(h.cf.PingInterval*3*int(time.Millisecond)) {
-							// 超时关闭当前的连接
-							log.Println("get message timeout", conn.channel)
-							// 有可能连接出现问题,断开并重新连接
-							conn.Close(false)
-							return
+	// 增加如果没有发送到消息,延时100ms重连直到超时
+	for i := 0; i < gd.Timeout; i += 100 {
+		h.connects.Range(func(key, _ any) bool {
+			conn := key.(*Line)
+			// 检查连接是否OK
+			if conn.state != Connected {
+				return true
+			}
+			// 验证连接是否达到发送数据的要求
+			if h.checkConnectOkFunc != nil && !h.checkConnectOkFunc(conn, gd) {
+				return true
+			}
+			if gd.Filter(conn) {
+				var id uint16
+				if gd.backchan != nil {
+					id = h.GetID()
+					timeout := gd.Timeout
+					if timeout <= 0 {
+						timeout = h.cf.WriteWait
+					}
+					fn := func(id uint16, conn *Line) func() {
+						return func() {
+							go h.outResponse(&ResponseData{
+								Id:    id,
+								State: config.GET_TIMEOUT,
+								Data:  fmt.Appendf(nil, "[%s] %s %s", config.GET_TIMEOUT_MSG, conn.channel, gd.Cmd),
+								conn:  conn,
+							})
+							// 检查是否已经很久时间没有使用连接了
+							if time.Since(conn.lastRead) > time.Duration(h.cf.PingInterval*3*int(time.Millisecond)) {
+								// 超时关闭当前的连接
+								log.Println("get message timeout", conn.channel)
+								// 有可能连接出现问题,断开并重新连接
+								conn.Close(false)
+								return
+							}
 						}
+					}(id, conn)
+					// 将要发送的请求缓存
+					gm := &GetMsg{
+						out:   gd.backchan,
+						timer: time.AfterFunc(time.Millisecond*time.Duration(timeout), fn),
 					}
-				}(id, conn)
-				// 将要发送的请求缓存
-				gm := &GetMsg{
-					out:   gd.backchan,
-					timer: time.AfterFunc(time.Millisecond*time.Duration(timeout), fn),
+					h.msgCache.Store(id, gm)
+				}
+				// 组织数据并发送到Connect
+				conn.sendRequest <- &RequestData{
+					Id:       id,
+					Cmd:      gd.Cmd,
+					Data:     outData,
+					timeout:  gd.Timeout,
+					backchan: gd.backchan,
+					conn:     conn,
+				}
+				if h.cf.PrintMsg {
+					log.Println("[SEND]->", id, conn.channel, "["+gd.Cmd+"]", subStr(string(outData), 200))
+				}
+				count++
+				if gd.Max > 0 && count >= gd.Max {
+					return false
 				}
-				h.msgCache.Store(id, gm)
-			}
-			// 组织数据并发送到Connect
-			conn.sendRequest <- &RequestData{
-				Id:       id,
-				Cmd:      gd.Cmd,
-				Data:     outData,
-				timeout:  gd.Timeout,
-				backchan: gd.backchan,
-				conn:     conn,
-			}
-			if h.cf.PrintMsg {
-				log.Println("[SEND]->", id, conn.channel, "["+gd.Cmd+"]", subStr(string(outData), 200))
-			}
-			count++
-			if gd.Max > 0 && count >= gd.Max {
-				return false
 			}
+			return true
+		})
+		if count > 0 {
+			break
 		}
-		return true
-	})
+		time.Sleep(time.Millisecond * 100)
+	}
 	return
 }
 
@@ -407,7 +420,7 @@ func (h *Hub) GetWithMaxAndTimeout(filter FilterFunc, cmd string, data any, back
 		return 0
 	}
 	if timeout <= 0 {
-		timeout = h.cf.ReadWait
+		timeout = h.cf.WriteWait
 	}
 	gd := &GetData{
 		Filter:   filter,
@@ -513,7 +526,7 @@ func (h *Hub) PushWithMax(filter FilterFunc, cmd string, data any, max int) {
 		Cmd:      cmd,
 		Data:     data,
 		Max:      max,
-		Timeout:  h.cf.ReadWait,
+		Timeout:  h.cf.WriteWait,
 		backchan: nil,
 	}
 	h.sendRequest(gd)
@@ -821,6 +834,8 @@ func NewHub(
 	checkAuthFunc CheckAuthFunc,
 	// 连接状态变化时调用的函数
 	connectStatusFunc ConnectStatusFunc,
+	// 验证发送的数据条件是否满足 (可为空)
+	checkConnectOkFunc CheckConnectOkFunc,
 ) (h *Hub) {
 	h = &Hub{
 		cf:                   cf,
@@ -831,6 +846,7 @@ func NewHub(
 		authFunc:             authFunc,
 		checkAuthFunc:        checkAuthFunc,
 		connectStatusFunc:    connectStatusFunc,
+		checkConnectOkFunc:   checkConnectOkFunc,
 		lastCleanDeadConnect: time.Now().UnixMilli(),
 	}
 	go h.checkProxyConnect()

+ 3 - 0
type.go

@@ -174,3 +174,6 @@ type AuthFunc func(proto string, version uint8, channel string, remoteAuth []byt
 
 // 认证合法性函数
 type CheckAuthFunc func(proto string, version uint8, channel string, auth []byte) bool
+
+// 验证发送的数据条件是否满足
+type CheckConnectOkFunc = func(line *Line, data *GetData) bool