| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- package tinymq
- import (
- "bytes"
- "errors"
- "fmt"
- "regexp"
- "strconv"
- // "regexp"
- "strings"
- "time"
- )
- // 中间件函数
- // 如果返回为空,表示处理完成,通过
- // 如果返回 NEXT_MIDDLE,表示需要下一个中间件函数处理;如果没有下一函数则默认通过
- type MiddleFunc func(request *RequestData) (response *ResponseData)
- // 订阅频道响应函数
- type SubscribeBackFunc func(request *RequestData) (state uint8, result any)
- // GET 获取数据的回调函数,如果返回 false 则提前结束
- type GetBackFunc func(response *ResponseData) (ok bool)
- // 线路状态改变时调用
- type ConnectStatusFunc func(conn *Line)
- // 频道过滤器函数,如果返回true表示成功匹配
- type FilterFunc func(conn *Line) (ok bool)
- // 通过过滤函数获取一个频道信息
- type FilterToChannelFunc func(filter FilterFunc) (channel string)
- // 订阅频道数据结构
- type SubscribeData struct {
- Filter FilterFunc // 频道匹配过滤
- Cmd string // 请求的命令
- BackFunc SubscribeBackFunc //回调函数,如果状态为 NEXT_SUBSCRIBE 将继续下一个频道调用
- }
- // 获取数据使用的数据结构
- type GetData struct {
- Filter FilterFunc // 命令匹配过滤
- Cmd string
- Data any
- Max int // 获取数据的频道最多有几个,如果为0表示没有限制
- Timeout int // 超时时间(毫秒)
- Rand bool // 是否使用随机的数列
- backchan chan *ResponseData // 获取响应返回的数据
- }
- // 连接状态
- type ConnectState byte
- const (
- Disconnected ConnectState = iota
- Connected
- Closed
- )
- func (t ConnectState) String() string {
- switch t {
- case Disconnected:
- return "Disconnected"
- case Connected:
- return "Connected"
- case Closed:
- return "Closed"
- default:
- return fmt.Sprintf("Unknown ConnectState (%d)", t)
- }
- }
- // 主机类型
- type HostType byte
- const (
- Direct HostType = iota
- Proxy
- Both
- )
- // 请求数据包
- type RequestData struct {
- Id uint16
- Cmd string
- Data []byte
- timeout int // 超时时间,单位为毫秒
- backchan chan *ResponseData // 返回数据的管道
- conn *Line // 将连接传递出去是为了能够让上层找回来
- }
- func (r *RequestData) Conn() *Line {
- return r.conn
- }
- type ResponseData struct {
- Id uint16
- State uint8
- Data []byte
- conn *Line
- }
- func (r *ResponseData) Conn() *Line {
- return r.conn
- }
- type PingData struct {
- Id uint16
- }
- // 请求信息,得到回应通过管道传递信息
- type GetMsg struct {
- out chan *ResponseData
- timer *time.Timer
- }
- // 连接服务结构
- type HostInfo struct {
- Proto string `json:"proto" yaml:"proto"` // 协议
- Version uint8 `json:"version" yaml:"version"` // 版本
- Host string `json:"host" yaml:"host"` // 连接的IP地址或者域名
- Bind string `json:"bind,omitempty" yaml:"bind"` // 绑定的地址
- Port uint16 `json:"port,omitempty" yaml:"port"` // 连接的端口
- Path string `json:"path,omitempty" yaml:"path"` // 连接的路径
- Hash string `json:"hash,omitempty" yaml:"hash"` // 连接验证使用,格式 method:key
- Proxy bool `json:"proxy,omitempty" yaml:"proxy"` // 是否代理
- Nat bool `json:"nat,omitempty" yaml:"nat"` // 是否是前端nat的方式处理
- Priority int16 `json:"priority,omitempty" yaml:"priority"` // 优先级,-1 表示不可用,0 表示最高优先级(为了兼容没有优先级的节点),1-100 表示优先级别,数值越高优先级越高
- Errors uint16 `json:"errors,omitempty" yaml:"errors"` // 连接失败计数,如果成功了则重置为0
- Updated time.Time `json:"updated,omitempty" yaml:"updated"` // 节点信息刷新时间
- }
- // 从 url 中解析信息
- // url 格式:ws2://xor:s^7mv7L!Mrn8Y!vn@127.0.0.1:14541/wsv2?proxy=1
- // 仅支持客户端连接使用
- func ParseUrl(url string) (hostInfo *HostInfo, err error) {
- mx := regexp.MustCompile(`^([a-z]+)([0-9]+)://([^/#\?]+)(/[\w\-/]+)?`).FindStringSubmatch(url)
- if mx == nil {
- return nil, errors.New("invalid url")
- }
- protocol := mx[1]
- version, err := strconv.Atoi(mx[2])
- if err != nil {
- return nil, err
- }
- host := mx[3]
- index := strings.Index(host, "@")
- hash := ""
- if index >= 0 {
- hash = host[0:index]
- host = host[index+1:]
- }
- // 检查是否ipv6,解析出ip和端口
- index = strings.Index(host, "]:")
- port := 0
- if index > 0 {
- // ipv6 地址和端口
- port, err = strconv.Atoi(host[index+2:])
- if err != nil {
- return nil, err
- }
- host = host[1:index]
- } else {
- hs := strings.Split(host, ":")
- if len(hs) == 2 {
- host = hs[0]
- port, err = strconv.Atoi(hs[1])
- if err != nil {
- return nil, err
- }
- }
- }
- path := ""
- if len(mx) > 4 {
- path = mx[4]
- }
- hostInfo = &HostInfo{
- Proto: protocol,
- Version: uint8(version),
- Host: host,
- Port: uint16(port),
- Hash: hash,
- Path: path,
- }
- // 查找是否代理
- url = url[len(mx[0]):]
- if regexp.MustCompile(`[\?&]proxy=1`).MatchString(url) {
- hostInfo.Proxy = true
- }
- if regexp.MustCompile(`[\?&]nat=1`).MatchString(url) {
- hostInfo.Nat = true
- }
- priorityM := regexp.MustCompile(`[\?&]priority=(-?\d+)`).FindStringSubmatch(url)
- if priorityM != nil {
- priority, err := strconv.Atoi(priorityM[1])
- if err != nil {
- return nil, err
- }
- hostInfo.Priority = int16(priority)
- }
- return hostInfo, nil
- }
- // 只输出客户端要连接的信息
- func (h *HostInfo) Url() string {
- var b bytes.Buffer
- b.WriteString(fmt.Sprintf("%s%d://", h.Proto, h.Version))
- if h.Hash != "" {
- b.WriteString(h.Hash + "@")
- }
- if strings.Contains(h.Host, ":") {
- // ipv6
- b.WriteString("[" + h.Host + "]")
- } else {
- b.WriteString(h.Host)
- }
- if h.Port > 0 {
- b.WriteString(fmt.Sprintf(":%d", h.Port))
- }
- if h.Path != "" {
- b.WriteString(h.Path)
- }
- param := make([]string, 0)
- if h.Proxy {
- param = append(param, "proxy=1")
- }
- if h.Nat {
- param = append(param, "nat=1")
- }
- if h.Priority != 0 {
- param = append(param, fmt.Sprintf("priority=%d", h.Priority))
- }
- if len(param) > 0 {
- b.WriteString("?" + strings.Join(param, "&"))
- }
- return b.String()
- }
- // 输出代表一个节点的关键信息
- func (h *HostInfo) Key() string {
- return fmt.Sprintf("%s%d://%s:%d%s", h.Proto, h.Version, h.Host, h.Port, h.Path)
- }
- // 获取对应频道的一个连接地址
- type ConnectHostFunc func(channel string, hostType HostType) (hostInfo *HostInfo, err error)
- // 获取认证信息
- type AuthFunc func(client bool, proto string, version uint8, channel string, remoteAuth []byte) (auth []byte)
- // 认证合法性函数
- type CheckAuthFunc func(client bool, proto string, version uint8, channel string, auth []byte) bool
- // 验证发送的数据条件是否满足
- type CheckConnectOkFunc = func(line *Line, data *GetData) bool
|