소스 검색

add mapx.go

Joyit 2 달 전
부모
커밋
bfe98170fd
6개의 변경된 파일221개의 추가작업 그리고 88개의 파일을 삭제
  1. 10 2
      README.md
  2. 1 1
      examples/client-tcp2.go
  3. 1 1
      examples/client-ws2.go
  4. 1 1
      examples/server.go
  5. 59 83
      hub.go
  6. 149 0
      mapx.go

+ 10 - 2
README.md

@@ -19,15 +19,23 @@
 - 多协议绑定
 - 连接验证
 
+## 优化连接方式
+
+- 高效匹配字符串算法
+- 随机匹配算法
+- 排序算法
+- 支持万级连接数
+
 ## 问题与优化
 
-- 断线时命令在还没有超时的情况下,要等待重连后发送
-- 优化命令匹配的算法,当有数量级的连接时需要考虑
+- 随机频道获取
+- 优化命令匹配的算法,当有数量级的连接时需要考虑;如使用固定 ID,自动生成匹配缓存等方式
 - 建立内存池来分配内存,减少内存碎片
 - 同地址多连接共存,使用不同的连接发送消息,减少延时,提高消息送达可靠性
 
 ## 已经解决的问题
 
+- 断线时命令在还没有超时的情况下,要等待重连后发送
 - 增加 gzip 的功能,只需要压缩数据部分
 - 转发地址定时测试切换回到主服务节点
 - 增加订阅中间件,处理验证登录等问题

+ 1 - 1
examples/client-tcp2.go

@@ -39,7 +39,7 @@ func main() {
 		return true
 	}, func(conn *tinymq.Line) {
 		log.Println("connect state", conn.Channel(), conn.State(), time.Since(conn.Updated()))
-	})
+	}, nil)
 
 	// 订阅频道
 	hub.Subscribe(remoteFilter, "hello", func(request *tinymq.RequestData) (state uint8, result any) {

+ 1 - 1
examples/client-ws2.go

@@ -42,7 +42,7 @@ func main() {
 		return true
 	}, func(conn *tinymq.Line) {
 		log.Println("connect state", conn.Channel(), conn.State(), time.Since(conn.Updated()))
-	})
+	}, nil)
 
 	// 订阅频道
 	hub.Subscribe(remoteFilter, "hello", func(request *tinymq.RequestData) (state uint8, result any) {

+ 1 - 1
examples/server.go

@@ -51,7 +51,7 @@ func main() {
 						return true
 					})
 			}
-		},
+		}, nil,
 	)
 
 	// tcp2协议

+ 59 - 83
hub.go

@@ -8,6 +8,7 @@ import (
 	"log"
 	"math/rand"
 	"net"
+
 	// "regexp"
 	"strconv"
 	"strings"
@@ -34,13 +35,14 @@ func subStr(str string, length int) string {
 
 type Hub struct {
 	sync.Mutex
-	cf         *config.Config
-	globalID   uint16
-	channel    string       // 本地频道信息
-	middle     []MiddleFunc // 中间件
-	connects   sync.Map     // map[*Line]bool(true) //记录当前的连接,方便查找
-	subscribes sync.Map     // [cmd]->[]*SubscribeData   //注册绑定频道的函数,用于响应请求
-	msgCache   sync.Map     //  map[uint16]*GetMsg //请求的回应记录,key为id
+	cf       *config.Config
+	globalID uint16
+	channel  string       // 本地频道信息
+	middle   []MiddleFunc // 中间件
+	// connects   sync.Map     // map[*Line]bool(true) //记录当前的连接,方便查找
+	lines      *Mapx    // 记录当前的连接,统一管理
+	subscribes sync.Map // [cmd]->[]*SubscribeData   //注册绑定频道的函数,用于响应请求
+	msgCache   sync.Map //  map[uint16]*GetMsg //请求的回应记录,key为id
 
 	// 客户端需要用的函数
 	connectHostFunc ConnectHostFunc // 获取对应频道的一个连接地址
@@ -74,9 +76,6 @@ func (h *Hub) convertData(data any) (reqData []byte, err error) {
 		}
 	default:
 		if data != nil {
-			// if s, ok := data.(func() ([]uint8, error)); ok {
-			// 	return s()
-			// }
 			// 自动转换数据为json格式
 			reqData, err = json.Marshal(data)
 			if err != nil {
@@ -90,18 +89,11 @@ func (h *Hub) convertData(data any) (reqData []byte, err error) {
 
 // 清理异常连接
 func (h *Hub) cleanDeadConnect() {
-	h.Lock()
-	defer h.Unlock()
 	now := time.Now().UnixMilli()
-	if now-h.lastCleanDeadConnect > int64(h.cf.CleanDeadConnectWait) {
+	expired := now - int64(h.cf.CleanDeadConnectWait)
+	if h.lastCleanDeadConnect < expired {
 		h.lastCleanDeadConnect = now
-		h.connects.Range(func(key, _ any) bool {
-			line := key.(*Line)
-			if line.state != Connected && now-line.updated.UnixMilli() > int64(h.cf.CleanDeadConnectWait) {
-				h.connects.Delete(key)
-			}
-			return true
-		})
+		h.lines.DeleteInvalidLines(expired)
 	}
 }
 
@@ -157,18 +149,15 @@ func (h *Hub) Subscribe(filter FilterFunc, cmd string, backFunc SubscribeBackFun
 
 // 遍历频道列表
 // 如果 fn 返回 false,则 range 停止迭代
-func (h *Hub) ConnectRange(fn func(line *Line) bool) {
-	h.connects.Range(func(key, _ any) bool {
-		line := key.(*Line)
-		return fn(line)
-	})
+func (h *Hub) ConnectRange(fn func(id int, line *Line) bool) {
+	h.lines.Range(fn)
 }
 
 // 获取当前在线的数量
 func (h *Hub) ConnectNum() int {
 	var count int
-	h.connects.Range(func(key, _ any) bool {
-		if key.(*Line).state == Connected {
+	h.lines.Range(func(id int, line *Line) bool {
+		if line.state == Connected {
 			count++
 		}
 		return true
@@ -179,8 +168,7 @@ func (h *Hub) ConnectNum() int {
 // 获取所有的在线连接频道
 func (h *Hub) AllChannel() []string {
 	cs := make([]string, 0)
-	h.connects.Range(func(key, _ any) bool {
-		line := key.(*Line)
+	h.lines.Range(func(id int, line *Line) bool {
 		if line.state == Connected {
 			cs = append(cs, line.channel)
 		}
@@ -193,11 +181,9 @@ func (h *Hub) AllChannel() []string {
 // 为了避免定义数据结构麻烦,采用|隔开, 频道名|连接开始时间
 func (h *Hub) AllChannelWithStarted() []string {
 	cs := make([]string, 0)
-	h.connects.Range(func(key, _ any) bool {
-		line := key.(*Line)
+	h.lines.Range(func(id int, line *Line) bool {
 		if line.state == Connected {
-			// ti := time.Since(line.started).Milliseconds()
-			cs = append(cs, line.channel+"|"+strconv.FormatInt(line.started.UnixMilli(), 10))
+			cs = append(cs, fmt.Sprintf("%s|%d", line.channel, line.started.UnixMilli()))
 		}
 		return true
 	})
@@ -206,8 +192,7 @@ func (h *Hub) AllChannelWithStarted() []string {
 
 // 获取频道并通过函数过滤,如果返回 false 将终止
 func (h *Hub) ChannelToFunc(fn func(string) bool) {
-	h.connects.Range(func(key, _ any) bool {
-		line := key.(*Line)
+	h.lines.Range(func(id int, line *Line) bool {
 		if line.state == Connected {
 			return fn(line.channel)
 		}
@@ -217,8 +202,7 @@ func (h *Hub) ChannelToFunc(fn func(string) bool) {
 
 // 从 channel 获取连接
 func (h *Hub) ChannelToLine(channel string) (line *Line) {
-	h.connects.Range(func(key, _ any) bool {
-		l := key.(*Line)
+	h.lines.Range(func(id int, l *Line) bool {
 		if l.channel == channel {
 			line = l
 			return false
@@ -257,19 +241,18 @@ func (h *Hub) sendRequest(gd *GetData) (count int) {
 		log.Println(err)
 		return 0
 	}
-	// 增加如果没有发送到消息,延时100ms重连直到超时
-	for i := 0; i < gd.Timeout; i += 100 {
-		h.connects.Range(func(key, _ any) bool {
-			conn := key.(*Line)
+	// 如果没有发送到消息,延时100ms重连直到超时
+	for i := 0; i <= gd.Timeout; i += 100 {
+		h.lines.Range(func(id int, line *Line) bool {
 			// 检查连接是否OK
-			if conn.state != Connected {
+			if line.state != Connected {
 				return true
 			}
 			// 验证连接是否达到发送数据的要求
-			if h.checkConnectOkFunc != nil && !h.checkConnectOkFunc(conn, gd) {
+			if h.checkConnectOkFunc != nil && !h.checkConnectOkFunc(line, gd) {
 				return true
 			}
-			if gd.Filter(conn) {
+			if gd.Filter(line) {
 				var id uint16
 				if gd.backchan != nil {
 					id = h.GetID()
@@ -294,7 +277,7 @@ func (h *Hub) sendRequest(gd *GetData) (count int) {
 								return
 							}
 						}
-					}(id, conn)
+					}(id, line)
 					// 将要发送的请求缓存
 					gm := &GetMsg{
 						out:   gd.backchan,
@@ -303,16 +286,16 @@ func (h *Hub) sendRequest(gd *GetData) (count int) {
 					h.msgCache.Store(id, gm)
 				}
 				// 组织数据并发送到Connect
-				conn.sendRequest <- &RequestData{
+				line.sendRequest <- &RequestData{
 					Id:       id,
 					Cmd:      gd.Cmd,
 					Data:     outData,
 					timeout:  gd.Timeout,
 					backchan: gd.backchan,
-					conn:     conn,
+					conn:     line,
 				}
 				if h.cf.PrintMsg {
-					log.Println("[SEND]->", id, conn.channel, "["+gd.Cmd+"]", subStr(string(outData), 200))
+					log.Println("[SEND]->", id, line.channel, "["+gd.Cmd+"]", subStr(string(outData), 200))
 				}
 				count++
 				if gd.Max > 0 && count >= gd.Max {
@@ -360,7 +343,6 @@ func (h *Hub) requestFromNet(request *RequestData) {
 		// 倒序查找是为了新增的频道响应函数优先执行
 		for i := len(subs) - 1; i >= 0; i-- {
 			rg := subs[i]
-			// if rg.Channel.MatchString(channel) {
 			if rg.Filter(request.conn) {
 				state, data := rg.BackFunc(request)
 				// NEXT_SUBSCRIBE 表示当前的函数没有处理完成,还需要下个注册函数处理
@@ -534,44 +516,40 @@ func (h *Hub) PushWithMax(filter FilterFunc, cmd string, data any, max int) {
 
 // 增加连接
 func (h *Hub) addLine(line *Line) {
-	if _, ok := h.connects.Load(line); ok {
+	if h.lines.Exist(line) {
 		log.Println("connect have exist")
 		// 连接已经存在,直接返回
 		return
 	}
-	// 检查是否有相同的channel,如果有的话将其关闭删除
-	channel := line.channel
-	h.connects.Range(func(key, _ any) bool {
-		conn := key.(*Line)
-		// 删除超时的连接
-		if conn.state != Connected && conn.host == nil && time.Since(conn.lastRead) > time.Duration(h.cf.PingInterval*5*int(time.Millisecond)) {
-			h.connects.Delete(key)
-			return true
-		}
-		if conn.channel == channel {
-			conn.Close(true)
-			h.connects.Delete(key)
-			return false
-		}
-		return true
-	})
-	h.connects.Store(line, true)
+	h.lines.Store(line)
+	// // 检查是否有相同的channel,如果有的话将其关闭删除
+	// channel := line.channel
+	// h.connects.Range(func(key, _ any) bool {
+	// 	conn := key.(*Line)
+	// 	// 删除超时的连接
+	// 	if conn.state != Connected && conn.host == nil && time.Since(conn.lastRead) > time.Duration(h.cf.PingInterval*5*int(time.Millisecond)) {
+	// 		h.connects.Delete(key)
+	// 		return true
+	// 	}
+	// 	if conn.channel == channel {
+	// 		conn.Close(true)
+	// 		h.connects.Delete(key)
+	// 		return false
+	// 	}
+	// 	return true
+	// })
+	// h.connects.Store(line, true)
 }
 
 // 删除连接
-func (h *Hub) removeLine(conn *Line) {
-	conn.Close(true)
-	h.connects.Delete(conn)
+func (h *Hub) removeLine(line *Line) {
+	line.Close(true)
+	h.lines.Delete(line)
 }
 
 // 获取指定连接的连接持续时间
-func (h *Hub) ConnectDuration(conn *Line) time.Duration {
-	t, ok := h.connects.Load(conn)
-	if ok {
-		return time.Since(t.(time.Time))
-	}
-	// 如果不存在直接返回0
-	return time.Duration(0)
+func (h *Hub) ConnectDuration(line *Line) time.Duration {
+	return time.Since(line.started)
 }
 
 // 绑定端口,建立服务
@@ -602,8 +580,7 @@ func (h *Hub) BindForServer(info *HostInfo) (err error) {
 		}
 		// 将连接加入现有连接中
 		done := false
-		h.connects.Range(func(key, _ any) bool {
-			line := key.(*Line)
+		h.lines.Range(func(id int, line *Line) bool {
 			if line.state == Disconnected && line.host == nil && line.ChannelEqualWithoutPrefix(channel) {
 				line.Start(conn, nil)
 				done = true
@@ -747,8 +724,7 @@ func (h *Hub) ConnectToServer(channel string, force bool, host *HostInfo) (err e
 
 	// 将连接加入现有连接中
 	done = false
-	h.connects.Range(func(key, _ any) bool {
-		line := key.(*Line)
+	h.lines.Range(func(id int, line *Line) bool {
 		if line.channel == channel {
 			if line.state == Connected {
 				if force {
@@ -804,8 +780,7 @@ func (h *Hub) checkProxyConnect() {
 	for {
 		<-proxyTicker.C
 		now := time.Now().UnixMilli()
-		h.connects.Range(func(key, _ any) bool {
-			line := key.(*Line)
+		h.lines.Range(func(id int, line *Line) bool {
 			if line.host != nil && line.host.Proxy && now-line.updated.UnixMilli() > int64(h.cf.ProxyTimeout) {
 				host, err := h.connectHostFunc(line.channel, Direct)
 				if err != nil {
@@ -842,6 +817,7 @@ func NewHub(
 		globalID:             uint16(time.Now().UnixNano()) % config.ID_MAX,
 		channel:              channel,
 		middle:               make([]MiddleFunc, 0),
+		lines:                NewMapx(),
 		connectHostFunc:      connectHostFunc,
 		authFunc:             authFunc,
 		checkAuthFunc:        checkAuthFunc,

+ 149 - 0
mapx.go

@@ -0,0 +1,149 @@
+package tinymq
+
+/* 优化连接方式
+
+- 高效匹配字符串算法
+- 随机匹配算法
+- 排序算法
+- 支持万级连接数
+*/
+
+import (
+	"math/rand"
+	"sync"
+)
+
+type Mapx struct {
+	sync.RWMutex
+	id        int // 生成 id 号
+	lineMap   map[int]*Line
+	randLines []*Line
+}
+
+func NewMapx() *Mapx {
+	return &Mapx{
+		lineMap:   make(map[int]*Line),
+		randLines: make([]*Line, 0),
+	}
+}
+
+// 存储连接
+func (m *Mapx) Store(line *Line) {
+	m.Lock()
+	defer m.Unlock()
+	m.id++
+	m.lineMap[m.id] = line
+	m.randLines = append(m.randLines, line)
+	if len(m.randLines) < 3 {
+		return
+	}
+	// 交换一下位置,随机数组
+	i := rand.Intn(len(m.randLines) - 1)
+	t := m.randLines[i]
+	m.randLines[i] = line
+	m.randLines[len(m.randLines)-1] = t
+}
+
+// 删除连接
+func (m *Mapx) Delete(line *Line) {
+	m.Lock()
+	defer m.Unlock()
+	for k, v := range m.lineMap {
+		if v == line {
+			delete(m.lineMap, k)
+			break
+		}
+	}
+	for i, v := range m.randLines {
+		if v == line {
+			l := len(m.randLines) - 1
+			m.randLines[i] = m.randLines[l]
+			m.randLines = m.randLines[0:l]
+			break
+		}
+	}
+}
+
+// ID 方式删除连接
+func (m *Mapx) DeleteById(id int) {
+	m.Lock()
+	defer m.Unlock()
+	line, ok := m.lineMap[id]
+	if !ok {
+		return
+	}
+	delete(m.lineMap, id)
+	for i, v := range m.randLines {
+		if v == line {
+			l := len(m.randLines) - 1
+			m.randLines[i] = m.randLines[l]
+			m.randLines = m.randLines[0:l]
+			break
+		}
+	}
+}
+
+// 删除不可用连接
+func (m *Mapx) DeleteInvalidLines(expired int64) {
+	m.Lock()
+	defer m.Unlock()
+	for id, line := range m.lineMap {
+		if line.state != Connected && line.updated.UnixMilli() < expired {
+			delete(m.lineMap, id)
+		}
+	}
+	for i := len(m.randLines) - 1; i >= 0; i-- {
+		line := m.randLines[i]
+		if line.state != Connected && line.updated.UnixMilli() < expired {
+			l := len(m.randLines) - 1
+			m.randLines[i] = m.randLines[l]
+			m.randLines = m.randLines[0:l]
+		}
+	}
+}
+
+// 循环遍历
+func (m *Mapx) Range(fn func(id int, line *Line) bool) {
+	m.RLock()
+	defer m.RUnlock()
+	for k, v := range m.lineMap {
+		if !fn(k, v) {
+			break
+		}
+	}
+}
+
+// 打乱随机数组
+func (m *Mapx) Rand() {
+	m.Lock()
+	defer m.Unlock()
+	l := len(m.randLines) / 2
+	for i := range l {
+		k := rand.Intn(l) + l
+		v := m.randLines[i]
+		m.randLines[i] = m.randLines[k]
+		m.randLines[k] = v
+	}
+}
+
+// 随机遍历
+func (m *Mapx) RandRange(fn func(i int, v *Line) bool) {
+	m.Rand()
+	m.RLock()
+	defer m.RUnlock()
+	for i, v := range m.randLines {
+		if !fn(i, v) {
+			break
+		}
+	}
+}
+
+// 是否存在
+func (m *Mapx) Exist(line *Line) bool {
+	for _, v := range m.lineMap {
+		if v == line {
+			return true
+		}
+	}
+	return false
+}