optimise nat forwarding in different lan

Signed-off-by: arraykeys@gmail.com <arraykeys@gmail.com>
This commit is contained in:
arraykeys@gmail.com
2018-03-08 18:42:50 +08:00
parent 6f11deab96
commit 4143f14fbd
7 changed files with 166 additions and 105 deletions

View File

@ -1,4 +1,18 @@
proxy更新日志 proxy更新日志
v4.4
1.增加了协议转换sps功能代理协议转换使用的是sps子命令(socks+https的缩写)
sps本身不提供代理功能只是接受代理请求"转换并转发"给已经存在的http(s)代理
或者socks5代理sps可以把已经存在的http(s)代理或者socks5代理转换为一个端口
同时支持http(s)和socks5代理而且http(s)代理支持正向代理和反向代理(SNI),转
换后的SOCKS5代理不支持UDP功能另外对于已经存在的http(s)代理或者socks5代理
支持tls、tcp、kcp三种模式支持链式连接也就是可以多个sps结点层级连接构建
加密通道。
2.增加了对KCP传输参数的配置多达17个参数可以自由的配置对kcp传输效率调优。
3.内网穿透功能server和client增加了--session-count参数可以设置server每个
监听端口到bridge打开的session数量可以设置client到bridge打开的session数量
之前都是1个现在性能提升N倍N就是你自己设置的--session-count这个参数很大
程度上解决了多路复用的拥塞问题v4.4开始默认10个。
v4.3 v4.3
1.优化了参数keygen生成证书逻辑避免证书出现特征。 1.优化了参数keygen生成证书逻辑避免证书出现特征。
2.http(s)和socks代理增加了--dns-address和--dns-ttl参数。 2.http(s)和socks代理增加了--dns-address和--dns-ttl参数。

View File

@ -682,7 +682,7 @@ KCP协议需要-B参数设置一个密码用于加密解密数据
### **6.代理协议转换** ### **6.代理协议转换**
#### **6.1 功能介绍** #### **6.1 功能介绍**
代理协议转换使用的是sps子命令(socks+https的缩写)sps本身不提供代理功能只是接受代理请求"转换并转发"给已经存在的http(s)代理或者socks5代理sps可以把已经存在的http(s)代理或者socks5代理转换为一个端口同时支持http(s)和socks5代理而且http(s)代理支持正向代理和反向代理(SNI)转换后的SOCKS5代理不支持UDP功能另外对于已经存在的http(s)代理或者socks5代理支持tls、tcp、kcp三种模式支持链式连接也就是可以多个sps结点层级连接构建加密通道; 代理协议转换使用的是sps子命令(socks+https的缩写)sps本身不提供代理功能只是接受代理请求"转换并转发"给已经存在的http(s)代理或者socks5代理sps可以把已经存在的http(s)代理或者socks5代理转换为一个端口同时支持http(s)和socks5代理而且http(s)代理支持正向代理和反向代理(SNI)转换后的SOCKS5代理不支持UDP功能另外对于已经存在的http(s)代理或者socks5代理支持tls、tcp、kcp三种模式支持链式连接也就是可以多个sps结点层级连接构建加密通道。
#### **6.2 HTTP(S)转HTTP(S)+SOCKS5** #### **6.2 HTTP(S)转HTTP(S)+SOCKS5**
假设已经存在一个普通的http(s)代理127.0.0.1:8080,现在我们把它转为同时支持http(s)和socks5的普通代理,转换后的本地端口为18080。 假设已经存在一个普通的http(s)代理127.0.0.1:8080,现在我们把它转为同时支持http(s)和socks5的普通代理,转换后的本地端口为18080。

View File

@ -133,6 +133,7 @@ func initConfig() (err error) {
muxServerArgs.Key = muxServer.Flag("k", "client key").Default("default").String() muxServerArgs.Key = muxServer.Flag("k", "client key").Default("default").String()
muxServerArgs.Route = muxServer.Flag("route", "local route to client's network, such as: PROTOCOL://LOCAL_IP:LOCAL_PORT@[CLIENT_KEY]CLIENT_LOCAL_HOST:CLIENT_LOCAL_PORT").Short('r').Default("").Strings() muxServerArgs.Route = muxServer.Flag("route", "local route to client's network, such as: PROTOCOL://LOCAL_IP:LOCAL_PORT@[CLIENT_KEY]CLIENT_LOCAL_HOST:CLIENT_LOCAL_PORT").Short('r').Default("").Strings()
muxServerArgs.IsCompress = muxServer.Flag("c", "compress data when tcp mode").Default("false").Bool() muxServerArgs.IsCompress = muxServer.Flag("c", "compress data when tcp mode").Default("false").Bool()
muxServerArgs.SessionCount = muxServer.Flag("session-count", "session count which connect to bridge").Short('n').Default("10").Int()
//########mux-client######### //########mux-client#########
muxClient := app.Command("client", "proxy on mux client mode") muxClient := app.Command("client", "proxy on mux client mode")
@ -142,6 +143,7 @@ func initConfig() (err error) {
muxClientArgs.Timeout = muxClient.Flag("timeout", "tcp timeout with milliseconds").Short('t').Default("2000").Int() muxClientArgs.Timeout = muxClient.Flag("timeout", "tcp timeout with milliseconds").Short('t').Default("2000").Int()
muxClientArgs.Key = muxClient.Flag("k", "key same with server").Default("default").String() muxClientArgs.Key = muxClient.Flag("k", "key same with server").Default("default").String()
muxClientArgs.IsCompress = muxClient.Flag("c", "compress data when tcp mode").Default("false").Bool() muxClientArgs.IsCompress = muxClient.Flag("c", "compress data when tcp mode").Default("false").Bool()
muxClientArgs.SessionCount = muxClient.Flag("session-count", "session count which connect to bridge").Short('n').Default("10").Int()
//########mux-bridge######### //########mux-bridge#########
muxBridge := app.Command("bridge", "proxy on mux bridge mode") muxBridge := app.Command("bridge", "proxy on mux bridge mode")

View File

@ -38,6 +38,7 @@ type MuxServerArgs struct {
Route *[]string Route *[]string
Mgr *MuxServerManager Mgr *MuxServerManager
IsCompress *bool IsCompress *bool
SessionCount *int
} }
type MuxClientArgs struct { type MuxClientArgs struct {
Parent *string Parent *string
@ -48,6 +49,7 @@ type MuxClientArgs struct {
Key *string Key *string
Timeout *int Timeout *int
IsCompress *bool IsCompress *bool
SessionCount *int
} }
type MuxBridgeArgs struct { type MuxBridgeArgs struct {
Parent *string Parent *string

View File

@ -4,9 +4,11 @@ import (
"bufio" "bufio"
"io" "io"
"log" "log"
"math/rand"
"net" "net"
"snail007/proxy/utils" "snail007/proxy/utils"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/xtaci/smux" "github.com/xtaci/smux"
@ -83,7 +85,6 @@ func (s *MuxBridge) Start(args interface{}) (err error) {
go s.callback(stream, serverID, key) go s.callback(stream, serverID, key)
} }
case CONN_CLIENT: case CONN_CLIENT:
log.Printf("client connection %s connected", key) log.Printf("client connection %s connected", key)
session, err := smux.Client(inConn, nil) session, err := smux.Client(inConn, nil)
if err != nil { if err != nil {
@ -91,11 +92,24 @@ func (s *MuxBridge) Start(args interface{}) (err error) {
log.Printf("client session error,ERR:%s", err) log.Printf("client session error,ERR:%s", err)
return return
} }
s.clientControlConns.Set(key, session) keyInfo := strings.Split(key, "-")
groupKey := keyInfo[0]
index := keyInfo[1]
if !s.clientControlConns.Has(groupKey) {
item := utils.NewConcurrentMap()
s.clientControlConns.Set(groupKey, &item)
}
_group, _ := s.clientControlConns.Get(groupKey)
group := _group.(*utils.ConcurrentMap)
group.Set(index, session)
// s.clientControlConns.Set(key, session)
go func() { go func() {
for { for {
if session.IsClosed() { if session.IsClosed() {
s.clientControlConns.Remove(key) group.Remove(index)
if group.IsEmpty() {
s.clientControlConns.Remove(groupKey)
}
break break
} }
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
@ -124,19 +138,23 @@ func (s *MuxBridge) callback(inConn net.Conn, serverID, key string) {
if key == "*" { if key == "*" {
key = s.router.GetKey() key = s.router.GetKey()
} }
session, ok := s.clientControlConns.Get(key) _group, ok := s.clientControlConns.Get(key)
if !ok { if !ok {
log.Printf("client %s session not exists for server stream %s", key, serverID) log.Printf("client %s session not exists for server stream %s", key, serverID)
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
continue continue
} }
group := _group.(*utils.ConcurrentMap)
index := group.Keys()[rand.Intn(group.Count())]
log.Printf("select client : %s-%s", key, index)
session, _ := group.Get(index)
stream, err := session.(*smux.Session).OpenStream() stream, err := session.(*smux.Session).OpenStream()
if err != nil { if err != nil {
log.Printf("%s client session open stream %s fail, err: %s, retrying...", key, serverID, err) log.Printf("%s client session open stream %s fail, err: %s, retrying...", key, serverID, err)
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
continue continue
} else { } else {
log.Printf("%s server %s stream created", key, serverID) log.Printf("stream %s -> %s created", serverID, key)
die1 := make(chan bool, 1) die1 := make(chan bool, 1)
die2 := make(chan bool, 1) die2 := make(chan bool, 1)
go func() { go func() {

View File

@ -2,6 +2,7 @@ package services
import ( import (
"crypto/tls" "crypto/tls"
"fmt"
"io" "io"
"log" "log"
"net" "net"
@ -45,6 +46,15 @@ func (s *MuxClient) Start(args interface{}) (err error) {
s.CheckArgs() s.CheckArgs()
s.InitService() s.InitService()
log.Printf("proxy on mux client mode, compress %v", *s.cfg.IsCompress) log.Printf("proxy on mux client mode, compress %v", *s.cfg.IsCompress)
for i := 1; i <= *s.cfg.SessionCount; i++ {
log.Printf("session worker[%d] started", i)
go func(i int) {
defer func() {
e := recover()
if e != nil {
log.Printf("session worker crashed: %s", e)
}
}()
for { for {
var _conn tls.Conn var _conn tls.Conn
_conn, err = utils.TlsConnectHost(*s.cfg.Parent, *s.cfg.Timeout, s.cfg.CertBytes, s.cfg.KeyBytes) _conn, err = utils.TlsConnectHost(*s.cfg.Parent, *s.cfg.Timeout, s.cfg.CertBytes, s.cfg.KeyBytes)
@ -54,7 +64,7 @@ func (s *MuxClient) Start(args interface{}) (err error) {
continue continue
} }
conn := net.Conn(&_conn) conn := net.Conn(&_conn)
_, err = conn.Write(utils.BuildPacket(CONN_CLIENT, *s.cfg.Key)) _, err = conn.Write(utils.BuildPacket(CONN_CLIENT, fmt.Sprintf("%s-%d", *s.cfg.Key, i)))
if err != nil { if err != nil {
conn.Close() conn.Close()
log.Printf("connection err: %s, retrying...", err) log.Printf("connection err: %s, retrying...", err)
@ -77,6 +87,12 @@ func (s *MuxClient) Start(args interface{}) (err error) {
break break
} }
go func() { go func() {
defer func() {
e := recover()
if e != nil {
log.Printf("stream handler crashed: %s", e)
}
}()
var ID, clientLocalAddr, serverID string var ID, clientLocalAddr, serverID string
err = utils.ReadPacketData(stream, &ID, &clientLocalAddr, &serverID) err = utils.ReadPacketData(stream, &ID, &clientLocalAddr, &serverID)
if err != nil { if err != nil {
@ -84,7 +100,7 @@ func (s *MuxClient) Start(args interface{}) (err error) {
stream.Close() stream.Close()
return return
} }
log.Printf("signal revecived,server %s stream %s %s", serverID, ID, clientLocalAddr) log.Printf("worker[%d] signal revecived,server %s stream %s %s", i, serverID, ID, clientLocalAddr)
protocol := clientLocalAddr[:3] protocol := clientLocalAddr[:3]
localAddr := clientLocalAddr[4:] localAddr := clientLocalAddr[4:]
if protocol == "udp" { if protocol == "udp" {
@ -96,6 +112,9 @@ func (s *MuxClient) Start(args interface{}) (err error) {
} }
} }
}(i)
}
return
} }
func (s *MuxClient) Clean() { func (s *MuxClient) Clean() {
s.StopService() s.StopService()

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"io" "io"
"log" "log"
"math/rand"
"net" "net"
"runtime/debug" "runtime/debug"
"snail007/proxy/utils" "snail007/proxy/utils"
@ -20,7 +21,7 @@ type MuxServer struct {
cfg MuxServerArgs cfg MuxServerArgs
udpChn chan MuxUDPItem udpChn chan MuxUDPItem
sc utils.ServerChannel sc utils.ServerChannel
session *smux.Session sessions utils.ConcurrentMap
lockChn chan bool lockChn chan bool
} }
@ -86,6 +87,7 @@ func (s *MuxServerManager) Start(args interface{}) (err error) {
Timeout: s.cfg.Timeout, Timeout: s.cfg.Timeout,
Mgr: s, Mgr: s,
IsCompress: s.cfg.IsCompress, IsCompress: s.cfg.IsCompress,
SessionCount: s.cfg.SessionCount,
}) })
if err != nil { if err != nil {
@ -113,6 +115,7 @@ func NewMuxServer() Service {
cfg: MuxServerArgs{}, cfg: MuxServerArgs{},
udpChn: make(chan MuxUDPItem, 50000), udpChn: make(chan MuxUDPItem, 50000),
lockChn: make(chan bool, 1), lockChn: make(chan bool, 1),
sessions: utils.NewConcurrentMap(),
} }
} }
@ -206,7 +209,7 @@ func (s *MuxServer) Clean() {
} }
func (s *MuxServer) GetOutConn() (outConn net.Conn, ID string, err error) { func (s *MuxServer) GetOutConn() (outConn net.Conn, ID string, err error) {
outConn, err = s.GetConn() outConn, err = s.GetConn(fmt.Sprintf("%d", rand.Intn(*s.cfg.SessionCount)))
if err != nil { if err != nil {
log.Printf("connection err: %s", err) log.Printf("connection err: %s", err)
return return
@ -224,7 +227,7 @@ func (s *MuxServer) GetOutConn() (outConn net.Conn, ID string, err error) {
} }
return return
} }
func (s *MuxServer) GetConn() (conn net.Conn, err error) { func (s *MuxServer) GetConn(index string) (conn net.Conn, err error) {
select { select {
case s.lockChn <- true: case s.lockChn <- true:
default: default:
@ -234,32 +237,35 @@ func (s *MuxServer) GetConn() (conn net.Conn, err error) {
defer func() { defer func() {
<-s.lockChn <-s.lockChn
}() }()
if s.session == nil { var session *smux.Session
_session, ok := s.sessions.Get(index)
if !ok {
var _conn tls.Conn var _conn tls.Conn
_conn, err = utils.TlsConnectHost(*s.cfg.Parent, *s.cfg.Timeout, s.cfg.CertBytes, s.cfg.KeyBytes) _conn, err = utils.TlsConnectHost(*s.cfg.Parent, *s.cfg.Timeout, s.cfg.CertBytes, s.cfg.KeyBytes)
if err != nil { if err != nil {
s.session = nil
return return
} }
c := net.Conn(&_conn) c := net.Conn(&_conn)
_, err = c.Write(utils.BuildPacket(CONN_SERVER, *s.cfg.Key, s.cfg.Mgr.serverID)) _, err = c.Write(utils.BuildPacket(CONN_SERVER, *s.cfg.Key, s.cfg.Mgr.serverID))
if err != nil { if err != nil {
c.Close() c.Close()
s.session = nil
return return
} }
if err == nil { if err == nil {
s.session, err = smux.Client(c, nil) session, err = smux.Client(c, nil)
if err != nil { if err != nil {
s.session = nil
return return
} }
} }
s.sessions.Set(index, session)
log.Printf("session[%s] created", index)
} else {
session = _session.(*smux.Session)
} }
conn, err = s.session.OpenStream() conn, err = session.OpenStream()
if err != nil { if err != nil {
s.session.Close() session.Close()
s.session = nil s.sessions.Remove(index)
} }
return return