package tinymq import ( "bytes" "errors" "fmt" "regexp" "strconv" // "regexp" "strings" "time" ) // 中间件函数 // 如果返回为空,表示处理完成,通过 // 如果返回 NEXT_MIDDLE,表示需要下一个中间件函数处理;如果没有下一函数则默认通过 type MiddleFunc func(request *RequestData) (response *ResponseData) // 订阅频道响应函数 type SubscribeBackFunc func(request *RequestData) (state uint8, result any) // GET 获取数据的回调函数,如果返回 false 则提前结束 type GetBackFunc func(response *ResponseData) (ok bool) // 线路状态改变时调用 type ConnectStatusFunc func(conn *Line) // 频道过滤器函数,如果返回true表示成功匹配 type FilterFunc func(conn *Line) (ok bool) // 通过过滤函数获取一个频道信息 type FilterToChannelFunc func(filter FilterFunc) (channel string) // 订阅频道数据结构 type SubscribeData struct { Filter FilterFunc // 频道匹配过滤 Cmd string // 请求的命令 BackFunc SubscribeBackFunc //回调函数,如果状态为 NEXT_SUBSCRIBE 将继续下一个频道调用 } // 获取数据使用的数据结构 type GetData struct { Filter FilterFunc // 命令匹配过滤 Cmd string Data any Max int // 获取数据的频道最多有几个,如果为0表示没有限制 Timeout int // 超时时间(毫秒) Rand bool // 是否使用随机的数列 backchan chan *ResponseData // 获取响应返回的数据 } // 连接状态 type ConnectState byte const ( Disconnected ConnectState = iota Connected Closed ) func (t ConnectState) String() string { switch t { case Disconnected: return "Disconnected" case Connected: return "Connected" case Closed: return "Closed" default: return fmt.Sprintf("Unknown ConnectState (%d)", t) } } // 主机类型 type HostType byte const ( Direct HostType = iota Proxy Both ) // 请求数据包 type RequestData struct { Id uint16 Cmd string Data []byte timeout int // 超时时间,单位为毫秒 backchan chan *ResponseData // 返回数据的管道 conn *Line // 将连接传递出去是为了能够让上层找回来 } func (r *RequestData) Conn() *Line { return r.conn } type ResponseData struct { Id uint16 State uint8 Data []byte conn *Line } func (r *ResponseData) Conn() *Line { return r.conn } type PingData struct { Id uint16 } // 请求信息,得到回应通过管道传递信息 type GetMsg struct { out chan *ResponseData timer *time.Timer } // 连接服务结构 type HostInfo struct { Proto string `json:"proto" yaml:"proto"` // 协议 Version uint8 `json:"version" yaml:"version"` // 版本 Host string `json:"host" yaml:"host"` // 连接的IP地址或者域名 Bind string `json:"bind" yaml:"bind"` // 绑定的地址 Port uint16 `json:"port" yaml:"port"` // 连接的端口 Path string `json:"path" yaml:"path"` // 连接的路径 Hash string `json:"hash" yaml:"hash"` // 连接验证使用,格式 method:key Proxy bool `json:"proxy" yaml:"proxy"` // 是否代理 Priority int16 `json:"priority" yaml:"priority"` // 优先级,-1 表示不可用,0 表示最高优先级(为了兼容没有优先级的节点),1-100 表示优先级别,数值越高优先级越高 Errors uint16 `json:"errors" yaml:"errors"` // 连接失败计数,如果成功了则重置为0 Updated time.Time `json:"updated" yaml:"updated"` // 节点信息刷新时间 } // 从 url 中解析信息 // url 格式:ws2://xor:s^7mv7L!Mrn8Y!vn@127.0.0.1:14541/wsv2?proxy=1 // 仅支持客户端连接使用 func ParseUrl(url string) (hostInfo *HostInfo, err error) { mx := regexp.MustCompile(`^([a-z]+)([0-9]+)://([^/#\?]+)(/[\w\-/]+)?`).FindStringSubmatch(url) if mx == nil { return nil, errors.New("invalid url") } protocol := mx[1] version, err := strconv.Atoi(mx[2]) if err != nil { return nil, err } host := mx[3] index := strings.Index(host, "@") hash := "" if index >= 0 { hash = host[0:index] host = host[index+1:] } // 检查是否ipv6,解析出ip和端口 index = strings.Index(host, "]:") port := 0 if index > 0 { // ipv6 地址和端口 port, err = strconv.Atoi(host[index+2:]) if err != nil { return nil, err } host = host[1:index] } else { hs := strings.Split(host, ":") if len(hs) == 2 { host = hs[0] port, err = strconv.Atoi(hs[1]) if err != nil { return nil, err } } } path := "" if len(mx) > 4 { path = mx[4] } hostInfo = &HostInfo{ Proto: protocol, Version: uint8(version), Host: host, Port: uint16(port), Hash: hash, Path: path, } // 查找是否代理 url = url[len(mx[0]):] if regexp.MustCompile(`[\?&]proxy=1`).MatchString(url) { hostInfo.Proxy = true } priorityM := regexp.MustCompile(`[\?&]priority=(-?\d+)`).FindStringSubmatch(url) if priorityM != nil { priority, err := strconv.Atoi(priorityM[1]) if err != nil { return nil, err } hostInfo.Priority = int16(priority) } return hostInfo, nil } // 只输出客户端要连接的信息 func (h *HostInfo) Url() string { var b bytes.Buffer b.WriteString(fmt.Sprintf("%s%d://", h.Proto, h.Version)) if h.Hash != "" { b.WriteString(h.Hash + "@") } if strings.Contains(h.Host, ":") { // ipv6 b.WriteString("[" + h.Host + "]") } else { b.WriteString(h.Host) } if h.Port > 0 { b.WriteString(fmt.Sprintf(":%d", h.Port)) } if h.Path != "" { b.WriteString(h.Path) } param := make([]string, 0) if h.Proxy { param = append(param, "proxy=1") } if h.Priority != 0 { param = append(param, fmt.Sprintf("priority=%d", h.Priority)) } if len(param) > 0 { b.WriteString("?" + strings.Join(param, "&")) } return b.String() } // 输出代表一个节点的关键信息 func (h *HostInfo) Key() string { return fmt.Sprintf("%s%d://%s:%d%s", h.Proto, h.Version, h.Host, h.Port, h.Path) } // 获取对应频道的一个连接地址 type ConnectHostFunc func(channel string, hostType HostType) (hostInfo *HostInfo, err error) // 获取认证信息 type AuthFunc func(client bool, proto string, version uint8, channel string, remoteAuth []byte) (auth []byte) // 认证合法性函数 type CheckAuthFunc func(client bool, proto string, version uint8, channel string, auth []byte) bool // 验证发送的数据条件是否满足 type CheckConnectOkFunc = func(line *Line, data *GetData) bool