server channel统一使用core
This commit is contained in:
arraykeys@gmail.com
2018-09-19 11:34:34 +08:00
parent 0859452475
commit 02513a9449
11 changed files with 52 additions and 274 deletions

View File

@ -525,7 +525,7 @@ HTTP(S)代理支持上级负载均衡,多个上级重复-P参数即可.
#### **1.14.3 使用目标地址选择上级**
`proxy http --lb-hashtarget --lb-method=leasttime -T tcp -P 1.1.1.1:33080 -P 2.1.1.1:33080 -P 3.1.1.1:33080 -t tcp -p :33080`
`proxy http --lb-hashtarget --lb-method=hash -T tcp -P 1.1.1.1:33080 -P 2.1.1.1:33080 -P 3.1.1.1:33080 -t tcp -p :33080`
### **1.15 限速**
@ -991,7 +991,7 @@ SOCKS代理支持上级负载均衡,多个上级重复-P参数即可.
#### **5.12.3 使用目标地址选择上级**
`proxy socks --lb-hashtarget --lb-method=leasttime -T tcp -P 1.1.1.1:33080 -P 2.1.1.1:33080 -P 3.1.1.1:33080 -p :33080 -t tcp`
`proxy socks --lb-hashtarget --lb-method=hash -T tcp -P 1.1.1.1:33080 -P 2.1.1.1:33080 -P 3.1.1.1:33080 -p :33080 -t tcp`
#### **5.13 限速**

View File

@ -120,7 +120,7 @@ func (s *ServerChannel) listenTLS(ip string, port int, certBytes, keyBytes, caCe
config.ClientCAs = clientCertPool
config.ClientAuth = tls.RequireAndVerifyClientCert
}
_ln, err := tls.Listen("tcp", fmt.Sprintf("%s:%d", ip, port), config)
_ln, err := tls.Listen("tcp", net.JoinHostPort(ip, fmt.Sprintf("%d", port)), config)
if err == nil {
ln = &_ln
}
@ -141,7 +141,7 @@ func (s *ServerChannel) ListenTCPS(method, password string, compress bool, fn fu
}
func (s *ServerChannel) ListenTCP(fn func(conn net.Conn)) (err error) {
var l net.Listener
l, err = net.Listen("tcp", fmt.Sprintf("%s:%d", s.ip, s.port))
l, err = net.Listen("tcp", net.JoinHostPort(s.ip, fmt.Sprintf("%d", s.port)))
if err == nil {
s.Listener = &l
go func() {
@ -172,7 +172,7 @@ func (s *ServerChannel) ListenTCP(fn func(conn net.Conn)) (err error) {
}
return
}
func (s *ServerChannel) ListenUDP(fn func(packet []byte, localAddr, srcAddr *net.UDPAddr)) (err error) {
func (s *ServerChannel) ListenUDP(fn func(listener *net.UDPConn, packet []byte, localAddr, srcAddr *net.UDPAddr)) (err error) {
addr := &net.UDPAddr{IP: net.ParseIP(s.ip), Port: s.port}
l, err := net.ListenUDP("udp", addr)
if err == nil {
@ -194,7 +194,7 @@ func (s *ServerChannel) ListenUDP(fn func(packet []byte, localAddr, srcAddr *net
s.log.Printf("udp data handler crashed , err : %s , \ntrace:%s", e, string(debug.Stack()))
}
}()
fn(packet, addr, srcAddr)
fn(s.UDPListener, packet, addr, srcAddr)
}()
} else {
s.errAcceptHandler(err)
@ -207,7 +207,7 @@ func (s *ServerChannel) ListenUDP(fn func(packet []byte, localAddr, srcAddr *net
return
}
func (s *ServerChannel) ListenKCP(config kcpcfg.KCPConfigArgs, fn func(conn net.Conn), log *logger.Logger) (err error) {
lis, err := kcp.ListenWithOptions(fmt.Sprintf("%s:%d", s.ip, s.port), config.Block, *config.DataShard, *config.ParityShard)
lis, err := kcp.ListenWithOptions(net.JoinHostPort(s.ip, fmt.Sprintf("%d", s.port)), config.Block, *config.DataShard, *config.ParityShard)
if err == nil {
if err = lis.SetDSCP(*config.DSCP); err != nil {
log.Println("SetDSCP:", err)

View File

@ -12,6 +12,7 @@ import (
"strings"
"time"
server "github.com/snail007/goproxy/core/cs/server"
"github.com/snail007/goproxy/core/lib/kcpcfg"
"github.com/snail007/goproxy/services"
"github.com/snail007/goproxy/utils/datasize"
@ -83,7 +84,7 @@ type HTTP struct {
lockChn chan bool
domainResolver dnsx.DomainResolver
isStop bool
serverChannels []*utils.ServerChannel
serverChannels []*server.ServerChannel
userConns mapx.ConcurrentMap
log *logger.Logger
lb *lb.Group
@ -96,7 +97,7 @@ func NewHTTP() services.Service {
basicAuth: utils.BasicAuth{},
lockChn: make(chan bool, 1),
isStop: false,
serverChannels: []*utils.ServerChannel{},
serverChannels: []*server.ServerChannel{},
userConns: mapx.NewConcurrentMap(),
}
}
@ -273,11 +274,11 @@ func (s *HTTP) Start(args interface{}, log *logger.Logger) (err error) {
if addr != "" {
host, port, _ := net.SplitHostPort(addr)
p, _ := strconv.Atoi(port)
sc := utils.NewServerChannel(host, p, s.log)
sc := server.NewServerChannel(host, p, s.log)
if *s.cfg.LocalType == "tcp" {
err = sc.ListenTCP(s.callback)
} else if *s.cfg.LocalType == "tls" {
err = sc.ListenTls(s.cfg.CertBytes, s.cfg.KeyBytes, s.cfg.CaCertBytes, s.callback)
err = sc.ListenTLS(s.cfg.CertBytes, s.cfg.KeyBytes, s.cfg.CaCertBytes, s.callback)
} else if *s.cfg.LocalType == "kcp" {
err = sc.ListenKCP(s.cfg.KCP, s.callback, s.log)
}

View File

@ -13,6 +13,7 @@ import (
"time"
clienttransport "github.com/snail007/goproxy/core/cs/client"
server "github.com/snail007/goproxy/core/cs/server"
"github.com/snail007/goproxy/core/lib/kcpcfg"
encryptconn "github.com/snail007/goproxy/core/lib/transport/encrypt"
"github.com/snail007/goproxy/services"
@ -56,7 +57,7 @@ type MuxServerArgs struct {
}
type MuxServer struct {
cfg MuxServerArgs
sc utils.ServerChannel
sc server.ServerChannel
sessions mapx.ConcurrentMap
lockChn chan bool
isStop bool
@ -212,7 +213,7 @@ func (s *MuxServer) StopService() {
s.jumper = nil
s.lockChn = nil
s.log = nil
s.sc = utils.ServerChannel{}
s.sc = server.ServerChannel{}
s.sessions = nil
s.udpConns = nil
s = nil
@ -264,7 +265,7 @@ func (s *MuxServer) Start(args interface{}, log *logger.Logger) (err error) {
}
host, port, _ := net.SplitHostPort(*s.cfg.Local)
p, _ := strconv.Atoi(port)
s.sc = utils.NewServerChannel(host, p, s.log)
s.sc = server.NewServerChannel(host, p, s.log)
if *s.cfg.IsUDP {
err = s.sc.ListenUDP(func(listener *net.UDPConn, packet []byte, localAddr, srcAddr *net.UDPAddr) {
s.UDPSend(packet, localAddr, srcAddr)

View File

@ -12,8 +12,9 @@ import (
"strings"
"time"
"github.com/snail007/goproxy/services"
server "github.com/snail007/goproxy/core/cs/server"
"github.com/snail007/goproxy/core/lib/kcpcfg"
"github.com/snail007/goproxy/services"
"github.com/snail007/goproxy/utils"
"github.com/snail007/goproxy/utils/conncrypt"
"github.com/snail007/goproxy/utils/datasize"
@ -80,8 +81,8 @@ type Socks struct {
basicAuth utils.BasicAuth
sshClient *ssh.Client
lockChn chan bool
udpSC utils.ServerChannel
sc *utils.ServerChannel
udpSC server.ServerChannel
sc *server.ServerChannel
domainResolver dnsx.DomainResolver
isStop bool
userConns mapx.ConcurrentMap
@ -243,7 +244,7 @@ func (s *Socks) StopService() {
s.udpLocalKey = nil
s.udpParentKey = nil
s.udpRelatedPacketConns = nil
s.udpSC = utils.ServerChannel{}
s.udpSC = server.ServerChannel{}
s.userConns = nil
s = nil
}()
@ -283,11 +284,11 @@ func (s *Socks) Start(args interface{}, log *logger.Logger) (err error) {
if len(*s.cfg.Parent) > 0 {
s.log.Printf("use %s parent %v [ %s ]", *s.cfg.ParentType, *s.cfg.Parent, strings.ToUpper(*s.cfg.LoadBalanceMethod))
}
sc := utils.NewServerChannelHost(*s.cfg.Local, s.log)
sc := server.NewServerChannelHost(*s.cfg.Local, s.log)
if *s.cfg.LocalType == "tcp" {
err = sc.ListenTCP(s.socksConnCallback)
} else if *s.cfg.LocalType == "tls" {
err = sc.ListenTls(s.cfg.CertBytes, s.cfg.KeyBytes, s.cfg.CaCertBytes, s.socksConnCallback)
err = sc.ListenTLS(s.cfg.CertBytes, s.cfg.KeyBytes, s.cfg.CaCertBytes, s.socksConnCallback)
} else if *s.cfg.LocalType == "kcp" {
err = sc.ListenKCP(s.cfg.KCP, s.socksConnCallback, s.log)
}

View File

@ -15,6 +15,7 @@ import (
"strings"
"time"
"github.com/snail007/goproxy/core/cs/server"
"github.com/snail007/goproxy/core/lib/kcpcfg"
"github.com/snail007/goproxy/services"
"github.com/snail007/goproxy/utils"
@ -78,7 +79,7 @@ type SPS struct {
cfg SPSArgs
domainResolver dnsx.DomainResolver
basicAuth utils.BasicAuth
serverChannels []*utils.ServerChannel
serverChannels []*server.ServerChannel
userConns mapx.ConcurrentMap
log *logger.Logger
localCipher *ss.Cipher
@ -93,7 +94,7 @@ func NewSPS() services.Service {
return &SPS{
cfg: SPSArgs{},
basicAuth: utils.BasicAuth{},
serverChannels: []*utils.ServerChannel{},
serverChannels: []*server.ServerChannel{},
userConns: mapx.NewConcurrentMap(),
udpRelatedPacketConns: mapx.NewConcurrentMap(),
}
@ -230,12 +231,12 @@ func (s *SPS) Start(args interface{}, log *logger.Logger) (err error) {
if addr != "" {
host, port, _ := net.SplitHostPort(addr)
p, _ := strconv.Atoi(port)
sc := utils.NewServerChannel(host, p, s.log)
sc := server.NewServerChannel(host, p, s.log)
s.serverChannels = append(s.serverChannels, &sc)
if *s.cfg.LocalType == "tcp" {
err = sc.ListenTCP(s.callback)
} else if *s.cfg.LocalType == "tls" {
err = sc.ListenTls(s.cfg.CertBytes, s.cfg.KeyBytes, s.cfg.CaCertBytes, s.callback)
err = sc.ListenTLS(s.cfg.CertBytes, s.cfg.KeyBytes, s.cfg.CaCertBytes, s.callback)
} else if *s.cfg.LocalType == "tcp" {
err = sc.ListenKCP(s.cfg.KCP, s.callback, s.log)
}

View File

@ -9,6 +9,7 @@ import (
"strings"
"time"
"github.com/snail007/goproxy/core/cs/server"
"github.com/snail007/goproxy/core/lib/kcpcfg"
"github.com/snail007/goproxy/services"
"github.com/snail007/goproxy/utils"
@ -43,7 +44,7 @@ type UDPConnItem struct {
}
type TCP struct {
cfg TCPArgs
sc *utils.ServerChannel
sc *server.ServerChannel
isStop bool
userConns mapx.ConcurrentMap
log *logger.Logger
@ -131,12 +132,12 @@ func (s *TCP) Start(args interface{}, log *logger.Logger) (err error) {
s.log.Printf("use %s parent %v", *s.cfg.ParentType, *s.cfg.Parent)
host, port, _ := net.SplitHostPort(*s.cfg.Local)
p, _ := strconv.Atoi(port)
sc := utils.NewServerChannel(host, p, s.log)
sc := server.NewServerChannel(host, p, s.log)
if *s.cfg.LocalType == "tcp" {
err = sc.ListenTCP(s.callback)
} else if *s.cfg.LocalType == "tls" {
err = sc.ListenTls(s.cfg.CertBytes, s.cfg.KeyBytes, nil, s.callback)
err = sc.ListenTLS(s.cfg.CertBytes, s.cfg.KeyBytes, nil, s.callback)
} else if *s.cfg.LocalType == "kcp" {
err = sc.ListenKCP(s.cfg.KCP, s.callback, s.log)
}

View File

@ -10,6 +10,7 @@ import (
"strings"
"time"
"github.com/snail007/goproxy/core/cs/server"
"github.com/snail007/goproxy/services"
"github.com/snail007/goproxy/utils"
"github.com/snail007/goproxy/utils/mapx"
@ -98,9 +99,9 @@ func (s *TunnelBridge) Start(args interface{}, log *logger.Logger) (err error) {
}
host, port, _ := net.SplitHostPort(*s.cfg.Local)
p, _ := strconv.Atoi(port)
sc := utils.NewServerChannel(host, p, s.log)
sc := server.NewServerChannel(host, p, s.log)
err = sc.ListenTls(s.cfg.CertBytes, s.cfg.KeyBytes, nil, s.callback)
err = sc.ListenTLS(s.cfg.CertBytes, s.cfg.KeyBytes, nil, s.callback)
if err != nil {
return
}

View File

@ -12,6 +12,7 @@ import (
"strings"
"time"
"github.com/snail007/goproxy/core/cs/server"
"github.com/snail007/goproxy/services"
"github.com/snail007/goproxy/utils"
"github.com/snail007/goproxy/utils/jumper"
@ -38,7 +39,7 @@ type TunnelServerArgs struct {
}
type TunnelServer struct {
cfg TunnelServerArgs
sc utils.ServerChannel
sc server.ServerChannel
isStop bool
udpConn *net.Conn
userConns mapx.ConcurrentMap
@ -180,7 +181,7 @@ func (s *TunnelServer) StopService() {
s.cfg = TunnelServerArgs{}
s.jumper = nil
s.log = nil
s.sc = utils.ServerChannel{}
s.sc = server.ServerChannel{}
s.udpConn = nil
s.udpConns = nil
s.userConns = nil
@ -233,7 +234,7 @@ func (s *TunnelServer) Start(args interface{}, log *logger.Logger) (err error) {
}
host, port, _ := net.SplitHostPort(*s.cfg.Local)
p, _ := strconv.Atoi(port)
s.sc = utils.NewServerChannel(host, p, s.log)
s.sc = server.NewServerChannel(host, p, s.log)
if *s.cfg.IsUDP {
err = s.sc.ListenUDP(func(listener *net.UDPConn, packet []byte, localAddr, srcAddr *net.UDPAddr) {
s.UDPSend(packet, localAddr, srcAddr)

View File

@ -11,6 +11,7 @@ import (
"strings"
"time"
"github.com/snail007/goproxy/core/cs/server"
"github.com/snail007/goproxy/services"
"github.com/snail007/goproxy/utils"
"github.com/snail007/goproxy/utils/mapx"
@ -30,7 +31,7 @@ type UDPArgs struct {
type UDP struct {
p mapx.ConcurrentMap
cfg UDPArgs
sc *utils.ServerChannel
sc *server.ServerChannel
isStop bool
log *logger.Logger
outUDPConnCtxMap mapx.ConcurrentMap
@ -121,7 +122,7 @@ func (s *UDP) Start(args interface{}, log *logger.Logger) (err error) {
}
host, port, _ := net.SplitHostPort(*s.cfg.Local)
p, _ := strconv.Atoi(port)
sc := utils.NewServerChannel(host, p, s.log)
sc := server.NewServerChannel(host, p, s.log)
s.sc = &sc
err = sc.ListenUDP(s.callback)
if err != nil {

View File

@ -1,230 +0,0 @@
package utils
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
logger "log"
"net"
"runtime/debug"
"strconv"
"github.com/snail007/goproxy/core/lib/kcpcfg"
kcp "github.com/xtaci/kcp-go"
)
type ServerChannel struct {
ip string
port int
Listener *net.Listener
UDPListener *net.UDPConn
errAcceptHandler func(err error)
log *logger.Logger
}
func NewServerChannel(ip string, port int, log *logger.Logger) ServerChannel {
return ServerChannel{
ip: ip,
port: port,
log: log,
errAcceptHandler: func(err error) {
log.Printf("accept error , ERR:%s", err)
},
}
}
func NewServerChannelHost(host string, log *logger.Logger) ServerChannel {
h, port, _ := net.SplitHostPort(host)
p, _ := strconv.Atoi(port)
return ServerChannel{
ip: h,
port: p,
log: log,
errAcceptHandler: func(err error) {
log.Printf("accept error , ERR:%s", err)
},
}
}
func (sc *ServerChannel) SetErrAcceptHandler(fn func(err error)) {
sc.errAcceptHandler = fn
}
func (sc *ServerChannel) ListenTls(certBytes, keyBytes, caCertBytes []byte, fn func(conn net.Conn)) (err error) {
sc.Listener, err = sc.listenTls(sc.ip, sc.port, certBytes, keyBytes, caCertBytes)
if err == nil {
go func() {
defer func() {
if e := recover(); e != nil {
sc.log.Printf("ListenTls crashed , err : %s , \ntrace:%s", e, string(debug.Stack()))
}
}()
for {
var conn net.Conn
conn, err = (*sc.Listener).Accept()
if err == nil {
go func() {
defer func() {
if e := recover(); e != nil {
sc.log.Printf("tls connection handler crashed , err : %s , \ntrace:%s", e, string(debug.Stack()))
}
}()
fn(conn)
}()
} else {
sc.errAcceptHandler(err)
(*sc.Listener).Close()
break
}
}
}()
}
return
}
func (sc *ServerChannel) listenTls(ip string, port int, certBytes, keyBytes, caCertBytes []byte) (ln *net.Listener, err error) {
var cert tls.Certificate
cert, err = tls.X509KeyPair(certBytes, keyBytes)
if err != nil {
return
}
clientCertPool := x509.NewCertPool()
caBytes := certBytes
if caCertBytes != nil {
caBytes = caCertBytes
}
ok := clientCertPool.AppendCertsFromPEM(caBytes)
if !ok {
err = errors.New("failed to parse root certificate")
}
config := &tls.Config{
ClientCAs: clientCertPool,
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
}
_ln, err := tls.Listen("tcp", fmt.Sprintf("%s:%d", ip, port), config)
if err == nil {
ln = &_ln
}
return
}
func (sc *ServerChannel) ListenTCP(fn func(conn net.Conn)) (err error) {
var l net.Listener
l, err = net.Listen("tcp", fmt.Sprintf("%s:%d", sc.ip, sc.port))
if err == nil {
sc.Listener = &l
go func() {
defer func() {
if e := recover(); e != nil {
sc.log.Printf("ListenTCP crashed , err : %s , \ntrace:%s", e, string(debug.Stack()))
}
}()
for {
var conn net.Conn
conn, err = (*sc.Listener).Accept()
if err == nil {
go func() {
defer func() {
if e := recover(); e != nil {
sc.log.Printf("tcp connection handler crashed , err : %s , \ntrace:%s", e, string(debug.Stack()))
}
}()
fn(conn)
}()
} else {
sc.errAcceptHandler(err)
break
}
}
}()
}
return
}
func (sc *ServerChannel) ListenUDP(fn func(listener *net.UDPConn, packet []byte, localAddr, srcAddr *net.UDPAddr)) (err error) {
addr := &net.UDPAddr{IP: net.ParseIP(sc.ip), Port: sc.port}
l, err := net.ListenUDP("udp", addr)
if err == nil {
sc.UDPListener = l
go func() {
defer func() {
if e := recover(); e != nil {
sc.log.Printf("ListenUDP crashed , err : %s , \ntrace:%s", e, string(debug.Stack()))
}
}()
for {
var buf = make([]byte, 2048)
n, srcAddr, err := (*sc.UDPListener).ReadFromUDP(buf)
if err == nil {
packet := buf[0:n]
go func() {
defer func() {
if e := recover(); e != nil {
sc.log.Printf("udp data handler crashed , err : %s , \ntrace:%s", e, string(debug.Stack()))
}
}()
fn(l, packet, addr, srcAddr)
}()
} else {
sc.errAcceptHandler(err)
break
}
}
}()
}
return
}
func (sc *ServerChannel) ListenKCP(config kcpcfg.KCPConfigArgs, fn func(conn net.Conn), log *logger.Logger) (err error) {
lis, err := kcp.ListenWithOptions(fmt.Sprintf("%s:%d", sc.ip, sc.port), config.Block, *config.DataShard, *config.ParityShard)
if err == nil {
if err = lis.SetDSCP(*config.DSCP); err != nil {
log.Println("SetDSCP:", err)
return
}
if err = lis.SetReadBuffer(*config.SockBuf); err != nil {
log.Println("SetReadBuffer:", err)
return
}
if err = lis.SetWriteBuffer(*config.SockBuf); err != nil {
log.Println("SetWriteBuffer:", err)
return
}
sc.Listener = new(net.Listener)
*sc.Listener = lis
go func() {
defer func() {
if e := recover(); e != nil {
sc.log.Printf("ListenKCP crashed , err : %s , \ntrace:%s", e, string(debug.Stack()))
}
}()
for {
//var conn net.Conn
conn, err := lis.AcceptKCP()
if err == nil {
go func() {
defer func() {
if e := recover(); e != nil {
sc.log.Printf("kcp connection handler crashed , err : %s , \ntrace:%s", e, string(debug.Stack()))
}
}()
conn.SetStreamMode(true)
conn.SetWriteDelay(true)
conn.SetNoDelay(*config.NoDelay, *config.Interval, *config.Resend, *config.NoCongestion)
conn.SetMtu(*config.MTU)
conn.SetWindowSize(*config.SndWnd, *config.RcvWnd)
conn.SetACKNoDelay(*config.AckNodelay)
if *config.NoComp {
fn(conn)
} else {
cconn := NewCompStream(conn)
fn(cconn)
}
}()
} else {
sc.errAcceptHandler(err)
break
}
}
}()
}
return
}