فهرست منبع

change mod name

Joyit 1 ماه پیش
والد
کامیت
c3e78f5f89
22فایلهای تغییر یافته به همراه2746 افزوده شده و 1 حذف شده
  1. 53 0
      .gitignore
  2. 25 1
      README.md
  3. 51 0
      config/config.go
  4. 38 0
      conn/README.md
  5. 31 0
      conn/conn.go
  6. 21 0
      conn/tpv2/README.md
  7. 465 0
      conn/tpv2/tpv2.go
  8. 29 0
      conn/util/crc8.go
  9. 128 0
      conn/util/encrypt.go
  10. 3 0
      conn/wsv2/README.md
  11. 392 0
      conn/wsv2/wsv2.go
  12. 14 0
      examples/README.md
  13. 80 0
      examples/client-tpv2.go
  14. 83 0
      examples/client-wsv2.go
  15. 79 0
      examples/echo-client.go
  16. 131 0
      examples/echo-server.go
  17. 119 0
      examples/server.go
  18. 7 0
      go.mod
  19. 5 0
      go.sum
  20. 609 0
      hub.go
  21. 290 0
      line.go
  22. 93 0
      type.go

+ 53 - 0
.gitignore

@@ -0,0 +1,53 @@
+# These are some examples of commonly ignored file patterns.
+# You should customize this list as applicable to your project.
+# Learn more about .gitignore:
+#     https://www.atlassian.com/git/tutorials/saving-changes/gitignore
+
+# Node artifact files
+node_modules/
+dist/
+
+# Compiled Java class files
+*.class
+
+# Compiled Python bytecode
+*.py[cod]
+
+# Log files
+*.log
+
+# Package files
+*.jar
+
+# Maven
+target/
+dist/
+
+# JetBrains IDE
+.idea/
+
+# Unit test reports
+TEST*.xml
+
+# Generated by MacOS
+.DS_Store
+
+# Generated by Windows
+Thumbs.db
+
+# Applications
+*.app
+*.exe
+*.war
+
+# Large media files
+*.mp4
+*.tiff
+*.avi
+*.flv
+*.mov
+*.wmv
+
+tmp/
+build/
+work/

+ 25 - 1
README.md

@@ -1 +1,25 @@
- world
+# 一款简单的可以跨平台 mq 架构设计
+
+采用多对多的结构,可能方便切换不同的连接方式和不同的连接地址。
+
+## 设计原则
+
+简单,安全,自动重连。
+三层架构:
+
+- 通信连接层,实现不同的通信方式,如 tcp, websocket, udp 等
+- 连接聚合层,管理不同的连接,重连等
+- 用户接口
+
+## 需要的功能
+
+- 同地址多连接协议
+- 自动重连
+- 多协议绑定
+- 连接验证
+
+## 问题与优化
+
+- 建立内存池来分配内存,减少内存碎片
+- 同地址多连接共存,使用不同的连接发送消息,减少延时,提高消息送达可靠性
+- 转发地址定时测试切换回到主服务节点

+ 51 - 0
config/config.go

@@ -0,0 +1,51 @@
+package config
+
+const (
+	// 系统错误号定义,最低号为110,最高127
+	MIN_SYSTEM_ERROR_CODE = 110 // 系统信息最小值
+	NEXT_SUBSCRIBE        = 111
+	NEXT_SUBSCRIBE_MSG    = "NEXT SUBSCRIBE"
+	SYSTEM_ERROR          = 123
+	SYSTEM_ERROR_MSG      = "SYSTEM ERROR"
+	GET_TIMEOUT           = 125
+	GET_TIMEOUT_MSG       = "GET TIMEOUT"
+	CONNECT_END           = 126
+	CONNECT_END_MSG       = "CONNECT END"
+	NO_MATCH              = 127
+	NO_MATCH_MSG          = "NO MATCH"
+	MAX_SYSTEM_ERROR_CODE = 127 //系统信息最大值
+)
+
+// 定义成功与失败的值
+const STATE_OK = 1
+const STATE_FAILED = 0
+
+const (
+	// ID 号最高值,高于这个值的ID号为系统内部使用
+	ID_MAX = 65500
+	// 验证ID
+	ID_AUTH = 65502
+)
+
+// 全局配置
+type Config struct {
+	ConnectTimeout       int // 没有收到数据包的连接超时时间(ms)
+	PingInterval         int // 发送ping包的时间间隔,前提是没有收到数据包(ms)
+	WriteWait            int // 写入网络数据等待最长时间(ms)
+	ReadWait             int // 获取命令数据超时时间 (ms)
+	LongReadWait         int // 长时间等待读取数据的超时时间(ms)
+	CleanDeadConnectWait int // 清理异常的连接(ms)
+}
+
+// 获取实例好的配置
+func NewConfig() *Config {
+	// 配置基础的数据
+	return &Config{
+		ConnectTimeout:       60 * 1000,
+		PingInterval:         61 * 1000,
+		WriteWait:            60 * 1000,
+		ReadWait:             30 * 1000,
+		LongReadWait:         150 * 1000,
+		CleanDeadConnectWait: 3600 * 1000,
+	}
+}

+ 38 - 0
conn/README.md

@@ -0,0 +1,38 @@
+# 连接实现类
+
+先实现 tcp 和 websocket 方式的线路连接。
+
+## 数据结构
+
+id 号为 0 表示不需要回应的消息
+id 号大于 65500 为内部使用。
+
+### 如何验证合法性
+
+建立连接后发送第一个数据包,包括下面的内容:
+
+id(uint16)+version(uint8)+proto(string)+channel(string)+auth([]byte)
+
+这个数据包必须是除混淆包(如果有的话)之后的第一个包,如果解析不成功则直接断开连接。
+proto 的字符长度不能超过 255
+channel 的字符长度不能超过 65535
+auth 对应的是应用层的认证,如果没有的话可以为空,具体的认证方式由应用层决定。
+auth 的数据结构由应用层自己决定。
+
+### Ping
+
+id(uint16)
+
+### 请求包
+
+id(uint16)+cmdLen(uint8)+cmd(string)+data([]byte)
+
+### 响应包
+
+id(uint16)+state(uint8)+data([]byte)
+
+其中 state 字节的最高位 1,需要 &0x7F 处理。
+
+### 频道
+
+id(65502)+channel(string)

+ 31 - 0
conn/conn.go

@@ -0,0 +1,31 @@
+package conn
+
+import (
+	"net"
+)
+
+// 消息类型
+type MsgType byte
+
+const (
+	PingMsg MsgType = iota
+	RequestMsg
+	ResponseMsg
+)
+
+// 连接接口,代表一个原始的连接
+type Connect interface {
+	WriteAuthInfo(channel string, auth []byte) (err error)
+	ReadAuthInfo() (proto string, version uint8, channel string, auth []byte, err error)
+	// WriteChannel(data []byte) error
+	WriteRequest(id uint16, cmd string, data []byte) error
+	WriteResponse(id uint16, state uint8, data []byte) error
+	WritePing(id uint16) error
+	ReadMessage(deadline int) (msgType MsgType, id uint16, cmd string, state uint8, data []byte, err error)
+	RemoteAddr() net.Addr
+	LocalAddr() net.Addr
+	Close() error
+}
+
+// 服务请求连接
+type ServerConnectFunc func(conn Connect)

+ 21 - 0
conn/tpv2/README.md

@@ -0,0 +1,21 @@
+# 简单的 mq 连接协议
+
+改进原来的 tinymq 框架,由于用的地方多,独立为通用库。
+V1 版本参考链接:https://docs.google.com/document/d/1JvDb0WhO-hMSOY69qq_12aOXBPifcTmQJp5kBkBXFe0/edit?usp=sharing
+
+V2 版本与 V1 版本不兼容,参考 V1 版本来完善新的协议。
+
+## 数据包长度
+
+长度 2 字节,长度的值不包括本身和 CRC,采用网络的大端格式(下同);频道名称类似网址,如/tinymq/,最后一个字节为 CRC8,用于检查数据是否正确,CRC 只是检查 ID、命令或响应码和数据部分,没有包括长度部分。
+
+如果需要发送的数据大于等于 0xFFFF,则原来的长度值为 0xFFFF,长度的后面增加一个 4 个字节的长度值表示当前数据包的实际长度。
+
+## 交换加密密钥
+
+根据加密协议的不同,iv 的长度也不同
+iv 在连接成功后第一时间发送
+
+## 关于混淆
+
+暂时还没有混淆的功能,等有时间再处理。

+ 465 - 0
conn/tpv2/tpv2.go

@@ -0,0 +1,465 @@
+package tpv2
+
+import (
+	"crypto/rand"
+	"encoding/binary"
+	"errors"
+	"fmt"
+	"io"
+	"log"
+	"net"
+	"strings"
+	"time"
+
+	"git.me9.top/git/tinymq/config"
+	"git.me9.top/git/tinymq/conn"
+	"git.me9.top/git/tinymq/conn/util"
+)
+
+const VERSION uint8 = 2
+const PROTO string = "tp"
+
+// 数据包的最大长度
+const MAX_LENGTH = 0xFFFF
+const MAX2_LENGTH = 0x1FFFFFFF // 500 M,避免申请过大内存
+
+type TpConnectV2 struct {
+	cf     *config.Config
+	conn   net.Conn
+	cipher *util.Cipher // 记录当前的加解密类
+}
+
+// 服务端
+// hash 格式 encryptMethod:encryptKey
+func Server(cf *config.Config, bind string, hash string, fn conn.ServerConnectFunc) (err error) {
+	var ci *util.CipherInfo
+	var encryptKey string
+	if hash != "" {
+		i := strings.Index(hash, ":")
+		if i <= 0 {
+			err = errors.New("hash is invalid")
+			return
+		}
+		encryptMethod := hash[0:i]
+		encryptKey = hash[i+1:]
+
+		if c, ok := util.CipherMethod[encryptMethod]; ok {
+			ci = c
+		} else {
+			return errors.New("Unsupported encryption method: " + encryptMethod)
+		}
+	}
+
+	log.Printf("Listening and serving tcp on %s\n", bind)
+	l, err := net.Listen("tcp", bind)
+	if err != nil {
+		log.Println("[tpv2 Server ERROR]", err)
+		return
+	}
+	go func(l net.Listener) {
+		defer l.Close()
+		for {
+			conn, err := l.Accept()
+			if err != nil {
+				log.Println("[accept ERROR]", err)
+				return
+			}
+			go func(conn net.Conn) {
+				if ci == nil {
+					c := &TpConnectV2{
+						cf:   cf,
+						conn: conn,
+					}
+					fn(c)
+					return
+				}
+				var eiv []byte
+				var div []byte
+				if ci.IvLen > 0 {
+					// 服务端 IV
+					eiv = make([]byte, ci.IvLen)
+					_, err = rand.Read(eiv)
+					if err != nil {
+						log.Println("[tpv2 Server rand.Read ERROR]", err)
+						return
+					}
+					// 发送 IV
+					conn.SetWriteDeadline(time.Now().Add(time.Duration(cf.WriteWait) * time.Millisecond))
+					if _, err := conn.Write(eiv); err != nil {
+						log.Println("[tpv2 Server conn.Write ERROR]", err)
+						return
+					}
+
+					// 读取 IV
+					err = conn.SetReadDeadline(time.Now().Add(time.Millisecond * time.Duration(cf.ReadWait)))
+					if err != nil {
+						log.Println("[tpv2 Server SetReadDeadline ERROR]", err)
+						return
+					}
+					div = make([]byte, ci.IvLen)
+					_, err := io.ReadFull(conn, div)
+					if err != nil {
+						log.Println("[tpv2 Server ReadFull ERROR]", err)
+						return
+					}
+				}
+				cipher, err := util.NewCipher(ci, encryptKey, eiv, div)
+				if err != nil {
+					log.Println("[tpv2 NewCipher ERROR]", err)
+					return
+				}
+				// 初始化
+				c := &TpConnectV2{
+					cf:     cf,
+					conn:   conn,
+					cipher: cipher,
+				}
+				fn(c)
+			}(conn)
+		}
+	}(l)
+	return
+}
+
+// 客户端,新建一个连接
+func Client(cf *config.Config, addr string, hash string) (conn.Connect, error) {
+	// 没有加密的情况
+	if hash == "" {
+		conn, err := net.DialTimeout("tcp", addr, time.Duration(cf.ConnectTimeout)*time.Millisecond)
+		if err != nil {
+			return nil, err
+		}
+		c := &TpConnectV2{
+			cf:   cf,
+			conn: conn,
+		}
+		return c, nil
+	}
+	i := strings.Index(hash, ":")
+	if i <= 0 {
+		return nil, errors.New("hash is invalid")
+	}
+	encryptMethod := hash[0:i]
+	encryptKey := hash[i+1:]
+	ci, ok := util.CipherMethod[encryptMethod]
+	if !ok {
+		return nil, errors.New("Unsupported encryption method: " + encryptMethod)
+	}
+	conn, err := net.DialTimeout("tcp", addr, time.Duration(cf.ConnectTimeout)*time.Millisecond)
+	if err != nil {
+		return nil, err
+	}
+	var eiv []byte
+	var div []byte
+	if ci.IvLen > 0 {
+		// 客户端 IV
+		eiv = make([]byte, ci.IvLen)
+		_, err = rand.Read(eiv)
+		if err != nil {
+			log.Println("[tpv2 Client rand.Read ERROR]", err)
+			return nil, err
+		}
+		// 发送 IV
+		conn.SetWriteDeadline(time.Now().Add(time.Duration(cf.WriteWait) * time.Millisecond))
+		if _, err := conn.Write(eiv); err != nil {
+			log.Println("[tpv2 Client conn.Write ERROR]", err)
+			return nil, err
+		}
+
+		// 读取 IV
+		err = conn.SetReadDeadline(time.Now().Add(time.Millisecond * time.Duration(cf.ReadWait)))
+		if err != nil {
+			log.Println("[tpv2 Client SetReadDeadline ERROR]", err)
+			return nil, err
+		}
+		div = make([]byte, ci.IvLen)
+		_, err := io.ReadFull(conn, div)
+		if err != nil {
+			log.Println("[tpv2 Client ReadFull ERROR]", err)
+			return nil, err
+		}
+	}
+	cipher, err := util.NewCipher(ci, encryptKey, eiv, div)
+	if err != nil {
+		log.Println("[tpv2 NewCipher ERROR]", err)
+		return nil, err
+	}
+	// 初始化
+	c := &TpConnectV2{
+		cf:     cf,
+		conn:   conn,
+		cipher: cipher,
+	}
+	return c, nil
+}
+
+// 发送数据到网络
+// 如果有加密函数的话会直接修改源数据
+func (c *TpConnectV2) writeMessage(buf []byte) (err error) {
+	if len(buf) > MAX2_LENGTH {
+		return fmt.Errorf("data length more than %d", MAX2_LENGTH)
+	}
+	if c.cipher != nil {
+		c.cipher.Encrypt(buf, buf)
+	}
+	c.conn.SetWriteDeadline(time.Now().Add(time.Duration(c.cf.WriteWait) * time.Millisecond))
+	for {
+		n, err := c.conn.Write(buf)
+		if err != nil {
+			return err
+		}
+		if n < len(buf) {
+			buf = buf[n:]
+		} else {
+			return nil
+		}
+	}
+}
+
+// 申请内存并写入数据长度信息
+// 还多申请一个字节用于保存crc
+func (c *TpConnectV2) writeDataLen(dlen int) (buf []byte, start int) {
+	if dlen >= MAX_LENGTH {
+		buf = make([]byte, dlen+2+4+1)
+		start = 2 + 4
+		binary.BigEndian.PutUint16(buf[:2], MAX_LENGTH)
+		binary.BigEndian.PutUint32(buf[2:6], uint32(dlen))
+	} else {
+		buf = make([]byte, dlen+2+1)
+		start = 2
+		binary.BigEndian.PutUint16(buf[:2], uint16(dlen))
+	}
+	return
+}
+
+// 发送Auth信息
+// 建立连接后第一个发送的消息
+func (c *TpConnectV2) WriteAuthInfo(channel string, auth []byte) (err error) {
+	protoLen := len(PROTO)
+	channelLen := len(channel)
+	if channelLen > 0xFFFF {
+		return errors.New("length of channel over")
+	}
+	dlen := 2 + 1 + 1 + protoLen + 2 + channelLen + len(auth)
+	buf, start := c.writeDataLen(dlen)
+	index := start
+	binary.BigEndian.PutUint16(buf[index:index+2], config.ID_AUTH)
+	index += 2
+	buf[index] = VERSION
+	index++
+	buf[index] = byte(protoLen)
+	index++
+	copy(buf[index:index+protoLen], []byte(PROTO))
+	index += protoLen
+	binary.BigEndian.PutUint16(buf[index:index+2], uint16(channelLen))
+	index += 2
+	copy(buf[index:index+channelLen], []byte(channel))
+	index += channelLen
+	copy(buf[index:], auth)
+	buf[start+dlen] = util.CRC8(buf[start : start+dlen])
+	return c.writeMessage(buf)
+}
+
+// 从连接中读取信息
+func (c *TpConnectV2) readMessage(deadline int) ([]byte, error) {
+	buf := make([]byte, 2)
+	err := c.conn.SetReadDeadline(time.Now().Add(time.Millisecond * time.Duration(deadline)))
+	if err != nil {
+		return nil, err
+	}
+	// 读取数据流长度
+	_, err = io.ReadFull(c.conn, buf)
+	if err != nil {
+		return nil, err
+	}
+	// 将读出来的数据进行解密
+	if c.cipher != nil {
+		c.cipher.Decrypt(buf, buf)
+	}
+	dlen := uint32(binary.BigEndian.Uint16(buf))
+	if dlen < 2 {
+		return nil, errors.New("length is less to 2")
+	}
+	if dlen >= MAX_LENGTH {
+		// 数据包比较大,通过后面的4位长度来表示实际长度
+		buf = make([]byte, 4)
+		_, err := io.ReadFull(c.conn, buf)
+		if err != nil {
+			return nil, err
+		}
+		if c.cipher != nil {
+			c.cipher.Decrypt(buf, buf)
+		}
+		dlen = binary.BigEndian.Uint32(buf)
+		if dlen < MAX_LENGTH || dlen > MAX2_LENGTH {
+			return nil, errors.New("wrong length in read message")
+		}
+	}
+	// 读取指定长度的数据
+	buf = make([]byte, dlen+1) // 最后一个是crc的值
+	_, err = io.ReadFull(c.conn, buf)
+	if err != nil {
+		return nil, err
+	}
+	if c.cipher != nil {
+		c.cipher.Decrypt(buf, buf)
+	}
+	// 检查CRC8
+	if util.CRC8(buf[:dlen]) != buf[dlen] {
+		return nil, errors.New("CRC error")
+	}
+	return buf[:dlen], nil
+}
+
+// 获取Auth信息
+// id(uint16)+version(uint8)+proto(string)+channel(string)+auth([]byte)
+func (c *TpConnectV2) ReadAuthInfo() (proto string, version uint8, channel string, auth []byte, err error) {
+	defer func() {
+		if r := recover(); r != nil {
+			err = fmt.Errorf("recovered from panic: %v", r)
+			return
+		}
+	}()
+	msg, err := c.readMessage(c.cf.ReadWait)
+	if err != nil {
+		return
+	}
+	msgLen := len(msg)
+	if msgLen < 4 {
+		err = errors.New("message length less than 4")
+		return
+	}
+	start := 0
+	id := binary.BigEndian.Uint16(msg[start : start+2])
+	if id != config.ID_AUTH {
+		err = fmt.Errorf("wrong message id: %d", id)
+		return
+	}
+	start += 2
+
+	version = msg[start]
+	if version != VERSION {
+		err = fmt.Errorf("require version %d, get version: %d", VERSION, version)
+		return
+	}
+	start++
+	protoLen := int(msg[start])
+	if protoLen < 2 {
+		err = errors.New("wrong proto length")
+		return
+	}
+	start++
+	proto = string(msg[start : start+protoLen])
+	if proto != PROTO {
+		err = fmt.Errorf("wrong proto: %s", proto)
+		return
+	}
+	start += protoLen
+	channelLen := int(binary.BigEndian.Uint16(msg[start : start+2]))
+	if channelLen < 2 {
+		err = errors.New("wrong channel length")
+		return
+	}
+	start += 2
+	channel = string(msg[start : start+channelLen])
+	start += channelLen
+	auth = msg[start:]
+	return
+}
+
+// 发送请求数据包到网络
+func (c *TpConnectV2) WriteRequest(id uint16, cmd string, data []byte) error {
+	// 为了区分请求还是响应包,命令字符串不能超过127个字节,如果超过则截断
+	cmdLen := len(cmd)
+	if cmdLen > 0x7F {
+		return errors.New("command length is more than 0x7F")
+	}
+	dlen := 2 + 1 + cmdLen + len(data)
+	buf, start := c.writeDataLen(dlen)
+	index := start
+	binary.BigEndian.PutUint16(buf[index:index+2], id)
+	index += 2
+	buf[index] = byte(cmdLen)
+	index++
+	copy(buf[index:index+cmdLen], cmd)
+	index += cmdLen
+	copy(buf[index:], data)
+	buf[start+dlen] = util.CRC8(buf[start : start+dlen])
+	return c.writeMessage(buf)
+}
+
+// 发送响应数据包到网络
+// 网络格式:[id, stateCode, data]
+func (c *TpConnectV2) WriteResponse(id uint16, state uint8, data []byte) error {
+	dlen := 2 + 1 + len(data)
+	buf, start := c.writeDataLen(dlen)
+	index := start
+	binary.BigEndian.PutUint16(buf[index:index+2], id)
+	index += 2
+	buf[index] = state | 0x80
+	index++
+	copy(buf[index:], data)
+	buf[start+dlen] = util.CRC8(buf[start : start+dlen])
+	return c.writeMessage(buf)
+}
+
+// 发送ping包
+func (c *TpConnectV2) WritePing(id uint16) error {
+	dlen := 2
+	buf, start := c.writeDataLen(dlen)
+	index := start
+	binary.BigEndian.PutUint16(buf[index:index+2], id)
+	// index += 2
+	buf[start+dlen] = util.CRC8(buf[start : start+dlen])
+	return c.writeMessage(buf)
+}
+
+// 获取信息
+func (c *TpConnectV2) ReadMessage(deadline int) (msgType conn.MsgType, id uint16, cmd string, state uint8, data []byte, err error) {
+	msg, err := c.readMessage(deadline)
+	if err != nil {
+		return
+	}
+	msgLen := len(msg)
+	id = binary.BigEndian.Uint16(msg[0:2])
+	// ping信息
+	if msgLen == 2 {
+		msgType = conn.PingMsg
+		return
+	}
+
+	if id > config.ID_MAX {
+		err = fmt.Errorf("wrong message id: %d", id)
+		return
+	}
+
+	cmdx := msg[2]
+	if (cmdx & 0x80) == 0 {
+		// 请求包
+		msgType = conn.RequestMsg
+		cmdLen := int(cmdx)
+		cmd = string(msg[3 : cmdLen+3])
+		data = msg[cmdLen+3:]
+		return
+	} else {
+		// 响应数据包
+		msgType = conn.ResponseMsg
+		state = cmdx & 0x7F
+		data = msg[3:]
+		return
+	}
+}
+
+// 获取远程的地址
+func (c *TpConnectV2) RemoteAddr() net.Addr {
+	return c.conn.RemoteAddr()
+}
+
+// 获取本地的地址
+func (c *TpConnectV2) LocalAddr() net.Addr {
+	return c.conn.LocalAddr()
+}
+
+func (c *TpConnectV2) Close() error {
+	return c.conn.Close()
+}

+ 29 - 0
conn/util/crc8.go

@@ -0,0 +1,29 @@
+package util
+
+// CRC8 验证表格
+// http://blog.csdn.net/zjli321/article/details/52998468
+var crcTable = [...]byte{
+	0x00, 0x31, 0x62, 0x53, 0xc4, 0xf5, 0xa6, 0x97, 0xb9, 0x88, 0xdb, 0xea, 0x7d, 0x4c, 0x1f, 0x2e,
+	0x43, 0x72, 0x21, 0x10, 0x87, 0xb6, 0xe5, 0xd4, 0xfa, 0xcb, 0x98, 0xa9, 0x3e, 0x0f, 0x5c, 0x6d,
+	0x86, 0xb7, 0xe4, 0xd5, 0x42, 0x73, 0x20, 0x11, 0x3f, 0x0e, 0x5d, 0x6c, 0xfb, 0xca, 0x99, 0xa8,
+	0xc5, 0xf4, 0xa7, 0x96, 0x01, 0x30, 0x63, 0x52, 0x7c, 0x4d, 0x1e, 0x2f, 0xb8, 0x89, 0xda, 0xeb,
+	0x3d, 0x0c, 0x5f, 0x6e, 0xf9, 0xc8, 0x9b, 0xaa, 0x84, 0xb5, 0xe6, 0xd7, 0x40, 0x71, 0x22, 0x13,
+	0x7e, 0x4f, 0x1c, 0x2d, 0xba, 0x8b, 0xd8, 0xe9, 0xc7, 0xf6, 0xa5, 0x94, 0x03, 0x32, 0x61, 0x50,
+	0xbb, 0x8a, 0xd9, 0xe8, 0x7f, 0x4e, 0x1d, 0x2c, 0x02, 0x33, 0x60, 0x51, 0xc6, 0xf7, 0xa4, 0x95,
+	0xf8, 0xc9, 0x9a, 0xab, 0x3c, 0x0d, 0x5e, 0x6f, 0x41, 0x70, 0x23, 0x12, 0x85, 0xb4, 0xe7, 0xd6,
+	0x7a, 0x4b, 0x18, 0x29, 0xbe, 0x8f, 0xdc, 0xed, 0xc3, 0xf2, 0xa1, 0x90, 0x07, 0x36, 0x65, 0x54,
+	0x39, 0x08, 0x5b, 0x6a, 0xfd, 0xcc, 0x9f, 0xae, 0x80, 0xb1, 0xe2, 0xd3, 0x44, 0x75, 0x26, 0x17,
+	0xfc, 0xcd, 0x9e, 0xaf, 0x38, 0x09, 0x5a, 0x6b, 0x45, 0x74, 0x27, 0x16, 0x81, 0xb0, 0xe3, 0xd2,
+	0xbf, 0x8e, 0xdd, 0xec, 0x7b, 0x4a, 0x19, 0x28, 0x06, 0x37, 0x64, 0x55, 0xc2, 0xf3, 0xa0, 0x91,
+	0x47, 0x76, 0x25, 0x14, 0x83, 0xb2, 0xe1, 0xd0, 0xfe, 0xcf, 0x9c, 0xad, 0x3a, 0x0b, 0x58, 0x69,
+	0x04, 0x35, 0x66, 0x57, 0xc0, 0xf1, 0xa2, 0x93, 0xbd, 0x8c, 0xdf, 0xee, 0x79, 0x48, 0x1b, 0x2a,
+	0xc1, 0xf0, 0xa3, 0x92, 0x05, 0x34, 0x67, 0x56, 0x78, 0x49, 0x1a, 0x2b, 0xbc, 0x8d, 0xde, 0xef,
+	0x82, 0xb3, 0xe0, 0xd1, 0x46, 0x77, 0x24, 0x15, 0x3b, 0x0a, 0x59, 0x68, 0xff, 0xce, 0x9d, 0xac,
+}
+
+func CRC8(data []byte) (crc byte) {
+	for _, v := range data {
+		crc = crcTable[crc^v]
+	}
+	return
+}

+ 128 - 0
conn/util/encrypt.go

@@ -0,0 +1,128 @@
+// 加密算法库
+package util
+
+import (
+	"crypto/cipher"
+	"crypto/md5"
+)
+
+const (
+	XOR_KEY_LEN = 32
+	XOR_IV_LEN  = 32
+)
+
+// 定义一些加密方法
+var CipherMethod = map[string]*CipherInfo{
+	"plain": {0, 0, NewPlainStream, NewPlainStream},
+	"xor":   {XOR_KEY_LEN, XOR_IV_LEN, NewXorStream, NewXorStream},
+}
+
+// 普通的xor加密,没有破解的难度,只是简单的让明文不可见而已
+type XorStreamCipher struct {
+	key   [XOR_KEY_LEN]byte
+	index byte // 加解密的位置指示
+}
+
+func (c *XorStreamCipher) XORKeyStream(dst, src []byte) {
+	l := len(src)
+	for i := range l {
+		dst[i] = src[i] ^ c.key[c.index]
+		c.index++
+		if c.index >= XOR_KEY_LEN {
+			c.index = 0
+		}
+	}
+}
+
+func NewXorStream(key, iv []byte) (cipher.Stream, error) {
+	var c XorStreamCipher
+	copy(c.key[:], key[:XOR_KEY_LEN])
+	for i := range XOR_KEY_LEN {
+		c.key[i] = c.key[i] ^ iv[i]
+	}
+	return &c, nil
+}
+
+// 没有加密模式,考虑到部分通讯管道已经是加密的状态,故增加不加密的方式,提高速度
+type PlainStreamCipher struct{}
+
+func (c *PlainStreamCipher) XORKeyStream(dst, src []byte) {
+	if &dst != &src {
+		copy(dst, src)
+	}
+}
+
+func NewPlainStream(key, iv []byte) (cipher.Stream, error) {
+	var c PlainStreamCipher
+	return &c, nil
+}
+
+func Md5Sum(d []byte) []byte {
+	h := md5.New()
+	h.Write(d)
+	return h.Sum(nil)
+}
+
+// 转字符串密码为byte数组类型
+func EvpBytesToKey(password string, keyLen int) (key []byte) {
+	if keyLen <= 1 {
+		return
+	}
+	const md5Len = 16
+
+	cnt := (keyLen-1)/md5Len + 1
+	m := make([]byte, cnt*md5Len)
+	copy(m, Md5Sum([]byte(password)))
+
+	// Repeatedly call md5 until bytes generated is enough.
+	// Each call to md5 uses data: prev md5 sum + password.
+	d := make([]byte, md5Len+len(password))
+	start := 0
+	for i := 1; i < cnt; i++ {
+		start += md5Len
+		copy(d, m[start-md5Len:start])
+		copy(d[md5Len:], password)
+		copy(m[start:], Md5Sum(d))
+	}
+	return m[:keyLen]
+}
+
+// 预先准备好的加密算法
+type CipherInfo struct {
+	KeyLen           int
+	IvLen            int
+	NewEncryptStream func(key, iv []byte) (cipher.Stream, error)
+	NewDecryptStream func(key, iv []byte) (cipher.Stream, error)
+}
+
+// 建立一个cipher对加密算法统一管理
+type Cipher struct {
+	enc cipher.Stream
+	dec cipher.Stream
+}
+
+func (c *Cipher) Encrypt(dst, src []byte) {
+	c.enc.XORKeyStream(dst, src)
+}
+
+func (c *Cipher) Decrypt(dst, src []byte) {
+	c.dec.XORKeyStream(dst, src)
+}
+
+// 新建 cipher
+func NewCipher(ci *CipherInfo, pwd string, eiv []byte, div []byte) (c *Cipher, err error) {
+	key := EvpBytesToKey(pwd, ci.KeyLen)
+	enc, err := ci.NewEncryptStream(key, eiv)
+	if err != nil {
+		return nil, err
+	}
+	dec, err := ci.NewDecryptStream(key, div)
+	if err != nil {
+		return nil, err
+	}
+	c = &Cipher{
+		enc: enc,
+		dec: dec,
+	}
+	return
+}

+ 3 - 0
conn/wsv2/README.md

@@ -0,0 +1,3 @@
+# Websocket 方式连接
+
+暂时只使用固定协议和版本号的方式,有时间和精力再改进。

+ 392 - 0
conn/wsv2/wsv2.go

@@ -0,0 +1,392 @@
+package wsv2
+
+import (
+	"crypto/rand"
+	"encoding/binary"
+	"errors"
+	"fmt"
+	"log"
+	"net"
+	"net/http"
+	"net/url"
+	"strings"
+	"time"
+
+	"git.me9.top/git/tinymq/config"
+	"git.me9.top/git/tinymq/conn"
+	"git.me9.top/git/tinymq/conn/util"
+	"github.com/gorilla/websocket"
+)
+
+const VERSION uint8 = 2
+const PROTO string = "ws"
+
+type WsConnectV2 struct {
+	cf     *config.Config
+	conn   *websocket.Conn
+	cipher *util.Cipher // 记录当前的加解密类,可以保证在没有ssl的情况下数据安全
+}
+
+var upgrader = websocket.Upgrader{} // use default options
+
+// websocket 服务
+// 如果有绑定参数,则进行绑定操作代码
+func Server(cf *config.Config, bind string, path string, hash string, fn conn.ServerConnectFunc) (err error) {
+	var ci *util.CipherInfo
+	var encryptKey string
+	if hash != "" {
+		i := strings.Index(hash, ":")
+		if i <= 0 {
+			return errors.New("hash is invalid")
+		}
+		encryptMethod := hash[0:i]
+		encryptKey = hash[i+1:]
+
+		if c, ok := util.CipherMethod[encryptMethod]; ok {
+			ci = c
+		} else {
+			return errors.New("Unsupported encryption method: " + encryptMethod)
+		}
+	}
+	http.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
+		conn, err := upgrader.Upgrade(w, r, nil)
+		if err != nil {
+			log.Println("[wsv2 Server Upgrade ERROR]", err)
+			return
+		}
+		if ci == nil {
+			ws := &WsConnectV2{
+				cf:   cf,
+				conn: conn,
+			}
+			fn(ws)
+			return
+		}
+		var eiv []byte
+		var div []byte
+		if ci.IvLen > 0 {
+			// 服务端 IV
+			eiv = make([]byte, ci.IvLen)
+			_, err = rand.Read(eiv)
+			if err != nil {
+				log.Println("[wsv2 Server rand.Read ERROR]", err)
+				return
+			}
+			// 发送 IV
+			conn.SetWriteDeadline(time.Now().Add(time.Duration(cf.WriteWait) * time.Millisecond))
+			if err := conn.WriteMessage(websocket.BinaryMessage, eiv); err != nil {
+				log.Println("[wsv2 Server conn.Write ERROR]", err)
+				return
+			}
+
+			// 读取 IV
+			err = conn.SetReadDeadline(time.Now().Add(time.Millisecond * time.Duration(cf.ReadWait)))
+			if err != nil {
+				log.Println("[wsv2 Server SetReadDeadline ERROR]", err)
+				return
+			}
+			_, div, err = conn.ReadMessage()
+			if err != nil {
+				log.Println("[wsv2 Server ReadFull ERROR]", err)
+				return
+			}
+		}
+		cipher, err := util.NewCipher(ci, encryptKey, eiv, div)
+		if err != nil {
+			log.Println("[wsv2 NewCipher ERROR]", err)
+			return
+		}
+		ws := &WsConnectV2{
+			cf:     cf,
+			conn:   conn,
+			cipher: cipher,
+		}
+		fn(ws)
+	})
+
+	if bind != "" {
+		go func() (err error) {
+			defer func() {
+				if err != nil {
+					log.Fatal(err)
+				}
+			}()
+			log.Printf("Listening and serving Websocket on %s\n", bind)
+			// 暂时使用全局的方式,后面有需求再修改
+			// 而且还没有 https 方式的绑定
+			// 需要在前端增加其他的服务进行转换
+			err = http.ListenAndServe(bind, nil)
+			return
+		}()
+	}
+
+	return
+}
+
+// 客户端,新建一个连接
+func Client(cf *config.Config, addr string, path string, hash string) (conn.Connect, error) {
+	u := url.URL{Scheme: "ws", Host: addr, Path: path}
+	// 没有加密的情况
+	if hash == "" {
+		conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
+		if err != nil {
+			return nil, err
+		}
+		ws := &WsConnectV2{
+			cf:   cf,
+			conn: conn,
+		}
+		return ws, nil
+	}
+	i := strings.Index(hash, ":")
+	if i <= 0 {
+		return nil, errors.New("hash is invalid")
+	}
+	encryptMethod := hash[0:i]
+	encryptKey := hash[i+1:]
+	ci, ok := util.CipherMethod[encryptMethod]
+	if !ok {
+		return nil, errors.New("Unsupported encryption method: " + encryptMethod)
+	}
+	conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
+	if err != nil {
+		return nil, err
+	}
+	var eiv []byte
+	var div []byte
+	if ci.IvLen > 0 {
+		// 客户端 IV
+		eiv = make([]byte, ci.IvLen)
+		_, err = rand.Read(eiv)
+		if err != nil {
+			log.Println("[wsv2 Client rand.Read ERROR]", err)
+			return nil, err
+		}
+		// 发送 IV
+		conn.SetWriteDeadline(time.Now().Add(time.Duration(cf.WriteWait) * time.Millisecond))
+		if err := conn.WriteMessage(websocket.BinaryMessage, eiv); err != nil {
+			log.Println("[wsv2 Client conn.Write ERROR]", err)
+			return nil, err
+		}
+
+		// 读取 IV
+		err = conn.SetReadDeadline(time.Now().Add(time.Millisecond * time.Duration(cf.ReadWait)))
+		if err != nil {
+			log.Println("[wsv2 Client SetReadDeadline ERROR]", err)
+			return nil, err
+		}
+		_, div, err = conn.ReadMessage()
+		if err != nil {
+			log.Println("[wsv2 Client ReadFull ERROR]", err)
+			return nil, err
+		}
+	}
+	cipher, err := util.NewCipher(ci, encryptKey, eiv, div)
+	if err != nil {
+		log.Println("[wsv2 NewCipher ERROR]", err)
+		return nil, err
+	}
+	ws := &WsConnectV2{
+		cf:     cf,
+		conn:   conn,
+		cipher: cipher,
+	}
+	return ws, nil
+}
+
+// 发送数据到网络
+// 如果有加密函数的话会直接修改源数据
+func (c *WsConnectV2) writeMessage(buf []byte) (err error) {
+	if c.cipher != nil {
+		c.cipher.Encrypt(buf, buf)
+	}
+	c.conn.SetWriteDeadline(time.Now().Add(time.Millisecond * time.Duration(c.cf.WriteWait)))
+	return c.conn.WriteMessage(websocket.BinaryMessage, buf)
+}
+
+// 发送Auth信息
+// 建立连接后第一个发送的消息
+func (c *WsConnectV2) WriteAuthInfo(channel string, auth []byte) (err error) {
+	protoLen := len(PROTO)
+	channelLen := len(channel)
+	if channelLen > 0xFFFF {
+		return errors.New("length of channel over")
+	}
+	dlen := 2 + 1 + 1 + protoLen + 2 + channelLen + len(auth)
+	start := 0
+	buf := make([]byte, dlen)
+	binary.BigEndian.PutUint16(buf[start:start+2], config.ID_AUTH)
+	start += 2
+	buf[start] = VERSION
+	start++
+	buf[start] = byte(protoLen)
+	start++
+	copy(buf[start:start+protoLen], []byte(PROTO))
+	start += protoLen
+	binary.BigEndian.PutUint16(buf[start:start+2], uint16(channelLen))
+	start += 2
+	copy(buf[start:start+channelLen], []byte(channel))
+	start += channelLen
+	copy(buf[start:], auth)
+	return c.writeMessage(buf)
+}
+
+// 获取Auth信息
+// id(uint16)+version(uint8)+proto(string)+channel(string)+auth([]byte)
+func (c *WsConnectV2) ReadAuthInfo() (proto string, version uint8, channel string, auth []byte, err error) {
+	defer func() {
+		if r := recover(); r != nil {
+			err = fmt.Errorf("recovered from panic: %v", r)
+			return
+		}
+	}()
+	err = c.conn.SetReadDeadline(time.Now().Add(time.Millisecond * time.Duration(c.cf.ReadWait)))
+	if err != nil {
+		return
+	}
+	_, msg, err := c.conn.ReadMessage()
+	if err != nil {
+		return
+	}
+	msgLen := len(msg)
+	if msgLen < 4 {
+		err = errors.New("message length less than 4")
+		return
+	}
+	// 将读出来的数据进行解密
+	if c.cipher != nil {
+		c.cipher.Decrypt(msg, msg)
+	}
+	start := 0
+	id := binary.BigEndian.Uint16(msg[start : start+2])
+	if id != config.ID_AUTH {
+		err = fmt.Errorf("wrong message id: %d", id)
+		return
+	}
+	start += 2
+
+	version = msg[start]
+	if version != VERSION {
+		err = fmt.Errorf("require version %d, get version: %d", VERSION, version)
+		return
+	}
+	start++
+	protoLen := int(msg[start])
+	if protoLen < 2 {
+		err = errors.New("wrong proto length")
+		return
+	}
+	start++
+	proto = string(msg[start : start+protoLen])
+	if proto != PROTO {
+		err = fmt.Errorf("wrong proto: %s", proto)
+		return
+	}
+	start += protoLen
+	channelLen := int(binary.BigEndian.Uint16(msg[start : start+2]))
+	if channelLen < 2 {
+		err = errors.New("wrong channel length")
+		return
+	}
+	start += 2
+	channel = string(msg[start : start+channelLen])
+	start += channelLen
+	auth = msg[start:]
+	return
+}
+
+// 发送请求数据包到网络
+func (c *WsConnectV2) WriteRequest(id uint16, cmd string, data []byte) error {
+	// 为了区分请求还是响应包,命令字符串不能超过127个字节,如果超过则报错
+	cmdLen := len(cmd)
+	if cmdLen > 0x7F {
+		return errors.New("length of command more than 0x7F")
+	}
+	dlen := 2 + 1 + cmdLen + len(data)
+	buf := make([]byte, dlen) // 申请内存
+	binary.BigEndian.PutUint16(buf[0:2], id)
+	buf[2] = byte(cmdLen)
+	copy(buf[3:], cmd)
+	copy(buf[3+cmdLen:], data)
+	return c.writeMessage(buf)
+}
+
+// 发送响应数据包到网络
+// 网络格式:[id, stateCode, data]
+func (c *WsConnectV2) WriteResponse(id uint16, state uint8, data []byte) error {
+	dlen := 2 + 1 + len(data)
+	buf := make([]byte, dlen)
+	binary.BigEndian.PutUint16(buf[0:2], id)
+	buf[2] = state | 0x80
+	copy(buf[3:], data)
+	return c.writeMessage(buf)
+}
+
+// 发送ping包
+func (c *WsConnectV2) WritePing(id uint16) error {
+	buf := make([]byte, 2)
+	binary.BigEndian.PutUint16(buf[0:2], id)
+	return c.writeMessage(buf)
+}
+
+// 获取信息
+func (c *WsConnectV2) ReadMessage(deadline int) (msgType conn.MsgType, id uint16, cmd string, state uint8, data []byte, err error) {
+	err = c.conn.SetReadDeadline(time.Now().Add(time.Millisecond * time.Duration(deadline)))
+	if err != nil {
+		return
+	}
+	_, msg, err := c.conn.ReadMessage()
+	if err != nil {
+		return
+	}
+	msgLen := len(msg)
+	if msgLen < 2 {
+		err = errors.New("message length less than 2")
+		return
+	}
+	// 将读出来的数据进行解密
+	if c.cipher != nil {
+		c.cipher.Decrypt(msg, msg)
+	}
+	id = binary.BigEndian.Uint16(msg[0:2])
+	// ping信息
+	if msgLen == 2 {
+		msgType = conn.PingMsg
+		return
+	}
+
+	if id > config.ID_MAX {
+		err = fmt.Errorf("wrong message id: %d", id)
+		return
+	}
+
+	cmdx := msg[2]
+	if (cmdx & 0x80) == 0 {
+		// 请求包
+		msgType = conn.RequestMsg
+		cmdLen := int(cmdx)
+		cmd = string(msg[3 : cmdLen+3])
+		data = msg[cmdLen+3:]
+		return
+	} else {
+		// 响应数据包
+		msgType = conn.ResponseMsg
+		state = cmdx & 0x7F
+		data = msg[3:]
+		return
+	}
+}
+
+// 获取远程的地址
+func (c *WsConnectV2) RemoteAddr() net.Addr {
+	return c.conn.RemoteAddr()
+}
+
+// 获取本地的地址
+func (c *WsConnectV2) LocalAddr() net.Addr {
+	return c.conn.LocalAddr()
+}
+
+func (c *WsConnectV2) Close() error {
+	return c.conn.Close()
+}

+ 14 - 0
examples/README.md

@@ -0,0 +1,14 @@
+# 一个简单测试程序
+
+启动服务端
+
+```
+go run server.go
+```
+
+启动客户端
+
+```
+go run client-tpv2.go
+go run client-wsv2.go
+```

+ 80 - 0
examples/client-tpv2.go

@@ -0,0 +1,80 @@
+//go:build ignore
+// +build ignore
+
+package main
+
+import (
+	"log"
+	"regexp"
+	"time"
+
+	"git.me9.top/git/tinymq"
+	"git.me9.top/git/tinymq/config"
+)
+
+func main() {
+	cf := config.NewConfig()
+	channel := "/tinymq/client/tpv2"
+	host := &tinymq.HostInfo{
+		Proto:   "tp",
+		Version: 2,
+		Host:    "127.0.0.1",
+		Port:    34222,
+		Hash:    "xor:1qaz2wsx3",
+	}
+
+	hub := tinymq.NewHub(cf, channel, func(channel string) (hostInfo *tinymq.HostInfo, err error) {
+		return host, nil
+	}, func(proto string, version uint8, channel string, remoteAuth []byte) (auth []byte) {
+		// 从 remoteAuth 是否为空来判断是否需要返回信息
+		if len(remoteAuth) <= 0 {
+			// 客户端调用,返回验证信息
+			return []byte("tinymq")
+		} else {
+			// 服务端调用,返回验证token,或者其他信息
+			return nil
+		}
+	}, func(proto string, version uint8, channel string, auth []byte) bool {
+		return string(auth) == "tinymq"
+	}, func(conn *tinymq.Line) {
+		log.Println("connect state", conn.State())
+	})
+
+	// 订阅频道
+	hub.Subscribe(&tinymq.SubscribeData{
+		Channel: regexp.MustCompile("/tinymq/server"),
+		Cmd:     "hello",
+		BackFunc: func(request *tinymq.RequestData) (state uint8, result []byte) {
+			log.Println("[client RECV]<-", string(request.Data))
+			return 1, []byte("tiny client")
+		},
+	})
+	hub.Subscribe(&tinymq.SubscribeData{
+		Channel: regexp.MustCompile("/tinymq/server"),
+		Cmd:     "nodata",
+		BackFunc: func(request *tinymq.RequestData) (state uint8, result []byte) {
+			log.Println("[client RECV]<-", string(request.Data))
+			return 1, nil
+		},
+	})
+
+	err := hub.ConnectToServer("/tinymq/server", true)
+	if err != nil {
+		log.Fatalln("[client ConnectToServer ERROR]", err)
+	}
+
+	// 获取信息
+	rsp := hub.GetOne(&tinymq.GetData{
+		Channel: regexp.MustCompile("/tinymq/server"),
+		Cmd:     "hello",
+		Data:    []byte("hello from client"),
+	})
+	if rsp.State != config.STATE_OK {
+		log.Println("error state:", rsp.State)
+		return
+	}
+	log.Println("[RESULT]<-", string(rsp.Data))
+
+	time.Sleep(time.Second * 300)
+	log.Println("client exit")
+}

+ 83 - 0
examples/client-wsv2.go

@@ -0,0 +1,83 @@
+//go:build ignore
+// +build ignore
+
+package main
+
+import (
+	"log"
+	"regexp"
+	"time"
+
+	"git.me9.top/git/tinymq"
+	"git.me9.top/git/tinymq/config"
+)
+
+func main() {
+	cf := config.NewConfig()
+	localChannel := "/tinymq/client/wsv2"
+	remoteChannel := "/tinymq/server"
+	host := &tinymq.HostInfo{
+		Proto:   "ws",
+		Version: 2,
+		Host:    "127.0.0.1",
+		Port:    34211,
+		// Path:    "/tinymq",
+		Path: "/tinymq-xor",
+		Hash: "xor:1qaz2wsx3edc",
+	}
+
+	hub := tinymq.NewHub(cf, localChannel, func(channel string) (hostInfo *tinymq.HostInfo, err error) {
+		return host, nil
+	}, func(proto string, version uint8, channel string, remoteAuth []byte) (auth []byte) {
+		// 从 remoteAuth 是否为空来判断是否需要返回信息
+		if len(remoteAuth) <= 0 {
+			// 客户端调用,返回验证信息
+			return []byte("tinymq")
+		} else {
+			// 服务端调用,返回验证token,或者其他信息
+			return nil
+		}
+	}, func(proto string, cversion uint8, hannel string, auth []byte) bool {
+		return string(auth) == "tinymq"
+	}, func(conn *tinymq.Line) {
+		log.Println("connect state", conn.Channel(), conn.State())
+	})
+
+	// 订阅频道
+	hub.Subscribe(&tinymq.SubscribeData{
+		Channel: regexp.MustCompile(remoteChannel),
+		Cmd:     "hello",
+		BackFunc: func(request *tinymq.RequestData) (state uint8, result []byte) {
+			log.Println("[client RECV]<-", string(request.Data))
+			return 1, []byte("tiny client")
+		},
+	})
+	hub.Subscribe(&tinymq.SubscribeData{
+		Channel: regexp.MustCompile(remoteChannel),
+		Cmd:     "nodata",
+		BackFunc: func(request *tinymq.RequestData) (state uint8, result []byte) {
+			log.Println("[client RECV]<-", string(request.Data))
+			return 1, nil
+		},
+	})
+
+	err := hub.ConnectToServer(remoteChannel, true)
+	if err != nil {
+		log.Fatalln("[client ConnectToServer ERROR]", err)
+	}
+
+	// 获取信息
+	rsp := hub.GetOne(&tinymq.GetData{
+		Channel: regexp.MustCompile(remoteChannel),
+		Cmd:     "hello",
+		Data:    []byte("hello from client"),
+	})
+	if rsp.State != config.STATE_OK {
+		log.Println("error state:", rsp.State)
+		return
+	}
+	log.Println("[RESULT]<-", string(rsp.Data))
+
+	time.Sleep(time.Second * 300)
+	log.Println("client exit")
+}

+ 79 - 0
examples/echo-client.go

@@ -0,0 +1,79 @@
+//go:build ignore
+// +build ignore
+
+package main
+
+import (
+	"flag"
+	"log"
+	"net/url"
+	"os"
+	"os/signal"
+	"time"
+
+	"github.com/gorilla/websocket"
+)
+
+var addr = flag.String("addr", "127.0.0.1:10808", "http service address")
+
+func main() {
+	flag.Parse()
+	log.SetFlags(0)
+
+	interrupt := make(chan os.Signal, 1)
+	signal.Notify(interrupt, os.Interrupt)
+
+	u := url.URL{Scheme: "ws", Host: *addr, Path: "/echo"}
+	log.Printf("connecting to %s", u.String())
+
+	c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
+	if err != nil {
+		log.Fatal("dial:", err)
+	}
+	defer c.Close()
+
+	done := make(chan struct{})
+
+	go func() {
+		defer close(done)
+		for {
+			_, message, err := c.ReadMessage()
+			if err != nil {
+				log.Println("read:", err)
+				return
+			}
+			log.Printf("recv: %s", message)
+		}
+	}()
+
+	ticker := time.NewTicker(time.Second)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-done:
+			return
+		case t := <-ticker.C:
+			err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))
+			if err != nil {
+				log.Println("write:", err)
+				return
+			}
+		case <-interrupt:
+			log.Println("interrupt")
+
+			// Cleanly close the connection by sending a close message and then
+			// waiting (with timeout) for the server to close the connection.
+			err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
+			if err != nil {
+				log.Println("write close:", err)
+				return
+			}
+			select {
+			case <-done:
+			case <-time.After(time.Second):
+			}
+			return
+		}
+	}
+}

+ 131 - 0
examples/echo-server.go

@@ -0,0 +1,131 @@
+//go:build ignore
+// +build ignore
+
+package main
+
+import (
+	"flag"
+	"html/template"
+	"log"
+	"net/http"
+
+	"github.com/gorilla/websocket"
+)
+
+var addr = flag.String("addr", "127.0.0.1:10808", "http service address")
+
+var upgrader = websocket.Upgrader{} // use default options
+
+func echo(w http.ResponseWriter, r *http.Request) {
+	c, err := upgrader.Upgrade(w, r, nil)
+	if err != nil {
+		log.Print("upgrade:", err)
+		return
+	}
+	defer c.Close()
+	for {
+		mt, message, err := c.ReadMessage()
+		if err != nil {
+			log.Println("read:", err)
+			break
+		}
+		log.Printf("recv: %s", message)
+		err = c.WriteMessage(mt, message)
+		if err != nil {
+			log.Println("write:", err)
+			break
+		}
+	}
+}
+
+func home(w http.ResponseWriter, r *http.Request) {
+	homeTemplate.Execute(w, "ws://"+r.Host+"/echo")
+}
+
+func main() {
+	flag.Parse()
+	log.SetFlags(0)
+	http.HandleFunc("/echo", echo)
+	http.HandleFunc("/", home)
+	log.Fatal(http.ListenAndServe(*addr, nil))
+}
+
+var homeTemplate = template.Must(template.New("").Parse(`
+<!DOCTYPE html>
+<html>
+<head>
+<meta charset="utf-8">
+<script>  
+window.addEventListener("load", function(evt) {
+
+    var output = document.getElementById("output");
+    var input = document.getElementById("input");
+    var ws;
+
+    var print = function(message) {
+        var d = document.createElement("div");
+        d.textContent = message;
+        output.appendChild(d);
+        output.scroll(0, output.scrollHeight);
+    };
+
+    document.getElementById("open").onclick = function(evt) {
+        if (ws) {
+            return false;
+        }
+        ws = new WebSocket("{{.}}");
+        ws.onopen = function(evt) {
+            print("OPEN");
+        }
+        ws.onclose = function(evt) {
+            print("CLOSE");
+            ws = null;
+        }
+        ws.onmessage = function(evt) {
+            print("RESPONSE: " + evt.data);
+        }
+        ws.onerror = function(evt) {
+            print("ERROR: " + evt.data);
+        }
+        return false;
+    };
+
+    document.getElementById("send").onclick = function(evt) {
+        if (!ws) {
+            return false;
+        }
+        print("SEND: " + input.value);
+        ws.send(input.value);
+        return false;
+    };
+
+    document.getElementById("close").onclick = function(evt) {
+        if (!ws) {
+            return false;
+        }
+        ws.close();
+        return false;
+    };
+
+});
+</script>
+</head>
+<body>
+<table>
+<tr><td valign="top" width="50%">
+<p>Click "Open" to create a connection to the server, 
+"Send" to send a message to the server and "Close" to close the connection. 
+You can change the message and send multiple times.
+<p>
+<form>
+<button id="open">Open</button>
+<button id="close">Close</button>
+<p><input id="input" type="text" value="Hello world!">
+<button id="send">Send</button>
+</form>
+</td><td valign="top" width="50%">
+<div id="output" style="max-height: 70vh;overflow-y: scroll;"></div>
+</td></tr></table>
+</body>
+</html>
+`))

+ 119 - 0
examples/server.go

@@ -0,0 +1,119 @@
+//go:build ignore
+// +build ignore
+
+package main
+
+import (
+	"errors"
+	"log"
+	"os"
+	"os/signal"
+	"regexp"
+	"syscall"
+
+	"git.me9.top/git/tinymq"
+	"git.me9.top/git/tinymq/config"
+)
+
+func main() {
+	cf := config.NewConfig()
+	localChannel := "/tinymq/server"
+	remoteChannel := "/tinymq/client"
+
+	var hub *tinymq.Hub
+
+	hub = tinymq.NewHub(cf, localChannel,
+		func(channel string) (hostInfo *tinymq.HostInfo, err error) {
+			return nil, errors.New("not host found")
+		}, func(proto string, version uint8, channel string, remoteAuth []byte) (auth []byte) {
+			// 从 remoteAuth 是否为空来判断是否需要返回信息
+			if len(remoteAuth) <= 0 {
+				// 客户端调用,返回验证信息
+				return []byte("tinymq")
+			} else {
+				// 服务端调用,返回验证token,或者其他信息
+				return nil
+			}
+		}, func(proto string, version uint8, channel string, auth []byte) bool {
+			return string(auth) == "tinymq"
+		}, func(conn *tinymq.Line) {
+			log.Println("[Connect state change]", conn.Channel(), conn.State(), conn.Started(), conn.Updated())
+			if conn.State() == tinymq.Connected {
+				go hub.Get(&tinymq.GetData{
+					Channel: regexp.MustCompile(remoteChannel),
+					Cmd:     "hello",
+					Data:    []byte("hello from server push"),
+				}, func(response *tinymq.ResponseData) (ok bool) {
+					log.Println("[hello response]", response.State, string(response.Data))
+					return true
+				})
+				go hub.Get(&tinymq.GetData{
+					Channel: regexp.MustCompile(remoteChannel),
+					Cmd:     "nodata",
+				}, func(response *tinymq.ResponseData) (ok bool) {
+					log.Println("[nodata response]", response.State, string(response.Data))
+					return true
+				})
+			}
+		},
+	)
+
+	// tpv2协议
+	bindTpv2Info := &tinymq.HostInfo{
+		Proto:   "tp",
+		Version: 2,
+		Bind:    "127.0.0.1",
+		Port:    34222,
+		Hash:    "xor:1qaz2wsx3",
+	}
+	hub.BindForServer(bindTpv2Info)
+
+	// wsv2协议
+	bindwsv2Info := &tinymq.HostInfo{
+		Proto:   "ws",
+		Version: 2,
+		Bind:    "127.0.0.1",
+		Port:    34211,
+		Path:    "/tinymq-xor",
+		Hash:    "xor:1qaz2wsx3edc",
+	}
+	hub.BindForServer(bindwsv2Info)
+
+	// wsv2协议,没有加密算法
+	bindInfo := &tinymq.HostInfo{
+		Proto:   "ws",
+		Version: 2,
+		// Bind:      "127.0.0.1",
+		Port: 34211,
+		Path: "/tinymq",
+	}
+	hub.BindForServer(bindInfo)
+
+	// 订阅频道
+	hub.Subscribe(&tinymq.SubscribeData{
+		Channel: regexp.MustCompile(remoteChannel),
+		Cmd:     "hello",
+		BackFunc: func(request *tinymq.RequestData) (state uint8, result []byte) {
+			log.Println("[server RECV]<-", string(request.Data))
+			return 1, []byte("tiny server")
+		},
+	})
+	hub.Subscribe(&tinymq.SubscribeData{
+		Channel: regexp.MustCompile(remoteChannel),
+		Cmd:     "nodata",
+		BackFunc: func(request *tinymq.RequestData) (state uint8, result []byte) {
+			log.Println("[server RECV]<-", string(request.Data))
+			return 1, nil
+		},
+	})
+
+	// log.Fatal(http.ListenAndServe(net.JoinHostPort(bindwsv2Info.Bind, strconv.Itoa(int(bindwsv2Info.Port))), nil))
+
+	// 初始化一个channel
+	exit := make(chan os.Signal, 3)
+	//notify方法用来监听收到的信号
+	signal.Notify(exit, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+	sig := <-exit
+	log.Println("[Exist with]", sig.String())
+}

+ 7 - 0
go.mod

@@ -0,0 +1,7 @@
+module git.me9.top/git/tinymq
+
+go 1.24.2
+
+require (
+	github.com/gorilla/websocket v1.5.3
+)

+ 5 - 0
go.sum

@@ -0,0 +1,5 @@
+github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
+github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
+gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=

+ 609 - 0
hub.go

@@ -0,0 +1,609 @@
+package tinymq
+
+import (
+	"errors"
+	"fmt"
+	"log"
+	"math/rand"
+	"net"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
+
+	"git.me9.top/git/tinymq/config"
+	"git.me9.top/git/tinymq/conn"
+	"git.me9.top/git/tinymq/conn/tpv2"
+	"git.me9.top/git/tinymq/conn/wsv2"
+)
+
+// 类似一个插座的功能,管理多个连接
+// 一个hub即可以是客户端,同时也可以是服务端
+// 为了简化流程和让通讯更加迅速,不再重发和缓存结果,采用超时的方式告诉应用层。
+
+// 截取部分字符串
+func subStr(str string, length int) string {
+	if len(str) <= length {
+		return str
+	}
+	return str[0:length] + "..."
+}
+
+type Hub struct {
+	sync.Mutex
+	cf         *config.Config
+	globalID   uint16
+	channel    string   // 本地频道信息
+	connects   sync.Map // map[*Line]bool(true) //记录当前的连接,方便查找
+	subscribes sync.Map // [cmd]->[]*SubscribeData   //注册绑定频道的函数,用于响应请求
+	msgCache   sync.Map //  map[uint16]*GetMsg //请求的回应记录,key为id
+
+	// 客户端需要用的函数
+	connectHostFunc ConnectHostFunc // 获取对应频道的一个连接地址
+	authFunc        AuthFunc        // 获取认证信息,用于发送给对方
+
+	// 服务端需要用的函数
+	checkAuthFunc CheckAuthFunc // 核对认证是否合法
+
+	// 连接状态变化时调用的函数
+	connectStatusFunc ConnectStatusFunc
+
+	// 上次清理异常连接时间戳
+	lastCleanDeadConnect int64
+}
+
+// 清理异常连接
+func (h *Hub) cleanDeadConnect() {
+	h.Lock()
+	defer h.Unlock()
+	now := time.Now().UnixMilli()
+	if now-h.lastCleanDeadConnect > int64(h.cf.CleanDeadConnectWait) {
+		h.lastCleanDeadConnect = now
+		h.connects.Range(func(key, _ any) bool {
+			line := key.(*Line)
+			if line.state != Connected && now-line.updated.UnixMilli() > int64(h.cf.CleanDeadConnectWait) {
+				h.connects.Delete(key)
+			}
+			return true
+		})
+	}
+}
+
+// 获取通讯消息ID号
+func (h *Hub) GetID() uint16 {
+	h.Lock()
+	defer h.Unlock()
+	h.globalID++
+	if h.globalID <= 0 || h.globalID >= config.ID_MAX {
+		h.globalID = 1
+	}
+	for {
+		// 检查是否在请求队列中存在对应的id
+		if _, ok := h.msgCache.Load(h.globalID); ok {
+			h.globalID++
+			if h.globalID <= 0 || h.globalID >= config.ID_MAX {
+				h.globalID = 1
+			}
+		} else {
+			break
+		}
+	}
+	return h.globalID
+}
+
+// 注册频道,其中频道为正则表达式字符串
+func (h *Hub) Subscribe(reg *SubscribeData) (err error) {
+	if reg.Channel == nil {
+		return errors.New("channel can not be nil")
+	}
+	cmd := reg.Cmd
+	sub, ok := h.subscribes.Load(cmd)
+	if ok {
+		h.subscribes.Store(cmd, append(sub.([]*SubscribeData), reg))
+		return
+	}
+	regs := make([]*SubscribeData, 1)
+	regs[0] = reg
+	h.subscribes.Store(cmd, regs)
+	return
+}
+
+// 获取当前在线的数量
+func (h *Hub) ConnectNum() int {
+	var count int
+	h.connects.Range(func(key, _ any) bool {
+		if key.(*Line).state == Connected {
+			count++
+		}
+		return true
+	})
+	return count
+}
+
+// 获取所有的在线连接频道
+func (h *Hub) AllChannel() []string {
+	cs := make([]string, 0)
+	h.connects.Range(func(key, _ any) bool {
+		line := key.(*Line)
+		if line.state == Connected {
+			cs = append(cs, line.channel)
+		}
+		return true
+	})
+	return cs
+}
+
+// 获取所有连接频道和连接时长
+// 为了避免定义数据结构麻烦,采用|隔开
+func (h *Hub) AllChannelTime() []string {
+	cs := make([]string, 0)
+	h.connects.Range(func(key, value any) bool {
+		line := key.(*Line)
+		if line.state == Connected {
+			ti := time.Since(value.(time.Time)).Milliseconds()
+			cs = append(cs, line.channel+"|"+strconv.FormatInt(ti, 10))
+		}
+		return true
+	})
+	return cs
+}
+
+// 获取频道并通过函数过滤,如果返回 false 将终止
+func (h *Hub) ChannelToFunc(fn func(string) bool) {
+	h.connects.Range(func(key, _ any) bool {
+		line := key.(*Line)
+		if line.state == Connected {
+			return fn(line.channel)
+		}
+		return true
+	})
+}
+
+// 从 channel 获取连接
+func (h *Hub) ChannelToLine(channel string) (line *Line) {
+	h.connects.Range(func(key, _ any) bool {
+		l := key.(*Line)
+		if l.channel == channel {
+			line = l
+			return false
+		}
+		return true
+	})
+	return
+}
+
+// 返回请求结果
+func (h *Hub) outResponse(response *ResponseData) {
+	defer recover() //避免管道已经关闭而引起panic
+	id := response.Id
+	t, ok := h.msgCache.Load(id)
+	if ok {
+		// 删除数据缓存
+		h.msgCache.Delete(id)
+		gm := t.(*GetMsg)
+		// 停止定时器
+		if !gm.timer.Stop() {
+			select {
+			case <-gm.timer.C:
+			default:
+			}
+		}
+		// 回应数据到上层
+		gm.out <- response
+	}
+}
+
+// 发送数据到网络接口
+// 返回发送的数量
+func (h *Hub) sendRequest(gd *GetData) (count int) {
+	h.connects.Range(func(key, _ any) bool {
+		conn := key.(*Line)
+		// 检查连接是否OK
+		if conn.state != Connected {
+			return true
+		}
+		if gd.Channel.MatchString(conn.channel) {
+			var id uint16
+			if gd.backchan != nil {
+				id = h.GetID()
+				timeout := gd.Timeout
+				if timeout <= 0 {
+					timeout = h.cf.WriteWait
+				}
+				fn := func(id uint16, conn *Line) func() {
+					return func() {
+						go h.outResponse(&ResponseData{
+							Id:    id,
+							State: config.GET_TIMEOUT,
+							Data:  []byte(config.GET_TIMEOUT_MSG),
+							conn:  conn,
+						})
+						// 检查是否已经很久时间没有使用连接了
+						if time.Since(conn.lastRead) > time.Duration(h.cf.PingInterval*3)*time.Millisecond {
+							// 超时关闭当前的连接
+							log.Println("get message timeout", conn.channel)
+							// 有可能连接出现问题,断开并重新连接
+							conn.Close(false)
+							return
+						}
+					}
+				}(id, conn)
+				// 将要发送的请求缓存
+				gm := &GetMsg{
+					out:   gd.backchan,
+					timer: time.AfterFunc(time.Millisecond*time.Duration(timeout), fn),
+				}
+				h.msgCache.Store(id, gm)
+			}
+			// 组织数据并发送到Connect
+			conn.sendRequest <- &RequestData{
+				Id:       id,
+				Cmd:      gd.Cmd,
+				Data:     gd.Data,
+				timeout:  gd.Timeout,
+				backchan: gd.backchan,
+				conn:     conn,
+			}
+			log.Println("[SEND]->", conn.channel, "["+gd.Cmd+"]", subStr(string(gd.Data), 200))
+			count++
+			if gd.Max > 0 && count >= gd.Max {
+				return false
+			}
+		}
+		return true
+	})
+	return
+}
+
+// 执行网络发送过来的命令
+func (h *Hub) requestFromNet(request *RequestData) {
+	cmd := request.Cmd
+	channel := request.conn.channel
+	log.Println("[REQU]<-", channel, "["+cmd+"]", subStr(string(request.Data), 200))
+	sub, ok := h.subscribes.Load(cmd)
+	if ok {
+		subs := sub.([]*SubscribeData)
+		// 倒序查找是为了新增的频道响应函数优先执行
+		for i := len(subs) - 1; i >= 0; i-- {
+			rg := subs[i]
+			if rg.Channel.MatchString(channel) {
+				state, data := rg.BackFunc(request)
+				// NEXT_SUBSCRIBE 表示当前的函数没有处理完成,还需要下个注册函数处理
+				if state == config.NEXT_SUBSCRIBE {
+					continue
+				}
+				// 如果id为0表示不需要回应
+				if request.Id != 0 {
+					request.conn.sendResponse <- &ResponseData{
+						Id:    request.Id,
+						State: state,
+						Data:  data,
+					}
+					log.Println("[RESP]->", channel, "["+cmd+"]", state, subStr(string(data), 200))
+				}
+				return
+			}
+		}
+	}
+
+	log.Println("[not match command]", channel, cmd)
+	// 返回没有匹配的消息
+	request.conn.sendResponse <- &ResponseData{
+		Id:    request.Id,
+		State: config.NO_MATCH,
+		Data:  []byte(config.NO_MATCH_MSG),
+	}
+}
+
+// 请求频道并获取数据,采用回调的方式返回结果
+// 当前调用将会阻塞,直到命令都执行结束,最后返回执行的数量
+// 如果 backFunc 返回为 false 则提前结束
+func (h *Hub) Get(gd *GetData, backFunc GetBack) (count int) {
+	// 排除空频道
+	if gd.Channel == nil {
+		return 0
+	}
+	if gd.Timeout <= 0 {
+		gd.Timeout = h.cf.ReadWait
+	}
+	if gd.backchan == nil {
+		gd.backchan = make(chan *ResponseData, 32)
+	}
+	max := h.sendRequest(gd)
+	if max <= 0 {
+		return 0
+	}
+	// 避免出现异常时线程无法退出
+	timer := time.NewTimer(time.Millisecond * time.Duration(gd.Timeout+h.cf.WriteWait*2))
+	defer func() {
+		if !timer.Stop() {
+			select {
+			case <-timer.C:
+			default:
+			}
+		}
+		close(gd.backchan)
+	}()
+	for {
+		select {
+		case rp := <-gd.backchan:
+			if rp == nil || rp.conn == nil {
+				// 可能是已经退出了
+				return
+			}
+			ch := rp.conn.channel
+			log.Println("[RECV]<-", ch, "["+gd.Cmd+"]", rp.State, subStr(string(rp.Data), 200))
+			count++
+			// 如果这里返回为false这跳出循环
+			if backFunc != nil && !backFunc(rp) {
+				return
+			}
+			if count >= max {
+				return
+			}
+		case <-timer.C:
+			return
+		}
+	}
+	// return
+}
+
+// 只获取一个频道的数据,阻塞等待
+// 如果没有结果将返回 NO_MATCH
+func (h *Hub) GetOne(cmd *GetData) (response *ResponseData) {
+	cmd.Max = 1
+	h.Get(cmd, func(rp *ResponseData) (ok bool) {
+		response = rp
+		return false
+	})
+	if response == nil {
+		response = &ResponseData{
+			State: config.NO_MATCH,
+			Data:  []byte(config.NO_MATCH_MSG),
+		}
+	}
+	return
+}
+
+// 推送消息出去,不需要返回数据
+func (h *Hub) Push(cmd *GetData) {
+	cmd.backchan = nil
+	h.sendRequest(cmd)
+}
+
+// 增加连接
+func (h *Hub) addLine(line *Line) {
+	if _, ok := h.connects.Load(line); ok {
+		log.Println("connect have exist")
+		// 连接已经存在,直接返回
+		return
+	}
+	// 检查是否有相同的channel,如果有的话将其关闭删除
+	channel := line.channel
+	h.connects.Range(func(key, _ any) bool {
+		conn := key.(*Line)
+		// 删除超时的连接
+		if conn.state != Connected && conn.host == nil && time.Since(conn.lastRead) > time.Duration(h.cf.PingInterval*5)*time.Millisecond {
+			h.connects.Delete(key)
+			return true
+		}
+		if conn.channel == channel {
+			conn.Close(true)
+			h.connects.Delete(key)
+			return false
+		}
+		return true
+	})
+	h.connects.Store(line, true)
+}
+
+// 删除连接
+func (h *Hub) removeLine(conn *Line) {
+	conn.Close(true)
+	h.connects.Delete(conn)
+}
+
+// 获取指定连接的连接持续时间
+func (h *Hub) ConnectDuration(conn *Line) time.Duration {
+	t, ok := h.connects.Load(conn)
+	if ok {
+		return time.Since(t.(time.Time))
+	}
+	// 如果不存在直接返回0
+	return time.Duration(0)
+}
+
+// 绑定端口,建立服务
+// 需要程序运行时调用
+func (h *Hub) BindForServer(info *HostInfo) (err error) {
+	doConnectFunc := func(conn conn.Connect) {
+		proto, version, channel, auth, err := conn.ReadAuthInfo()
+		if err != nil {
+			log.Println("[BindForServer ReadAuthInfo ERROR]", err)
+			conn.Close()
+			return
+		}
+		if version != info.Version || proto != info.Proto {
+			log.Println("wrong version or protocol: ", version, proto)
+			conn.Close()
+			return
+		}
+		// 检查验证是否合法
+		if !h.checkAuthFunc(proto, version, channel, auth) {
+			conn.Close()
+			return
+		}
+		// 发送频道信息
+		if err := conn.WriteAuthInfo(h.channel, h.authFunc(proto, version, channel, auth)); err != nil {
+			log.Println("[WriteAuthInfo ERROR]", err)
+			conn.Close()
+			return
+		}
+		// 将连接加入现有连接中
+		done := false
+		h.connects.Range(func(key, _ any) bool {
+			line := key.(*Line)
+			if line.state == Disconnected && line.channel == channel && line.host == nil {
+				line.Start(conn, nil)
+				done = true
+				return false
+			}
+			return true
+		})
+		// 新建一个连接
+		if !done {
+			line := NewConnect(h.cf, h, channel, conn, nil)
+			h.addLine(line)
+		}
+	}
+	if info.Version == wsv2.VERSION && info.Proto == wsv2.PROTO {
+		bind := ""
+		if info.Bind != "" {
+			bind = net.JoinHostPort(info.Bind, strconv.Itoa(int(info.Port)))
+		}
+		return wsv2.Server(h.cf, bind, info.Path, info.Hash, doConnectFunc)
+	} else if info.Version == tpv2.VERSION && info.Proto == tpv2.PROTO {
+		return tpv2.Server(h.cf, net.JoinHostPort(info.Bind, strconv.Itoa(int(info.Port))), info.Hash, doConnectFunc)
+	}
+	return errors.New("not connect protocol and version found")
+}
+
+// 新建一个连接,不同的连接协议由底层自己选择
+// channel: 要连接的频道信息,需要能表达频道关键信息的部分
+func (h *Hub) ConnectToServer(channel string, force bool) (err error) {
+	// 检查当前channel是否已经存在
+	if !force {
+		line := h.ChannelToLine(channel)
+		if line != nil && line.state == Connected {
+			err = fmt.Errorf("[ConnectToServer ERROR] existed channel: %s", channel)
+			return
+		}
+	}
+	// 获取服务地址等信息
+	host, err := h.connectHostFunc(channel)
+	if err != nil {
+		return err
+	}
+	var conn conn.Connect
+	addr := net.JoinHostPort(host.Host, strconv.Itoa(int(host.Port)))
+	if host.Version == wsv2.VERSION && host.Proto == wsv2.PROTO {
+		conn, err = wsv2.Client(h.cf, addr, host.Path, host.Hash)
+	} else if host.Version == tpv2.VERSION && host.Proto == tpv2.PROTO {
+		conn, err = tpv2.Client(h.cf, addr, host.Hash)
+	} else {
+		return fmt.Errorf("not correct protocol and version found in: %+v", host)
+	}
+	if err != nil {
+		log.Println("[Client ERROR]", host.Proto, err)
+		host.Errors++
+		host.Updated = time.Now()
+		return err
+	}
+	// 发送验证信息
+	if err := conn.WriteAuthInfo(h.channel, h.authFunc(host.Proto, host.Version, channel, nil)); err != nil {
+		log.Println("[WriteAuthInfo ERROR]", err)
+		conn.Close()
+		host.Errors++
+		host.Updated = time.Now()
+		return err
+	}
+	// 接收频道信息
+	proto, version, channel2, _, err := conn.ReadAuthInfo()
+	if err != nil {
+		log.Println("[ConnectToServer ReadAuthInfo ERROR]", err)
+		conn.Close()
+		host.Errors++
+		host.Updated = time.Now()
+		return err
+	}
+	// 检查版本和协议是否一致
+	if version != host.Version || proto != host.Proto {
+		err = fmt.Errorf("[version or protocol wrong ERROR] %d, %s", version, proto)
+		log.Println(err)
+		conn.Close()
+		host.Errors++
+		host.Updated = time.Now()
+		return err
+	}
+	// 检查频道名称是否匹配
+	if !strings.Contains(channel2, channel) {
+		err = fmt.Errorf("[channel ERROR] want %s, get %s", channel, channel2)
+		log.Println(err)
+		conn.Close()
+		host.Errors++
+		host.Updated = time.Now()
+		return err
+	}
+	// 更新服务主机信息
+	host.Errors = 0
+	host.Updated = time.Now()
+
+	// 将连接加入现有连接中
+	done := false
+	h.connects.Range(func(key, _ any) bool {
+		line := key.(*Line)
+		if line.channel == channel {
+			if line.state == Connected {
+				if !force {
+					err = fmt.Errorf("[connectToServer ERROR] channel already connected: %s", channel)
+					log.Println(err)
+					return false
+				}
+				return true
+			}
+			line.Start(conn, host)
+			done = true
+			return false
+		}
+		return true
+	})
+	if err != nil {
+		return err
+	}
+	// 新建一个连接
+	if !done {
+		line := NewConnect(h.cf, h, channel, conn, host)
+		h.addLine(line)
+	}
+	return nil
+}
+
+// 重试方式连接服务
+// 将会一直阻塞直到连接成功
+func (h *Hub) ConnectToServerX(channel string, force bool) {
+	for {
+		err := h.ConnectToServer(channel, force)
+		if err == nil {
+			return
+		}
+		log.Println("[ConnectToServer ERROR, try it again]", err)
+		// 产生一个随机数避免刹间重连过载
+		r := rand.New(rand.NewSource(time.Now().UnixNano()))
+		time.Sleep(time.Duration(r.Intn(h.cf.ConnectTimeout)+(h.cf.ConnectTimeout/2)) * time.Millisecond)
+	}
+}
+
+// 建立一个集线器
+// connectFunc 用于监听连接状态的函数,可以为nil
+func NewHub(
+	cf *config.Config,
+	channel string,
+	// 客户端需要用的函数
+	connectHostFunc ConnectHostFunc,
+	authFunc AuthFunc,
+	// 服务端需要用的函数
+	checkAuthFunc CheckAuthFunc,
+	// 连接状态变化时调用的函数
+	connectStatusFunc ConnectStatusFunc,
+) (h *Hub) {
+	h = &Hub{
+		cf:                   cf,
+		channel:              channel,
+		connectHostFunc:      connectHostFunc,
+		authFunc:             authFunc,
+		checkAuthFunc:        checkAuthFunc,
+		connectStatusFunc:    connectStatusFunc,
+		lastCleanDeadConnect: time.Now().UnixMilli(),
+	}
+	return h
+}

+ 290 - 0
line.go

@@ -0,0 +1,290 @@
+package tinymq
+
+import (
+	"log"
+	"net"
+	"strings"
+	"time"
+
+	"git.me9.top/git/tinymq/config"
+	"git.me9.top/git/tinymq/conn"
+)
+
+// 建立一个虚拟连接,包括生成命令,发送命令和响应命令
+
+type Line struct {
+	cf             *config.Config
+	hub            *Hub
+	conn           conn.Connect
+	channel        string // 连接对端的频道
+	state          ConnectState
+	host           *HostInfo // 如果有值说明是客户端,就是主动连接端
+	pingID         uint16    // 只有客户端使用
+	pingWrongCount uint8     // 记录 ping id 反馈错误次数,超过3次则重新连接
+
+	// 当前连接的管道
+	sendRequest  chan *RequestData  // 发送请求数据
+	sendResponse chan *ResponseData // 发送回应包
+	pingRequest  chan *PingData     // Ping请求
+	closeConnect chan bool          // 关闭连接信号,true表示外部进行关闭,false表示连接已经出问题或超时关闭
+
+	lastRead time.Time // 记录最后一次读到数据的时间,在客户端如果超时则重连
+
+	started time.Time // 开始时间
+	updated time.Time // 更新时间
+}
+
+// 获取开始时间
+func (c *Line) Started() time.Time {
+	return c.started
+}
+
+// 获取更新时间
+func (c *Line) Updated() time.Time {
+	return c.updated
+}
+
+// 获取当前连接状态
+func (c *Line) State() ConnectState {
+	return c.state
+}
+
+// 获取频道名
+func (c *Line) Channel() string {
+	return c.channel
+}
+
+// 获取远程的地址
+func (c *Line) RemoteAddr() net.Addr {
+	if c.state == Connected {
+		return c.conn.RemoteAddr()
+	} else {
+		return nil
+	}
+}
+
+// 获取本地的地址
+func (c *Line) LocalAddr() net.Addr {
+	if c.state == Connected {
+		return c.conn.LocalAddr()
+	} else {
+		return nil
+	}
+}
+
+// 获取通讯消息ID号
+func (c *Line) getPingID() uint16 {
+	c.pingID++
+	if c.pingID <= 0 || c.pingID >= config.ID_MAX {
+		c.pingID = 1
+	}
+	return c.pingID
+}
+
+// 设置频道名
+func (c *Line) SetChannelName(name string) {
+	if strings.Contains(name, "@") {
+		c.channel = name
+	} else {
+		if inx := strings.Index(c.channel, "@"); inx >= 0 {
+			c.channel = name + c.channel[inx:]
+		} else {
+			c.channel = name + "@" + c.channel
+		}
+	}
+}
+
+// 读信息循环通道,采用新线程
+func (c *Line) readPump() {
+	for {
+		msgType, id, cmd, state, data, err := c.conn.ReadMessage(c.cf.LongReadWait)
+		if err != nil {
+			if !strings.Contains(err.Error(), "EOF") {
+				log.Println("[readPump ERROR]", err)
+			}
+			c.Close(false)
+			return
+		}
+		// 记录最后读到数据的时间
+		c.lastRead = time.Now()
+		switch msgType {
+		case conn.PingMsg:
+			// ping或pong包
+			c.pingRequest <- &PingData{
+				Id: id,
+			}
+		case conn.RequestMsg:
+			// 请求数据包
+			go c.hub.requestFromNet(&RequestData{
+				Id:   id,
+				Cmd:  cmd,
+				Data: data,
+				conn: c,
+			})
+		case conn.ResponseMsg:
+			// 网络回应数据包
+			go c.hub.outResponse(&ResponseData{
+				Id:    id,
+				State: state & 0x7F,
+				Data:  data,
+				conn:  c,
+			})
+		}
+	}
+}
+
+// 检查管道并处理不同的消息,新go程调用
+// 为了防止多线程的冲突,主要的处理都在这里进行
+func (c *Line) writePump() {
+	pingTicker := time.NewTicker(time.Duration(c.cf.PingInterval) * time.Millisecond)
+	// 定义恢复函数
+	defer func() {
+		pingTicker.Stop()
+		c.conn.Close()
+		// 检查是否需要重新连接
+		if c.host != nil && c.state != Closed && c.state != Connected {
+			go func() {
+				c.host.Errors++
+				c.host.Updated = time.Now()
+
+				time.Sleep(time.Second)
+				c.hub.ConnectToServerX(c.channel, false)
+			}()
+		}
+	}()
+	// 清空closeConnect
+	c.cleanClose()
+	// 开始处理信息循环
+	for {
+		select {
+		case request := <-c.sendRequest: // 发送请求包
+			err := c.conn.WriteRequest(request.Id, request.Cmd, request.Data)
+			if err != nil {
+				log.Println(err)
+				return
+			}
+		case response := <-c.sendResponse: // 接收到的响应包
+			// 发送响应数据
+			err := c.conn.WriteResponse(response.Id, response.State, response.Data)
+			if err != nil {
+				log.Println(err)
+				return
+			}
+		case ping := <-c.pingRequest: // 发送ping包到网络
+			// 只有服务器端需要回应ping包
+			if c.host == nil {
+				err := c.conn.WritePing(ping.Id)
+				if err != nil {
+					log.Println("[ping ERROR]", err)
+					return
+				}
+			} else {
+				// 检查 ping id 是否正确
+				if c.pingID == ping.Id {
+					c.pingWrongCount = 0
+				} else {
+					c.pingWrongCount++
+					if c.pingWrongCount > 3 {
+						log.Println("[wrong ping id]", ping.Id)
+						c.Close(false)
+						return
+					}
+				}
+			}
+		case <-pingTicker.C:
+			// 检查是否已经很久时间没有使用连接了
+			dr := time.Since(c.lastRead)
+			if dr > time.Duration(c.cf.PingInterval*3)*time.Millisecond {
+				// 超时关闭当前的连接
+				log.Println("Connect timeout and stop it", c.channel)
+				// 有可能连接出现问题,断开并重新连接
+				c.Close(false)
+				return
+			}
+			// 只需要客户端发送
+			if c.host != nil {
+				// 发送ping包
+				if dr > time.Duration(c.cf.PingInterval) {
+					// 发送ping数据包
+					err := c.conn.WritePing(c.getPingID())
+					if err != nil {
+						log.Println(err)
+						return
+					}
+				}
+			}
+		case <-c.closeConnect:
+			c.cleanClose()
+			// 退出循环
+			return
+		}
+	}
+}
+
+// 关闭连接
+func (c *Line) Close(quick bool) {
+	defer recover() //避免管道已经关闭而引起panic
+	c.conn.Close()
+	c.closeConnect <- quick
+	c.updated = time.Now()
+	if quick {
+		if c.state != Closed {
+			c.state = Closed
+			c.hub.connectStatusFunc(c)
+		}
+		c.hub.removeLine(c)
+	} else {
+		if c.state != Disconnected {
+			c.state = Disconnected
+			c.hub.connectStatusFunc(c)
+			go c.hub.cleanDeadConnect()
+		}
+	}
+}
+
+// 清空余留下来的管道消息
+func (c *Line) cleanClose() {
+	for {
+		select {
+		case <-c.closeConnect:
+		default:
+			return
+		}
+	}
+}
+
+// 连接开始运行
+func (c *Line) Start(conn conn.Connect, host *HostInfo) {
+	c.updated = time.Now()
+	c.conn = conn
+	c.host = host
+	go c.readPump()
+	go c.writePump()
+	c.state = Connected
+	c.hub.connectStatusFunc(c)
+}
+
+// 请求入口函数
+func NewConnect(
+	cf *config.Config,
+	hub *Hub,
+	channel string,
+	conn conn.Connect,
+	host *HostInfo,
+) *Line {
+	cc := &Line{
+		cf:      cf,
+		channel: channel,
+		hub:     hub,
+
+		sendRequest:  make(chan *RequestData, 32),
+		sendResponse: make(chan *ResponseData, 32),
+		pingRequest:  make(chan *PingData, 5),
+		closeConnect: make(chan bool, 5),
+
+		lastRead: time.Now(), // 避免默认为0时被清理
+		started:  time.Now(),
+	}
+	cc.Start(conn, host)
+	return cc
+}

+ 93 - 0
type.go

@@ -0,0 +1,93 @@
+package tinymq
+
+import (
+	"regexp"
+	"time"
+)
+
+type SubscribeBack func(request *RequestData) (state uint8, result []byte)
+type GetBack func(response *ResponseData) (ok bool)
+type ConnectStatusFunc func(conn *Line) // 线路状态改变时调用
+
+// 订阅频道数据结构
+type SubscribeData struct {
+	Channel  *regexp.Regexp //频道的正则表达式
+	Cmd      string         // 请求的命令
+	BackFunc SubscribeBack  //回调函数,如果状态为 NEXT_SUBSCRIBE 将继续下一个频道调用
+}
+
+// 获取数据使用的数据结构
+type GetData struct {
+	Channel *regexp.Regexp
+	Cmd     string
+	Data    []byte
+	Max     int // 获取数据的频道最多有几个,如果为0表示没有限制
+	Timeout int // 超时时间(毫秒)
+
+	backchan chan *ResponseData // 获取响应返回的数据
+}
+
+// 连接状态
+type ConnectState byte
+
+const (
+	Disconnected ConnectState = iota
+	Connected
+	Closed
+)
+
+// 请求数据包
+type RequestData struct {
+	Id   uint16
+	Cmd  string
+	Data []byte
+
+	timeout  int                // 超时时间,单位为毫秒
+	backchan chan *ResponseData // 返回数据的管道
+	conn     *Line              // 将连接传递出去是为了能够让上层找回来
+}
+
+func (r *RequestData) Conn() *Line {
+	return r.conn
+}
+
+type ResponseData struct {
+	Id    uint16
+	State uint8
+	Data  []byte
+
+	conn *Line
+}
+
+type PingData struct {
+	Id uint16
+}
+
+// 请求信息,得到回应通过管道传递信息
+type GetMsg struct {
+	out   chan *ResponseData
+	timer *time.Timer
+}
+
+// 连接服务结构
+type HostInfo struct {
+	Proto   string    `json:"proto" yaml:"proto"`     // 协议
+	Version uint8     `json:"version" yaml:"version"` // 版本
+	Host    string    `json:"host" yaml:"host"`       // 连接的IP地址或者域名
+	Bind    string    `json:"bind" yaml:"bind"`       // 绑定的地址
+	Port    uint16    `json:"port" yaml:"port"`       // 连接的端口
+	Path    string    `json:"path" yaml:"path"`       // 连接的路径
+	Hash    string    `json:"hash" yaml:"hash"`       // 连接验证使用,格式 method:key
+	Proxy   bool      `json:"proxy" yaml:"proxy"`     // 是否代理转发
+	Errors  uint16    `json:"errors" yaml:"errors"`   // 连接失败计数,如果成功了则重置为0
+	Updated time.Time `json:"updated" yaml:"updated"` // 节点信息刷新时间
+}
+
+// 获取对应频道的一个连接地址
+type ConnectHostFunc func(channel string) (hostInfo *HostInfo, err error)
+
+// 获取认证信息
+type AuthFunc func(proto string, version uint8, channel string, remoteAuth []byte) (auth []byte)
+
+// 认证合法性函数
+type CheckAuthFunc func(proto string, version uint8, channel string, auth []byte) bool