Parcourir la source

change channel filter

Joyit il y a 3 semaines
Parent
commit
8e468aa68d
6 fichiers modifiés avec 79 ajouts et 52 suppressions
  1. 7 6
      examples/client-tcp2.go
  2. 7 6
      examples/client-ws2.go
  3. 6 6
      examples/server.go
  4. 18 0
      filter.go
  5. 29 27
      hub.go
  6. 12 7
      type.go

+ 7 - 6
examples/client-tcp2.go

@@ -5,7 +5,6 @@ package main
 
 import (
 	"log"
-	"regexp"
 	"time"
 
 	"git.me9.top/git/tinymq"
@@ -14,7 +13,9 @@ import (
 
 func main() {
 	cf := config.NewConfig()
-	channel := "/tinymq/client/tcp2"
+	localChannel := "/tinymq/client/tcp2"
+	remoteChannel := "/tinymq/server"
+	remoteFilter := tinymq.StrChannelFilter(remoteChannel)
 	host := &tinymq.HostInfo{
 		Proto:   "tcp",
 		Version: 2,
@@ -23,7 +24,7 @@ func main() {
 		Hash:    "xor:1qaz2wsx3",
 	}
 
-	hub := tinymq.NewHub(cf, channel, func(channel string, hostType tinymq.HostType) (hostInfo *tinymq.HostInfo, err error) {
+	hub := tinymq.NewHub(cf, localChannel, func(channel string, hostType tinymq.HostType) (hostInfo *tinymq.HostInfo, err error) {
 		return host, nil
 	}, func(proto string, version uint8, channel string, remoteAuth []byte) (auth []byte) {
 		// 从 remoteAuth 是否为空来判断是否需要返回信息
@@ -41,12 +42,12 @@ func main() {
 	})
 
 	// 订阅频道
-	hub.Subscribe(regexp.MustCompile("/tinymq/server"), "hello", func(request *tinymq.RequestData) (state uint8, result any) {
+	hub.Subscribe(remoteFilter, "hello", func(request *tinymq.RequestData) (state uint8, result any) {
 		log.Println("[client RECV]<-", string(request.Data))
 		return 1, "tiny client"
 	},
 	)
-	hub.Subscribe(regexp.MustCompile("/tinymq/server"), "nodata", func(request *tinymq.RequestData) (state uint8, result any) {
+	hub.Subscribe(remoteFilter, "nodata", func(request *tinymq.RequestData) (state uint8, result any) {
 		log.Println("[client RECV]<-", string(request.Data))
 		return 1, nil
 	},
@@ -58,7 +59,7 @@ func main() {
 	}
 
 	// 获取信息
-	rsp := hub.GetOne(regexp.MustCompile("/tinymq/server"), "hello", []byte("hello from client, hello from client, hello from client"))
+	rsp := hub.GetOne(remoteFilter, "hello", []byte("hello from client, hello from client, hello from client"))
 	if rsp.State != config.STATE_OK {
 		log.Println("error state:", rsp.State)
 		return

+ 7 - 6
examples/client-ws2.go

@@ -5,7 +5,6 @@ package main
 
 import (
 	"log"
-	"regexp"
 	"time"
 
 	"git.me9.top/git/tinymq"
@@ -16,6 +15,8 @@ func main() {
 	cf := config.NewConfig()
 	localChannel := "/tinymq/client/ws2"
 	remoteChannel := "/tinymq/server"
+	remoteFilter := tinymq.StrChannelFilter(remoteChannel)
+
 	host := &tinymq.HostInfo{
 		Proto:   "ws",
 		Version: 2,
@@ -44,12 +45,12 @@ func main() {
 	})
 
 	// 订阅频道
-	hub.Subscribe(regexp.MustCompile(remoteChannel), "hello", func(request *tinymq.RequestData) (state uint8, result any) {
+	hub.Subscribe(remoteFilter, "hello", func(request *tinymq.RequestData) (state uint8, result any) {
 		log.Println("[client RECV]<-", string(request.Data))
 		return 1, "tiny client"
 	},
 	)
-	hub.Subscribe(regexp.MustCompile(remoteChannel), "nodata", func(request *tinymq.RequestData) (state uint8, result any) {
+	hub.Subscribe(remoteFilter, "nodata", func(request *tinymq.RequestData) (state uint8, result any) {
 		log.Println("[client RECV]<-", string(request.Data))
 		return 1, nil
 	},
@@ -61,14 +62,14 @@ func main() {
 	}
 
 	// 获取信息
-	rsp := hub.GetOne(regexp.MustCompile(remoteChannel), "hello", []byte("hello from client,hello from client,hello from client"))
+	rsp := hub.GetOne(remoteFilter, "hello", []byte("hello from client,hello from client,hello from client"))
 	if rsp.State != config.STATE_OK {
 		log.Println("error state:", rsp.State)
 		return
 	}
 	log.Println("[RESULT]<-", string(rsp.Data))
 	// 获取长数据
-	rsp = hub.GetOne(regexp.MustCompile(remoteChannel), "bigdata", nil)
+	rsp = hub.GetOne(remoteFilter, "bigdata", nil)
 	if rsp.State != config.STATE_OK {
 		log.Println("error state:", rsp.State)
 		return
@@ -77,7 +78,7 @@ func main() {
 	}
 
 	time.Sleep(time.Second * 5)
-	hub.Push(regexp.MustCompile(remoteChannel), "push", []byte(time.Now().GoString()))
+	hub.Push(remoteFilter, "push", []byte(time.Now().GoString()))
 
 	time.Sleep(time.Second * 300)
 	log.Println("client exit")

Fichier diff supprimé car celui-ci est trop grand
+ 6 - 6
examples/server.go


+ 18 - 0
filter.go

@@ -0,0 +1,18 @@
+package tinymq
+
+import "regexp"
+
+// 正则频道过滤器
+func RegChannelFilter(channel *regexp.Regexp) FilterFunc {
+	return func(conn *Line) (ok bool) {
+		return channel.MatchString(conn.channel)
+	}
+}
+
+// 字符串频道过滤器
+func StrChannelFilter(channel string) FilterFunc {
+	c := regexp.MustCompile(channel)
+	return func(conn *Line) (ok bool) {
+		return c.MatchString(conn.channel)
+	}
+}

+ 29 - 27
hub.go

@@ -8,7 +8,7 @@ import (
 	"log"
 	"math/rand"
 	"net"
-	"regexp"
+	// "regexp"
 	"strconv"
 	"strings"
 	"sync"
@@ -103,12 +103,12 @@ func (h *Hub) UseMiddle(middleFunc MiddleFunc) {
 }
 
 // 注册频道,其中频道为正则表达式字符串
-func (h *Hub) Subscribe(channel *regexp.Regexp, cmd string, backFunc SubscribeBack) (err error) {
-	if channel == nil {
-		return errors.New("channel can not be nil")
+func (h *Hub) Subscribe(filter FilterFunc, cmd string, backFunc SubscribeBackFunc) (err error) {
+	if filter == nil {
+		return errors.New("filter function can not be nil")
 	}
 	reg := &SubscribeData{
-		Channel:  channel,
+		Filter:   filter,
 		Cmd:      cmd,
 		BackFunc: backFunc,
 	}
@@ -226,7 +226,8 @@ func (h *Hub) sendRequest(gd *GetData) (count int) {
 		if conn.state != Connected {
 			return true
 		}
-		if gd.Channel.MatchString(conn.channel) {
+		// if gd.Channel.MatchString(conn.channel) {
+		if gd.Filter(conn) {
 			var id uint16
 			if gd.backchan != nil {
 				id = h.GetID()
@@ -239,7 +240,7 @@ func (h *Hub) sendRequest(gd *GetData) (count int) {
 						go h.outResponse(&ResponseData{
 							Id:    id,
 							State: config.GET_TIMEOUT,
-							Data:  fmt.Appendf(nil, "[%s] %s %s", config.GET_TIMEOUT_MSG, gd.Channel.String(), gd.Cmd),
+							Data:  fmt.Appendf(nil, "[%s] %s %s", config.GET_TIMEOUT_MSG, conn.channel, gd.Cmd),
 							conn:  conn,
 						})
 						// 检查是否已经很久时间没有使用连接了
@@ -312,7 +313,8 @@ func (h *Hub) requestFromNet(request *RequestData) {
 		// 倒序查找是为了新增的频道响应函数优先执行
 		for i := len(subs) - 1; i >= 0; i-- {
 			rg := subs[i]
-			if rg.Channel.MatchString(channel) {
+			// if rg.Channel.MatchString(channel) {
+			if rg.Filter(request.conn) {
 				state, data := rg.BackFunc(request)
 				// NEXT_SUBSCRIBE 表示当前的函数没有处理完成,还需要下个注册函数处理
 				if state == config.NEXT_SUBSCRIBE {
@@ -365,7 +367,11 @@ func (h *Hub) requestFromNet(request *RequestData) {
 // 当前调用将会阻塞,直到命令都执行结束,最后返回执行的数量
 // 如果 backFunc 返回为 false 则提前结束
 // 最大数量和超时时间如果为0的话表示使用默认值
-func (h *Hub) GetWithMaxAndTimeout(channel *regexp.Regexp, cmd string, data any, backFunc GetBack, max int, timeout int) (count int) {
+func (h *Hub) GetWithMaxAndTimeout(filter FilterFunc, cmd string, data any, backFunc GetBackFunc, max int, timeout int) (count int) {
+	// 排除空频道
+	if filter == nil {
+		return 0
+	}
 	var reqData []byte
 	switch data := data.(type) {
 	case []byte:
@@ -384,15 +390,11 @@ func (h *Hub) GetWithMaxAndTimeout(channel *regexp.Regexp, cmd string, data any,
 		}
 	}
 
-	// 排除空频道
-	if channel == nil {
-		return 0
-	}
 	if timeout <= 0 {
 		timeout = h.cf.ReadWait
 	}
 	gd := &GetData{
-		Channel:  channel,
+		Filter:   filter,
 		Cmd:      cmd,
 		Data:     reqData,
 		Max:      max,
@@ -443,21 +445,21 @@ func (h *Hub) GetWithMaxAndTimeout(channel *regexp.Regexp, cmd string, data any,
 // 请求频道并获取数据,采用回调的方式返回结果
 // 当前调用将会阻塞,直到命令都执行结束,最后返回执行的数量
 // 如果 backFunc 返回为 false 则提前结束
-func (h *Hub) Get(channel *regexp.Regexp, cmd string, data any, backFunc GetBack) (count int) {
-	return h.GetWithMaxAndTimeout(channel, cmd, data, backFunc, 0, 0)
+func (h *Hub) Get(filter FilterFunc, cmd string, data any, backFunc GetBackFunc) (count int) {
+	return h.GetWithMaxAndTimeout(filter, cmd, data, backFunc, 0, 0)
 }
 
 // 只获取一个频道的数据,阻塞等待到默认超时间隔
 // 如果没有结果将返回 NO_MATCH
-func (h *Hub) GetOne(channel *regexp.Regexp, cmd string, data any) (response *ResponseData) {
-	h.GetWithMaxAndTimeout(channel, cmd, data, func(rp *ResponseData) (ok bool) {
+func (h *Hub) GetOne(filter FilterFunc, cmd string, data any) (response *ResponseData) {
+	h.GetWithMaxAndTimeout(filter, cmd, data, func(rp *ResponseData) (ok bool) {
 		response = rp
 		return false
 	}, 1, 0)
 	if response == nil {
 		response = &ResponseData{
 			State: config.CONNECT_NO_MATCH,
-			Data:  fmt.Appendf(nil, "[%s] %s %s", config.CONNECT_NO_MATCH_MSG, channel.String(), cmd),
+			Data:  fmt.Appendf(nil, "[%s] %s", config.CONNECT_NO_MATCH_MSG, cmd),
 		}
 	}
 	return
@@ -465,29 +467,29 @@ func (h *Hub) GetOne(channel *regexp.Regexp, cmd string, data any) (response *Re
 
 // 只获取一个频道的数据,阻塞等待到指定超时间隔
 // 如果没有结果将返回 NO_MATCH
-func (h *Hub) GetOneWithTimeout(channel *regexp.Regexp, cmd string, data any, timeout int) (response *ResponseData) {
-	h.GetWithMaxAndTimeout(channel, cmd, data, func(rp *ResponseData) (ok bool) {
+func (h *Hub) GetOneWithTimeout(filter FilterFunc, cmd string, data any, timeout int) (response *ResponseData) {
+	h.GetWithMaxAndTimeout(filter, cmd, data, func(rp *ResponseData) (ok bool) {
 		response = rp
 		return false
 	}, 1, timeout)
 	if response == nil {
 		response = &ResponseData{
 			State: config.CONNECT_NO_MATCH,
-			Data:  fmt.Appendf(nil, "[%s] %s %s", config.CONNECT_NO_MATCH_MSG, channel.String(), cmd),
+			Data:  fmt.Appendf(nil, "[%s] %s", config.CONNECT_NO_MATCH_MSG, cmd),
 		}
 	}
 	return
 }
 
 // 推送消息出去,不需要返回数据
-func (h *Hub) Push(channel *regexp.Regexp, cmd string, data any) {
-	h.PushWithMax(channel, cmd, data, 0)
+func (h *Hub) Push(filter FilterFunc, cmd string, data any) {
+	h.PushWithMax(filter, cmd, data, 0)
 }
 
 // 推送最大对应数量的消息出去,不需要返回数据
-func (h *Hub) PushWithMax(channel *regexp.Regexp, cmd string, data any, max int) {
+func (h *Hub) PushWithMax(filter FilterFunc, cmd string, data any, max int) {
 	// 排除空频道
-	if channel == nil {
+	if filter == nil {
 		return
 	}
 	var reqData []byte
@@ -508,7 +510,7 @@ func (h *Hub) PushWithMax(channel *regexp.Regexp, cmd string, data any, max int)
 		}
 	}
 	gd := &GetData{
-		Channel:  channel,
+		Filter:   filter,
 		Cmd:      cmd,
 		Data:     reqData,
 		Max:      max,

+ 12 - 7
type.go

@@ -3,7 +3,7 @@ package tinymq
 import (
 	"bytes"
 	"fmt"
-	"regexp"
+	// "regexp"
 	"strings"
 	"time"
 )
@@ -14,24 +14,29 @@ import (
 type MiddleFunc func(request *RequestData) (response *ResponseData)
 
 // 订阅频道响应函数
-type SubscribeBack func(request *RequestData) (state uint8, result any)
+type SubscribeBackFunc func(request *RequestData) (state uint8, result any)
 
 // GET 获取数据的回调函数,如果返回 false 则提前结束
-type GetBack func(response *ResponseData) (ok bool)
+type GetBackFunc func(response *ResponseData) (ok bool)
 
 // 线路状态改变时调用
 type ConnectStatusFunc func(conn *Line)
 
+// 频道过滤器函数,如果返回true表示成功匹配
+type FilterFunc func(conn *Line) (ok bool)
+
 // 订阅频道数据结构
 type SubscribeData struct {
-	Channel  *regexp.Regexp //频道的正则表达式
-	Cmd      string         // 请求的命令
-	BackFunc SubscribeBack  //回调函数,如果状态为 NEXT_SUBSCRIBE 将继续下一个频道调用
+	// Channel  *regexp.Regexp    //频道的正则表达式
+	Filter   FilterFunc        // 频道匹配过滤
+	Cmd      string            // 请求的命令
+	BackFunc SubscribeBackFunc //回调函数,如果状态为 NEXT_SUBSCRIBE 将继续下一个频道调用
 }
 
 // 获取数据使用的数据结构
 type GetData struct {
-	Channel *regexp.Regexp
+	// Channel *regexp.Regexp
+	Filter  FilterFunc // 命令匹配过滤
 	Cmd     string
 	Data    []byte
 	Max     int // 获取数据的频道最多有几个,如果为0表示没有限制

Certains fichiers n'ont pas été affichés car il y a eu trop de fichiers modifiés dans ce diff