package tinymq import ( "log" "net" "strings" "time" "git.me9.top/git/tinymq/config" "git.me9.top/git/tinymq/conn" ) // 建立一个虚拟连接,包括生成命令,发送命令和响应命令 type Line struct { cf *config.Config hub *Hub conn conn.Connect channel string // 连接对端的频道 state ConnectState host *HostInfo // 如果有值说明是客户端,就是主动连接端 pingID uint16 // 只有客户端使用 pingWrongCount uint8 // 记录 ping id 反馈错误次数,超过3次则重新连接 // 当前连接的管道 sendRequest chan *RequestData // 发送请求数据 sendResponse chan *ResponseData // 发送回应包 pingRequest chan *PingData // Ping请求 closeConnect chan bool // 关闭连接信号,true表示外部进行关闭,false表示连接已经出问题或超时关闭 lastRead time.Time // 记录最后一次读到数据的时间,在客户端如果超时则重连 started time.Time // 开始时间 updated time.Time // 更新时间 } // 获取开始时间 func (c *Line) Started() time.Time { return c.started } // 获取更新时间 func (c *Line) Updated() time.Time { return c.updated } // 获取当前连接状态 func (c *Line) State() ConnectState { return c.state } // 获取频道名 func (c *Line) Channel() string { return c.channel } // 获取远程的地址 func (c *Line) RemoteAddr() net.Addr { if c.state == Connected { return c.conn.RemoteAddr() } else { return nil } } // 获取本地的地址 func (c *Line) LocalAddr() net.Addr { if c.state == Connected { return c.conn.LocalAddr() } else { return nil } } // 获取通讯消息ID号 func (c *Line) getPingID() uint16 { c.pingID++ if c.pingID <= 0 || c.pingID >= config.ID_MAX { c.pingID = 1 } return c.pingID } // 设置频道名 func (c *Line) SetChannelName(name string) { if strings.Contains(name, "@") { c.channel = name } else { if inx := strings.Index(c.channel, "@"); inx >= 0 { c.channel = name + c.channel[inx:] } else { c.channel = name + "@" + c.channel } } } // 读信息循环通道,采用新线程 func (c *Line) readPump() { for { msgType, id, cmd, state, data, err := c.conn.ReadMessage(c.cf.LongReadWait) if err != nil { if !strings.Contains(err.Error(), "EOF") { log.Println("[readPump ERROR]", err) } c.Close(false) return } // 记录最后读到数据的时间 c.lastRead = time.Now() switch msgType { case conn.PingMsg: // ping或pong包 c.pingRequest <- &PingData{ Id: id, } case conn.RequestMsg: // 请求数据包 go c.hub.requestFromNet(&RequestData{ Id: id, Cmd: cmd, Data: data, conn: c, }) case conn.ResponseMsg: // 网络回应数据包 go c.hub.outResponse(&ResponseData{ Id: id, State: state & 0x7F, Data: data, conn: c, }) } } } // 检查管道并处理不同的消息,新go程调用 // 为了防止多线程的冲突,主要的处理都在这里进行 func (c *Line) writePump() { pingTicker := time.NewTicker(time.Duration(c.cf.PingInterval) * time.Millisecond) // 定义恢复函数 defer func() { pingTicker.Stop() c.conn.Close() // 检查是否需要重新连接 if c.host != nil && c.state != Closed && c.state != Connected { go func() { c.host.Errors++ c.host.Updated = time.Now() time.Sleep(time.Second) c.hub.ConnectToServerX(c.channel, false) }() } }() // 清空closeConnect c.cleanClose() // 开始处理信息循环 for { select { case request := <-c.sendRequest: // 发送请求包 err := c.conn.WriteRequest(request.Id, request.Cmd, request.Data) if err != nil { log.Println(err) return } case response := <-c.sendResponse: // 接收到的响应包 // 发送响应数据 err := c.conn.WriteResponse(response.Id, response.State, response.Data) if err != nil { log.Println(err) return } case ping := <-c.pingRequest: // 发送ping包到网络 // 只有服务器端需要回应ping包 if c.host == nil { err := c.conn.WritePing(ping.Id) if err != nil { log.Println("[ping ERROR]", err) return } } else { // 检查 ping id 是否正确 if c.pingID == ping.Id { c.pingWrongCount = 0 } else { c.pingWrongCount++ if c.pingWrongCount > 3 { log.Println("[wrong ping id]", ping.Id) c.Close(false) return } } } case <-pingTicker.C: // 检查是否已经很久时间没有使用连接了 dr := time.Since(c.lastRead) if dr > time.Duration(c.cf.PingInterval*3)*time.Millisecond { // 超时关闭当前的连接 log.Println("Connect timeout and stop it", c.channel) // 有可能连接出现问题,断开并重新连接 c.Close(false) return } // 只需要客户端发送 if c.host != nil { // 发送ping包 if dr > time.Duration(c.cf.PingInterval) { // 发送ping数据包 err := c.conn.WritePing(c.getPingID()) if err != nil { log.Println(err) return } } } case <-c.closeConnect: c.cleanClose() // 退出循环 return } } } // 关闭连接 func (c *Line) Close(quick bool) { defer recover() //避免管道已经关闭而引起panic c.conn.Close() c.closeConnect <- quick c.updated = time.Now() if quick { if c.state != Closed { c.state = Closed c.hub.connectStatusFunc(c) } c.hub.removeLine(c) } else { if c.state != Disconnected { c.state = Disconnected c.hub.connectStatusFunc(c) go c.hub.cleanDeadConnect() } } } // 清空余留下来的管道消息 func (c *Line) cleanClose() { for { select { case <-c.closeConnect: default: return } } } // 连接开始运行 func (c *Line) Start(conn conn.Connect, host *HostInfo) { c.updated = time.Now() c.conn = conn c.host = host go c.readPump() go c.writePump() c.state = Connected c.hub.connectStatusFunc(c) } // 请求入口函数 func NewConnect( cf *config.Config, hub *Hub, channel string, conn conn.Connect, host *HostInfo, ) *Line { cc := &Line{ cf: cf, channel: channel, hub: hub, sendRequest: make(chan *RequestData, 32), sendResponse: make(chan *ResponseData, 32), pingRequest: make(chan *PingData, 5), closeConnect: make(chan bool, 5), lastRead: time.Now(), // 避免默认为0时被清理 started: time.Now(), } cc.Start(conn, host) return cc }