diff --git a/CHANGELOG b/CHANGELOG index 71c2141..95300b5 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,4 +1,11 @@ proxy更新日志 +v5.5 +1.预编译的二进制增加了armv8支持. +2.预编译的mipsle和mips二进制增加了softfloat支持. +3.优化连接HTTP(s)上级代理的CONNECT指令,附带更多的信息. +4.重构了内网穿透的UDP功能,性能大幅度提升,可以愉快的与异地基友玩依赖UDP的局域网游戏了. +5.重构了UDP端口映射,性能大幅度提升. + v5.4 1.优化了获取本地IP信息导致CPU过高的问题. 2.所有服务都增加了--nolog参数,可以关闭日志输出,节省CPU. diff --git a/release.sh b/release.sh index df7b13c..ac0a83c 100755 --- a/release.sh +++ b/release.sh @@ -18,10 +18,16 @@ CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=7 go build -o proxy && tar zcfv "${REL CGO_ENABLED=0 GOOS=linux GOARCH=arm64 GOARM=7 go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-arm64-v7.tar.gz" proxy direct blocked CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=5 go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-arm-v5.tar.gz" proxy direct blocked CGO_ENABLED=0 GOOS=linux GOARCH=arm64 GOARM=5 go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-arm64-v5.tar.gz" proxy direct blocked +CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-arm64-v8.tar.gz" proxy direct blocked +CGO_ENABLED=0 GOOS=linux GOARCH=arm go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-arm-v8.tar.gz" proxy direct blocked CGO_ENABLED=0 GOOS=linux GOARCH=mips go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-mips.tar.gz" proxy direct blocked CGO_ENABLED=0 GOOS=linux GOARCH=mips64 go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-mips64.tar.gz" proxy direct blocked CGO_ENABLED=0 GOOS=linux GOARCH=mips64le go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-mips64le.tar.gz" proxy direct blocked CGO_ENABLED=0 GOOS=linux GOARCH=mipsle go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-mipsle.tar.gz" proxy direct blocked +CGO_ENABLED=0 GOOS=linux GOARCH=mips GOMIPS=softfloat go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-mips-softfloat.tar.gz" proxy direct blocked +CGO_ENABLED=0 GOOS=linux GOARCH=mips64 GOMIPS=softfloat go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-mips64-softfloat.tar.gz" proxy direct blocked +CGO_ENABLED=0 GOOS=linux GOARCH=mips64le GOMIPS=softfloat go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-mips64le-softfloat.tar.gz" proxy direct blocked +CGO_ENABLED=0 GOOS=linux GOARCH=mipsle GOMIPS=softfloat go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-mipsle-softfloat.tar.gz" proxy direct blocked CGO_ENABLED=0 GOOS=linux GOARCH=ppc64 go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-ppc64.tar.gz" proxy direct blocked CGO_ENABLED=0 GOOS=linux GOARCH=ppc64le go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-ppc64le.tar.gz" proxy direct blocked CGO_ENABLED=0 GOOS=linux GOARCH=s390x go build -o proxy && tar zcfv "${RELEASE}/proxy-linux-s390x.tar.gz" proxy direct blocked diff --git a/services/mux/mux_client.go b/services/mux/mux_client.go index a40914f..e340821 100644 --- a/services/mux/mux_client.go +++ b/services/mux/mux_client.go @@ -6,6 +6,7 @@ import ( "io" logger "log" "net" + "strings" "time" "github.com/golang/snappy" @@ -31,12 +32,21 @@ type MuxClientArgs struct { KCP kcpcfg.KCPConfigArgs Jumper *string } +type ClientUDPConnItem struct { + conn *smux.Stream + touchtime int64 + srcAddr *net.UDPAddr + localAddr *net.UDPAddr + udpConn *net.UDPConn + connid string +} type MuxClient struct { cfg MuxClientArgs isStop bool sessions utils.ConcurrentMap log *logger.Logger jumper *jumper.Jumper + udpConns utils.ConcurrentMap } func NewMuxClient() services.Service { @@ -44,10 +54,12 @@ func NewMuxClient() services.Service { cfg: MuxClientArgs{}, isStop: false, sessions: utils.NewConcurrentMap(), + udpConns: utils.NewConcurrentMap(), } } func (s *MuxClient) InitService() (err error) { + s.UDPGCDeamon() return } @@ -178,7 +190,7 @@ func (s *MuxClient) Start(args interface{}, log *logger.Logger) (err error) { stream.Close() return } - s.log.Printf("worker[%d] signal revecived,server %s stream %s %s", i, serverID, ID, clientLocalAddr) + //s.log.Printf("worker[%d] signal revecived,server %s stream %s %s", i, serverID, ID, clientLocalAddr) protocol := clientLocalAddr[:3] localAddr := clientLocalAddr[4:] if protocol == "udp" { @@ -228,76 +240,130 @@ func (s *MuxClient) getParentConn() (conn net.Conn, err error) { return } func (s *MuxClient) ServeUDP(inConn *smux.Stream, localAddr, ID string) { - + var item *ClientUDPConnItem + var body []byte + var err error + srcAddr := "" + defer func() { + if item != nil { + (*item).conn.Close() + (*item).udpConn.Close() + s.udpConns.Remove(srcAddr) + inConn.Close() + } + }() for { if s.isStop { return } - inConn.SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout))) - srcAddr, body, err := utils.ReadUDPPacket(inConn) - inConn.SetDeadline(time.Time{}) + srcAddr, body, err = utils.ReadUDPPacket(inConn) if err != nil { - s.log.Printf("udp packet revecived fail, err: %s", err) - s.log.Printf("connection %s released", ID) - inConn.Close() - break - } else { - //s.log.Printf("udp packet revecived:%s,%v", srcAddr, body) - go func() { - defer func() { - if e := recover(); e != nil { - s.log.Printf("client processUDPPacket crashed,err: %s", e) - } - }() - s.processUDPPacket(inConn, srcAddr, localAddr, body) - }() - + if strings.Contains(err.Error(), "n != int(") { + continue + } + if !utils.IsNetDeadlineErr(err) && err != io.EOF { + s.log.Printf("udp packet revecived from bridge fail, err: %s", err) + } + return } - + if v, ok := s.udpConns.Get(srcAddr); !ok { + _srcAddr, _ := net.ResolveUDPAddr("udp", srcAddr) + zeroAddr, _ := net.ResolveUDPAddr("udp", ":") + _localAddr, _ := net.ResolveUDPAddr("udp", localAddr) + c, err := net.DialUDP("udp", zeroAddr, _localAddr) + if err != nil { + s.log.Printf("create local udp conn fail, err : %s", err) + inConn.Close() + return + } + item = &ClientUDPConnItem{ + conn: inConn, + srcAddr: _srcAddr, + localAddr: _localAddr, + udpConn: c, + connid: ID, + } + s.udpConns.Set(srcAddr, item) + s.UDPRevecive(srcAddr, ID) + } else { + item = v.(*ClientUDPConnItem) + } + (*item).touchtime = time.Now().Unix() + go (*item).udpConn.Write(body) + //_, err = (*item).udpConn.Write(body) + // if err != nil { + // s.log.Printf("send udp packet to %s fail, err : %s", item.localAddr, err) + // return + // } } - // } } -func (s *MuxClient) processUDPPacket(inConn *smux.Stream, srcAddr, localAddr string, body []byte) { - dstAddr, err := net.ResolveUDPAddr("udp", localAddr) - if err != nil { - s.log.Printf("can't resolve address: %s", err) - inConn.Close() - return - } - clientSrcAddr := &net.UDPAddr{IP: net.IPv4zero, Port: 0} - conn, err := net.DialUDP("udp", clientSrcAddr, dstAddr) - if err != nil { - s.log.Printf("connect to udp %s fail,ERR:%s", dstAddr.String(), err) - return - } - conn.SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout))) - _, err = conn.Write(body) - conn.SetDeadline(time.Time{}) - if err != nil { - s.log.Printf("send udp packet to %s fail,ERR:%s", dstAddr.String(), err) - return - } - //s.log.Printf("send udp packet to %s success", dstAddr.String()) - buf := make([]byte, 1024) - conn.SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout))) - length, _, err := conn.ReadFromUDP(buf) - conn.SetDeadline(time.Time{}) - if err != nil { - s.log.Printf("read udp response from %s fail ,ERR:%s", dstAddr.String(), err) - return - } - respBody := buf[0:length] - //s.log.Printf("revecived udp packet from %s , %v", dstAddr.String(), respBody) - bs := utils.UDPPacket(srcAddr, respBody) - (*inConn).SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout))) - _, err = (*inConn).Write(bs) - (*inConn).SetDeadline(time.Time{}) - if err != nil { - s.log.Printf("send udp response fail ,ERR:%s", err) - inConn.Close() - return - } - //s.log.Printf("send udp response success ,from:%s ,%d ,%v", dstAddr.String(), len(bs), bs) +func (s *MuxClient) UDPRevecive(key, ID string) { + go func() { + s.log.Printf("udp conn %s connected", ID) + v, ok := s.udpConns.Get(key) + if !ok { + s.log.Printf("[warn] udp conn not exists for %s, connid : %s", key, ID) + return + } + cui := v.(*ClientUDPConnItem) + buf := utils.LeakyBuffer.Get() + defer func() { + utils.LeakyBuffer.Put(buf) + cui.conn.Close() + cui.udpConn.Close() + s.udpConns.Remove(key) + s.log.Printf("udp conn %s released", ID) + }() + for { + n, err := cui.udpConn.Read(buf) + if err != nil { + if !utils.IsNetClosedErr(err) { + s.log.Printf("udp conn read udp packet fail , err: %s ", err) + } + return + } + cui.touchtime = time.Now().Unix() + go func() { + cui.conn.SetWriteDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout))) + _, err = cui.conn.Write(utils.UDPPacket(cui.srcAddr.String(), buf[:n])) + cui.conn.SetWriteDeadline(time.Time{}) + if err != nil { + cui.udpConn.Close() + return + } + }() + // _, err = cui.conn.Write(utils.UDPPacket(cui.srcAddr.String(), buf[:n])) + // if err != nil { + // s.log.Printf("send udp packet to bridge fail, err : %s", err) + // return + // } + } + }() +} +func (s *MuxClient) UDPGCDeamon() { + gctime := int64(30) + go func() { + if s.isStop { + return + } + timer := time.NewTicker(time.Second) + for { + <-timer.C + gcKeys := []string{} + s.udpConns.IterCb(func(key string, v interface{}) { + if time.Now().Unix()-v.(*ClientUDPConnItem).touchtime > gctime { + (*(v.(*ClientUDPConnItem).conn)).Close() + (v.(*ClientUDPConnItem).udpConn).Close() + gcKeys = append(gcKeys, key) + s.log.Printf("gc udp conn %s", v.(*ClientUDPConnItem).connid) + } + }) + for _, k := range gcKeys { + s.udpConns.Remove(k) + } + gcKeys = nil + } + }() } func (s *MuxClient) ServeConn(inConn *smux.Stream, localAddr, ID string) { var err error diff --git a/services/mux/mux_server.go b/services/mux/mux_server.go index b9321fe..2a9f70a 100644 --- a/services/mux/mux_server.go +++ b/services/mux/mux_server.go @@ -42,15 +42,20 @@ type MuxServerArgs struct { Jumper *string } -type MuxUDPItem struct { +type MuxUDPPacketItem struct { packet *[]byte localAddr *net.UDPAddr srcAddr *net.UDPAddr } - +type UDPConnItem struct { + conn *net.Conn + touchtime int64 + srcAddr *net.UDPAddr + localAddr *net.UDPAddr + connid string +} type MuxServerManager struct { cfg MuxServerArgs - udpChn chan MuxUDPItem serverID string servers []*services.Service log *logger.Logger @@ -59,7 +64,6 @@ type MuxServerManager struct { func NewMuxServerManager() services.Service { return &MuxServerManager{ cfg: MuxServerArgs{}, - udpChn: make(chan MuxUDPItem, 50000), serverID: utils.Uniqueid(), servers: []*services.Service{}, } @@ -160,23 +164,24 @@ func (s *MuxServerManager) InitService() (err error) { type MuxServer struct { cfg MuxServerArgs - udpChn chan MuxUDPItem sc utils.ServerChannel sessions utils.ConcurrentMap lockChn chan bool isStop bool - udpConn *net.Conn log *logger.Logger jumper *jumper.Jumper + udpConns utils.ConcurrentMap + // writelock *sync.Mutex } func NewMuxServer() services.Service { return &MuxServer{ cfg: MuxServerArgs{}, - udpChn: make(chan MuxUDPItem, 50000), lockChn: make(chan bool, 1), sessions: utils.NewConcurrentMap(), isStop: false, + udpConns: utils.NewConcurrentMap(), + // writelock: &sync.Mutex{}, } } @@ -199,12 +204,9 @@ func (s *MuxServer) StopService() { if s.sc.UDPListener != nil { (*s.sc.UDPListener).Close() } - if s.udpConn != nil { - (*s.udpConn).Close() - } } func (s *MuxServer) InitService() (err error) { - s.UDPConnDeamon() + s.UDPGCDeamon() return } func (s *MuxServer) CheckArgs() (err error) { @@ -242,11 +244,7 @@ func (s *MuxServer) Start(args interface{}, log *logger.Logger) (err error) { s.sc = utils.NewServerChannel(host, p, s.log) if *s.cfg.IsUDP { err = s.sc.ListenUDP(func(packet []byte, localAddr, srcAddr *net.UDPAddr) { - s.udpChn <- MuxUDPItem{ - packet: &packet, - localAddr: localAddr, - srcAddr: srcAddr, - } + s.UDPSend(packet, localAddr, srcAddr) }) if err != nil { return @@ -317,7 +315,9 @@ func (s *MuxServer) GetOutConn() (outConn net.Conn, ID string, err error) { } outConn, err = s.GetConn(fmt.Sprintf("%d", i)) if err != nil { - s.log.Printf("connection err: %s", err) + if !strings.Contains(err.Error(), "can not connect at same time") { + s.log.Printf("connection err: %s", err) + } return } remoteAddr := "tcp:" + *s.cfg.Remote @@ -424,87 +424,111 @@ func (s *MuxServer) getParentConn() (conn net.Conn, err error) { } return } -func (s *MuxServer) UDPConnDeamon() { +func (s *MuxServer) UDPGCDeamon() { + gctime := int64(30) go func() { - defer func() { - if err := recover(); err != nil { - s.log.Printf("udp conn deamon crashed with err : %s \nstack: %s", err, string(debug.Stack())) - } - }() - var outConn net.Conn - var ID string - var err error + if s.isStop { + return + } + timer := time.NewTicker(time.Second) for { - if s.isStop { - return - } - item := <-s.udpChn - RETRY: - if s.isStop { - return - } - if outConn == nil { - for { - if s.isStop { - return - } - outConn, ID, err = s.GetOutConn() - if err != nil { - outConn = nil - utils.CloseConn(&outConn) - s.log.Printf("connect to %s fail, err: %s, retrying...", *s.cfg.Parent, err) - time.Sleep(time.Second * 3) - continue - } else { - go func(outConn net.Conn, ID string) { - if s.udpConn != nil { - (*s.udpConn).Close() - } - s.udpConn = &outConn - for { - if s.isStop { - return - } - outConn.SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout))) - srcAddrFromConn, body, err := utils.ReadUDPPacket(outConn) - outConn.SetDeadline(time.Time{}) - if err != nil { - s.log.Printf("parse revecived udp packet fail, err: %s ,%v", err, body) - s.log.Printf("UDP deamon connection %s exited", ID) - break - } - //s.log.Printf("udp packet revecived over parent , local:%s", srcAddrFromConn) - _srcAddr := strings.Split(srcAddrFromConn, ":") - if len(_srcAddr) != 2 { - s.log.Printf("parse revecived udp packet fail, addr error : %s", srcAddrFromConn) - continue - } - port, _ := strconv.Atoi(_srcAddr[1]) - dstAddr := &net.UDPAddr{IP: net.ParseIP(_srcAddr[0]), Port: port} - s.sc.UDPListener.SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout))) - _, err = s.sc.UDPListener.WriteToUDP(body, dstAddr) - s.sc.UDPListener.SetDeadline(time.Time{}) - if err != nil { - s.log.Printf("udp response to local %s fail,ERR:%s", srcAddrFromConn, err) - continue - } - //s.log.Printf("udp response to local %s success , %v", srcAddrFromConn, body) - } - }(outConn, ID) - break - } + <-timer.C + gcKeys := []string{} + s.udpConns.IterCb(func(key string, v interface{}) { + if time.Now().Unix()-v.(*UDPConnItem).touchtime > gctime { + (*(v.(*UDPConnItem).conn)).Close() + gcKeys = append(gcKeys, key) + s.log.Printf("gc udp conn %s", v.(*UDPConnItem).connid) } + }) + for _, k := range gcKeys { + s.udpConns.Remove(k) } - outConn.SetWriteDeadline(time.Now().Add(time.Second)) - _, err = outConn.Write(utils.UDPPacket(item.srcAddr.String(), *item.packet)) - outConn.SetWriteDeadline(time.Time{}) - if err != nil { - utils.CloseConn(&outConn) - outConn = nil - s.log.Printf("write udp packet to %s fail ,flush err:%s ,retrying...", *s.cfg.Parent, err) - goto RETRY - } - //s.log.Printf("write packet %v", *item.packet) + gcKeys = nil + } + }() +} +func (s *MuxServer) UDPSend(data []byte, localAddr, srcAddr *net.UDPAddr) { + var ( + uc *UDPConnItem + key = srcAddr.String() + ID string + err error + outconn net.Conn + ) + v, ok := s.udpConns.Get(key) + if !ok { + for { + outconn, ID, err = s.GetOutConn() + if err != nil && strings.Contains(err.Error(), "can not connect at same time") { + time.Sleep(time.Millisecond * 500) + continue + } else { + break + } + } + if err != nil { + s.log.Printf("connect to %s fail, err: %s", *s.cfg.Parent, err) + return + } + uc = &UDPConnItem{ + conn: &outconn, + srcAddr: srcAddr, + localAddr: localAddr, + connid: ID, + } + s.udpConns.Set(key, uc) + s.UDPRevecive(key, ID) + } else { + uc = v.(*UDPConnItem) + } + go func() { + defer func() { + if e := recover(); e != nil { + (*uc.conn).Close() + s.udpConns.Remove(key) + s.log.Printf("udp sender crashed with error : %s", e) + } + }() + uc.touchtime = time.Now().Unix() + (*uc.conn).SetWriteDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout))) + _, err = (*uc.conn).Write(utils.UDPPacket(srcAddr.String(), data)) + (*uc.conn).SetWriteDeadline(time.Time{}) + if err != nil { + s.log.Printf("write udp packet to %s fail ,flush err:%s ", *s.cfg.Parent, err) + } + }() +} +func (s *MuxServer) UDPRevecive(key, ID string) { + go func() { + s.log.Printf("udp conn %s connected", ID) + var uc *UDPConnItem + defer func() { + if uc != nil { + (*uc.conn).Close() + } + s.udpConns.Remove(key) + s.log.Printf("udp conn %s released", ID) + }() + v, ok := s.udpConns.Get(key) + if !ok { + s.log.Printf("[warn] udp conn not exists for %s, connid : %s", key, ID) + return + } + uc = v.(*UDPConnItem) + for { + _, body, err := utils.ReadUDPPacket(*uc.conn) + if err != nil { + if strings.Contains(err.Error(), "n != int(") { + continue + } + if err != io.EOF { + s.log.Printf("udp conn read udp packet fail , err: %s ", err) + } + return + } + uc.touchtime = time.Now().Unix() + go s.sc.UDPListener.WriteToUDP(body, uc.srcAddr) } }() } diff --git a/services/service.go b/services/service.go index d126642..1be1fce 100644 --- a/services/service.go +++ b/services/service.go @@ -39,6 +39,7 @@ func GetService(name string) *ServiceItem { func Stop(name string) { if s, ok := servicesMap.Load(name); ok && s.(*ServiceItem).S != nil { s.(*ServiceItem).S.Clean() + servicesMap.Delete(name) } } func Run(name string, args interface{}) (service *ServiceItem, err error) { diff --git a/services/sps/sps.go b/services/sps/sps.go index 343f69b..1da298b 100644 --- a/services/sps/sps.go +++ b/services/sps/sps.go @@ -345,7 +345,10 @@ func (s *SPS) OutToTCP(inConn *net.Conn) (err error) { isHTTPS = true pb.Write([]byte(fmt.Sprintf("CONNECT %s HTTP/1.1\r\n", address))) } + pb.WriteString(fmt.Sprintf("Host: %s\r\n", address)) + pb.WriteString(fmt.Sprintf("Proxy-Host: %s\r\n", address)) pb.WriteString("Proxy-Connection: Keep-Alive\r\n") + pb.WriteString("Connection: Keep-Alive\r\n") u := "" if *s.cfg.ParentAuth != "" { diff --git a/utils/functions.go b/utils/functions.go index 9cca81a..28064c2 100755 --- a/utils/functions.go +++ b/utils/functions.go @@ -585,6 +585,26 @@ func RemoveProxyHeaders(head []byte) []byte { func InsertProxyHeaders(head []byte, headers string) []byte { return bytes.Replace(head, []byte("\r\n"), []byte("\r\n"+headers), 1) } +func IsNetClosedErr(err error) bool { + return err != nil && strings.Contains(err.Error(), "use of closed network connection") +} +func IsNetTimeoutErr(err error) bool { + if err == nil { + return false + } + e, ok := err.(net.Error) + return ok && e.Timeout() +} +func IsNetDeadlineErr(err error) bool { + return err != nil && strings.Contains(err.Error(), "i/o deadline reached") +} + +func IsNetRefusedErr(err error) bool { + return err != nil && strings.Contains(err.Error(), "connection refused") +} +func IsNetSocketNotConnectedErr(err error) bool { + return err != nil && strings.Contains(err.Error(), "socket is not connected") +} // type sockaddr struct { // family uint16 diff --git a/utils/jumper/jumper.go b/utils/jumper/jumper.go index e66c85a..11fe4eb 100644 --- a/utils/jumper/jumper.go +++ b/utils/jumper/jumper.go @@ -51,7 +51,10 @@ func (j *Jumper) dialHTTPS(address string, timeout time.Duration) (conn net.Conn } pb := new(bytes.Buffer) pb.Write([]byte(fmt.Sprintf("CONNECT %s HTTP/1.1\r\n", address))) + pb.WriteString(fmt.Sprintf("Host: %s\r\n", address)) + pb.WriteString(fmt.Sprintf("Proxy-Host: %s\r\n", address)) pb.WriteString("Proxy-Connection: Keep-Alive\r\n") + pb.WriteString("Connection: Keep-Alive\r\n") if j.proxyURL.User != nil { p, _ := j.proxyURL.User.Password() u := fmt.Sprintf("%s:%s", j.proxyURL.User.Username(), p)