Joyit před 1 měsícem
rodič
revize
d688aaaac4
7 změnil soubory, kde provedl 73 přidání a 26 odebrání
  1. 5 1
      README.md
  2. 2 0
      config/config.go
  3. 2 2
      examples/client-tcp2.go
  4. 2 2
      examples/client-ws2.go
  5. 1 1
      examples/server.go
  6. 41 8
      hub.go
  7. 20 12
      type.go

+ 5 - 1
README.md

@@ -1,6 +1,7 @@
 # 一款简单的可以跨平台 mq 架构设计
 
 提供服务端和客户端代码,采用多对多的结构,可能方便切换不同的连接方式和不同的连接地址。
+如果直连优先,系统会定时尝试切换连接,直到直连成功。
 
 ## 设计原则
 
@@ -20,7 +21,10 @@
 
 ## 问题与优化
 
-- 增加订阅中间件,处理验证登录等问题
 - 建立内存池来分配内存,减少内存碎片
 - 同地址多连接共存,使用不同的连接发送消息,减少延时,提高消息送达可靠性
 - 转发地址定时测试切换回到主服务节点
+
+## 已经解决的问题
+
+- 增加订阅中间件,处理验证登录等问题

+ 2 - 0
config/config.go

@@ -41,6 +41,7 @@ type Config struct {
 	CleanDeadConnectWait int  // 清理异常的连接(ms)
 	PrintPing            bool // 打印连接ping包
 	PrintMsg             bool // 打印数据包
+	ProxyTimeout         int  // 代理方式下,连接超过这个时间将进行一次直连尝试,如为0表示不启用
 }
 
 // 获取实例好的配置
@@ -55,5 +56,6 @@ func NewConfig() *Config {
 		CleanDeadConnectWait: 3600 * 1000,
 		PrintPing:            true,
 		PrintMsg:             true,
+		ProxyTimeout:         3600 * 1000,
 	}
 }

+ 2 - 2
examples/client-tcp2.go

@@ -23,7 +23,7 @@ func main() {
 		Hash:    "xor:1qaz2wsx3",
 	}
 
-	hub := tinymq.NewHub(cf, channel, func(channel string) (hostInfo *tinymq.HostInfo, err error) {
+	hub := tinymq.NewHub(cf, channel, func(channel string, proxy bool) (hostInfo *tinymq.HostInfo, err error) {
 		return host, nil
 	}, func(proto string, version uint8, channel string, remoteAuth []byte) (auth []byte) {
 		// 从 remoteAuth 是否为空来判断是否需要返回信息
@@ -52,7 +52,7 @@ func main() {
 	},
 	)
 
-	err := hub.ConnectToServer("/tinymq/server", true)
+	err := hub.ConnectToServer("/tinymq/server", true, nil)
 	if err != nil {
 		log.Fatalln("[client ConnectToServer ERROR]", err)
 	}

+ 2 - 2
examples/client-ws2.go

@@ -26,7 +26,7 @@ func main() {
 		Hash: "xor:1qaz2wsx3edc",
 	}
 
-	hub := tinymq.NewHub(cf, localChannel, func(channel string) (hostInfo *tinymq.HostInfo, err error) {
+	hub := tinymq.NewHub(cf, localChannel, func(channel string, proxy bool) (hostInfo *tinymq.HostInfo, err error) {
 		return host, nil
 	}, func(proto string, version uint8, channel string, remoteAuth []byte) (auth []byte) {
 		// 从 remoteAuth 是否为空来判断是否需要返回信息
@@ -55,7 +55,7 @@ func main() {
 	},
 	)
 
-	err := hub.ConnectToServer(remoteChannel, true)
+	err := hub.ConnectToServer(remoteChannel, true, nil)
 	if err != nil {
 		log.Fatalln("[client ConnectToServer ERROR]", err)
 	}

+ 1 - 1
examples/server.go

@@ -24,7 +24,7 @@ func main() {
 	var hub *tinymq.Hub
 
 	hub = tinymq.NewHub(cf, localChannel,
-		func(channel string) (hostInfo *tinymq.HostInfo, err error) {
+		func(channel string, proxy bool) (hostInfo *tinymq.HostInfo, err error) {
 			return nil, errors.New("not host found")
 		}, func(proto string, version uint8, channel string, remoteAuth []byte) (auth []byte) {
 			// 从 remoteAuth 是否为空来判断是否需要返回信息

+ 41 - 8
hub.go

@@ -565,7 +565,7 @@ func (h *Hub) BindForServer(info *HostInfo) (err error) {
 
 // 新建一个连接,不同的连接协议由底层自己选择
 // channel: 要连接的频道信息,需要能表达频道关键信息的部分
-func (h *Hub) ConnectToServer(channel string, force bool) (err error) {
+func (h *Hub) ConnectToServer(channel string, force bool, host *HostInfo) (err error) {
 	// 检查当前channel是否已经存在
 	if !force {
 		line := h.ChannelToLine(channel)
@@ -574,10 +574,12 @@ func (h *Hub) ConnectToServer(channel string, force bool) (err error) {
 			return
 		}
 	}
-	// 获取服务地址等信息
-	host, err := h.connectHostFunc(channel)
-	if err != nil {
-		return err
+	if host == nil {
+		// 获取服务地址等信息
+		host, err = h.connectHostFunc(channel, true)
+		if err != nil {
+			return err
+		}
 	}
 
 	var conn conn.Connect
@@ -683,12 +685,13 @@ func (h *Hub) ConnectToServer(channel string, force bool) (err error) {
 		line := key.(*Line)
 		if line.channel == channel {
 			if line.state == Connected {
-				if !force {
+				if force {
+					line.Close(true)
+				} else {
 					err = fmt.Errorf("[connectToServer ERROR] channel already connected: %s", channel)
 					log.Println(err)
 					return false
 				}
-				return true
 			}
 			line.Start(conn, host)
 			done = true
@@ -710,11 +713,13 @@ func (h *Hub) ConnectToServer(channel string, force bool) (err error) {
 // 重试方式连接服务
 // 将会一直阻塞直到连接成功
 func (h *Hub) ConnectToServerX(channel string, force bool) {
+	host, _ := h.connectHostFunc(channel, false)
 	for {
-		err := h.ConnectToServer(channel, force)
+		err := h.ConnectToServer(channel, force, host)
 		if err == nil {
 			return
 		}
+		host = nil
 		log.Println("[ConnectToServer ERROR, try it again]", err)
 		// 产生一个随机数避免刹间重连过载
 		r := rand.New(rand.NewSource(time.Now().UnixNano()))
@@ -722,6 +727,33 @@ func (h *Hub) ConnectToServerX(channel string, force bool) {
 	}
 }
 
+// 检测处理代理连接
+func (h *Hub) checkProxyConnect() {
+	if h.cf.ProxyTimeout <= 0 {
+		return
+	}
+	proxyTicker := time.NewTicker(time.Duration(h.cf.ProxyTimeout * int(time.Millisecond)))
+	for {
+		<-proxyTicker.C
+		now := time.Now().UnixMilli()
+		h.connects.Range(func(key, _ any) bool {
+			line := key.(*Line)
+			if line.host != nil && line.host.Proxy && now-line.updated.UnixMilli() > int64(h.cf.ProxyTimeout) {
+				host, err := h.connectHostFunc(line.channel, false)
+				if err != nil {
+					log.Println("[checkProxyConnect connectHostFunc ERROR]", err)
+					return false
+				}
+				err = h.ConnectToServer(line.channel, true, host)
+				if err != nil {
+					log.Println("[checkProxyConnect ConnectToServer WARNING]", err)
+				}
+			}
+			return true
+		})
+	}
+}
+
 // 建立一个集线器
 // connectFunc 用于监听连接状态的函数,可以为nil
 func NewHub(
@@ -745,5 +777,6 @@ func NewHub(
 		connectStatusFunc:    connectStatusFunc,
 		lastCleanDeadConnect: time.Now().UnixMilli(),
 	}
+	go h.checkProxyConnect()
 	return h
 }

+ 20 - 12
type.go

@@ -101,16 +101,17 @@ type GetMsg struct {
 
 // 连接服务结构
 type HostInfo struct {
-	Proto   string    `json:"proto" yaml:"proto"`     // 协议
-	Version uint8     `json:"version" yaml:"version"` // 版本
-	Host    string    `json:"host" yaml:"host"`       // 连接的IP地址或者域名
-	Bind    string    `json:"bind" yaml:"bind"`       // 绑定的地址
-	Port    uint16    `json:"port" yaml:"port"`       // 连接的端口
-	Path    string    `json:"path" yaml:"path"`       // 连接的路径
-	Hash    string    `json:"hash" yaml:"hash"`       // 连接验证使用,格式 method:key
-	Proxy   bool      `json:"proxy" yaml:"proxy"`     // 是否代理转发
-	Errors  uint16    `json:"errors" yaml:"errors"`   // 连接失败计数,如果成功了则重置为0
-	Updated time.Time `json:"updated" yaml:"updated"` // 节点信息刷新时间
+	Proto    string    `json:"proto" yaml:"proto"`       // 协议
+	Version  uint8     `json:"version" yaml:"version"`   // 版本
+	Host     string    `json:"host" yaml:"host"`         // 连接的IP地址或者域名
+	Bind     string    `json:"bind" yaml:"bind"`         // 绑定的地址
+	Port     uint16    `json:"port" yaml:"port"`         // 连接的端口
+	Path     string    `json:"path" yaml:"path"`         // 连接的路径
+	Hash     string    `json:"hash" yaml:"hash"`         // 连接验证使用,格式 method:key
+	Proxy    bool      `json:"proxy" yaml:"proxy"`       // 是否代理
+	Priority int16     `json:"priority" yaml:"priority"` // 优先级,-1 表示不可用,0 表示最高优先级(为了兼容没有优先级的节点),1-100 表示优先级别,数值越高优先级越高
+	Errors   uint16    `json:"errors" yaml:"errors"`     // 连接失败计数,如果成功了则重置为0
+	Updated  time.Time `json:"updated" yaml:"updated"`   // 节点信息刷新时间
 }
 
 // 只输出客户端要连接的信息
@@ -130,14 +131,21 @@ func (h *HostInfo) String() string {
 	if h.Path != "" {
 		b.WriteString(h.Path)
 	}
+	param := make([]string, 0)
 	if h.Proxy {
-		b.WriteString("?proxy=1")
+		param = append(param, "proxy=1")
+	}
+	if h.Priority != 0 {
+		param = append(param, fmt.Sprintf("priority=%d", h.Priority))
+	}
+	if len(param) > 0 {
+		b.WriteString("?" + strings.Join(param, "&"))
 	}
 	return b.String()
 }
 
 // 获取对应频道的一个连接地址
-type ConnectHostFunc func(channel string) (hostInfo *HostInfo, err error)
+type ConnectHostFunc func(channel string, proxy bool) (hostInfo *HostInfo, err error)
 
 // 获取认证信息
 type AuthFunc func(proto string, version uint8, channel string, remoteAuth []byte) (auth []byte)