From 02513a94497528c978a83f0a678ae3b9cbba9e9d Mon Sep 17 00:00:00 2001 From: "arraykeys@gmail.com" Date: Wed, 19 Sep 2018 11:34:34 +0800 Subject: [PATCH] =?UTF-8?q?fix=20#146=20server=20channel=E7=BB=9F=E4=B8=80?= =?UTF-8?q?=E4=BD=BF=E7=94=A8core?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README_ZH.md | 4 +- core/cs/server/server.go | 10 +- services/http/http.go | 9 +- services/mux/mux_server.go | 7 +- services/socks/socks.go | 15 +- services/sps/sps.go | 9 +- services/tcp/tcp.go | 7 +- services/tunnel/tunnel_bridge.go | 5 +- services/tunnel/tunnel_server.go | 13 +- services/udp/udp.go | 17 +-- utils/serve-channel.go | 230 ------------------------------- 11 files changed, 52 insertions(+), 274 deletions(-) delete mode 100644 utils/serve-channel.go diff --git a/README_ZH.md b/README_ZH.md index cd0611e..86c687e 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -525,7 +525,7 @@ HTTP(S)代理支持上级负载均衡,多个上级重复-P参数即可. #### **1.14.3 使用目标地址选择上级** -`proxy http --lb-hashtarget --lb-method=leasttime -T tcp -P 1.1.1.1:33080 -P 2.1.1.1:33080 -P 3.1.1.1:33080 -t tcp -p :33080` +`proxy http --lb-hashtarget --lb-method=hash -T tcp -P 1.1.1.1:33080 -P 2.1.1.1:33080 -P 3.1.1.1:33080 -t tcp -p :33080` ### **1.15 限速** @@ -991,7 +991,7 @@ SOCKS代理支持上级负载均衡,多个上级重复-P参数即可. #### **5.12.3 使用目标地址选择上级** -`proxy socks --lb-hashtarget --lb-method=leasttime -T tcp -P 1.1.1.1:33080 -P 2.1.1.1:33080 -P 3.1.1.1:33080 -p :33080 -t tcp` +`proxy socks --lb-hashtarget --lb-method=hash -T tcp -P 1.1.1.1:33080 -P 2.1.1.1:33080 -P 3.1.1.1:33080 -p :33080 -t tcp` #### **5.13 限速** diff --git a/core/cs/server/server.go b/core/cs/server/server.go index 2a3a25d..fd83c75 100644 --- a/core/cs/server/server.go +++ b/core/cs/server/server.go @@ -120,7 +120,7 @@ func (s *ServerChannel) listenTLS(ip string, port int, certBytes, keyBytes, caCe config.ClientCAs = clientCertPool config.ClientAuth = tls.RequireAndVerifyClientCert } - _ln, err := tls.Listen("tcp", fmt.Sprintf("%s:%d", ip, port), config) + _ln, err := tls.Listen("tcp", net.JoinHostPort(ip, fmt.Sprintf("%d", port)), config) if err == nil { ln = &_ln } @@ -141,7 +141,7 @@ func (s *ServerChannel) ListenTCPS(method, password string, compress bool, fn fu } func (s *ServerChannel) ListenTCP(fn func(conn net.Conn)) (err error) { var l net.Listener - l, err = net.Listen("tcp", fmt.Sprintf("%s:%d", s.ip, s.port)) + l, err = net.Listen("tcp", net.JoinHostPort(s.ip, fmt.Sprintf("%d", s.port))) if err == nil { s.Listener = &l go func() { @@ -172,7 +172,7 @@ func (s *ServerChannel) ListenTCP(fn func(conn net.Conn)) (err error) { } return } -func (s *ServerChannel) ListenUDP(fn func(packet []byte, localAddr, srcAddr *net.UDPAddr)) (err error) { +func (s *ServerChannel) ListenUDP(fn func(listener *net.UDPConn, packet []byte, localAddr, srcAddr *net.UDPAddr)) (err error) { addr := &net.UDPAddr{IP: net.ParseIP(s.ip), Port: s.port} l, err := net.ListenUDP("udp", addr) if err == nil { @@ -194,7 +194,7 @@ func (s *ServerChannel) ListenUDP(fn func(packet []byte, localAddr, srcAddr *net s.log.Printf("udp data handler crashed , err : %s , \ntrace:%s", e, string(debug.Stack())) } }() - fn(packet, addr, srcAddr) + fn(s.UDPListener, packet, addr, srcAddr) }() } else { s.errAcceptHandler(err) @@ -207,7 +207,7 @@ func (s *ServerChannel) ListenUDP(fn func(packet []byte, localAddr, srcAddr *net return } func (s *ServerChannel) ListenKCP(config kcpcfg.KCPConfigArgs, fn func(conn net.Conn), log *logger.Logger) (err error) { - lis, err := kcp.ListenWithOptions(fmt.Sprintf("%s:%d", s.ip, s.port), config.Block, *config.DataShard, *config.ParityShard) + lis, err := kcp.ListenWithOptions(net.JoinHostPort(s.ip, fmt.Sprintf("%d", s.port)), config.Block, *config.DataShard, *config.ParityShard) if err == nil { if err = lis.SetDSCP(*config.DSCP); err != nil { log.Println("SetDSCP:", err) diff --git a/services/http/http.go b/services/http/http.go index a2dd03a..5e5008a 100644 --- a/services/http/http.go +++ b/services/http/http.go @@ -12,6 +12,7 @@ import ( "strings" "time" + server "github.com/snail007/goproxy/core/cs/server" "github.com/snail007/goproxy/core/lib/kcpcfg" "github.com/snail007/goproxy/services" "github.com/snail007/goproxy/utils/datasize" @@ -83,7 +84,7 @@ type HTTP struct { lockChn chan bool domainResolver dnsx.DomainResolver isStop bool - serverChannels []*utils.ServerChannel + serverChannels []*server.ServerChannel userConns mapx.ConcurrentMap log *logger.Logger lb *lb.Group @@ -96,7 +97,7 @@ func NewHTTP() services.Service { basicAuth: utils.BasicAuth{}, lockChn: make(chan bool, 1), isStop: false, - serverChannels: []*utils.ServerChannel{}, + serverChannels: []*server.ServerChannel{}, userConns: mapx.NewConcurrentMap(), } } @@ -273,11 +274,11 @@ func (s *HTTP) Start(args interface{}, log *logger.Logger) (err error) { if addr != "" { host, port, _ := net.SplitHostPort(addr) p, _ := strconv.Atoi(port) - sc := utils.NewServerChannel(host, p, s.log) + sc := server.NewServerChannel(host, p, s.log) if *s.cfg.LocalType == "tcp" { err = sc.ListenTCP(s.callback) } else if *s.cfg.LocalType == "tls" { - err = sc.ListenTls(s.cfg.CertBytes, s.cfg.KeyBytes, s.cfg.CaCertBytes, s.callback) + err = sc.ListenTLS(s.cfg.CertBytes, s.cfg.KeyBytes, s.cfg.CaCertBytes, s.callback) } else if *s.cfg.LocalType == "kcp" { err = sc.ListenKCP(s.cfg.KCP, s.callback, s.log) } diff --git a/services/mux/mux_server.go b/services/mux/mux_server.go index caa1bc5..5d5fb2f 100644 --- a/services/mux/mux_server.go +++ b/services/mux/mux_server.go @@ -13,6 +13,7 @@ import ( "time" clienttransport "github.com/snail007/goproxy/core/cs/client" + server "github.com/snail007/goproxy/core/cs/server" "github.com/snail007/goproxy/core/lib/kcpcfg" encryptconn "github.com/snail007/goproxy/core/lib/transport/encrypt" "github.com/snail007/goproxy/services" @@ -56,7 +57,7 @@ type MuxServerArgs struct { } type MuxServer struct { cfg MuxServerArgs - sc utils.ServerChannel + sc server.ServerChannel sessions mapx.ConcurrentMap lockChn chan bool isStop bool @@ -212,7 +213,7 @@ func (s *MuxServer) StopService() { s.jumper = nil s.lockChn = nil s.log = nil - s.sc = utils.ServerChannel{} + s.sc = server.ServerChannel{} s.sessions = nil s.udpConns = nil s = nil @@ -264,7 +265,7 @@ func (s *MuxServer) Start(args interface{}, log *logger.Logger) (err error) { } host, port, _ := net.SplitHostPort(*s.cfg.Local) p, _ := strconv.Atoi(port) - s.sc = utils.NewServerChannel(host, p, s.log) + s.sc = server.NewServerChannel(host, p, s.log) if *s.cfg.IsUDP { err = s.sc.ListenUDP(func(listener *net.UDPConn, packet []byte, localAddr, srcAddr *net.UDPAddr) { s.UDPSend(packet, localAddr, srcAddr) diff --git a/services/socks/socks.go b/services/socks/socks.go index 14abee8..5f84431 100644 --- a/services/socks/socks.go +++ b/services/socks/socks.go @@ -12,8 +12,9 @@ import ( "strings" "time" - "github.com/snail007/goproxy/services" + server "github.com/snail007/goproxy/core/cs/server" "github.com/snail007/goproxy/core/lib/kcpcfg" + "github.com/snail007/goproxy/services" "github.com/snail007/goproxy/utils" "github.com/snail007/goproxy/utils/conncrypt" "github.com/snail007/goproxy/utils/datasize" @@ -80,8 +81,8 @@ type Socks struct { basicAuth utils.BasicAuth sshClient *ssh.Client lockChn chan bool - udpSC utils.ServerChannel - sc *utils.ServerChannel + udpSC server.ServerChannel + sc *server.ServerChannel domainResolver dnsx.DomainResolver isStop bool userConns mapx.ConcurrentMap @@ -191,7 +192,7 @@ func (s *Socks) InitService() (err error) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:",e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) } }() //循环检查ssh网络连通性 @@ -243,7 +244,7 @@ func (s *Socks) StopService() { s.udpLocalKey = nil s.udpParentKey = nil s.udpRelatedPacketConns = nil - s.udpSC = utils.ServerChannel{} + s.udpSC = server.ServerChannel{} s.userConns = nil s = nil }() @@ -283,11 +284,11 @@ func (s *Socks) Start(args interface{}, log *logger.Logger) (err error) { if len(*s.cfg.Parent) > 0 { s.log.Printf("use %s parent %v [ %s ]", *s.cfg.ParentType, *s.cfg.Parent, strings.ToUpper(*s.cfg.LoadBalanceMethod)) } - sc := utils.NewServerChannelHost(*s.cfg.Local, s.log) + sc := server.NewServerChannelHost(*s.cfg.Local, s.log) if *s.cfg.LocalType == "tcp" { err = sc.ListenTCP(s.socksConnCallback) } else if *s.cfg.LocalType == "tls" { - err = sc.ListenTls(s.cfg.CertBytes, s.cfg.KeyBytes, s.cfg.CaCertBytes, s.socksConnCallback) + err = sc.ListenTLS(s.cfg.CertBytes, s.cfg.KeyBytes, s.cfg.CaCertBytes, s.socksConnCallback) } else if *s.cfg.LocalType == "kcp" { err = sc.ListenKCP(s.cfg.KCP, s.socksConnCallback, s.log) } diff --git a/services/sps/sps.go b/services/sps/sps.go index 589cbeb..b1e5c91 100644 --- a/services/sps/sps.go +++ b/services/sps/sps.go @@ -15,6 +15,7 @@ import ( "strings" "time" + "github.com/snail007/goproxy/core/cs/server" "github.com/snail007/goproxy/core/lib/kcpcfg" "github.com/snail007/goproxy/services" "github.com/snail007/goproxy/utils" @@ -78,7 +79,7 @@ type SPS struct { cfg SPSArgs domainResolver dnsx.DomainResolver basicAuth utils.BasicAuth - serverChannels []*utils.ServerChannel + serverChannels []*server.ServerChannel userConns mapx.ConcurrentMap log *logger.Logger localCipher *ss.Cipher @@ -93,7 +94,7 @@ func NewSPS() services.Service { return &SPS{ cfg: SPSArgs{}, basicAuth: utils.BasicAuth{}, - serverChannels: []*utils.ServerChannel{}, + serverChannels: []*server.ServerChannel{}, userConns: mapx.NewConcurrentMap(), udpRelatedPacketConns: mapx.NewConcurrentMap(), } @@ -230,12 +231,12 @@ func (s *SPS) Start(args interface{}, log *logger.Logger) (err error) { if addr != "" { host, port, _ := net.SplitHostPort(addr) p, _ := strconv.Atoi(port) - sc := utils.NewServerChannel(host, p, s.log) + sc := server.NewServerChannel(host, p, s.log) s.serverChannels = append(s.serverChannels, &sc) if *s.cfg.LocalType == "tcp" { err = sc.ListenTCP(s.callback) } else if *s.cfg.LocalType == "tls" { - err = sc.ListenTls(s.cfg.CertBytes, s.cfg.KeyBytes, s.cfg.CaCertBytes, s.callback) + err = sc.ListenTLS(s.cfg.CertBytes, s.cfg.KeyBytes, s.cfg.CaCertBytes, s.callback) } else if *s.cfg.LocalType == "tcp" { err = sc.ListenKCP(s.cfg.KCP, s.callback, s.log) } diff --git a/services/tcp/tcp.go b/services/tcp/tcp.go index 58c0c5a..c6ae279 100644 --- a/services/tcp/tcp.go +++ b/services/tcp/tcp.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/snail007/goproxy/core/cs/server" "github.com/snail007/goproxy/core/lib/kcpcfg" "github.com/snail007/goproxy/services" "github.com/snail007/goproxy/utils" @@ -43,7 +44,7 @@ type UDPConnItem struct { } type TCP struct { cfg TCPArgs - sc *utils.ServerChannel + sc *server.ServerChannel isStop bool userConns mapx.ConcurrentMap log *logger.Logger @@ -131,12 +132,12 @@ func (s *TCP) Start(args interface{}, log *logger.Logger) (err error) { s.log.Printf("use %s parent %v", *s.cfg.ParentType, *s.cfg.Parent) host, port, _ := net.SplitHostPort(*s.cfg.Local) p, _ := strconv.Atoi(port) - sc := utils.NewServerChannel(host, p, s.log) + sc := server.NewServerChannel(host, p, s.log) if *s.cfg.LocalType == "tcp" { err = sc.ListenTCP(s.callback) } else if *s.cfg.LocalType == "tls" { - err = sc.ListenTls(s.cfg.CertBytes, s.cfg.KeyBytes, nil, s.callback) + err = sc.ListenTLS(s.cfg.CertBytes, s.cfg.KeyBytes, nil, s.callback) } else if *s.cfg.LocalType == "kcp" { err = sc.ListenKCP(s.cfg.KCP, s.callback, s.log) } diff --git a/services/tunnel/tunnel_bridge.go b/services/tunnel/tunnel_bridge.go index 6dbc21b..5cdc7f3 100644 --- a/services/tunnel/tunnel_bridge.go +++ b/services/tunnel/tunnel_bridge.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/snail007/goproxy/core/cs/server" "github.com/snail007/goproxy/services" "github.com/snail007/goproxy/utils" "github.com/snail007/goproxy/utils/mapx" @@ -98,9 +99,9 @@ func (s *TunnelBridge) Start(args interface{}, log *logger.Logger) (err error) { } host, port, _ := net.SplitHostPort(*s.cfg.Local) p, _ := strconv.Atoi(port) - sc := utils.NewServerChannel(host, p, s.log) + sc := server.NewServerChannel(host, p, s.log) - err = sc.ListenTls(s.cfg.CertBytes, s.cfg.KeyBytes, nil, s.callback) + err = sc.ListenTLS(s.cfg.CertBytes, s.cfg.KeyBytes, nil, s.callback) if err != nil { return } diff --git a/services/tunnel/tunnel_server.go b/services/tunnel/tunnel_server.go index 32fc8f0..57995a7 100644 --- a/services/tunnel/tunnel_server.go +++ b/services/tunnel/tunnel_server.go @@ -12,6 +12,7 @@ import ( "strings" "time" + "github.com/snail007/goproxy/core/cs/server" "github.com/snail007/goproxy/services" "github.com/snail007/goproxy/utils" "github.com/snail007/goproxy/utils/jumper" @@ -38,7 +39,7 @@ type TunnelServerArgs struct { } type TunnelServer struct { cfg TunnelServerArgs - sc utils.ServerChannel + sc server.ServerChannel isStop bool udpConn *net.Conn userConns mapx.ConcurrentMap @@ -180,7 +181,7 @@ func (s *TunnelServer) StopService() { s.cfg = TunnelServerArgs{} s.jumper = nil s.log = nil - s.sc = utils.ServerChannel{} + s.sc = server.ServerChannel{} s.udpConn = nil s.udpConns = nil s.userConns = nil @@ -233,7 +234,7 @@ func (s *TunnelServer) Start(args interface{}, log *logger.Logger) (err error) { } host, port, _ := net.SplitHostPort(*s.cfg.Local) p, _ := strconv.Atoi(port) - s.sc = utils.NewServerChannel(host, p, s.log) + s.sc = server.NewServerChannel(host, p, s.log) if *s.cfg.IsUDP { err = s.sc.ListenUDP(func(listener *net.UDPConn, packet []byte, localAddr, srcAddr *net.UDPAddr) { s.UDPSend(packet, localAddr, srcAddr) @@ -368,7 +369,7 @@ func (s *TunnelServer) UDPGCDeamon() { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:",e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) } }() if s.isStop { @@ -439,7 +440,7 @@ func (s *TunnelServer) UDPRevecive(key, ID string) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:",e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) } }() s.log.Printf("udp conn %s connected", ID) @@ -472,7 +473,7 @@ func (s *TunnelServer) UDPRevecive(key, ID string) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:",e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) } }() s.sc.UDPListener.WriteToUDP(body, uc.srcAddr) diff --git a/services/udp/udp.go b/services/udp/udp.go index 9ac55e9..b50f885 100644 --- a/services/udp/udp.go +++ b/services/udp/udp.go @@ -11,6 +11,7 @@ import ( "strings" "time" + "github.com/snail007/goproxy/core/cs/server" "github.com/snail007/goproxy/services" "github.com/snail007/goproxy/utils" "github.com/snail007/goproxy/utils/mapx" @@ -30,7 +31,7 @@ type UDPArgs struct { type UDP struct { p mapx.ConcurrentMap cfg UDPArgs - sc *utils.ServerChannel + sc *server.ServerChannel isStop bool log *logger.Logger outUDPConnCtxMap mapx.ConcurrentMap @@ -121,7 +122,7 @@ func (s *UDP) Start(args interface{}, log *logger.Logger) (err error) { } host, port, _ := net.SplitHostPort(*s.cfg.Local) p, _ := strconv.Atoi(port) - sc := utils.NewServerChannel(host, p, s.log) + sc := server.NewServerChannel(host, p, s.log) s.sc = &sc err = sc.ListenUDP(s.callback) if err != nil { @@ -173,7 +174,7 @@ func (s *UDP) OutToUDPGCDeamon() { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:",e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) } }() if s.isStop { @@ -215,7 +216,7 @@ func (s *UDP) OutToUDP(packet []byte, localAddr, srcAddr *net.UDPAddr) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:",e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) } }() s.log.Printf("udp conn %s <--> %s connected", srcAddr.String(), localAddr.String()) @@ -237,7 +238,7 @@ func (s *UDP) OutToUDP(packet []byte, localAddr, srcAddr *net.UDPAddr) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:",e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) } }() (*(s.sc).UDPListener).SetWriteDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout))) @@ -274,7 +275,7 @@ func (s *UDP) UDPGCDeamon() { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:",e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) } }() if s.isStop { @@ -351,7 +352,7 @@ func (s *UDP) UDPRevecive(key string) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:",e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) } }() s.log.Printf("udp conn %s connected", key) @@ -384,7 +385,7 @@ func (s *UDP) UDPRevecive(key string) { go func() { defer func() { if e := recover(); e != nil { - fmt.Printf("crashed, err: %s\nstack:",e, string(debug.Stack())) + fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack())) } }() s.sc.UDPListener.WriteToUDP(body, uc.srcAddr) diff --git a/utils/serve-channel.go b/utils/serve-channel.go deleted file mode 100644 index 84bb578..0000000 --- a/utils/serve-channel.go +++ /dev/null @@ -1,230 +0,0 @@ -package utils - -import ( - "crypto/tls" - "crypto/x509" - "errors" - "fmt" - logger "log" - "net" - "runtime/debug" - "strconv" - - "github.com/snail007/goproxy/core/lib/kcpcfg" - - kcp "github.com/xtaci/kcp-go" -) - -type ServerChannel struct { - ip string - port int - Listener *net.Listener - UDPListener *net.UDPConn - errAcceptHandler func(err error) - log *logger.Logger -} - -func NewServerChannel(ip string, port int, log *logger.Logger) ServerChannel { - return ServerChannel{ - ip: ip, - port: port, - log: log, - errAcceptHandler: func(err error) { - log.Printf("accept error , ERR:%s", err) - }, - } -} -func NewServerChannelHost(host string, log *logger.Logger) ServerChannel { - h, port, _ := net.SplitHostPort(host) - p, _ := strconv.Atoi(port) - return ServerChannel{ - ip: h, - port: p, - log: log, - errAcceptHandler: func(err error) { - log.Printf("accept error , ERR:%s", err) - }, - } -} -func (sc *ServerChannel) SetErrAcceptHandler(fn func(err error)) { - sc.errAcceptHandler = fn -} -func (sc *ServerChannel) ListenTls(certBytes, keyBytes, caCertBytes []byte, fn func(conn net.Conn)) (err error) { - sc.Listener, err = sc.listenTls(sc.ip, sc.port, certBytes, keyBytes, caCertBytes) - if err == nil { - go func() { - defer func() { - if e := recover(); e != nil { - sc.log.Printf("ListenTls crashed , err : %s , \ntrace:%s", e, string(debug.Stack())) - } - }() - for { - var conn net.Conn - conn, err = (*sc.Listener).Accept() - if err == nil { - go func() { - defer func() { - if e := recover(); e != nil { - sc.log.Printf("tls connection handler crashed , err : %s , \ntrace:%s", e, string(debug.Stack())) - } - }() - fn(conn) - }() - } else { - sc.errAcceptHandler(err) - (*sc.Listener).Close() - break - } - } - }() - } - return -} -func (sc *ServerChannel) listenTls(ip string, port int, certBytes, keyBytes, caCertBytes []byte) (ln *net.Listener, err error) { - - var cert tls.Certificate - cert, err = tls.X509KeyPair(certBytes, keyBytes) - if err != nil { - return - } - clientCertPool := x509.NewCertPool() - caBytes := certBytes - if caCertBytes != nil { - caBytes = caCertBytes - } - ok := clientCertPool.AppendCertsFromPEM(caBytes) - if !ok { - err = errors.New("failed to parse root certificate") - } - config := &tls.Config{ - ClientCAs: clientCertPool, - Certificates: []tls.Certificate{cert}, - ClientAuth: tls.RequireAndVerifyClientCert, - } - _ln, err := tls.Listen("tcp", fmt.Sprintf("%s:%d", ip, port), config) - if err == nil { - ln = &_ln - } - return -} -func (sc *ServerChannel) ListenTCP(fn func(conn net.Conn)) (err error) { - var l net.Listener - l, err = net.Listen("tcp", fmt.Sprintf("%s:%d", sc.ip, sc.port)) - if err == nil { - sc.Listener = &l - go func() { - defer func() { - if e := recover(); e != nil { - sc.log.Printf("ListenTCP crashed , err : %s , \ntrace:%s", e, string(debug.Stack())) - } - }() - for { - var conn net.Conn - conn, err = (*sc.Listener).Accept() - if err == nil { - go func() { - defer func() { - if e := recover(); e != nil { - sc.log.Printf("tcp connection handler crashed , err : %s , \ntrace:%s", e, string(debug.Stack())) - } - }() - fn(conn) - }() - } else { - sc.errAcceptHandler(err) - break - } - } - }() - } - return -} -func (sc *ServerChannel) ListenUDP(fn func(listener *net.UDPConn, packet []byte, localAddr, srcAddr *net.UDPAddr)) (err error) { - addr := &net.UDPAddr{IP: net.ParseIP(sc.ip), Port: sc.port} - l, err := net.ListenUDP("udp", addr) - if err == nil { - sc.UDPListener = l - go func() { - defer func() { - if e := recover(); e != nil { - sc.log.Printf("ListenUDP crashed , err : %s , \ntrace:%s", e, string(debug.Stack())) - } - }() - for { - var buf = make([]byte, 2048) - n, srcAddr, err := (*sc.UDPListener).ReadFromUDP(buf) - if err == nil { - packet := buf[0:n] - go func() { - defer func() { - if e := recover(); e != nil { - sc.log.Printf("udp data handler crashed , err : %s , \ntrace:%s", e, string(debug.Stack())) - } - }() - fn(l, packet, addr, srcAddr) - }() - } else { - sc.errAcceptHandler(err) - break - } - } - }() - } - return -} - -func (sc *ServerChannel) ListenKCP(config kcpcfg.KCPConfigArgs, fn func(conn net.Conn), log *logger.Logger) (err error) { - lis, err := kcp.ListenWithOptions(fmt.Sprintf("%s:%d", sc.ip, sc.port), config.Block, *config.DataShard, *config.ParityShard) - if err == nil { - if err = lis.SetDSCP(*config.DSCP); err != nil { - log.Println("SetDSCP:", err) - return - } - if err = lis.SetReadBuffer(*config.SockBuf); err != nil { - log.Println("SetReadBuffer:", err) - return - } - if err = lis.SetWriteBuffer(*config.SockBuf); err != nil { - log.Println("SetWriteBuffer:", err) - return - } - sc.Listener = new(net.Listener) - *sc.Listener = lis - go func() { - defer func() { - if e := recover(); e != nil { - sc.log.Printf("ListenKCP crashed , err : %s , \ntrace:%s", e, string(debug.Stack())) - } - }() - for { - //var conn net.Conn - conn, err := lis.AcceptKCP() - if err == nil { - go func() { - defer func() { - if e := recover(); e != nil { - sc.log.Printf("kcp connection handler crashed , err : %s , \ntrace:%s", e, string(debug.Stack())) - } - }() - conn.SetStreamMode(true) - conn.SetWriteDelay(true) - conn.SetNoDelay(*config.NoDelay, *config.Interval, *config.Resend, *config.NoCongestion) - conn.SetMtu(*config.MTU) - conn.SetWindowSize(*config.SndWnd, *config.RcvWnd) - conn.SetACKNoDelay(*config.AckNodelay) - if *config.NoComp { - fn(conn) - } else { - cconn := NewCompStream(conn) - fn(cconn) - } - }() - } else { - sc.errAcceptHandler(err) - break - } - } - }() - } - return -}