From 85178223e029c2b03fcc7734ba17d582feada801 Mon Sep 17 00:00:00 2001 From: "arraykeys@gmail.com" Date: Fri, 14 Sep 2018 18:20:45 +0800 Subject: [PATCH] =?UTF-8?q?5.=E4=BC=98=E5=8C=96=E4=BA=86DST,=E9=98=B2?= =?UTF-8?q?=E6=AD=A2=E6=84=8F=E5=A4=96crash.=206.=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E4=BA=86mapx=E7=9A=84Keys()=E6=96=B9=E6=B3=95=E7=9A=84bug?= =?UTF-8?q?=E5=AF=BC=E8=87=B4=E5=86=85=E7=BD=91=E7=A9=BF=E9=80=8Fbridge?= =?UTF-8?q?=E4=B8=8D=E7=A8=B3=E5=AE=9A=E7=9A=84=E9=97=AE=E9=A2=98.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG | 3 +++ config.go | 6 +++--- core/dst/conn.go | 11 ++++++++++- core/dst/mux.go | 2 +- core/dst/sendbuffer.go | 2 +- sdk/android-ios/dns.go | 4 ++-- services/http/http.go | 2 +- services/mux/mux_bridge.go | 34 +++++++++++++++++++++++++--------- services/mux/mux_client.go | 29 +++++++++++++++++++++-------- services/mux/mux_server.go | 16 ++++++++++------ services/tcp/tcp.go | 6 +++--- utils/mapx/map.go | 12 ++++++------ utils/structs.go | 4 ++-- 13 files changed, 88 insertions(+), 43 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 6b4f7c3..f0fc19a 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -7,6 +7,9 @@ v6.1 4.内网穿透增加了TCPS和TOU协议, TCPS提供了多种自定义加密TCP方式传输. TOU提供了TCP over UDP,多种自定义加密UDP方式传输TCP数据. +5.优化了DST,防止意外crash. +6.修复了mapx的Keys()方法的bug导致内网穿透bridge不稳定的问题. + v6.0 企业版开源啦 本次更新主要是把企业版开源,把企业版代码合并到现在的开源goproxy当中,继续遵循GPLv3,免费开源, diff --git a/config.go b/config.go index 6098db2..5cbfc88 100755 --- a/config.go +++ b/config.go @@ -432,7 +432,7 @@ func initConfig() (err error) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() for { @@ -456,7 +456,7 @@ func initConfig() (err error) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() for scanner.Scan() { @@ -466,7 +466,7 @@ func initConfig() (err error) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() for scannerStdErr.Scan() { diff --git a/core/dst/conn.go b/core/dst/conn.go index e978b75..3a32df7 100644 --- a/core/dst/conn.go +++ b/core/dst/conn.go @@ -137,7 +137,7 @@ func (c *Conn) start() { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:",e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) } }() c.reader() @@ -433,6 +433,12 @@ func (c *Conn) String() string { // Read can be made to time out and return a Error with Timeout() == true // after a fixed time limit; see SetDeadline and SetReadDeadline. func (c *Conn) Read(b []byte) (n int, err error) { + defer func() { + if e := recover(); e != nil { + n = 0 + err = io.EOF + } + }() c.inbufMut.Lock() defer c.inbufMut.Unlock() for c.inbuf.Len() == 0 { @@ -497,6 +503,9 @@ func (c *Conn) Write(b []byte) (n int, err error) { // Close closes the connection. // Any blocked Read or Write operations will be unblocked and return errors. func (c *Conn) Close() error { + defer func() { + _ = recover() + }() c.closeOnce.Do(func() { if debugConnection { log.Println(c, "explicit close start") diff --git a/core/dst/mux.go b/core/dst/mux.go index b548f7b..934deda 100644 --- a/core/dst/mux.go +++ b/core/dst/mux.go @@ -82,7 +82,7 @@ func NewMux(conn net.PacketConn, packetSize int) *Mux { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() m.readerLoop() diff --git a/core/dst/sendbuffer.go b/core/dst/sendbuffer.go index 65c9a47..406dcd5 100644 --- a/core/dst/sendbuffer.go +++ b/core/dst/sendbuffer.go @@ -56,7 +56,7 @@ func newSendBuffer(m *Mux) *sendBuffer { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() b.writerLoop() diff --git a/sdk/android-ios/dns.go b/sdk/android-ios/dns.go index b8edfde..228f750 100644 --- a/sdk/android-ios/dns.go +++ b/sdk/android-ios/dns.go @@ -60,7 +60,7 @@ func (s *DNS) InitService() (err error) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() for { @@ -142,7 +142,7 @@ func (s *DNS) Start(args interface{}, log *logger.Logger) (err error) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() log.Printf("dns server on udp %s", *s.cfg.Local) diff --git a/services/http/http.go b/services/http/http.go index 10b53a7..a2dd03a 100644 --- a/services/http/http.go +++ b/services/http/http.go @@ -183,7 +183,7 @@ func (s *HTTP) InitService() (err error) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() //循环检查ssh网络连通性 diff --git a/services/mux/mux_bridge.go b/services/mux/mux_bridge.go index b44b18a..c3f76c6 100644 --- a/services/mux/mux_bridge.go +++ b/services/mux/mux_bridge.go @@ -131,7 +131,11 @@ func (s *MuxBridge) Start(args interface{}, log *logger.Logger) (err error) { return } s.sc = &sc - s.log.Printf("%s bridge on %s", *s.cfg.LocalType, (*sc.Listener).Addr()) + if *s.cfg.LocalType == "tou" { + s.log.Printf("%s bridge on %s", *s.cfg.LocalType, sc.UDPListener.LocalAddr()) + } else { + s.log.Printf("%s bridge on %s", *s.cfg.LocalType, (*sc.Listener).Addr()) + } return } func (s *MuxBridge) Clean() { @@ -211,20 +215,25 @@ func (s *MuxBridge) handler(inConn net.Conn) { index := keyInfo[1] s.l.Lock() defer s.l.Unlock() + var group *mapx.ConcurrentMap if !s.clientControlConns.Has(groupKey) { - item := mapx.NewConcurrentMap() - s.clientControlConns.Set(groupKey, &item) + _g := mapx.NewConcurrentMap() + group = &_g + s.clientControlConns.Set(groupKey, group) + //s.log.Printf("init client session group %s", groupKey) + } else { + _group, _ := s.clientControlConns.Get(groupKey) + group = _group.(*mapx.ConcurrentMap) } - _group, _ := s.clientControlConns.Get(groupKey) - group := _group.(*mapx.ConcurrentMap) if v, ok := group.Get(index); ok { v.(*smux.Session).Close() } group.Set(index, session) + //s.log.Printf("set client session %s to group %s,grouplen:%d", index, groupKey, group.Count()) go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() for { @@ -236,10 +245,12 @@ func (s *MuxBridge) handler(inConn net.Conn) { defer s.l.Unlock() if sess, ok := group.Get(index); ok && sess.(*smux.Session).IsClosed() { group.Remove(index) + //s.log.Printf("client session %s removed from group %s, grouplen:%d", key, groupKey, group.Count()) s.log.Printf("client connection %s released", key) } if group.IsEmpty() { s.clientControlConns.Remove(groupKey) + //s.log.Printf("client session group %s removed", groupKey) } break } @@ -263,6 +274,7 @@ func (s *MuxBridge) callback(inConn net.Conn, serverID, key string) { if key == "*" { key = s.router.GetKey() } + //s.log.Printf("server get client session %s", key) _group, ok := s.clientControlConns.Get(key) if !ok { s.log.Printf("client %s session not exists for server stream %s, retrying...", key, serverID) @@ -270,8 +282,12 @@ func (s *MuxBridge) callback(inConn net.Conn, serverID, key string) { continue } group := _group.(*mapx.ConcurrentMap) - keys := group.Keys() + keys := []string{} + group.IterCb(func(key string, v interface{}) { + keys = append(keys, key) + }) keysLen := len(keys) + //s.log.Printf("client session %s , len:%d , keysLen: %d", key, group.Count(), keysLen) i := 0 if keysLen > 0 { i = rand.Intn(keysLen) @@ -297,7 +313,7 @@ func (s *MuxBridge) callback(inConn net.Conn, serverID, key string) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() io.Copy(stream, inConn) @@ -306,7 +322,7 @@ func (s *MuxBridge) callback(inConn net.Conn, serverID, key string) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() io.Copy(inConn, stream) diff --git a/services/mux/mux_client.go b/services/mux/mux_client.go index 662748a..9e9ab6a 100644 --- a/services/mux/mux_client.go +++ b/services/mux/mux_client.go @@ -8,6 +8,7 @@ import ( "net" "runtime/debug" "strings" + "sync" "time" "github.com/golang/snappy" @@ -145,7 +146,7 @@ func (s *MuxClient) Start(args interface{}, log *logger.Logger) (err error) { defer func() { e := recover() if e != nil { - s.log.Printf("session worker crashed: %s", e) + s.log.Printf("session worker crashed: %s\nstack:%s", e, string(debug.Stack())) } }() for { @@ -159,7 +160,16 @@ func (s *MuxClient) Start(args interface{}, log *logger.Logger) (err error) { continue } conn.SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout))) - _, err = conn.Write(utils.BuildPacket(CONN_CLIENT, fmt.Sprintf("%s-%d", *s.cfg.Key, i))) + g := sync.WaitGroup{} + g.Add(1) + go func() { + defer func() { + _ = recover() + g.Done() + }() + _, err = conn.Write(utils.BuildPacket(CONN_CLIENT, fmt.Sprintf("%s-%d", *s.cfg.Key, i))) + }() + g.Wait() conn.SetDeadline(time.Time{}) if err != nil { conn.Close() @@ -316,14 +326,17 @@ func (s *MuxClient) ServeUDP(inConn *smux.Stream, localAddr, ID string) { item = v.(*ClientUDPConnItem) } (*item).touchtime = time.Now().Unix() - go (*item).udpConn.Write(body) + go func() { + defer func() { _ = recover() }() + (*item).udpConn.Write(body) + }() } } func (s *MuxClient) UDPRevecive(key, ID string) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() s.log.Printf("udp conn %s connected", ID) @@ -353,7 +366,7 @@ func (s *MuxClient) UDPRevecive(key, ID string) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() cui.conn.SetWriteDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout))) @@ -372,7 +385,7 @@ func (s *MuxClient) UDPGCDeamon() { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() if s.isStop { @@ -431,7 +444,7 @@ func (s *MuxClient) ServeConn(inConn *smux.Stream, localAddr, ID string) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() io.Copy(outConn, snappy.NewReader(inConn)) @@ -440,7 +453,7 @@ func (s *MuxClient) ServeConn(inConn *smux.Stream, localAddr, ID string) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() io.Copy(snappy.NewWriter(inConn), outConn) diff --git a/services/mux/mux_server.go b/services/mux/mux_server.go index f2be351..caa1bc5 100644 --- a/services/mux/mux_server.go +++ b/services/mux/mux_server.go @@ -139,6 +139,10 @@ func (s *MuxServerManager) Start(args interface{}, log *logger.Logger) (err erro KCP: s.cfg.KCP, ParentType: s.cfg.ParentType, Jumper: s.cfg.Jumper, + TCPSMethod: s.cfg.TCPSMethod, + TCPSPassword: s.cfg.TCPSPassword, + TOUMethod: s.cfg.TOUMethod, + TOUPassword: s.cfg.TOUPassword, }, log) if err != nil { @@ -299,7 +303,7 @@ func (s *MuxServer) Start(args interface{}, log *logger.Logger) (err error) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() io.Copy(inConn, snappy.NewReader(outConn)) @@ -308,7 +312,7 @@ func (s *MuxServer) Start(args interface{}, log *logger.Logger) (err error) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() io.Copy(snappy.NewWriter(outConn), inConn) @@ -403,7 +407,7 @@ func (s *MuxServer) GetConn(index string) (conn net.Conn, err error) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() for { @@ -475,7 +479,7 @@ func (s *MuxServer) UDPGCDeamon() { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() if s.isStop { @@ -554,7 +558,7 @@ func (s *MuxServer) UDPRevecive(key, ID string) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() s.log.Printf("udp conn %s connected", ID) @@ -587,7 +591,7 @@ func (s *MuxServer) UDPRevecive(key, ID string) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() s.sc.UDPListener.WriteToUDP(body, uc.srcAddr) diff --git a/services/tcp/tcp.go b/services/tcp/tcp.go index 4b92c3c..58c0c5a 100644 --- a/services/tcp/tcp.go +++ b/services/tcp/tcp.go @@ -256,7 +256,7 @@ func (s *TCP) UDPRevecive(key string) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() s.log.Printf("udp conn %s connected", key) @@ -286,7 +286,7 @@ func (s *TCP) UDPRevecive(key string) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() (*cui.conn).SetWriteDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout))) @@ -305,7 +305,7 @@ func (s *TCP) UDPGCDeamon() { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() if s.isStop { diff --git a/utils/mapx/map.go b/utils/mapx/map.go index 07a3171..df89c31 100644 --- a/utils/mapx/map.go +++ b/utils/mapx/map.go @@ -155,7 +155,7 @@ func (m ConcurrentMap) Iter() <-chan Tuple { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() fanIn(chans, ch) @@ -174,7 +174,7 @@ func (m ConcurrentMap) IterBuffered() <-chan Tuple { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() fanIn(chans, ch) @@ -195,7 +195,7 @@ func snapshot(m ConcurrentMap) (chans []chan Tuple) { go func(index int, shard *ConcurrentMapShared) { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() // Foreach key, value pair. @@ -221,7 +221,7 @@ func fanIn(chans []chan Tuple, out chan Tuple) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() func(ch chan Tuple) { @@ -274,7 +274,7 @@ func (m ConcurrentMap) Keys() []string { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() // Foreach shard. @@ -284,7 +284,7 @@ func (m ConcurrentMap) Keys() []string { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() func(shard *ConcurrentMapShared) { diff --git a/utils/structs.go b/utils/structs.go index 77253ef..24fd8e0 100644 --- a/utils/structs.go +++ b/utils/structs.go @@ -90,7 +90,7 @@ func (c *Checker) start() { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() //log.Printf("checker started") @@ -100,7 +100,7 @@ func (c *Checker) start() { go func(item CheckerItem) { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:%s", e, string(debug.Stack())) } }() if c.isNeedCheck(item) {