This commit is contained in:
arraykeys@gmail.com
2018-09-04 15:09:08 +08:00
parent 1d7eee0e69
commit 4d1c9ffa1f
8 changed files with 287 additions and 157 deletions

View File

@ -1,4 +1,11 @@
proxy更新日志
v5.5
1.预编译的二进制增加了armv8支持.
2.预编译的mipsle和mips二进制增加了softfloat支持.
3.优化连接HTTP(s)上级代理的CONNECT指令,附带更多的信息.
4.重构了内网穿透的UDP功能,性能大幅度提升,可以愉快的与异地基友玩依赖UDP的局域网游戏了.
5.重构了UDP端口映射,性能大幅度提升.
v5.4
1.优化了获取本地IP信息导致CPU过高的问题.
2.所有服务都增加了--nolog参数,可以关闭日志输出,节省CPU.

View File

@ -18,10 +18,16 @@ CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=7 go build -o proxy && tar zcfv "${REL
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 GOARM=7 go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-arm64-v7.tar.gz" proxy direct blocked
CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=5 go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-arm-v5.tar.gz" proxy direct blocked
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 GOARM=5 go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-arm64-v5.tar.gz" proxy direct blocked
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-arm64-v8.tar.gz" proxy direct blocked
CGO_ENABLED=0 GOOS=linux GOARCH=arm go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-arm-v8.tar.gz" proxy direct blocked
CGO_ENABLED=0 GOOS=linux GOARCH=mips go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-mips.tar.gz" proxy direct blocked
CGO_ENABLED=0 GOOS=linux GOARCH=mips64 go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-mips64.tar.gz" proxy direct blocked
CGO_ENABLED=0 GOOS=linux GOARCH=mips64le go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-mips64le.tar.gz" proxy direct blocked
CGO_ENABLED=0 GOOS=linux GOARCH=mipsle go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-mipsle.tar.gz" proxy direct blocked
CGO_ENABLED=0 GOOS=linux GOARCH=mips GOMIPS=softfloat go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-mips-softfloat.tar.gz" proxy direct blocked
CGO_ENABLED=0 GOOS=linux GOARCH=mips64 GOMIPS=softfloat go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-mips64-softfloat.tar.gz" proxy direct blocked
CGO_ENABLED=0 GOOS=linux GOARCH=mips64le GOMIPS=softfloat go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-mips64le-softfloat.tar.gz" proxy direct blocked
CGO_ENABLED=0 GOOS=linux GOARCH=mipsle GOMIPS=softfloat go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-mipsle-softfloat.tar.gz" proxy direct blocked
CGO_ENABLED=0 GOOS=linux GOARCH=ppc64 go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-ppc64.tar.gz" proxy direct blocked
CGO_ENABLED=0 GOOS=linux GOARCH=ppc64le go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-ppc64le.tar.gz" proxy direct blocked
CGO_ENABLED=0 GOOS=linux GOARCH=s390x go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-s390x.tar.gz" proxy direct blocked

View File

@ -6,6 +6,7 @@ import (
"io"
logger "log"
"net"
"strings"
"time"
"github.com/golang/snappy"
@ -31,12 +32,21 @@ type MuxClientArgs struct {
KCP kcpcfg.KCPConfigArgs
Jumper *string
}
type ClientUDPConnItem struct {
conn *smux.Stream
touchtime int64
srcAddr *net.UDPAddr
localAddr *net.UDPAddr
udpConn *net.UDPConn
connid string
}
type MuxClient struct {
cfg MuxClientArgs
isStop bool
sessions utils.ConcurrentMap
log *logger.Logger
jumper *jumper.Jumper
udpConns utils.ConcurrentMap
}
func NewMuxClient() services.Service {
@ -44,10 +54,12 @@ func NewMuxClient() services.Service {
cfg: MuxClientArgs{},
isStop: false,
sessions: utils.NewConcurrentMap(),
udpConns: utils.NewConcurrentMap(),
}
}
func (s *MuxClient) InitService() (err error) {
s.UDPGCDeamon()
return
}
@ -178,7 +190,7 @@ func (s *MuxClient) Start(args interface{}, log *logger.Logger) (err error) {
stream.Close()
return
}
s.log.Printf("worker[%d] signal revecived,server %s stream %s %s", i, serverID, ID, clientLocalAddr)
//s.log.Printf("worker[%d] signal revecived,server %s stream %s %s", i, serverID, ID, clientLocalAddr)
protocol := clientLocalAddr[:3]
localAddr := clientLocalAddr[4:]
if protocol == "udp" {
@ -228,76 +240,130 @@ func (s *MuxClient) getParentConn() (conn net.Conn, err error) {
return
}
func (s *MuxClient) ServeUDP(inConn *smux.Stream, localAddr, ID string) {
var item *ClientUDPConnItem
var body []byte
var err error
srcAddr := ""
defer func() {
if item != nil {
(*item).conn.Close()
(*item).udpConn.Close()
s.udpConns.Remove(srcAddr)
inConn.Close()
}
}()
for {
if s.isStop {
return
}
inConn.SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout)))
srcAddr, body, err := utils.ReadUDPPacket(inConn)
inConn.SetDeadline(time.Time{})
srcAddr, body, err = utils.ReadUDPPacket(inConn)
if err != nil {
s.log.Printf("udp packet revecived fail, err: %s", err)
s.log.Printf("connection %s released", ID)
if strings.Contains(err.Error(), "n != int(") {
continue
}
if !utils.IsNetDeadlineErr(err) && err != io.EOF {
s.log.Printf("udp packet revecived from bridge fail, err: %s", err)
}
return
}
if v, ok := s.udpConns.Get(srcAddr); !ok {
_srcAddr, _ := net.ResolveUDPAddr("udp", srcAddr)
zeroAddr, _ := net.ResolveUDPAddr("udp", ":")
_localAddr, _ := net.ResolveUDPAddr("udp", localAddr)
c, err := net.DialUDP("udp", zeroAddr, _localAddr)
if err != nil {
s.log.Printf("create local udp conn fail, err : %s", err)
inConn.Close()
break
return
}
item = &ClientUDPConnItem{
conn: inConn,
srcAddr: _srcAddr,
localAddr: _localAddr,
udpConn: c,
connid: ID,
}
s.udpConns.Set(srcAddr, item)
s.UDPRevecive(srcAddr, ID)
} else {
//s.log.Printf("udp packet revecived:%s,%v", srcAddr, body)
go func() {
defer func() {
if e := recover(); e != nil {
s.log.Printf("client processUDPPacket crashed,err: %s", e)
}
}()
s.processUDPPacket(inConn, srcAddr, localAddr, body)
}()
}
item = v.(*ClientUDPConnItem)
}
(*item).touchtime = time.Now().Unix()
go (*item).udpConn.Write(body)
//_, err = (*item).udpConn.Write(body)
// if err != nil {
// s.log.Printf("send udp packet to %s fail, err : %s", item.localAddr, err)
// return
// }
}
func (s *MuxClient) processUDPPacket(inConn *smux.Stream, srcAddr, localAddr string, body []byte) {
dstAddr, err := net.ResolveUDPAddr("udp", localAddr)
if err != nil {
s.log.Printf("can't resolve address: %s", err)
inConn.Close()
}
func (s *MuxClient) UDPRevecive(key, ID string) {
go func() {
s.log.Printf("udp conn %s connected", ID)
v, ok := s.udpConns.Get(key)
if !ok {
s.log.Printf("[warn] udp conn not exists for %s, connid : %s", key, ID)
return
}
clientSrcAddr := &net.UDPAddr{IP: net.IPv4zero, Port: 0}
conn, err := net.DialUDP("udp", clientSrcAddr, dstAddr)
cui := v.(*ClientUDPConnItem)
buf := utils.LeakyBuffer.Get()
defer func() {
utils.LeakyBuffer.Put(buf)
cui.conn.Close()
cui.udpConn.Close()
s.udpConns.Remove(key)
s.log.Printf("udp conn %s released", ID)
}()
for {
n, err := cui.udpConn.Read(buf)
if err != nil {
s.log.Printf("connect to udp %s fail,ERR:%s", dstAddr.String(), err)
if !utils.IsNetClosedErr(err) {
s.log.Printf("udp conn read udp packet fail , err: %s ", err)
}
return
}
conn.SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout)))
_, err = conn.Write(body)
conn.SetDeadline(time.Time{})
cui.touchtime = time.Now().Unix()
go func() {
cui.conn.SetWriteDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout)))
_, err = cui.conn.Write(utils.UDPPacket(cui.srcAddr.String(), buf[:n]))
cui.conn.SetWriteDeadline(time.Time{})
if err != nil {
s.log.Printf("send udp packet to %s fail,ERR:%s", dstAddr.String(), err)
cui.udpConn.Close()
return
}
//s.log.Printf("send udp packet to %s success", dstAddr.String())
buf := make([]byte, 1024)
conn.SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout)))
length, _, err := conn.ReadFromUDP(buf)
conn.SetDeadline(time.Time{})
if err != nil {
s.log.Printf("read udp response from %s fail ,ERR:%s", dstAddr.String(), err)
}()
// _, err = cui.conn.Write(utils.UDPPacket(cui.srcAddr.String(), buf[:n]))
// if err != nil {
// s.log.Printf("send udp packet to bridge fail, err : %s", err)
// return
// }
}
}()
}
func (s *MuxClient) UDPGCDeamon() {
gctime := int64(30)
go func() {
if s.isStop {
return
}
respBody := buf[0:length]
//s.log.Printf("revecived udp packet from %s , %v", dstAddr.String(), respBody)
bs := utils.UDPPacket(srcAddr, respBody)
(*inConn).SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout)))
_, err = (*inConn).Write(bs)
(*inConn).SetDeadline(time.Time{})
if err != nil {
s.log.Printf("send udp response fail ,ERR:%s", err)
inConn.Close()
return
timer := time.NewTicker(time.Second)
for {
<-timer.C
gcKeys := []string{}
s.udpConns.IterCb(func(key string, v interface{}) {
if time.Now().Unix()-v.(*ClientUDPConnItem).touchtime > gctime {
(*(v.(*ClientUDPConnItem).conn)).Close()
(v.(*ClientUDPConnItem).udpConn).Close()
gcKeys = append(gcKeys, key)
s.log.Printf("gc udp conn %s", v.(*ClientUDPConnItem).connid)
}
//s.log.Printf("send udp response success ,from:%s ,%d ,%v", dstAddr.String(), len(bs), bs)
})
for _, k := range gcKeys {
s.udpConns.Remove(k)
}
gcKeys = nil
}
}()
}
func (s *MuxClient) ServeConn(inConn *smux.Stream, localAddr, ID string) {
var err error

View File

@ -42,15 +42,20 @@ type MuxServerArgs struct {
Jumper *string
}
type MuxUDPItem struct {
type MuxUDPPacketItem struct {
packet *[]byte
localAddr *net.UDPAddr
srcAddr *net.UDPAddr
}
type UDPConnItem struct {
conn *net.Conn
touchtime int64
srcAddr *net.UDPAddr
localAddr *net.UDPAddr
connid string
}
type MuxServerManager struct {
cfg MuxServerArgs
udpChn chan MuxUDPItem
serverID string
servers []*services.Service
log *logger.Logger
@ -59,7 +64,6 @@ type MuxServerManager struct {
func NewMuxServerManager() services.Service {
return &MuxServerManager{
cfg: MuxServerArgs{},
udpChn: make(chan MuxUDPItem, 50000),
serverID: utils.Uniqueid(),
servers: []*services.Service{},
}
@ -160,23 +164,24 @@ func (s *MuxServerManager) InitService() (err error) {
type MuxServer struct {
cfg MuxServerArgs
udpChn chan MuxUDPItem
sc utils.ServerChannel
sessions utils.ConcurrentMap
lockChn chan bool
isStop bool
udpConn *net.Conn
log *logger.Logger
jumper *jumper.Jumper
udpConns utils.ConcurrentMap
// writelock *sync.Mutex
}
func NewMuxServer() services.Service {
return &MuxServer{
cfg: MuxServerArgs{},
udpChn: make(chan MuxUDPItem, 50000),
lockChn: make(chan bool, 1),
sessions: utils.NewConcurrentMap(),
isStop: false,
udpConns: utils.NewConcurrentMap(),
// writelock: &sync.Mutex{},
}
}
@ -199,12 +204,9 @@ func (s *MuxServer) StopService() {
if s.sc.UDPListener != nil {
(*s.sc.UDPListener).Close()
}
if s.udpConn != nil {
(*s.udpConn).Close()
}
}
func (s *MuxServer) InitService() (err error) {
s.UDPConnDeamon()
s.UDPGCDeamon()
return
}
func (s *MuxServer) CheckArgs() (err error) {
@ -242,11 +244,7 @@ func (s *MuxServer) Start(args interface{}, log *logger.Logger) (err error) {
s.sc = utils.NewServerChannel(host, p, s.log)
if *s.cfg.IsUDP {
err = s.sc.ListenUDP(func(packet []byte, localAddr, srcAddr *net.UDPAddr) {
s.udpChn <- MuxUDPItem{
packet: &packet,
localAddr: localAddr,
srcAddr: srcAddr,
}
s.UDPSend(packet, localAddr, srcAddr)
})
if err != nil {
return
@ -317,7 +315,9 @@ func (s *MuxServer) GetOutConn() (outConn net.Conn, ID string, err error) {
}
outConn, err = s.GetConn(fmt.Sprintf("%d", i))
if err != nil {
if !strings.Contains(err.Error(), "can not connect at same time") {
s.log.Printf("connection err: %s", err)
}
return
}
remoteAddr := "tcp:" + *s.cfg.Remote
@ -424,87 +424,111 @@ func (s *MuxServer) getParentConn() (conn net.Conn, err error) {
}
return
}
func (s *MuxServer) UDPConnDeamon() {
func (s *MuxServer) UDPGCDeamon() {
gctime := int64(30)
go func() {
defer func() {
if err := recover(); err != nil {
s.log.Printf("udp conn deamon crashed with err : %s \nstack: %s", err, string(debug.Stack()))
if s.isStop {
return
}
timer := time.NewTicker(time.Second)
for {
<-timer.C
gcKeys := []string{}
s.udpConns.IterCb(func(key string, v interface{}) {
if time.Now().Unix()-v.(*UDPConnItem).touchtime > gctime {
(*(v.(*UDPConnItem).conn)).Close()
gcKeys = append(gcKeys, key)
s.log.Printf("gc udp conn %s", v.(*UDPConnItem).connid)
}
})
for _, k := range gcKeys {
s.udpConns.Remove(k)
}
gcKeys = nil
}
}()
var outConn net.Conn
var ID string
var err error
}
func (s *MuxServer) UDPSend(data []byte, localAddr, srcAddr *net.UDPAddr) {
var (
uc *UDPConnItem
key = srcAddr.String()
ID string
err error
outconn net.Conn
)
v, ok := s.udpConns.Get(key)
if !ok {
for {
if s.isStop {
return
}
item := <-s.udpChn
RETRY:
if s.isStop {
return
}
if outConn == nil {
for {
if s.isStop {
return
}
outConn, ID, err = s.GetOutConn()
if err != nil {
outConn = nil
utils.CloseConn(&outConn)
s.log.Printf("connect to %s fail, err: %s, retrying...", *s.cfg.Parent, err)
time.Sleep(time.Second * 3)
outconn, ID, err = s.GetOutConn()
if err != nil && strings.Contains(err.Error(), "can not connect at same time") {
time.Sleep(time.Millisecond * 500)
continue
} else {
go func(outConn net.Conn, ID string) {
if s.udpConn != nil {
(*s.udpConn).Close()
break
}
s.udpConn = &outConn
for {
if s.isStop {
}
if err != nil {
s.log.Printf("connect to %s fail, err: %s", *s.cfg.Parent, err)
return
}
outConn.SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout)))
srcAddrFromConn, body, err := utils.ReadUDPPacket(outConn)
outConn.SetDeadline(time.Time{})
uc = &UDPConnItem{
conn: &outconn,
srcAddr: srcAddr,
localAddr: localAddr,
connid: ID,
}
s.udpConns.Set(key, uc)
s.UDPRevecive(key, ID)
} else {
uc = v.(*UDPConnItem)
}
go func() {
defer func() {
if e := recover(); e != nil {
(*uc.conn).Close()
s.udpConns.Remove(key)
s.log.Printf("udp sender crashed with error : %s", e)
}
}()
uc.touchtime = time.Now().Unix()
(*uc.conn).SetWriteDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout)))
_, err = (*uc.conn).Write(utils.UDPPacket(srcAddr.String(), data))
(*uc.conn).SetWriteDeadline(time.Time{})
if err != nil {
s.log.Printf("parse revecived udp packet fail, err: %s ,%v", err, body)
s.log.Printf("UDP deamon connection %s exited", ID)
break
}
//s.log.Printf("udp packet revecived over parent , local:%s", srcAddrFromConn)
_srcAddr := strings.Split(srcAddrFromConn, ":")
if len(_srcAddr) != 2 {
s.log.Printf("parse revecived udp packet fail, addr error : %s", srcAddrFromConn)
continue
}
port, _ := strconv.Atoi(_srcAddr[1])
dstAddr := &net.UDPAddr{IP: net.ParseIP(_srcAddr[0]), Port: port}
s.sc.UDPListener.SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout)))
_, err = s.sc.UDPListener.WriteToUDP(body, dstAddr)
s.sc.UDPListener.SetDeadline(time.Time{})
if err != nil {
s.log.Printf("udp response to local %s fail,ERR:%s", srcAddrFromConn, err)
continue
}
//s.log.Printf("udp response to local %s success , %v", srcAddrFromConn, body)
}
}(outConn, ID)
break
}
}
}
outConn.SetWriteDeadline(time.Now().Add(time.Second))
_, err = outConn.Write(utils.UDPPacket(item.srcAddr.String(), *item.packet))
outConn.SetWriteDeadline(time.Time{})
if err != nil {
utils.CloseConn(&outConn)
outConn = nil
s.log.Printf("write udp packet to %s fail ,flush err:%s ,retrying...", *s.cfg.Parent, err)
goto RETRY
}
//s.log.Printf("write packet %v", *item.packet)
s.log.Printf("write udp packet to %s fail ,flush err:%s ", *s.cfg.Parent, err)
}
}()
}
func (s *MuxServer) UDPRevecive(key, ID string) {
go func() {
s.log.Printf("udp conn %s connected", ID)
var uc *UDPConnItem
defer func() {
if uc != nil {
(*uc.conn).Close()
}
s.udpConns.Remove(key)
s.log.Printf("udp conn %s released", ID)
}()
v, ok := s.udpConns.Get(key)
if !ok {
s.log.Printf("[warn] udp conn not exists for %s, connid : %s", key, ID)
return
}
uc = v.(*UDPConnItem)
for {
_, body, err := utils.ReadUDPPacket(*uc.conn)
if err != nil {
if strings.Contains(err.Error(), "n != int(") {
continue
}
if err != io.EOF {
s.log.Printf("udp conn read udp packet fail , err: %s ", err)
}
return
}
uc.touchtime = time.Now().Unix()
go s.sc.UDPListener.WriteToUDP(body, uc.srcAddr)
}
}()
}

View File

@ -39,6 +39,7 @@ func GetService(name string) *ServiceItem {
func Stop(name string) {
if s, ok := servicesMap.Load(name); ok && s.(*ServiceItem).S != nil {
s.(*ServiceItem).S.Clean()
servicesMap.Delete(name)
}
}
func Run(name string, args interface{}) (service *ServiceItem, err error) {

View File

@ -345,7 +345,10 @@ func (s *SPS) OutToTCP(inConn *net.Conn) (err error) {
isHTTPS = true
pb.Write([]byte(fmt.Sprintf("CONNECT %s HTTP/1.1\r\n", address)))
}
pb.WriteString(fmt.Sprintf("Host: %s\r\n", address))
pb.WriteString(fmt.Sprintf("Proxy-Host: %s\r\n", address))
pb.WriteString("Proxy-Connection: Keep-Alive\r\n")
pb.WriteString("Connection: Keep-Alive\r\n")
u := ""
if *s.cfg.ParentAuth != "" {

View File

@ -585,6 +585,26 @@ func RemoveProxyHeaders(head []byte) []byte {
func InsertProxyHeaders(head []byte, headers string) []byte {
return bytes.Replace(head, []byte("\r\n"), []byte("\r\n"+headers), 1)
}
func IsNetClosedErr(err error) bool {
return err != nil && strings.Contains(err.Error(), "use of closed network connection")
}
func IsNetTimeoutErr(err error) bool {
if err == nil {
return false
}
e, ok := err.(net.Error)
return ok && e.Timeout()
}
func IsNetDeadlineErr(err error) bool {
return err != nil && strings.Contains(err.Error(), "i/o deadline reached")
}
func IsNetRefusedErr(err error) bool {
return err != nil && strings.Contains(err.Error(), "connection refused")
}
func IsNetSocketNotConnectedErr(err error) bool {
return err != nil && strings.Contains(err.Error(), "socket is not connected")
}
// type sockaddr struct {
// family uint16

View File

@ -51,7 +51,10 @@ func (j *Jumper) dialHTTPS(address string, timeout time.Duration) (conn net.Conn
}
pb := new(bytes.Buffer)
pb.Write([]byte(fmt.Sprintf("CONNECT %s HTTP/1.1\r\n", address)))
pb.WriteString(fmt.Sprintf("Host: %s\r\n", address))
pb.WriteString(fmt.Sprintf("Proxy-Host: %s\r\n", address))
pb.WriteString("Proxy-Connection: Keep-Alive\r\n")
pb.WriteString("Connection: Keep-Alive\r\n")
if j.proxyURL.User != nil {
p, _ := j.proxyURL.User.Password()
u := fmt.Sprintf("%s:%s", j.proxyURL.User.Username(), p)