|
@@ -56,6 +56,35 @@ type Hub struct {
|
|
|
lastCleanDeadConnect int64
|
|
|
}
|
|
|
|
|
|
+// 转换数据
|
|
|
+func (h *Hub) convData(data any) (reqData []byte, err error) {
|
|
|
+ switch data := data.(type) {
|
|
|
+ case []byte:
|
|
|
+ reqData = data
|
|
|
+ case string:
|
|
|
+ reqData = []byte(data)
|
|
|
+ case func() ([]byte, error):
|
|
|
+ reqData, err = data()
|
|
|
+ if err != nil {
|
|
|
+ log.Println(err.Error())
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ default:
|
|
|
+ if data != nil {
|
|
|
+ // if s, ok := data.(func() ([]uint8, error)); ok {
|
|
|
+ // return s()
|
|
|
+ // }
|
|
|
+ // 自动转换数据为json格式
|
|
|
+ reqData, err = json.Marshal(data)
|
|
|
+ if err != nil {
|
|
|
+ log.Println(err.Error())
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
// 清理异常连接
|
|
|
func (h *Hub) cleanDeadConnect() {
|
|
|
h.Lock()
|
|
@@ -220,6 +249,11 @@ func (h *Hub) outResponse(response *ResponseData) {
|
|
|
// 发送数据到网络接口
|
|
|
// 返回发送的数量
|
|
|
func (h *Hub) sendRequest(gd *GetData) (count int) {
|
|
|
+ outData, err := h.convData(gd.Data)
|
|
|
+ if err != nil {
|
|
|
+ log.Println(err)
|
|
|
+ return 0
|
|
|
+ }
|
|
|
h.connects.Range(func(key, _ any) bool {
|
|
|
conn := key.(*Line)
|
|
|
// 检查连接是否OK
|
|
@@ -264,13 +298,13 @@ func (h *Hub) sendRequest(gd *GetData) (count int) {
|
|
|
conn.sendRequest <- &RequestData{
|
|
|
Id: id,
|
|
|
Cmd: gd.Cmd,
|
|
|
- Data: gd.Data,
|
|
|
+ Data: outData,
|
|
|
timeout: gd.Timeout,
|
|
|
backchan: gd.backchan,
|
|
|
conn: conn,
|
|
|
}
|
|
|
if h.cf.PrintMsg {
|
|
|
- log.Println("[SEND]->", id, conn.channel, "["+gd.Cmd+"]", subStr(string(gd.Data), 200))
|
|
|
+ log.Println("[SEND]->", id, conn.channel, "["+gd.Cmd+"]", subStr(string(outData), 200))
|
|
|
}
|
|
|
count++
|
|
|
if gd.Max > 0 && count >= gd.Max {
|
|
@@ -372,31 +406,13 @@ func (h *Hub) GetWithMaxAndTimeout(filter FilterFunc, cmd string, data any, back
|
|
|
if filter == nil {
|
|
|
return 0
|
|
|
}
|
|
|
- var reqData []byte
|
|
|
- switch data := data.(type) {
|
|
|
- case []byte:
|
|
|
- reqData = data
|
|
|
- case string:
|
|
|
- reqData = []byte(data)
|
|
|
- default:
|
|
|
- if data != nil {
|
|
|
- // 自动转换数据为json格式
|
|
|
- var err error
|
|
|
- reqData, err = json.Marshal(data)
|
|
|
- if err != nil {
|
|
|
- log.Println(err.Error())
|
|
|
- return 0
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
if timeout <= 0 {
|
|
|
timeout = h.cf.ReadWait
|
|
|
}
|
|
|
gd := &GetData{
|
|
|
Filter: filter,
|
|
|
Cmd: cmd,
|
|
|
- Data: reqData,
|
|
|
+ Data: data,
|
|
|
Max: max,
|
|
|
Timeout: timeout,
|
|
|
backchan: make(chan *ResponseData, 32),
|
|
@@ -492,27 +508,10 @@ func (h *Hub) PushWithMax(filter FilterFunc, cmd string, data any, max int) {
|
|
|
if filter == nil {
|
|
|
return
|
|
|
}
|
|
|
- var reqData []byte
|
|
|
- switch data := data.(type) {
|
|
|
- case []byte:
|
|
|
- reqData = data
|
|
|
- case string:
|
|
|
- reqData = []byte(data)
|
|
|
- default:
|
|
|
- if data != nil {
|
|
|
- // 自动转换数据为json格式
|
|
|
- var err error
|
|
|
- reqData, err = json.Marshal(data)
|
|
|
- if err != nil {
|
|
|
- log.Println(err.Error())
|
|
|
- return
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
gd := &GetData{
|
|
|
Filter: filter,
|
|
|
Cmd: cmd,
|
|
|
- Data: reqData,
|
|
|
+ Data: data,
|
|
|
Max: max,
|
|
|
Timeout: h.cf.ReadWait,
|
|
|
backchan: nil,
|