diff --git a/services/mux_bridge.go b/services/mux_bridge.go index 153a341..789a6d9 100644 --- a/services/mux_bridge.go +++ b/services/mux_bridge.go @@ -15,13 +15,16 @@ import ( type MuxBridge struct { cfg MuxBridgeArgs clientControlConns utils.ConcurrentMap + router utils.ClientKeyRouter } func NewMuxBridge() Service { - return &MuxBridge{ + b := &MuxBridge{ cfg: MuxBridgeArgs{}, clientControlConns: utils.NewConcurrentMap(), } + b.router = utils.NewClientKeyRouter(&b.clientControlConns, 50000) + return b } func (s *MuxBridge) InitService() { @@ -118,6 +121,9 @@ func (s *MuxBridge) callback(inConn net.Conn, serverID, key string) { if try == 0 { break } + if key == "*" { + key = s.router.GetKey() + } session, ok := s.clientControlConns.Get(key) if !ok { log.Printf("client %s session not exists for server stream %s", key, serverID) diff --git a/utils/structs.go b/utils/structs.go index 93c81db..2d830ba 100644 --- a/utils/structs.go +++ b/utils/structs.go @@ -773,3 +773,45 @@ func (cm *ConnManager) RemoveAll() { cm.Remove(k) } } + +type ClientKeyRouter struct { + keyChan chan string + ctrl *ConcurrentMap + lock *sync.Mutex +} + +func NewClientKeyRouter(ctrl *ConcurrentMap, size int) ClientKeyRouter { + return ClientKeyRouter{ + keyChan: make(chan string, size), + ctrl: ctrl, + lock: &sync.Mutex{}, + } +} +func (c *ClientKeyRouter) GetKey() string { + defer c.lock.Unlock() + c.lock.Lock() + if len(c.keyChan) == 0 { + EXIT: + for _, k := range c.ctrl.Keys() { + select { + case c.keyChan <- k: + default: + goto EXIT + } + } + } + for { + if len(c.keyChan) == 0 { + return "*" + } + select { + case key := <-c.keyChan: + if c.ctrl.Has(key) { + return key + } + default: + return "*" + } + } + +}