Joyit 2 сар өмнө
parent
commit
bc27882ab2
4 өөрчлөгдсөн 85 нэмэгдсэн , 78 устгасан
  1. 2 1
      README.md
  2. 28 10
      config/config.go
  3. 1 4
      examples/server.go
  4. 54 63
      hub.go

+ 2 - 1
README.md

@@ -28,13 +28,14 @@
 
 ## 问题与优化
 
-- 随机频道获取
+- 出现断线没有自动重连的问题
 - 优化命令匹配的算法,当有数量级的连接时需要考虑;如使用固定 ID,自动生成匹配缓存等方式
 - 建立内存池来分配内存,减少内存碎片
 - 同地址多连接共存,使用不同的连接发送消息,减少延时,提高消息送达可靠性
 
 ## 已经解决的问题
 
+- 随机频道获取
 - 断线时命令在还没有超时的情况下,要等待重连后发送
 - 增加 gzip 的功能,只需要压缩数据部分
 - 转发地址定时测试切换回到主服务节点

+ 28 - 10
config/config.go

@@ -4,26 +4,42 @@ const (
 	// 系统错误号定义,最低号为110,最高127
 	MIN_SYSTEM_ERROR_CODE = 110 // 系统信息最小值
 	NEXT_SUBSCRIBE        = 111
-	NEXT_SUBSCRIBE_MSG    = "NEXT SUBSCRIBE"
 	NEXT_MIDDLE           = 112
-	NEXT_MIDDLE_MSG       = "NEXT MIDDLE"
 	CONVERT_FAILED        = 113
-	CONVERT_FAILED_MSG    = "CONVERT FAILED"
 	FORBIDDEN             = 120
-	FORBIDDEN_MSG         = "FORBIDDEN"
 	SYSTEM_ERROR          = 123
-	SYSTEM_ERROR_MSG      = "SYSTEM ERROR"
 	GET_TIMEOUT           = 124
-	GET_TIMEOUT_MSG       = "GET TIMEOUT"
 	CONNECT_NO_MATCH      = 125
-	CONNECT_NO_MATCH_MSG  = "CONNECT NO MATCH"
 	CONNECT_END           = 126
-	CONNECT_END_MSG       = "CONNECT END"
 	NO_MATCH              = 127
-	NO_MATCH_MSG          = "NO MATCH"
 	MAX_SYSTEM_ERROR_CODE = 127 //系统信息最大值
 )
 
+// 转换 id 到对应的消息
+func IdMsg(id int) string {
+	switch id {
+	case NEXT_SUBSCRIBE:
+		return "NEXT SUBSCRIBE"
+	case NEXT_MIDDLE:
+		return "NEXT MIDDLE"
+	case CONVERT_FAILED:
+		return "CONVERT FAILED"
+	case FORBIDDEN:
+		return "FORBIDDEN"
+	case SYSTEM_ERROR:
+		return "SYSTEM ERROR"
+	case GET_TIMEOUT:
+		return "GET TIMEOUT"
+	case CONNECT_NO_MATCH:
+		return "CONNECT NO MATCH"
+	case CONNECT_END:
+		return "CONNECT END"
+	case NO_MATCH:
+		return "NO MATCH"
+	}
+	return "UNKNOWN"
+}
+
 // 定义成功与失败的值
 const STATE_OK = 1
 const STATE_FAILED = 0
@@ -45,7 +61,8 @@ type Config struct {
 	CleanDeadConnectWait int  // 清理异常的连接(ms)
 	PrintPing            bool // 打印连接ping包
 	PrintMsg             bool // 打印数据包
-	ProxyTimeout         int  // 代理方式下,连接超过这个时间将进行一次直连尝试,如为0表示不启用
+	ProxyTimeout         int  // 客户端检测是否使用代理连接,如果是则进行直连尝试(ms)
+	ConnectCheck         int  // 客户端重连检测周期(ms)
 }
 
 // 获取实例好的配置
@@ -61,5 +78,6 @@ func NewConfig() *Config {
 		PrintPing:            false,
 		PrintMsg:             true,
 		ProxyTimeout:         3600 * 1000,
+		ConnectCheck:         600 * 1000,
 	}
 }

+ 1 - 4
examples/server.go

@@ -4,7 +4,6 @@
 package main
 
 import (
-	"errors"
 	"log"
 	"os"
 	"os/signal"
@@ -24,9 +23,7 @@ func main() {
 	var hub *tinymq.Hub
 
 	hub = tinymq.NewHub(cf, localChannel,
-		func(channel string, hostType tinymq.HostType) (hostInfo *tinymq.HostInfo, err error) {
-			return nil, errors.New("not host found")
-		}, func(proto string, version uint8, channel string, remoteAuth []byte) (auth []byte) {
+		nil, func(proto string, version uint8, channel string, remoteAuth []byte) (auth []byte) {
 			// 从 remoteAuth 是否为空来判断是否需要返回信息
 			if len(remoteAuth) <= 0 {
 				// 客户端调用,返回验证信息

+ 54 - 63
hub.go

@@ -44,17 +44,19 @@ type Hub struct {
 	subscribes sync.Map // [cmd]->[]*SubscribeData   //注册绑定频道的函数,用于响应请求
 	msgCache   sync.Map //  map[uint16]*GetMsg //请求的回应记录,key为id
 
-	// 客户端需要用的函数
+	// 客户端需要用的函数(服务端可为空)
 	connectHostFunc ConnectHostFunc // 获取对应频道的一个连接地址
-	authFunc        AuthFunc        // 获取认证信息,用于发送给对方
 
-	// 服务端需要用的函数
+	// 返回认证信息,发送到对方
+	authFunc AuthFunc // 获取认证信息,用于发送给对方
+
+	// 核对发送过来的认证信息
 	checkAuthFunc CheckAuthFunc // 核对认证是否合法
 
 	// 连接状态变化时调用的函数
 	connectStatusFunc ConnectStatusFunc
 
-	// 验证发送数据条件是否满足 (可为空)
+	// 验证发送数据条件是否满足 (可为空)
 	checkConnectOkFunc CheckConnectOkFunc
 
 	// 上次清理异常连接时间戳
@@ -263,7 +265,7 @@ func (h *Hub) sendRequest(gd *GetData) (count int) {
 						go h.outResponse(&ResponseData{
 							Id:    id,
 							State: config.GET_TIMEOUT,
-							Data:  fmt.Appendf(nil, "[%s] %s %s", config.GET_TIMEOUT_MSG, conn.channel, gd.Cmd),
+							Data:  fmt.Appendf(nil, "[%s] %s %s", config.IdMsg(config.GET_TIMEOUT), conn.channel, gd.Cmd),
 							conn:  conn,
 						})
 						// 检查是否已经很久时间没有使用连接了
@@ -368,7 +370,7 @@ func (h *Hub) requestFromNet(request *RequestData) {
 						if err != nil {
 							log.Println(err.Error())
 							state = config.CONVERT_FAILED
-							byteData = fmt.Appendf(nil, "[%s] %s %s", config.CONVERT_FAILED_MSG, request.conn.channel, request.Cmd)
+							byteData = fmt.Appendf(nil, "[%s] %s %s", config.IdMsg(config.CONVERT_FAILED), request.conn.channel, request.Cmd)
 						}
 					}
 				}
@@ -393,7 +395,7 @@ func (h *Hub) requestFromNet(request *RequestData) {
 	request.conn.sendResponse <- &ResponseData{
 		Id:    request.Id,
 		State: config.NO_MATCH,
-		Data:  fmt.Appendf(nil, "[%s] %s %s", config.NO_MATCH_MSG, channel, cmd),
+		Data:  fmt.Appendf(nil, "[%s] %s %s", config.IdMsg(config.NO_MATCH), channel, cmd),
 	}
 }
 
@@ -411,21 +413,6 @@ func (h *Hub) GetWithStruct(gd *GetData, backFunc GetBackFunc) (count int) {
 	if gd.backchan == nil {
 		gd.backchan = make(chan *ResponseData, 32)
 	}
-	// // 排除空频道
-	// if filter == nil {
-	// 	return 0
-	// }
-	// if timeout <= 0 {
-	// 	timeout = h.cf.WriteWait
-	// }
-	// gd := &GetData{
-	// 	Filter:   filter,
-	// 	Cmd:      cmd,
-	// 	Data:     data,
-	// 	Max:      max,
-	// 	Timeout:  timeout,
-	// 	backchan: make(chan *ResponseData, 32),
-	// }
 	sendMax := h.sendRequest(gd)
 	if sendMax <= 0 {
 		return 0
@@ -483,7 +470,7 @@ func (h *Hub) GetOneWithStruct(gd *GetData) (response *ResponseData) {
 	if gd.Filter == nil {
 		return &ResponseData{
 			State: config.CONNECT_NO_MATCH,
-			Data:  fmt.Appendf(nil, "[%s] not filter function", config.CONNECT_NO_MATCH_MSG),
+			Data:  fmt.Appendf(nil, "[%s] not filter function", config.IdMsg(config.CONNECT_NO_MATCH)),
 		}
 	}
 	gd.Max = 1
@@ -494,7 +481,7 @@ func (h *Hub) GetOneWithStruct(gd *GetData) (response *ResponseData) {
 	if response == nil {
 		return &ResponseData{
 			State: config.CONNECT_NO_MATCH,
-			Data:  fmt.Appendf(nil, "[%s] %s", config.CONNECT_NO_MATCH_MSG, gd.Cmd),
+			Data:  fmt.Appendf(nil, "[%s] %s", config.IdMsg(config.CONNECT_NO_MATCH), gd.Cmd),
 		}
 	}
 	return
@@ -574,23 +561,6 @@ func (h *Hub) addLine(line *Line) {
 		return
 	}
 	h.lines.Store(line)
-	// // 检查是否有相同的channel,如果有的话将其关闭删除
-	// channel := line.channel
-	// h.connects.Range(func(key, _ any) bool {
-	// 	conn := key.(*Line)
-	// 	// 删除超时的连接
-	// 	if conn.state != Connected && conn.host == nil && time.Since(conn.lastRead) > time.Duration(h.cf.PingInterval*5*int(time.Millisecond)) {
-	// 		h.connects.Delete(key)
-	// 		return true
-	// 	}
-	// 	if conn.channel == channel {
-	// 		conn.Close(true)
-	// 		h.connects.Delete(key)
-	// 		return false
-	// 	}
-	// 	return true
-	// })
-	// h.connects.Store(line, true)
 }
 
 // 删除连接
@@ -619,7 +589,6 @@ func (h *Hub) BindForServer(info *HostInfo) (err error) {
 			conn.Close()
 			return
 		}
-		// 检查验证是否合法
 		if !h.checkAuthFunc(proto, version, channel, auth) {
 			conn.Close()
 			return
@@ -670,6 +639,9 @@ func (h *Hub) ConnectToServer(channel string, force bool, host *HostInfo) (err e
 		}
 	}
 	if host == nil {
+		if h.connectHostFunc == nil {
+			return errors.New("not connect host func found")
+		}
 		// 获取服务地址等信息
 		host, err = h.connectHostFunc(channel, Both)
 		if err != nil {
@@ -808,6 +780,10 @@ func (h *Hub) ConnectToServer(channel string, force bool, host *HostInfo) (err e
 // 将会一直阻塞直到连接成功
 func (h *Hub) ConnectToServerX(channel string, force bool, host *HostInfo) {
 	if host == nil {
+		if h.connectHostFunc == nil {
+			log.Println("ConnectToServerX: not connect host func found")
+			return
+		}
 		host, _ = h.connectHostFunc(channel, Direct)
 	}
 	for {
@@ -823,29 +799,43 @@ func (h *Hub) ConnectToServerX(channel string, force bool, host *HostInfo) {
 	}
 }
 
-// 检测处理代理连接
-func (h *Hub) checkProxyConnect() {
-	if h.cf.ProxyTimeout <= 0 {
+// 检测处理连接状态,只在客户端有效
+func (h *Hub) checkConnect() {
+	// 检查客户端获取主机地址函数
+	if h.connectHostFunc == nil {
 		return
 	}
 	proxyTicker := time.NewTicker(time.Duration(h.cf.ProxyTimeout * int(time.Millisecond)))
+	connectTicker := time.NewTicker(time.Millisecond * time.Duration(h.cf.ConnectCheck))
 	for {
-		<-proxyTicker.C
-		now := time.Now().UnixMilli()
-		h.lines.Range(func(id int, line *Line) bool {
-			if line.host != nil && line.host.Proxy && now-line.updated.UnixMilli() > int64(h.cf.ProxyTimeout) {
-				host, err := h.connectHostFunc(line.channel, Direct)
-				if err != nil {
-					log.Println("[checkProxyConnect connectHostFunc ERROR]", err)
-					return false
+		select {
+		case <-proxyTicker.C:
+			now := time.Now().UnixMilli()
+			h.lines.Range(func(id int, line *Line) bool {
+				if line.host != nil && line.host.Proxy && now-line.updated.UnixMilli() > int64(h.cf.ProxyTimeout) {
+					host, err := h.connectHostFunc(line.channel, Direct)
+					if err != nil {
+						log.Println("[proxyTicker connectHostFunc ERROR]", err)
+						return false
+					}
+					err = h.ConnectToServer(line.channel, true, host)
+					if err != nil {
+						log.Println("[checkProxyConnect ConnectToServer WARNING]", err)
+					}
 				}
-				err = h.ConnectToServer(line.channel, true, host)
-				if err != nil {
-					log.Println("[checkProxyConnect ConnectToServer WARNING]", err)
+				return true
+			})
+		case <-connectTicker.C:
+			h.lines.Range(func(id int, line *Line) bool {
+				if line.host != nil && line.state == Disconnected {
+					err := h.ConnectToServer(line.channel, true, nil)
+					if err != nil {
+						log.Println("[connectTicker ConnectToServer WARNING]", err)
+					}
 				}
-			}
-			return true
-		})
+				return true
+			})
+		}
 	}
 }
 
@@ -854,14 +844,15 @@ func (h *Hub) checkProxyConnect() {
 func NewHub(
 	cf *config.Config,
 	channel string,
-	// 客户端需要用的函数
+	// 客户端需要用的函数 (服务端可空)
 	connectHostFunc ConnectHostFunc,
+	// 验证函数,获取认证信息,用于发送给对方
 	authFunc AuthFunc,
-	// 服务端需要用的函数
+	// 核对发送过来的认证信息
 	checkAuthFunc CheckAuthFunc,
 	// 连接状态变化时调用的函数
 	connectStatusFunc ConnectStatusFunc,
-	// 验证发送数据条件是否满足 (可为空)
+	// 验证发送数据条件是否满足 (可为空)
 	checkConnectOkFunc CheckConnectOkFunc,
 ) (h *Hub) {
 	h = &Hub{
@@ -877,6 +868,6 @@ func NewHub(
 		checkConnectOkFunc:   checkConnectOkFunc,
 		lastCleanDeadConnect: time.Now().UnixMilli(),
 	}
-	go h.checkProxyConnect()
+	go h.checkConnect()
 	return h
 }