package tinymq import ( "bytes" "fmt" "regexp" "strings" "time" ) // 中间件函数 // 如果返回为空,表示处理完成,通过 // 如果返回 NEXT_MIDDLE,表示需要下一个中间件函数处理;如果没有下一函数则默认通过 type MiddleFunc func(request *RequestData) (response *ResponseData) // 订阅频道响应函数 type SubscribeBack func(request *RequestData) (state uint8, result []byte) // GET 获取数据的回调函数,如果返回 false 则提前结束 type GetBack func(response *ResponseData) (ok bool) // 线路状态改变时调用 type ConnectStatusFunc func(conn *Line) // 订阅频道数据结构 type SubscribeData struct { Channel *regexp.Regexp //频道的正则表达式 Cmd string // 请求的命令 BackFunc SubscribeBack //回调函数,如果状态为 NEXT_SUBSCRIBE 将继续下一个频道调用 } // 获取数据使用的数据结构 type GetData struct { Channel *regexp.Regexp Cmd string Data []byte Max int // 获取数据的频道最多有几个,如果为0表示没有限制 Timeout int // 超时时间(毫秒) backchan chan *ResponseData // 获取响应返回的数据 } // 连接状态 type ConnectState byte const ( Disconnected ConnectState = iota Connected Closed ) func (t ConnectState) String() string { switch t { case Disconnected: return "Disconnected" case Connected: return "Connected" case Closed: return "Closed" default: return fmt.Sprintf("Unknown ConnectState (%d)", t) } } // 请求数据包 type RequestData struct { Id uint16 Cmd string Data []byte timeout int // 超时时间,单位为毫秒 backchan chan *ResponseData // 返回数据的管道 conn *Line // 将连接传递出去是为了能够让上层找回来 } func (r *RequestData) Conn() *Line { return r.conn } type ResponseData struct { Id uint16 State uint8 Data []byte conn *Line } func (r *ResponseData) Conn() *Line { return r.conn } type PingData struct { Id uint16 } // 请求信息,得到回应通过管道传递信息 type GetMsg struct { out chan *ResponseData timer *time.Timer } // 连接服务结构 type HostInfo struct { Proto string `json:"proto" yaml:"proto"` // 协议 Version uint8 `json:"version" yaml:"version"` // 版本 Host string `json:"host" yaml:"host"` // 连接的IP地址或者域名 Bind string `json:"bind" yaml:"bind"` // 绑定的地址 Port uint16 `json:"port" yaml:"port"` // 连接的端口 Path string `json:"path" yaml:"path"` // 连接的路径 Hash string `json:"hash" yaml:"hash"` // 连接验证使用,格式 method:key Proxy bool `json:"proxy" yaml:"proxy"` // 是否代理 Priority int16 `json:"priority" yaml:"priority"` // 优先级,-1 表示不可用,0 表示最高优先级(为了兼容没有优先级的节点),1-100 表示优先级别,数值越高优先级越高 Errors uint16 `json:"errors" yaml:"errors"` // 连接失败计数,如果成功了则重置为0 Updated time.Time `json:"updated" yaml:"updated"` // 节点信息刷新时间 } // 只输出客户端要连接的信息 func (h *HostInfo) String() string { var b bytes.Buffer b.WriteString(fmt.Sprintf("%s%d://", h.Proto, h.Version)) if h.Hash != "" { b.WriteString(h.Hash + "@") } if strings.Contains(h.Host, ":") { // ipv6 b.WriteString("[" + h.Host + "]") } else { b.WriteString(h.Host) } b.WriteString(fmt.Sprintf(":%d", h.Port)) if h.Path != "" { b.WriteString(h.Path) } param := make([]string, 0) if h.Proxy { param = append(param, "proxy=1") } if h.Priority != 0 { param = append(param, fmt.Sprintf("priority=%d", h.Priority)) } if len(param) > 0 { b.WriteString("?" + strings.Join(param, "&")) } return b.String() } // 获取对应频道的一个连接地址 type ConnectHostFunc func(channel string, proxy bool) (hostInfo *HostInfo, err error) // 获取认证信息 type AuthFunc func(proto string, version uint8, channel string, remoteAuth []byte) (auth []byte) // 认证合法性函数 type CheckAuthFunc func(proto string, version uint8, channel string, auth []byte) bool