hub.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696
  1. package tinymq
  2. import (
  3. "errors"
  4. "fmt"
  5. "log"
  6. "math/rand"
  7. "net"
  8. "regexp"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "git.me9.top/git/tinymq/config"
  14. "git.me9.top/git/tinymq/conn"
  15. "git.me9.top/git/tinymq/conn/tpv2"
  16. "git.me9.top/git/tinymq/conn/wsv2"
  17. )
  18. // 类似一个插座的功能,管理多个连接
  19. // 一个hub即可以是客户端,同时也可以是服务端
  20. // 为了简化流程和让通讯更加迅速,不再重发和缓存结果,采用超时的方式告诉应用层。
  21. // 截取部分字符串
  22. func subStr(str string, length int) string {
  23. if len(str) <= length {
  24. return str
  25. }
  26. return str[0:length] + "..."
  27. }
  28. type Hub struct {
  29. sync.Mutex
  30. cf *config.Config
  31. globalID uint16
  32. channel string // 本地频道信息
  33. middle []MiddleFunc // 中间件
  34. connects sync.Map // map[*Line]bool(true) //记录当前的连接,方便查找
  35. subscribes sync.Map // [cmd]->[]*SubscribeData //注册绑定频道的函数,用于响应请求
  36. msgCache sync.Map // map[uint16]*GetMsg //请求的回应记录,key为id
  37. // 客户端需要用的函数
  38. connectHostFunc ConnectHostFunc // 获取对应频道的一个连接地址
  39. authFunc AuthFunc // 获取认证信息,用于发送给对方
  40. // 服务端需要用的函数
  41. checkAuthFunc CheckAuthFunc // 核对认证是否合法
  42. // 连接状态变化时调用的函数
  43. connectStatusFunc ConnectStatusFunc
  44. // 上次清理异常连接时间戳
  45. lastCleanDeadConnect int64
  46. }
  47. // 清理异常连接
  48. func (h *Hub) cleanDeadConnect() {
  49. h.Lock()
  50. defer h.Unlock()
  51. now := time.Now().UnixMilli()
  52. if now-h.lastCleanDeadConnect > int64(h.cf.CleanDeadConnectWait) {
  53. h.lastCleanDeadConnect = now
  54. h.connects.Range(func(key, _ any) bool {
  55. line := key.(*Line)
  56. if line.state != Connected && now-line.updated.UnixMilli() > int64(h.cf.CleanDeadConnectWait) {
  57. h.connects.Delete(key)
  58. }
  59. return true
  60. })
  61. }
  62. }
  63. // 获取通讯消息ID号
  64. func (h *Hub) GetID() uint16 {
  65. h.Lock()
  66. defer h.Unlock()
  67. h.globalID++
  68. if h.globalID <= 0 || h.globalID >= config.ID_MAX {
  69. h.globalID = 1
  70. }
  71. for {
  72. // 检查是否在请求队列中存在对应的id
  73. if _, ok := h.msgCache.Load(h.globalID); ok {
  74. h.globalID++
  75. if h.globalID <= 0 || h.globalID >= config.ID_MAX {
  76. h.globalID = 1
  77. }
  78. } else {
  79. break
  80. }
  81. }
  82. return h.globalID
  83. }
  84. // 添加中间件
  85. func (h *Hub) UseMiddle(middleFunc MiddleFunc) {
  86. h.middle = append(h.middle, middleFunc)
  87. }
  88. // 注册频道,其中频道为正则表达式字符串
  89. func (h *Hub) Subscribe(channel *regexp.Regexp, cmd string, backFunc SubscribeBack) (err error) {
  90. if channel == nil {
  91. return errors.New("channel can not be nil")
  92. }
  93. reg := &SubscribeData{
  94. Channel: channel,
  95. Cmd: cmd,
  96. BackFunc: backFunc,
  97. }
  98. sub, ok := h.subscribes.Load(cmd)
  99. if ok {
  100. h.subscribes.Store(cmd, append(sub.([]*SubscribeData), reg))
  101. return
  102. }
  103. regs := make([]*SubscribeData, 1)
  104. regs[0] = reg
  105. h.subscribes.Store(cmd, regs)
  106. return
  107. }
  108. // 获取当前在线的数量
  109. func (h *Hub) ConnectNum() int {
  110. var count int
  111. h.connects.Range(func(key, _ any) bool {
  112. if key.(*Line).state == Connected {
  113. count++
  114. }
  115. return true
  116. })
  117. return count
  118. }
  119. // 获取所有的在线连接频道
  120. func (h *Hub) AllChannel() []string {
  121. cs := make([]string, 0)
  122. h.connects.Range(func(key, _ any) bool {
  123. line := key.(*Line)
  124. if line.state == Connected {
  125. cs = append(cs, line.channel)
  126. }
  127. return true
  128. })
  129. return cs
  130. }
  131. // 获取所有连接频道和连接时长
  132. // 为了避免定义数据结构麻烦,采用|隔开
  133. func (h *Hub) AllChannelTime() []string {
  134. cs := make([]string, 0)
  135. h.connects.Range(func(key, value any) bool {
  136. line := key.(*Line)
  137. if line.state == Connected {
  138. ti := time.Since(value.(time.Time)).Milliseconds()
  139. cs = append(cs, line.channel+"|"+strconv.FormatInt(ti, 10))
  140. }
  141. return true
  142. })
  143. return cs
  144. }
  145. // 获取频道并通过函数过滤,如果返回 false 将终止
  146. func (h *Hub) ChannelToFunc(fn func(string) bool) {
  147. h.connects.Range(func(key, _ any) bool {
  148. line := key.(*Line)
  149. if line.state == Connected {
  150. return fn(line.channel)
  151. }
  152. return true
  153. })
  154. }
  155. // 从 channel 获取连接
  156. func (h *Hub) ChannelToLine(channel string) (line *Line) {
  157. h.connects.Range(func(key, _ any) bool {
  158. l := key.(*Line)
  159. if l.channel == channel {
  160. line = l
  161. return false
  162. }
  163. return true
  164. })
  165. return
  166. }
  167. // 返回请求结果
  168. func (h *Hub) outResponse(response *ResponseData) {
  169. defer recover() //避免管道已经关闭而引起panic
  170. id := response.Id
  171. t, ok := h.msgCache.Load(id)
  172. if ok {
  173. // 删除数据缓存
  174. h.msgCache.Delete(id)
  175. gm := t.(*GetMsg)
  176. // 停止定时器
  177. if !gm.timer.Stop() {
  178. select {
  179. case <-gm.timer.C:
  180. default:
  181. }
  182. }
  183. // 回应数据到上层
  184. gm.out <- response
  185. }
  186. }
  187. // 发送数据到网络接口
  188. // 返回发送的数量
  189. func (h *Hub) sendRequest(gd *GetData) (count int) {
  190. h.connects.Range(func(key, _ any) bool {
  191. conn := key.(*Line)
  192. // 检查连接是否OK
  193. if conn.state != Connected {
  194. return true
  195. }
  196. if gd.Channel.MatchString(conn.channel) {
  197. var id uint16
  198. if gd.backchan != nil {
  199. id = h.GetID()
  200. timeout := gd.Timeout
  201. if timeout <= 0 {
  202. timeout = h.cf.WriteWait
  203. }
  204. fn := func(id uint16, conn *Line) func() {
  205. return func() {
  206. go h.outResponse(&ResponseData{
  207. Id: id,
  208. State: config.GET_TIMEOUT,
  209. Data: []byte(config.GET_TIMEOUT_MSG),
  210. conn: conn,
  211. })
  212. // 检查是否已经很久时间没有使用连接了
  213. if time.Since(conn.lastRead) > time.Duration(h.cf.PingInterval*3)*time.Millisecond {
  214. // 超时关闭当前的连接
  215. log.Println("get message timeout", conn.channel)
  216. // 有可能连接出现问题,断开并重新连接
  217. conn.Close(false)
  218. return
  219. }
  220. }
  221. }(id, conn)
  222. // 将要发送的请求缓存
  223. gm := &GetMsg{
  224. out: gd.backchan,
  225. timer: time.AfterFunc(time.Millisecond*time.Duration(timeout), fn),
  226. }
  227. h.msgCache.Store(id, gm)
  228. }
  229. // 组织数据并发送到Connect
  230. conn.sendRequest <- &RequestData{
  231. Id: id,
  232. Cmd: gd.Cmd,
  233. Data: gd.Data,
  234. timeout: gd.Timeout,
  235. backchan: gd.backchan,
  236. conn: conn,
  237. }
  238. log.Println("[SEND]->", conn.channel, "["+gd.Cmd+"]", subStr(string(gd.Data), 200))
  239. count++
  240. if gd.Max > 0 && count >= gd.Max {
  241. return false
  242. }
  243. }
  244. return true
  245. })
  246. return
  247. }
  248. // 执行网络发送过来的命令
  249. func (h *Hub) requestFromNet(request *RequestData) {
  250. cmd := request.Cmd
  251. channel := request.conn.channel
  252. log.Println("[REQU]<-", channel, "["+cmd+"]", subStr(string(request.Data), 200))
  253. // 执行中间件
  254. for _, mdFunc := range h.middle {
  255. rsp := mdFunc(request)
  256. if rsp != nil {
  257. if request.Id != 0 {
  258. rsp.Id = request.Id
  259. request.conn.sendResponse <- rsp
  260. }
  261. return
  262. }
  263. }
  264. sub, ok := h.subscribes.Load(cmd)
  265. if ok {
  266. subs := sub.([]*SubscribeData)
  267. // 倒序查找是为了新增的频道响应函数优先执行
  268. for i := len(subs) - 1; i >= 0; i-- {
  269. rg := subs[i]
  270. if rg.Channel.MatchString(channel) {
  271. state, data := rg.BackFunc(request)
  272. // NEXT_SUBSCRIBE 表示当前的函数没有处理完成,还需要下个注册函数处理
  273. if state == config.NEXT_SUBSCRIBE {
  274. continue
  275. }
  276. // 如果id为0表示不需要回应
  277. if request.Id != 0 {
  278. request.conn.sendResponse <- &ResponseData{
  279. Id: request.Id,
  280. State: state,
  281. Data: data,
  282. }
  283. log.Println("[RESP]->", channel, "["+cmd+"]", state, subStr(string(data), 200))
  284. }
  285. return
  286. }
  287. }
  288. }
  289. log.Println("[not match command]", channel, cmd)
  290. // 返回没有匹配的消息
  291. request.conn.sendResponse <- &ResponseData{
  292. Id: request.Id,
  293. State: config.NO_MATCH,
  294. Data: []byte(config.NO_MATCH_MSG),
  295. }
  296. }
  297. // 请求频道并获取数据,采用回调的方式返回结果
  298. // 当前调用将会阻塞,直到命令都执行结束,最后返回执行的数量
  299. // 如果 backFunc 返回为 false 则提前结束
  300. // 最大数量和超时时间如果为0的话表示使用默认值
  301. func (h *Hub) GetX(channel *regexp.Regexp, cmd string, data []byte, backFunc GetBack, max int, timeout int) (count int) {
  302. // 排除空频道
  303. if channel == nil {
  304. return 0
  305. }
  306. if timeout <= 0 {
  307. timeout = h.cf.ReadWait
  308. }
  309. gd := &GetData{
  310. Channel: channel,
  311. Cmd: cmd,
  312. Data: data,
  313. Max: max,
  314. Timeout: timeout,
  315. backchan: make(chan *ResponseData, 32),
  316. }
  317. sendMax := h.sendRequest(gd)
  318. if sendMax <= 0 {
  319. return 0
  320. }
  321. // 避免出现异常时线程无法退出
  322. timer := time.NewTimer(time.Millisecond * time.Duration(gd.Timeout+h.cf.WriteWait*2))
  323. defer func() {
  324. if !timer.Stop() {
  325. select {
  326. case <-timer.C:
  327. default:
  328. }
  329. }
  330. close(gd.backchan)
  331. }()
  332. for {
  333. select {
  334. case rp := <-gd.backchan:
  335. if rp == nil || rp.conn == nil {
  336. // 可能是已经退出了
  337. return
  338. }
  339. ch := rp.conn.channel
  340. log.Println("[RECV]<-", ch, "["+gd.Cmd+"]", rp.State, subStr(string(rp.Data), 200))
  341. count++
  342. // 如果这里返回为false这跳出循环
  343. if backFunc != nil && !backFunc(rp) {
  344. return
  345. }
  346. if count >= sendMax {
  347. return
  348. }
  349. case <-timer.C:
  350. return
  351. }
  352. }
  353. // return
  354. }
  355. // 请求频道并获取数据,采用回调的方式返回结果
  356. // 当前调用将会阻塞,直到命令都执行结束,最后返回执行的数量
  357. // 如果 backFunc 返回为 false 则提前结束
  358. func (h *Hub) Get(channel *regexp.Regexp, cmd string, data []byte, backFunc GetBack) (count int) {
  359. return h.GetX(channel, cmd, data, backFunc, 0, 0)
  360. }
  361. // 只获取一个频道的数据,阻塞等待到默认超时间隔
  362. // 如果没有结果将返回 NO_MATCH
  363. func (h *Hub) GetOne(channel *regexp.Regexp, cmd string, data []byte) (response *ResponseData) {
  364. h.GetX(channel, cmd, data, func(rp *ResponseData) (ok bool) {
  365. response = rp
  366. return false
  367. }, 1, 0)
  368. if response == nil {
  369. response = &ResponseData{
  370. State: config.NO_MATCH,
  371. Data: []byte(config.NO_MATCH_MSG),
  372. }
  373. }
  374. return
  375. }
  376. // 只获取一个频道的数据,阻塞等待到指定超时间隔
  377. // 如果没有结果将返回 NO_MATCH
  378. func (h *Hub) GetOneX(channel *regexp.Regexp, cmd string, data []byte, timeout int) (response *ResponseData) {
  379. h.GetX(channel, cmd, data, func(rp *ResponseData) (ok bool) {
  380. response = rp
  381. return false
  382. }, 1, timeout)
  383. if response == nil {
  384. response = &ResponseData{
  385. State: config.NO_MATCH,
  386. Data: []byte(config.NO_MATCH_MSG),
  387. }
  388. }
  389. return
  390. }
  391. // 推送消息出去,不需要返回数据
  392. func (h *Hub) Push(channel *regexp.Regexp, cmd string, data []byte) {
  393. // 排除空频道
  394. if channel == nil {
  395. return
  396. }
  397. gd := &GetData{
  398. Channel: channel,
  399. Cmd: cmd,
  400. Data: data,
  401. Timeout: h.cf.ReadWait,
  402. backchan: nil,
  403. }
  404. h.sendRequest(gd)
  405. }
  406. // 推送最大对应数量的消息出去,不需要返回数据
  407. func (h *Hub) PushX(channel *regexp.Regexp, cmd string, data []byte, max int) {
  408. // 排除空频道
  409. if channel == nil {
  410. return
  411. }
  412. gd := &GetData{
  413. Channel: channel,
  414. Cmd: cmd,
  415. Data: data,
  416. Max: max,
  417. Timeout: h.cf.ReadWait,
  418. backchan: nil,
  419. }
  420. h.sendRequest(gd)
  421. }
  422. // 增加连接
  423. func (h *Hub) addLine(line *Line) {
  424. if _, ok := h.connects.Load(line); ok {
  425. log.Println("connect have exist")
  426. // 连接已经存在,直接返回
  427. return
  428. }
  429. // 检查是否有相同的channel,如果有的话将其关闭删除
  430. channel := line.channel
  431. h.connects.Range(func(key, _ any) bool {
  432. conn := key.(*Line)
  433. // 删除超时的连接
  434. if conn.state != Connected && conn.host == nil && time.Since(conn.lastRead) > time.Duration(h.cf.PingInterval*5)*time.Millisecond {
  435. h.connects.Delete(key)
  436. return true
  437. }
  438. if conn.channel == channel {
  439. conn.Close(true)
  440. h.connects.Delete(key)
  441. return false
  442. }
  443. return true
  444. })
  445. h.connects.Store(line, true)
  446. }
  447. // 删除连接
  448. func (h *Hub) removeLine(conn *Line) {
  449. conn.Close(true)
  450. h.connects.Delete(conn)
  451. }
  452. // 获取指定连接的连接持续时间
  453. func (h *Hub) ConnectDuration(conn *Line) time.Duration {
  454. t, ok := h.connects.Load(conn)
  455. if ok {
  456. return time.Since(t.(time.Time))
  457. }
  458. // 如果不存在直接返回0
  459. return time.Duration(0)
  460. }
  461. // 绑定端口,建立服务
  462. // 需要程序运行时调用
  463. func (h *Hub) BindForServer(info *HostInfo) (err error) {
  464. doConnectFunc := func(conn conn.Connect) {
  465. proto, version, channel, auth, err := conn.ReadAuthInfo()
  466. if err != nil {
  467. log.Println("[BindForServer ReadAuthInfo ERROR]", err)
  468. conn.Close()
  469. return
  470. }
  471. if version != info.Version || proto != info.Proto {
  472. log.Println("wrong version or protocol: ", version, proto)
  473. conn.Close()
  474. return
  475. }
  476. // 检查验证是否合法
  477. if !h.checkAuthFunc(proto, version, channel, auth) {
  478. conn.Close()
  479. return
  480. }
  481. // 发送频道信息
  482. if err := conn.WriteAuthInfo(h.channel, h.authFunc(proto, version, channel, auth)); err != nil {
  483. log.Println("[WriteAuthInfo ERROR]", err)
  484. conn.Close()
  485. return
  486. }
  487. // 将连接加入现有连接中
  488. done := false
  489. h.connects.Range(func(key, _ any) bool {
  490. line := key.(*Line)
  491. if line.state == Disconnected && line.channel == channel && line.host == nil {
  492. line.Start(conn, nil)
  493. done = true
  494. return false
  495. }
  496. return true
  497. })
  498. // 新建一个连接
  499. if !done {
  500. line := NewConnect(h.cf, h, channel, conn, nil)
  501. h.addLine(line)
  502. }
  503. }
  504. if info.Version == wsv2.VERSION && info.Proto == wsv2.PROTO {
  505. bind := ""
  506. if info.Bind != "" {
  507. bind = net.JoinHostPort(info.Bind, strconv.Itoa(int(info.Port)))
  508. }
  509. return wsv2.Server(h.cf, bind, info.Path, info.Hash, doConnectFunc)
  510. } else if info.Version == tpv2.VERSION && info.Proto == tpv2.PROTO {
  511. return tpv2.Server(h.cf, net.JoinHostPort(info.Bind, strconv.Itoa(int(info.Port))), info.Hash, doConnectFunc)
  512. }
  513. return errors.New("not connect protocol and version found")
  514. }
  515. // 新建一个连接,不同的连接协议由底层自己选择
  516. // channel: 要连接的频道信息,需要能表达频道关键信息的部分
  517. func (h *Hub) ConnectToServer(channel string, force bool) (err error) {
  518. // 检查当前channel是否已经存在
  519. if !force {
  520. line := h.ChannelToLine(channel)
  521. if line != nil && line.state == Connected {
  522. err = fmt.Errorf("[ConnectToServer ERROR] existed channel: %s", channel)
  523. return
  524. }
  525. }
  526. // 获取服务地址等信息
  527. host, err := h.connectHostFunc(channel)
  528. if err != nil {
  529. return err
  530. }
  531. var conn conn.Connect
  532. addr := net.JoinHostPort(host.Host, strconv.Itoa(int(host.Port)))
  533. if host.Version == wsv2.VERSION && host.Proto == wsv2.PROTO {
  534. conn, err = wsv2.Client(h.cf, addr, host.Path, host.Hash)
  535. } else if host.Version == tpv2.VERSION && host.Proto == tpv2.PROTO {
  536. conn, err = tpv2.Client(h.cf, addr, host.Hash)
  537. } else {
  538. return fmt.Errorf("not correct protocol and version found in: %+v", host)
  539. }
  540. if err != nil {
  541. log.Println("[Client ERROR]", host.Proto, err)
  542. host.Errors++
  543. host.Updated = time.Now()
  544. return err
  545. }
  546. // 发送验证信息
  547. if err := conn.WriteAuthInfo(h.channel, h.authFunc(host.Proto, host.Version, channel, nil)); err != nil {
  548. log.Println("[WriteAuthInfo ERROR]", err)
  549. conn.Close()
  550. host.Errors++
  551. host.Updated = time.Now()
  552. return err
  553. }
  554. // 接收频道信息
  555. proto, version, channel2, auth, err := conn.ReadAuthInfo()
  556. if err != nil {
  557. log.Println("[ConnectToServer ReadAuthInfo ERROR]", err)
  558. conn.Close()
  559. host.Errors++
  560. host.Updated = time.Now()
  561. return err
  562. }
  563. // 检查版本和协议是否一致
  564. if version != host.Version || proto != host.Proto {
  565. err = fmt.Errorf("[version or protocol wrong ERROR] %d, %s", version, proto)
  566. log.Println(err)
  567. conn.Close()
  568. host.Errors++
  569. host.Updated = time.Now()
  570. return err
  571. }
  572. // 检查频道名称是否匹配
  573. if !strings.Contains(channel2, channel) {
  574. err = fmt.Errorf("[channel ERROR] want %s, get %s", channel, channel2)
  575. log.Println(err)
  576. conn.Close()
  577. host.Errors++
  578. host.Updated = time.Now()
  579. return err
  580. }
  581. // 检查验证是否合法
  582. if !h.checkAuthFunc(proto, version, channel, auth) {
  583. err = fmt.Errorf("[checkAuthFunc ERROR] in proto: %s, version: %d, channel: %s, auth: %s", proto, version, channel, string(auth))
  584. log.Println(err)
  585. conn.Close()
  586. host.Errors++
  587. host.Updated = time.Now()
  588. return err
  589. }
  590. // 更新服务主机信息
  591. host.Errors = 0
  592. host.Updated = time.Now()
  593. // 将连接加入现有连接中
  594. done := false
  595. h.connects.Range(func(key, _ any) bool {
  596. line := key.(*Line)
  597. if line.channel == channel {
  598. if line.state == Connected {
  599. if !force {
  600. err = fmt.Errorf("[connectToServer ERROR] channel already connected: %s", channel)
  601. log.Println(err)
  602. return false
  603. }
  604. return true
  605. }
  606. line.Start(conn, host)
  607. done = true
  608. return false
  609. }
  610. return true
  611. })
  612. if err != nil {
  613. return err
  614. }
  615. // 新建一个连接
  616. if !done {
  617. line := NewConnect(h.cf, h, channel, conn, host)
  618. h.addLine(line)
  619. }
  620. return nil
  621. }
  622. // 重试方式连接服务
  623. // 将会一直阻塞直到连接成功
  624. func (h *Hub) ConnectToServerX(channel string, force bool) {
  625. for {
  626. err := h.ConnectToServer(channel, force)
  627. if err == nil {
  628. return
  629. }
  630. log.Println("[ConnectToServer ERROR, try it again]", err)
  631. // 产生一个随机数避免刹间重连过载
  632. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  633. time.Sleep(time.Duration(r.Intn(h.cf.ConnectTimeout)+(h.cf.ConnectTimeout/2)) * time.Millisecond)
  634. }
  635. }
  636. // 建立一个集线器
  637. // connectFunc 用于监听连接状态的函数,可以为nil
  638. func NewHub(
  639. cf *config.Config,
  640. channel string,
  641. // 客户端需要用的函数
  642. connectHostFunc ConnectHostFunc,
  643. authFunc AuthFunc,
  644. // 服务端需要用的函数
  645. checkAuthFunc CheckAuthFunc,
  646. // 连接状态变化时调用的函数
  647. connectStatusFunc ConnectStatusFunc,
  648. ) (h *Hub) {
  649. h = &Hub{
  650. cf: cf,
  651. channel: channel,
  652. middle: make([]MiddleFunc, 0),
  653. connectHostFunc: connectHostFunc,
  654. authFunc: authFunc,
  655. checkAuthFunc: checkAuthFunc,
  656. connectStatusFunc: connectStatusFunc,
  657. lastCleanDeadConnect: time.Now().UnixMilli(),
  658. }
  659. return h
  660. }