From 4143f14fbd12f4c306f34e96dbfbd715540cf40e Mon Sep 17 00:00:00 2001 From: "arraykeys@gmail.com" Date: Thu, 8 Mar 2018 18:42:50 +0800 Subject: [PATCH] optimise nat forwarding in different lan Signed-off-by: arraykeys@gmail.com --- CHANGELOG | 14 +++++ README_ZH.md | 2 +- config.go | 2 + services/args.go | 44 ++++++++-------- services/mux_bridge.go | 28 ++++++++-- services/mux_client.go | 115 ++++++++++++++++++++++++----------------- services/mux_server.go | 66 ++++++++++++----------- 7 files changed, 166 insertions(+), 105 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 36e12be..b784585 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,4 +1,18 @@ proxy更新日志 +v4.4 +1.增加了协议转换sps功能,代理协议转换使用的是sps子命令(socks+https的缩写), +sps本身不提供代理功能,只是接受代理请求"转换并转发"给已经存在的http(s)代理 +或者socks5代理;sps可以把已经存在的http(s)代理或者socks5代理转换为一个端口 +同时支持http(s)和socks5代理,而且http(s)代理支持正向代理和反向代理(SNI),转 +换后的SOCKS5代理不支持UDP功能;另外对于已经存在的http(s)代理或者socks5代理, +支持tls、tcp、kcp三种模式,支持链式连接,也就是可以多个sps结点层级连接构建 +加密通道。 +2.增加了对KCP传输参数的配置,多达17个参数可以自由的配置对kcp传输效率调优。 +3.内网穿透功能,server和client增加了--session-count参数,可以设置server每个 +监听端口到bridge打开的session数量,可以设置client到bridge打开的session数量, +之前都是1个,现在性能提升N倍,N就是你自己设置的--session-count,这个参数很大 +程度上解决了多路复用的拥塞问题,v4.4开始默认10个。 + v4.3 1.优化了参数keygen生成证书逻辑,避免证书出现特征。 2.http(s)和socks代理增加了--dns-address和--dns-ttl参数。 diff --git a/README_ZH.md b/README_ZH.md index bc714d7..9fada3c 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -682,7 +682,7 @@ KCP协议需要-B参数设置一个密码用于加密解密数据 ### **6.代理协议转换** #### **6.1 功能介绍** -代理协议转换使用的是sps子命令(socks+https的缩写),sps本身不提供代理功能,只是接受代理请求"转换并转发"给已经存在的http(s)代理或者socks5代理;sps可以把已经存在的http(s)代理或者socks5代理转换为一个端口同时支持http(s)和socks5代理,而且http(s)代理支持正向代理和反向代理(SNI),转换后的SOCKS5代理不支持UDP功能;另外对于已经存在的http(s)代理或者socks5代理,支持tls、tcp、kcp三种模式,支持链式连接,也就是可以多个sps结点层级连接构建加密通道;。 +代理协议转换使用的是sps子命令(socks+https的缩写),sps本身不提供代理功能,只是接受代理请求"转换并转发"给已经存在的http(s)代理或者socks5代理;sps可以把已经存在的http(s)代理或者socks5代理转换为一个端口同时支持http(s)和socks5代理,而且http(s)代理支持正向代理和反向代理(SNI),转换后的SOCKS5代理不支持UDP功能;另外对于已经存在的http(s)代理或者socks5代理,支持tls、tcp、kcp三种模式,支持链式连接,也就是可以多个sps结点层级连接构建加密通道。 #### **6.2 HTTP(S)转HTTP(S)+SOCKS5** 假设已经存在一个普通的http(s)代理:127.0.0.1:8080,现在我们把它转为同时支持http(s)和socks5的普通代理,转换后的本地端口为18080。 diff --git a/config.go b/config.go index cbf8452..56c0de4 100755 --- a/config.go +++ b/config.go @@ -133,6 +133,7 @@ func initConfig() (err error) { muxServerArgs.Key = muxServer.Flag("k", "client key").Default("default").String() muxServerArgs.Route = muxServer.Flag("route", "local route to client's network, such as: PROTOCOL://LOCAL_IP:LOCAL_PORT@[CLIENT_KEY]CLIENT_LOCAL_HOST:CLIENT_LOCAL_PORT").Short('r').Default("").Strings() muxServerArgs.IsCompress = muxServer.Flag("c", "compress data when tcp mode").Default("false").Bool() + muxServerArgs.SessionCount = muxServer.Flag("session-count", "session count which connect to bridge").Short('n').Default("10").Int() //########mux-client######### muxClient := app.Command("client", "proxy on mux client mode") @@ -142,6 +143,7 @@ func initConfig() (err error) { muxClientArgs.Timeout = muxClient.Flag("timeout", "tcp timeout with milliseconds").Short('t').Default("2000").Int() muxClientArgs.Key = muxClient.Flag("k", "key same with server").Default("default").String() muxClientArgs.IsCompress = muxClient.Flag("c", "compress data when tcp mode").Default("false").Bool() + muxClientArgs.SessionCount = muxClient.Flag("session-count", "session count which connect to bridge").Short('n').Default("10").Int() //########mux-bridge######### muxBridge := app.Command("bridge", "proxy on mux bridge mode") diff --git a/services/args.go b/services/args.go index 73f263a..dd23686 100644 --- a/services/args.go +++ b/services/args.go @@ -25,29 +25,31 @@ const ( ) type MuxServerArgs struct { - Parent *string - CertFile *string - KeyFile *string - CertBytes []byte - KeyBytes []byte - Local *string - IsUDP *bool - Key *string - Remote *string - Timeout *int - Route *[]string - Mgr *MuxServerManager - IsCompress *bool + Parent *string + CertFile *string + KeyFile *string + CertBytes []byte + KeyBytes []byte + Local *string + IsUDP *bool + Key *string + Remote *string + Timeout *int + Route *[]string + Mgr *MuxServerManager + IsCompress *bool + SessionCount *int } type MuxClientArgs struct { - Parent *string - CertFile *string - KeyFile *string - CertBytes []byte - KeyBytes []byte - Key *string - Timeout *int - IsCompress *bool + Parent *string + CertFile *string + KeyFile *string + CertBytes []byte + KeyBytes []byte + Key *string + Timeout *int + IsCompress *bool + SessionCount *int } type MuxBridgeArgs struct { Parent *string diff --git a/services/mux_bridge.go b/services/mux_bridge.go index e2feac1..f452b48 100644 --- a/services/mux_bridge.go +++ b/services/mux_bridge.go @@ -4,9 +4,11 @@ import ( "bufio" "io" "log" + "math/rand" "net" "snail007/proxy/utils" "strconv" + "strings" "time" "github.com/xtaci/smux" @@ -83,7 +85,6 @@ func (s *MuxBridge) Start(args interface{}) (err error) { go s.callback(stream, serverID, key) } case CONN_CLIENT: - log.Printf("client connection %s connected", key) session, err := smux.Client(inConn, nil) if err != nil { @@ -91,11 +92,24 @@ func (s *MuxBridge) Start(args interface{}) (err error) { log.Printf("client session error,ERR:%s", err) return } - s.clientControlConns.Set(key, session) + keyInfo := strings.Split(key, "-") + groupKey := keyInfo[0] + index := keyInfo[1] + if !s.clientControlConns.Has(groupKey) { + item := utils.NewConcurrentMap() + s.clientControlConns.Set(groupKey, &item) + } + _group, _ := s.clientControlConns.Get(groupKey) + group := _group.(*utils.ConcurrentMap) + group.Set(index, session) + // s.clientControlConns.Set(key, session) go func() { for { if session.IsClosed() { - s.clientControlConns.Remove(key) + group.Remove(index) + if group.IsEmpty() { + s.clientControlConns.Remove(groupKey) + } break } time.Sleep(time.Second * 5) @@ -124,19 +138,23 @@ func (s *MuxBridge) callback(inConn net.Conn, serverID, key string) { if key == "*" { key = s.router.GetKey() } - session, ok := s.clientControlConns.Get(key) + _group, ok := s.clientControlConns.Get(key) if !ok { log.Printf("client %s session not exists for server stream %s", key, serverID) time.Sleep(time.Second * 3) continue } + group := _group.(*utils.ConcurrentMap) + index := group.Keys()[rand.Intn(group.Count())] + log.Printf("select client : %s-%s", key, index) + session, _ := group.Get(index) stream, err := session.(*smux.Session).OpenStream() if err != nil { log.Printf("%s client session open stream %s fail, err: %s, retrying...", key, serverID, err) time.Sleep(time.Second * 3) continue } else { - log.Printf("%s server %s stream created", key, serverID) + log.Printf("stream %s -> %s created", serverID, key) die1 := make(chan bool, 1) die2 := make(chan bool, 1) go func() { diff --git a/services/mux_client.go b/services/mux_client.go index fa45a78..c41f6e6 100644 --- a/services/mux_client.go +++ b/services/mux_client.go @@ -2,6 +2,7 @@ package services import ( "crypto/tls" + "fmt" "io" "log" "net" @@ -45,57 +46,75 @@ func (s *MuxClient) Start(args interface{}) (err error) { s.CheckArgs() s.InitService() log.Printf("proxy on mux client mode, compress %v", *s.cfg.IsCompress) - for { - var _conn tls.Conn - _conn, err = utils.TlsConnectHost(*s.cfg.Parent, *s.cfg.Timeout, s.cfg.CertBytes, s.cfg.KeyBytes) - if err != nil { - log.Printf("connection err: %s, retrying...", err) - time.Sleep(time.Second * 3) - continue - } - conn := net.Conn(&_conn) - _, err = conn.Write(utils.BuildPacket(CONN_CLIENT, *s.cfg.Key)) - if err != nil { - conn.Close() - log.Printf("connection err: %s, retrying...", err) - time.Sleep(time.Second * 3) - continue - } - session, err := smux.Server(conn, nil) - if err != nil { - log.Printf("session err: %s, retrying...", err) - conn.Close() - time.Sleep(time.Second * 3) - continue - } - for { - stream, err := session.AcceptStream() - if err != nil { - log.Printf("accept stream err: %s, retrying...", err) - session.Close() - time.Sleep(time.Second * 3) - break - } - go func() { - var ID, clientLocalAddr, serverID string - err = utils.ReadPacketData(stream, &ID, &clientLocalAddr, &serverID) - if err != nil { - log.Printf("read stream signal err: %s", err) - stream.Close() - return - } - log.Printf("signal revecived,server %s stream %s %s", serverID, ID, clientLocalAddr) - protocol := clientLocalAddr[:3] - localAddr := clientLocalAddr[4:] - if protocol == "udp" { - s.ServeUDP(stream, localAddr, ID) - } else { - s.ServeConn(stream, localAddr, ID) + for i := 1; i <= *s.cfg.SessionCount; i++ { + log.Printf("session worker[%d] started", i) + go func(i int) { + defer func() { + e := recover() + if e != nil { + log.Printf("session worker crashed: %s", e) } }() - } - } + for { + var _conn tls.Conn + _conn, err = utils.TlsConnectHost(*s.cfg.Parent, *s.cfg.Timeout, s.cfg.CertBytes, s.cfg.KeyBytes) + if err != nil { + log.Printf("connection err: %s, retrying...", err) + time.Sleep(time.Second * 3) + continue + } + conn := net.Conn(&_conn) + _, err = conn.Write(utils.BuildPacket(CONN_CLIENT, fmt.Sprintf("%s-%d", *s.cfg.Key, i))) + if err != nil { + conn.Close() + log.Printf("connection err: %s, retrying...", err) + time.Sleep(time.Second * 3) + continue + } + session, err := smux.Server(conn, nil) + if err != nil { + log.Printf("session err: %s, retrying...", err) + conn.Close() + time.Sleep(time.Second * 3) + continue + } + for { + stream, err := session.AcceptStream() + if err != nil { + log.Printf("accept stream err: %s, retrying...", err) + session.Close() + time.Sleep(time.Second * 3) + break + } + go func() { + defer func() { + e := recover() + if e != nil { + log.Printf("stream handler crashed: %s", e) + } + }() + var ID, clientLocalAddr, serverID string + err = utils.ReadPacketData(stream, &ID, &clientLocalAddr, &serverID) + if err != nil { + log.Printf("read stream signal err: %s", err) + stream.Close() + return + } + log.Printf("worker[%d] signal revecived,server %s stream %s %s", i, serverID, ID, clientLocalAddr) + protocol := clientLocalAddr[:3] + localAddr := clientLocalAddr[4:] + if protocol == "udp" { + s.ServeUDP(stream, localAddr, ID) + } else { + s.ServeConn(stream, localAddr, ID) + } + }() + } + } + }(i) + } + return } func (s *MuxClient) Clean() { s.StopService() diff --git a/services/mux_server.go b/services/mux_server.go index 7699b7b..d7c961b 100644 --- a/services/mux_server.go +++ b/services/mux_server.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "log" + "math/rand" "net" "runtime/debug" "snail007/proxy/utils" @@ -17,11 +18,11 @@ import ( ) type MuxServer struct { - cfg MuxServerArgs - udpChn chan MuxUDPItem - sc utils.ServerChannel - session *smux.Session - lockChn chan bool + cfg MuxServerArgs + udpChn chan MuxUDPItem + sc utils.ServerChannel + sessions utils.ConcurrentMap + lockChn chan bool } type MuxServerManager struct { @@ -74,18 +75,19 @@ func (s *MuxServerManager) Start(args interface{}) (err error) { remote = fmt.Sprintf("127.0.0.1%s", remote) } err = server.Start(MuxServerArgs{ - CertBytes: s.cfg.CertBytes, - KeyBytes: s.cfg.KeyBytes, - Parent: s.cfg.Parent, - CertFile: s.cfg.CertFile, - KeyFile: s.cfg.KeyFile, - Local: &local, - IsUDP: &IsUDP, - Remote: &remote, - Key: &KEY, - Timeout: s.cfg.Timeout, - Mgr: s, - IsCompress: s.cfg.IsCompress, + CertBytes: s.cfg.CertBytes, + KeyBytes: s.cfg.KeyBytes, + Parent: s.cfg.Parent, + CertFile: s.cfg.CertFile, + KeyFile: s.cfg.KeyFile, + Local: &local, + IsUDP: &IsUDP, + Remote: &remote, + Key: &KEY, + Timeout: s.cfg.Timeout, + Mgr: s, + IsCompress: s.cfg.IsCompress, + SessionCount: s.cfg.SessionCount, }) if err != nil { @@ -110,9 +112,10 @@ func (s *MuxServerManager) InitService() { func NewMuxServer() Service { return &MuxServer{ - cfg: MuxServerArgs{}, - udpChn: make(chan MuxUDPItem, 50000), - lockChn: make(chan bool, 1), + cfg: MuxServerArgs{}, + udpChn: make(chan MuxUDPItem, 50000), + lockChn: make(chan bool, 1), + sessions: utils.NewConcurrentMap(), } } @@ -206,7 +209,7 @@ func (s *MuxServer) Clean() { } func (s *MuxServer) GetOutConn() (outConn net.Conn, ID string, err error) { - outConn, err = s.GetConn() + outConn, err = s.GetConn(fmt.Sprintf("%d", rand.Intn(*s.cfg.SessionCount))) if err != nil { log.Printf("connection err: %s", err) return @@ -224,7 +227,7 @@ func (s *MuxServer) GetOutConn() (outConn net.Conn, ID string, err error) { } return } -func (s *MuxServer) GetConn() (conn net.Conn, err error) { +func (s *MuxServer) GetConn(index string) (conn net.Conn, err error) { select { case s.lockChn <- true: default: @@ -234,32 +237,35 @@ func (s *MuxServer) GetConn() (conn net.Conn, err error) { defer func() { <-s.lockChn }() - if s.session == nil { + var session *smux.Session + _session, ok := s.sessions.Get(index) + if !ok { var _conn tls.Conn _conn, err = utils.TlsConnectHost(*s.cfg.Parent, *s.cfg.Timeout, s.cfg.CertBytes, s.cfg.KeyBytes) if err != nil { - s.session = nil return } c := net.Conn(&_conn) _, err = c.Write(utils.BuildPacket(CONN_SERVER, *s.cfg.Key, s.cfg.Mgr.serverID)) if err != nil { c.Close() - s.session = nil return } if err == nil { - s.session, err = smux.Client(c, nil) + session, err = smux.Client(c, nil) if err != nil { - s.session = nil return } } + s.sessions.Set(index, session) + log.Printf("session[%s] created", index) + } else { + session = _session.(*smux.Session) } - conn, err = s.session.OpenStream() + conn, err = session.OpenStream() if err != nil { - s.session.Close() - s.session = nil + session.Close() + s.sessions.Remove(index) } return