diff --git a/CHANGELOG b/CHANGELOG index b784585..f510159 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,4 +1,8 @@ proxy更新日志 +v4.5 +1.优化了mux内网穿透连接管理逻辑,增强了稳定性. +2.mux内网穿透增加了tcp和kcp协议支持,之前是tls,现在支持三种协议tcp,tls,kcp. + v4.4 1.增加了协议转换sps功能,代理协议转换使用的是sps子命令(socks+https的缩写), sps本身不提供代理功能,只是接受代理请求"转换并转发"给已经存在的http(s)代理 diff --git a/README.md b/README.md index c3a9e5c..3eb6ab6 100644 --- a/README.md +++ b/README.md @@ -148,7 +148,7 @@ If the installation fails or your VPS is not a linux64 system, please follow the Download address: https://github.com/snail007/goproxy/releases ```shell cd /root/proxy/ -wget https://github.com/snail007/goproxy/releases/download/v4.4/proxy-linux-amd64.tar.gz +wget https://github.com/snail007/goproxy/releases/download/v4.5/proxy-linux-amd64.tar.gz ``` #### **2.Download the automatic installation script** ```shell diff --git a/README_ZH.md b/README_ZH.md index e3e2974..5a20d8e 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -146,7 +146,7 @@ curl -L https://raw.githubusercontent.com/snail007/goproxy/master/install_auto.s 下载地址:https://github.com/snail007/goproxy/releases ```shell cd /root/proxy/ -wget https://github.com/snail007/goproxy/releases/download/v4.4/proxy-linux-amd64.tar.gz +wget https://github.com/snail007/goproxy/releases/download/v4.5/proxy-linux-amd64.tar.gz ``` #### **2.下载自动安装脚本** ```shell diff --git a/config.go b/config.go index c105bb4..f980254 100755 --- a/config.go +++ b/config.go @@ -126,31 +126,34 @@ func initConfig() (err error) { //########mux-server######### muxServer := app.Command("server", "proxy on mux server mode") muxServerArgs.Parent = muxServer.Flag("parent", "parent address, such as: \"23.32.32.19:28008\"").Default("").Short('P').String() + muxServerArgs.ParentType = muxServer.Flag("parent-type", "parent protocol type ").Default("tls").Short('T').Enum("tls", "tcp", "kcp") muxServerArgs.CertFile = muxServer.Flag("cert", "cert file for tls").Short('C').Default("proxy.crt").String() muxServerArgs.KeyFile = muxServer.Flag("key", "key file for tls").Short('K').Default("proxy.key").String() - muxServerArgs.Timeout = muxServer.Flag("timeout", "tcp timeout with milliseconds").Short('t').Default("2000").Int() + muxServerArgs.Timeout = muxServer.Flag("timeout", "tcp timeout with milliseconds").Short('i').Default("2000").Int() muxServerArgs.IsUDP = muxServer.Flag("udp", "proxy on udp mux server mode").Default("false").Bool() muxServerArgs.Key = muxServer.Flag("k", "client key").Default("default").String() muxServerArgs.Route = muxServer.Flag("route", "local route to client's network, such as: PROTOCOL://LOCAL_IP:LOCAL_PORT@[CLIENT_KEY]CLIENT_LOCAL_HOST:CLIENT_LOCAL_PORT").Short('r').Default("").Strings() - muxServerArgs.IsCompress = muxServer.Flag("c", "compress data when tcp mode").Default("false").Bool() + muxServerArgs.IsCompress = muxServer.Flag("c", "compress data when tcp|tls mode").Default("false").Bool() muxServerArgs.SessionCount = muxServer.Flag("session-count", "session count which connect to bridge").Short('n').Default("10").Int() //########mux-client######### muxClient := app.Command("client", "proxy on mux client mode") muxClientArgs.Parent = muxClient.Flag("parent", "parent address, such as: \"23.32.32.19:28008\"").Default("").Short('P').String() + muxClientArgs.ParentType = muxClient.Flag("parent-type", "parent protocol type ").Default("tls").Short('T').Enum("tls", "tcp", "kcp") muxClientArgs.CertFile = muxClient.Flag("cert", "cert file for tls").Short('C').Default("proxy.crt").String() muxClientArgs.KeyFile = muxClient.Flag("key", "key file for tls").Short('K').Default("proxy.key").String() - muxClientArgs.Timeout = muxClient.Flag("timeout", "tcp timeout with milliseconds").Short('t').Default("2000").Int() + muxClientArgs.Timeout = muxClient.Flag("timeout", "tcp timeout with milliseconds").Short('i').Default("2000").Int() muxClientArgs.Key = muxClient.Flag("k", "key same with server").Default("default").String() - muxClientArgs.IsCompress = muxClient.Flag("c", "compress data when tcp mode").Default("false").Bool() + muxClientArgs.IsCompress = muxClient.Flag("c", "compress data when tcp|tls mode").Default("false").Bool() muxClientArgs.SessionCount = muxClient.Flag("session-count", "session count which connect to bridge").Short('n').Default("10").Int() //########mux-bridge######### muxBridge := app.Command("bridge", "proxy on mux bridge mode") muxBridgeArgs.CertFile = muxBridge.Flag("cert", "cert file for tls").Short('C').Default("proxy.crt").String() muxBridgeArgs.KeyFile = muxBridge.Flag("key", "key file for tls").Short('K').Default("proxy.key").String() - muxBridgeArgs.Timeout = muxBridge.Flag("timeout", "tcp timeout with milliseconds").Short('t').Default("2000").Int() + muxBridgeArgs.Timeout = muxBridge.Flag("timeout", "tcp timeout with milliseconds").Short('i').Default("2000").Int() muxBridgeArgs.Local = muxBridge.Flag("local", "local ip:port to listen").Short('p').Default(":33080").String() + muxBridgeArgs.LocalType = muxBridge.Flag("local-type", "local protocol type ").Default("tls").Short('t').Enum("tls", "tcp", "kcp") //########tunnel-server######### tunnelServer := app.Command("tserver", "proxy on tunnel server mode") @@ -269,6 +272,9 @@ func initConfig() (err error) { httpArgs.KCP = kcpArgs socksArgs.KCP = kcpArgs spsArgs.KCP = kcpArgs + muxBridgeArgs.KCP = kcpArgs + muxServerArgs.KCP = kcpArgs + muxClientArgs.KCP = kcpArgs flags := log.Ldate if *debug { diff --git a/install_auto.sh b/install_auto.sh index 6a5b874..e25d8d7 100755 --- a/install_auto.sh +++ b/install_auto.sh @@ -5,7 +5,7 @@ if [ -e /tmp/proxy ]; then fi mkdir /tmp/proxy cd /tmp/proxy -wget https://github.com/snail007/goproxy/releases/download/v4.4/proxy-linux-amd64.tar.gz +wget https://github.com/snail007/goproxy/releases/download/v4.5/proxy-linux-amd64.tar.gz # #install proxy tar zxvf proxy-linux-amd64.tar.gz diff --git a/main.go b/main.go index f01ec8c..1547971 100644 --- a/main.go +++ b/main.go @@ -8,7 +8,7 @@ import ( "syscall" ) -const APP_VERSION = "4.4" +const APP_VERSION = "4.5" func main() { err := initConfig() diff --git a/release.sh b/release.sh index 3125887..5009a87 100755 --- a/release.sh +++ b/release.sh @@ -1,5 +1,5 @@ #!/bin/bash -VER="4.4" +VER="4.5" RELEASE="release-${VER}" rm -rf .cert mkdir .cert diff --git a/services/args.go b/services/args.go index dd23686..39496e1 100644 --- a/services/args.go +++ b/services/args.go @@ -26,6 +26,7 @@ const ( type MuxServerArgs struct { Parent *string + ParentType *string CertFile *string KeyFile *string CertBytes []byte @@ -39,9 +40,11 @@ type MuxServerArgs struct { Mgr *MuxServerManager IsCompress *bool SessionCount *int + KCP kcpcfg.KCPConfigArgs } type MuxClientArgs struct { Parent *string + ParentType *string CertFile *string KeyFile *string CertBytes []byte @@ -50,16 +53,18 @@ type MuxClientArgs struct { Timeout *int IsCompress *bool SessionCount *int + KCP kcpcfg.KCPConfigArgs } type MuxBridgeArgs struct { - Parent *string CertFile *string KeyFile *string CertBytes []byte KeyBytes []byte Local *string + LocalType *string Timeout *int IsCompress *bool + KCP kcpcfg.KCPConfigArgs } type TunnelServerArgs struct { Parent *string diff --git a/services/mux_bridge.go b/services/mux_bridge.go index 0b097ce..adf361d 100644 --- a/services/mux_bridge.go +++ b/services/mux_bridge.go @@ -9,6 +9,7 @@ import ( "snail007/proxy/utils" "strconv" "strings" + "sync" "time" "github.com/xtaci/smux" @@ -18,12 +19,14 @@ type MuxBridge struct { cfg MuxBridgeArgs clientControlConns utils.ConcurrentMap router utils.ClientKeyRouter + l *sync.Mutex } func NewMuxBridge() Service { b := &MuxBridge{ cfg: MuxBridgeArgs{}, clientControlConns: utils.NewConcurrentMap(), + l: &sync.Mutex{}, } b.router = utils.NewClientKeyRouter(&b.clientControlConns, 50000) return b @@ -36,7 +39,9 @@ func (s *MuxBridge) CheckArgs() { if *s.cfg.CertFile == "" || *s.cfg.KeyFile == "" { log.Fatalf("cert and key file required") } - s.cfg.CertBytes, s.cfg.KeyBytes = utils.TlsBytes(*s.cfg.CertFile, *s.cfg.KeyFile) + if *s.cfg.LocalType == TYPE_TLS { + s.cfg.CertBytes, s.cfg.KeyBytes = utils.TlsBytes(*s.cfg.CertFile, *s.cfg.KeyFile) + } } func (s *MuxBridge) StopService() { @@ -48,91 +53,100 @@ func (s *MuxBridge) Start(args interface{}) (err error) { host, port, _ := net.SplitHostPort(*s.cfg.Local) p, _ := strconv.Atoi(port) sc := utils.NewServerChannel(host, p) - - err = sc.ListenTls(s.cfg.CertBytes, s.cfg.KeyBytes, func(inConn net.Conn) { - reader := bufio.NewReader(inConn) - - var err error - var connType uint8 - var key string - err = utils.ReadPacket(reader, &connType, &key) - if err != nil { - log.Printf("read error,ERR:%s", err) - return - } - switch connType { - case CONN_SERVER: - var serverID string - err = utils.ReadPacketData(reader, &serverID) - if err != nil { - log.Printf("read error,ERR:%s", err) - return - } - log.Printf("server connection %s %s connected", serverID, key) - session, err := smux.Server(inConn, nil) - if err != nil { - utils.CloseConn(&inConn) - log.Printf("server session error,ERR:%s", err) - return - } - for { - stream, err := session.AcceptStream() - if err != nil { - session.Close() - utils.CloseConn(&inConn) - return - } - go s.callback(stream, serverID, key) - } - case CONN_CLIENT: - log.Printf("client connection %s connected", key) - session, err := smux.Client(inConn, nil) - if err != nil { - utils.CloseConn(&inConn) - log.Printf("client session error,ERR:%s", err) - return - } - keyInfo := strings.Split(key, "-") - if len(keyInfo) != 2 { - utils.CloseConn(&inConn) - log.Printf("client key format error,key:%s", key) - return - } - groupKey := keyInfo[0] - index := keyInfo[1] - if !s.clientControlConns.Has(groupKey) { - item := utils.NewConcurrentMap() - s.clientControlConns.Set(groupKey, &item) - } - _group, _ := s.clientControlConns.Get(groupKey) - group := _group.(*utils.ConcurrentMap) - group.Set(index, session) - // s.clientControlConns.Set(key, session) - go func() { - for { - if session.IsClosed() { - group.Remove(index) - if group.IsEmpty() { - s.clientControlConns.Remove(groupKey) - } - break - } - time.Sleep(time.Second * 5) - } - }() - //log.Printf("set client session,key: %s", key) - } - - }) + if *s.cfg.LocalType == TYPE_TCP { + err = sc.ListenTCP(s.handler) + } else if *s.cfg.LocalType == TYPE_TLS { + err = sc.ListenTls(s.cfg.CertBytes, s.cfg.KeyBytes, s.handler) + } else if *s.cfg.LocalType == TYPE_KCP { + err = sc.ListenKCP(s.cfg.KCP, s.handler) + } if err != nil { return } - log.Printf("proxy on mux bridge mode %s", (*sc.Listener).Addr()) + log.Printf("%s bridge on %s", *s.cfg.LocalType, (*sc.Listener).Addr()) return } func (s *MuxBridge) Clean() { s.StopService() } +func (s *MuxBridge) handler(inConn net.Conn) { + reader := bufio.NewReader(inConn) + + var err error + var connType uint8 + var key string + err = utils.ReadPacket(reader, &connType, &key) + if err != nil { + log.Printf("read error,ERR:%s", err) + return + } + switch connType { + case CONN_SERVER: + var serverID string + err = utils.ReadPacketData(reader, &serverID) + if err != nil { + log.Printf("read error,ERR:%s", err) + return + } + log.Printf("server connection %s %s connected", serverID, key) + session, err := smux.Server(inConn, nil) + if err != nil { + utils.CloseConn(&inConn) + log.Printf("server session error,ERR:%s", err) + return + } + for { + stream, err := session.AcceptStream() + if err != nil { + session.Close() + utils.CloseConn(&inConn) + return + } + go s.callback(stream, serverID, key) + } + case CONN_CLIENT: + log.Printf("client connection %s connected", key) + session, err := smux.Client(inConn, nil) + if err != nil { + utils.CloseConn(&inConn) + log.Printf("client session error,ERR:%s", err) + return + } + keyInfo := strings.Split(key, "-") + if len(keyInfo) != 2 { + utils.CloseConn(&inConn) + log.Printf("client key format error,key:%s", key) + return + } + groupKey := keyInfo[0] + index := keyInfo[1] + if !s.clientControlConns.Has(groupKey) { + item := utils.NewConcurrentMap() + s.clientControlConns.Set(groupKey, &item) + } + _group, _ := s.clientControlConns.Get(groupKey) + group := _group.(*utils.ConcurrentMap) + s.l.Lock() + defer s.l.Unlock() + group.Set(index, session) + // s.clientControlConns.Set(key, session) + go func() { + for { + if session.IsClosed() { + s.l.Lock() + defer s.l.Unlock() + if sess, ok := group.Get(index); ok && sess.(*smux.Session).IsClosed() { + group.Remove(index) + } + break + } + time.Sleep(time.Second * 5) + } + }() + //log.Printf("set client session,key: %s", key) + } + +} func (s *MuxBridge) callback(inConn net.Conn, serverID, key string) { try := 20 for { @@ -145,12 +159,22 @@ func (s *MuxBridge) callback(inConn net.Conn, serverID, key string) { } _group, ok := s.clientControlConns.Get(key) if !ok { - log.Printf("client %s session not exists for server stream %s", key, serverID) + log.Printf("client %s session not exists for server stream %s, retrying...", key, serverID) time.Sleep(time.Second * 3) continue } group := _group.(*utils.ConcurrentMap) - index := group.Keys()[rand.Intn(group.Count())] + keys := group.Keys() + keysLen := len(keys) + i := 0 + if keysLen > 0 { + i = rand.Intn(keysLen) + } else { + log.Printf("client %s session empty for server stream %s, retrying...", key, serverID) + time.Sleep(time.Second * 3) + continue + } + index := keys[i] log.Printf("select client : %s-%s", key, index) session, _ := group.Get(index) stream, err := session.(*smux.Session).OpenStream() diff --git a/services/mux_client.go b/services/mux_client.go index c41f6e6..79af08c 100644 --- a/services/mux_client.go +++ b/services/mux_client.go @@ -36,7 +36,9 @@ func (s *MuxClient) CheckArgs() { if *s.cfg.CertFile == "" || *s.cfg.KeyFile == "" { log.Fatalf("cert and key file required") } - s.cfg.CertBytes, s.cfg.KeyBytes = utils.TlsBytes(*s.cfg.CertFile, *s.cfg.KeyFile) + if *s.cfg.ParentType == "tls" { + s.cfg.CertBytes, s.cfg.KeyBytes = utils.TlsBytes(*s.cfg.CertFile, *s.cfg.KeyFile) + } } func (s *MuxClient) StopService() { @@ -45,8 +47,12 @@ func (s *MuxClient) Start(args interface{}) (err error) { s.cfg = args.(MuxClientArgs) s.CheckArgs() s.InitService() - log.Printf("proxy on mux client mode, compress %v", *s.cfg.IsCompress) - for i := 1; i <= *s.cfg.SessionCount; i++ { + log.Printf("%s client on %s", *s.cfg.ParentType, *s.cfg.Parent) + count := 1 + if *s.cfg.SessionCount > 0 { + count = *s.cfg.SessionCount + } + for i := 1; i <= count; i++ { log.Printf("session worker[%d] started", i) go func(i int) { defer func() { @@ -56,14 +62,12 @@ func (s *MuxClient) Start(args interface{}) (err error) { } }() for { - var _conn tls.Conn - _conn, err = utils.TlsConnectHost(*s.cfg.Parent, *s.cfg.Timeout, s.cfg.CertBytes, s.cfg.KeyBytes) + conn, err := s.getParentConn() if err != nil { log.Printf("connection err: %s, retrying...", err) time.Sleep(time.Second * 3) continue } - conn := net.Conn(&_conn) _, err = conn.Write(utils.BuildPacket(CONN_CLIENT, fmt.Sprintf("%s-%d", *s.cfg.Key, i))) if err != nil { conn.Close() @@ -119,7 +123,20 @@ func (s *MuxClient) Start(args interface{}) (err error) { func (s *MuxClient) Clean() { s.StopService() } - +func (s *MuxClient) getParentConn() (conn net.Conn, err error) { + if *s.cfg.ParentType == "tls" { + var _conn tls.Conn + _conn, err = utils.TlsConnectHost(*s.cfg.Parent, *s.cfg.Timeout, s.cfg.CertBytes, s.cfg.KeyBytes) + if err == nil { + conn = net.Conn(&_conn) + } + } else if *s.cfg.ParentType == "kcp" { + conn, err = utils.ConnectKCPHost(*s.cfg.Parent, s.cfg.KCP) + } else { + conn, err = utils.ConnectHost(*s.cfg.Parent, *s.cfg.Timeout) + } + return +} func (s *MuxClient) ServeUDP(inConn *smux.Stream, localAddr, ID string) { for { diff --git a/services/mux_server.go b/services/mux_server.go index d7c961b..643443b 100644 --- a/services/mux_server.go +++ b/services/mux_server.go @@ -43,7 +43,7 @@ func (s *MuxServerManager) Start(args interface{}) (err error) { s.cfg = args.(MuxServerArgs) s.CheckArgs() if *s.cfg.Parent != "" { - log.Printf("use tls parent %s", *s.cfg.Parent) + log.Printf("use %s parent %s", *s.cfg.ParentType, *s.cfg.Parent) } else { log.Fatalf("parent required") } @@ -88,6 +88,8 @@ func (s *MuxServerManager) Start(args interface{}) (err error) { Mgr: s, IsCompress: s.cfg.IsCompress, SessionCount: s.cfg.SessionCount, + KCP: s.cfg.KCP, + ParentType: s.cfg.ParentType, }) if err != nil { @@ -105,7 +107,9 @@ func (s *MuxServerManager) CheckArgs() { if *s.cfg.CertFile == "" || *s.cfg.KeyFile == "" { log.Fatalf("cert and key file required") } - s.cfg.CertBytes, s.cfg.KeyBytes = utils.TlsBytes(*s.cfg.CertFile, *s.cfg.KeyFile) + if *s.cfg.ParentType == "tls" { + s.cfg.CertBytes, s.cfg.KeyBytes = utils.TlsBytes(*s.cfg.CertFile, *s.cfg.KeyFile) + } } func (s *MuxServerManager) InitService() { } @@ -152,7 +156,7 @@ func (s *MuxServer) Start(args interface{}) (err error) { if err != nil { return } - log.Printf("proxy on udp mux server mode %s", (*s.sc.UDPListener).LocalAddr()) + log.Printf("server on %s", (*s.sc.UDPListener).LocalAddr()) } else { err = s.sc.ListenTCP(func(inConn net.Conn) { defer func() { @@ -201,7 +205,7 @@ func (s *MuxServer) Start(args interface{}) (err error) { if err != nil { return } - log.Printf("proxy on mux server mode %s, compress %v", (*s.sc.Listener).Addr(), *s.cfg.IsCompress) + log.Printf("%s server on %s", *s.cfg.ParentType, (*s.sc.Listener).Addr()) } return } @@ -209,7 +213,11 @@ func (s *MuxServer) Clean() { } func (s *MuxServer) GetOutConn() (outConn net.Conn, ID string, err error) { - outConn, err = s.GetConn(fmt.Sprintf("%d", rand.Intn(*s.cfg.SessionCount))) + i := 1 + if *s.cfg.SessionCount > 0 { + i = rand.Intn(*s.cfg.SessionCount) + } + outConn, err = s.GetConn(fmt.Sprintf("%d", i)) if err != nil { log.Printf("connection err: %s", err) return @@ -240,12 +248,11 @@ func (s *MuxServer) GetConn(index string) (conn net.Conn, err error) { var session *smux.Session _session, ok := s.sessions.Get(index) if !ok { - var _conn tls.Conn - _conn, err = utils.TlsConnectHost(*s.cfg.Parent, *s.cfg.Timeout, s.cfg.CertBytes, s.cfg.KeyBytes) + var c net.Conn + c, err = s.getParentConn() if err != nil { return } - c := net.Conn(&_conn) _, err = c.Write(utils.BuildPacket(CONN_SERVER, *s.cfg.Key, s.cfg.Mgr.serverID)) if err != nil { c.Close() @@ -266,10 +273,34 @@ func (s *MuxServer) GetConn(index string) (conn net.Conn, err error) { if err != nil { session.Close() s.sessions.Remove(index) + } else { + go func() { + for { + if session.IsClosed() { + s.sessions.Remove(index) + break + } + time.Sleep(time.Second * 5) + } + }() } return } +func (s *MuxServer) getParentConn() (conn net.Conn, err error) { + if *s.cfg.ParentType == "tls" { + var _conn tls.Conn + _conn, err = utils.TlsConnectHost(*s.cfg.Parent, *s.cfg.Timeout, s.cfg.CertBytes, s.cfg.KeyBytes) + if err == nil { + conn = net.Conn(&_conn) + } + } else if *s.cfg.ParentType == "kcp" { + conn, err = utils.ConnectKCPHost(*s.cfg.Parent, s.cfg.KCP) + } else { + conn, err = utils.ConnectHost(*s.cfg.Parent, *s.cfg.Timeout) + } + return +} func (s *MuxServer) UDPConnDeamon() { go func() { defer func() {