优化服务stop方法,方便sdk开发.

Signed-off-by: arraykeys@gmail.com <arraykeys@gmail.com>
This commit is contained in:
arraykeys@gmail.com
2018-04-10 18:32:17 +08:00
parent 2086966a89
commit 68deae6bf8
16 changed files with 429 additions and 436 deletions

View File

@ -2,7 +2,11 @@ proxy更新日志
v4.7 v4.7
1.优化了bridge的日志,增加了client和server的掉线日志. 1.优化了bridge的日志,增加了client和server的掉线日志.
2.优化了sps读取http(s)代理响应的缓冲大小,同时优化了CONNECT请求, 2.优化了sps读取http(s)代理响应的缓冲大小,同时优化了CONNECT请求,
避免了某些代理服务器返回过多数据导致不能正常通讯的问题. 避免了某些代理服务器返回过多数据导致不能正常通讯的问题.
3.去除了鸡肋连接池功能.
4.增加了gomobile sdk,对安卓/IOS提供支持.
5.优化了所有服务代码,方便对sdk提供支持.
v4.6 v4.6

View File

@ -299,17 +299,30 @@ func Start(argsStr string) (errStr string) {
} }
//regist services and run service //regist services and run service
services.Regist("http", services.NewHTTP(), httpArgs) switch serviceName {
services.Regist("tcp", services.NewTCP(), tcpArgs) case "http":
services.Regist("udp", services.NewUDP(), udpArgs) services.Regist("http", services.NewHTTP(), httpArgs)
services.Regist("tserver", services.NewTunnelServerManager(), tunnelServerArgs) case "tcp":
services.Regist("tclient", services.NewTunnelClient(), tunnelClientArgs) services.Regist("tcp", services.NewTCP(), tcpArgs)
services.Regist("tbridge", services.NewTunnelBridge(), tunnelBridgeArgs) case "udp":
services.Regist("server", services.NewMuxServerManager(), muxServerArgs) services.Regist("udp", services.NewUDP(), udpArgs)
services.Regist("client", services.NewMuxClient(), muxClientArgs) case "tserver":
services.Regist("bridge", services.NewMuxBridge(), muxBridgeArgs) services.Regist("tserver", services.NewTunnelServerManager(), tunnelServerArgs)
services.Regist("socks", services.NewSocks(), socksArgs) case "tclient":
services.Regist("sps", services.NewSPS(), spsArgs) services.Regist("tclient", services.NewTunnelClient(), tunnelClientArgs)
case "tbridge":
services.Regist("tbridge", services.NewTunnelBridge(), tunnelBridgeArgs)
case "server":
services.Regist("server", services.NewMuxServerManager(), muxServerArgs)
case "client":
services.Regist("client", services.NewMuxClient(), muxClientArgs)
case "bridge":
services.Regist("bridge", services.NewMuxBridge(), muxBridgeArgs)
case "socks":
services.Regist("socks", services.NewSocks(), socksArgs)
case "sps":
services.Regist("sps", services.NewSPS(), spsArgs)
}
service, err = services.Run(serviceName) service, err = services.Run(serviceName)
if err != nil { if err != nil {

View File

@ -16,22 +16,26 @@ import (
) )
type HTTP struct { type HTTP struct {
outPool utils.OutPool outPool utils.OutConn
cfg HTTPArgs cfg HTTPArgs
checker utils.Checker checker utils.Checker
basicAuth utils.BasicAuth basicAuth utils.BasicAuth
sshClient *ssh.Client sshClient *ssh.Client
lockChn chan bool lockChn chan bool
domainResolver utils.DomainResolver domainResolver utils.DomainResolver
isStop bool
serverChannels []*utils.ServerChannel
} }
func NewHTTP() Service { func NewHTTP() Service {
return &HTTP{ return &HTTP{
outPool: utils.OutPool{}, outPool: utils.OutConn{},
cfg: HTTPArgs{}, cfg: HTTPArgs{},
checker: utils.Checker{}, checker: utils.Checker{},
basicAuth: utils.BasicAuth{}, basicAuth: utils.BasicAuth{},
lockChn: make(chan bool, 1), lockChn: make(chan bool, 1),
isStop: false,
serverChannels: []*utils.ServerChannel{},
} }
} }
func (s *HTTP) CheckArgs() (err error) { func (s *HTTP) CheckArgs() (err error) {
@ -102,6 +106,9 @@ func (s *HTTP) InitService() (err error) {
go func() { go func() {
//循环检查ssh网络连通性 //循环检查ssh网络连通性
for { for {
if s.isStop {
return
}
conn, err := utils.ConnectHost(s.Resolve(*s.cfg.Parent), *s.cfg.Timeout*2) conn, err := utils.ConnectHost(s.Resolve(*s.cfg.Parent), *s.cfg.Timeout*2)
if err == nil { if err == nil {
conn.SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout))) conn.SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout)))
@ -127,8 +134,26 @@ func (s *HTTP) InitService() (err error) {
return return
} }
func (s *HTTP) StopService() { func (s *HTTP) StopService() {
if s.outPool.Pool != nil { defer func() {
s.outPool.Pool.ReleaseAll() e := recover()
if e != nil {
log.Printf("stop http(s) service crashed,%s", e)
} else {
log.Printf("service http(s) stoped,%s", e)
}
}()
s.isStop = true
s.checker.Stop()
if s.sshClient != nil {
s.sshClient.Close()
}
for _, sc := range s.serverChannels {
if sc.Listener != nil && *sc.Listener != nil {
(*sc.Listener).Close()
}
if sc.UDPListener != nil {
(*sc.UDPListener).Close()
}
} }
} }
func (s *HTTP) Start(args interface{}) (err error) { func (s *HTTP) Start(args interface{}) (err error) {
@ -159,6 +184,7 @@ func (s *HTTP) Start(args interface{}) (err error) {
return return
} }
log.Printf("%s http(s) proxy on %s", *s.cfg.LocalType, (*sc.Listener).Addr()) log.Printf("%s http(s) proxy on %s", *s.cfg.LocalType, (*sc.Listener).Addr())
s.serverChannels = append(s.serverChannels, &sc)
} }
} }
return return
@ -224,19 +250,18 @@ func (s *HTTP) OutToTCP(useProxy bool, address string, inConn *net.Conn, req *ut
return return
} }
var outConn net.Conn var outConn net.Conn
var _outConn interface{}
tryCount := 0 tryCount := 0
maxTryCount := 5 maxTryCount := 5
for { for {
if s.isStop {
return
}
if useProxy { if useProxy {
if *s.cfg.ParentType == "ssh" { if *s.cfg.ParentType == "ssh" {
outConn, err = s.getSSHConn(address) outConn, err = s.getSSHConn(address)
} else { } else {
// log.Printf("%v", s.outPool) // log.Printf("%v", s.outPool)
_outConn, err = s.outPool.Pool.Get() outConn, err = s.outPool.Get()
if err == nil {
outConn = _outConn.(net.Conn)
}
} }
} else { } else {
outConn, err = utils.ConnectHost(s.Resolve(address), *s.cfg.Timeout) outConn, err = utils.ConnectHost(s.Resolve(address), *s.cfg.Timeout)
@ -283,7 +308,7 @@ func (s *HTTP) getSSHConn(host string) (outConn net.Conn, err interface{}) {
maxTryCount := 1 maxTryCount := 1
tryCount := 0 tryCount := 0
RETRY: RETRY:
if tryCount >= maxTryCount { if tryCount >= maxTryCount || s.isStop {
return return
} }
wait := make(chan bool, 1) wait := make(chan bool, 1)
@ -340,7 +365,7 @@ func (s *HTTP) InitOutConnPool() {
if *s.cfg.ParentType == TYPE_TLS || *s.cfg.ParentType == TYPE_TCP || *s.cfg.ParentType == TYPE_KCP { if *s.cfg.ParentType == TYPE_TLS || *s.cfg.ParentType == TYPE_TCP || *s.cfg.ParentType == TYPE_KCP {
//dur int, isTLS bool, certBytes, keyBytes []byte, //dur int, isTLS bool, certBytes, keyBytes []byte,
//parent string, timeout int, InitialCap int, MaxCap int //parent string, timeout int, InitialCap int, MaxCap int
s.outPool = utils.NewOutPool( s.outPool = utils.NewOutConn(
*s.cfg.CheckParentInterval, *s.cfg.CheckParentInterval,
*s.cfg.ParentType, *s.cfg.ParentType,
s.cfg.KCP, s.cfg.KCP,

View File

@ -21,6 +21,8 @@ type MuxBridge struct {
clientControlConns utils.ConcurrentMap clientControlConns utils.ConcurrentMap
router utils.ClientKeyRouter router utils.ClientKeyRouter
l *sync.Mutex l *sync.Mutex
isStop bool
sc *utils.ServerChannel
} }
func NewMuxBridge() Service { func NewMuxBridge() Service {
@ -28,6 +30,7 @@ func NewMuxBridge() Service {
cfg: MuxBridgeArgs{}, cfg: MuxBridgeArgs{},
clientControlConns: utils.NewConcurrentMap(), clientControlConns: utils.NewConcurrentMap(),
l: &sync.Mutex{}, l: &sync.Mutex{},
isStop: false,
} }
b.router = utils.NewClientKeyRouter(&b.clientControlConns, 50000) b.router = utils.NewClientKeyRouter(&b.clientControlConns, 50000)
return b return b
@ -50,7 +53,23 @@ func (s *MuxBridge) CheckArgs() (err error) {
return return
} }
func (s *MuxBridge) StopService() { func (s *MuxBridge) StopService() {
defer func() {
e := recover()
if e != nil {
log.Printf("stop bridge service crashed,%s", e)
} else {
log.Printf("service bridge stoped,%s", e)
}
}()
s.isStop = true
if s.sc != nil && (*s.sc).Listener != nil {
(*(*s.sc).Listener).Close()
}
for _, g := range s.clientControlConns.Items() {
for _, session := range g.(utils.ConcurrentMap).Items() {
(session.(*smux.Session)).Close()
}
}
} }
func (s *MuxBridge) Start(args interface{}) (err error) { func (s *MuxBridge) Start(args interface{}) (err error) {
s.cfg = args.(MuxBridgeArgs) s.cfg = args.(MuxBridgeArgs)
@ -74,6 +93,7 @@ func (s *MuxBridge) Start(args interface{}) (err error) {
if err != nil { if err != nil {
return return
} }
s.sc = &sc
log.Printf("%s bridge on %s", *s.cfg.LocalType, (*sc.Listener).Addr()) log.Printf("%s bridge on %s", *s.cfg.LocalType, (*sc.Listener).Addr())
return return
} }
@ -111,6 +131,9 @@ func (s *MuxBridge) handler(inConn net.Conn) {
return return
} }
for { for {
if s.isStop {
return
}
stream, err := session.AcceptStream() stream, err := session.AcceptStream()
if err != nil { if err != nil {
session.Close() session.Close()
@ -118,7 +141,14 @@ func (s *MuxBridge) handler(inConn net.Conn) {
log.Printf("server connection %s %s released", serverID, key) log.Printf("server connection %s %s released", serverID, key)
return return
} }
go s.callback(stream, serverID, key) go func() {
defer func() {
if e := recover(); e != nil {
log.Printf("bridge callback crashed,err: %s", e)
}
}()
s.callback(stream, serverID, key)
}()
} }
case CONN_CLIENT: case CONN_CLIENT:
log.Printf("client connection %s connected", key) log.Printf("client connection %s connected", key)
@ -151,6 +181,9 @@ func (s *MuxBridge) handler(inConn net.Conn) {
// s.clientControlConns.Set(key, session) // s.clientControlConns.Set(key, session)
go func() { go func() {
for { for {
if s.isStop {
return
}
if session.IsClosed() { if session.IsClosed() {
s.l.Lock() s.l.Lock()
defer s.l.Unlock() defer s.l.Unlock()
@ -173,6 +206,9 @@ func (s *MuxBridge) handler(inConn net.Conn) {
func (s *MuxBridge) callback(inConn net.Conn, serverID, key string) { func (s *MuxBridge) callback(inConn net.Conn, serverID, key string) {
try := 20 try := 20
for { for {
if s.isStop {
return
}
try-- try--
if try == 0 { if try == 0 {
break break

View File

@ -14,12 +14,16 @@ import (
) )
type MuxClient struct { type MuxClient struct {
cfg MuxClientArgs cfg MuxClientArgs
isStop bool
sessions utils.ConcurrentMap
} }
func NewMuxClient() Service { func NewMuxClient() Service {
return &MuxClient{ return &MuxClient{
cfg: MuxClientArgs{}, cfg: MuxClientArgs{},
isStop: false,
sessions: utils.NewConcurrentMap(),
} }
} }
@ -47,7 +51,18 @@ func (s *MuxClient) CheckArgs() (err error) {
return return
} }
func (s *MuxClient) StopService() { func (s *MuxClient) StopService() {
defer func() {
e := recover()
if e != nil {
log.Printf("stop client service crashed,%s", e)
} else {
log.Printf("service client stoped,%s", e)
}
}()
s.isStop = true
for _, sess := range s.sessions.Items() {
sess.(*smux.Session).Close()
}
} }
func (s *MuxClient) Start(args interface{}) (err error) { func (s *MuxClient) Start(args interface{}) (err error) {
s.cfg = args.(MuxClientArgs) s.cfg = args.(MuxClientArgs)
@ -63,7 +78,8 @@ func (s *MuxClient) Start(args interface{}) (err error) {
count = *s.cfg.SessionCount count = *s.cfg.SessionCount
} }
for i := 1; i <= count; i++ { for i := 1; i <= count; i++ {
log.Printf("session worker[%d] started", i) key := fmt.Sprintf("worker[%d]", i)
log.Printf("session %s started", key)
go func(i int) { go func(i int) {
defer func() { defer func() {
e := recover() e := recover()
@ -72,6 +88,9 @@ func (s *MuxClient) Start(args interface{}) (err error) {
} }
}() }()
for { for {
if s.isStop {
return
}
conn, err := s.getParentConn() conn, err := s.getParentConn()
if err != nil { if err != nil {
log.Printf("connection err: %s, retrying...", err) log.Printf("connection err: %s, retrying...", err)
@ -94,7 +113,14 @@ func (s *MuxClient) Start(args interface{}) (err error) {
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
continue continue
} }
if _sess, ok := s.sessions.Get(key); ok {
_sess.(*smux.Session).Close()
}
s.sessions.Set(key, session)
for { for {
if s.isStop {
return
}
stream, err := session.AcceptStream() stream, err := session.AcceptStream()
if err != nil { if err != nil {
log.Printf("accept stream err: %s, retrying...", err) log.Printf("accept stream err: %s, retrying...", err)
@ -153,6 +179,9 @@ func (s *MuxClient) getParentConn() (conn net.Conn, err error) {
func (s *MuxClient) ServeUDP(inConn *smux.Stream, localAddr, ID string) { func (s *MuxClient) ServeUDP(inConn *smux.Stream, localAddr, ID string) {
for { for {
if s.isStop {
return
}
inConn.SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout))) inConn.SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout)))
srcAddr, body, err := utils.ReadUDPPacket(inConn) srcAddr, body, err := utils.ReadUDPPacket(inConn)
inConn.SetDeadline(time.Time{}) inConn.SetDeadline(time.Time{})
@ -163,7 +192,15 @@ func (s *MuxClient) ServeUDP(inConn *smux.Stream, localAddr, ID string) {
break break
} else { } else {
//log.Printf("udp packet revecived:%s,%v", srcAddr, body) //log.Printf("udp packet revecived:%s,%v", srcAddr, body)
go s.processUDPPacket(inConn, srcAddr, localAddr, body) go func() {
defer func() {
if e := recover(); e != nil {
log.Printf("client processUDPPacket crashed,err: %s", e)
}
}()
s.processUDPPacket(inConn, srcAddr, localAddr, body)
}()
} }
} }
@ -216,6 +253,9 @@ func (s *MuxClient) ServeConn(inConn *smux.Stream, localAddr, ID string) {
var outConn net.Conn var outConn net.Conn
i := 0 i := 0
for { for {
if s.isStop {
return
}
i++ i++
outConn, err = utils.ConnectHost(localAddr, *s.cfg.Timeout) outConn, err = utils.ConnectHost(localAddr, *s.cfg.Timeout)
if err == nil || i == 3 { if err == nil || i == 3 {

View File

@ -23,13 +23,15 @@ type MuxServer struct {
sc utils.ServerChannel sc utils.ServerChannel
sessions utils.ConcurrentMap sessions utils.ConcurrentMap
lockChn chan bool lockChn chan bool
isStop bool
udpConn *net.Conn
} }
type MuxServerManager struct { type MuxServerManager struct {
cfg MuxServerArgs cfg MuxServerArgs
udpChn chan MuxUDPItem udpChn chan MuxUDPItem
sc utils.ServerChannel
serverID string serverID string
servers []*Service
} }
func NewMuxServerManager() Service { func NewMuxServerManager() Service {
@ -37,8 +39,10 @@ func NewMuxServerManager() Service {
cfg: MuxServerArgs{}, cfg: MuxServerArgs{},
udpChn: make(chan MuxUDPItem, 50000), udpChn: make(chan MuxUDPItem, 50000),
serverID: utils.Uniqueid(), serverID: utils.Uniqueid(),
servers: []*Service{},
} }
} }
func (s *MuxServerManager) Start(args interface{}) (err error) { func (s *MuxServerManager) Start(args interface{}) (err error) {
s.cfg = args.(MuxServerArgs) s.cfg = args.(MuxServerArgs)
if err = s.CheckArgs(); err != nil { if err = s.CheckArgs(); err != nil {
@ -100,6 +104,7 @@ func (s *MuxServerManager) Start(args interface{}) (err error) {
if err != nil { if err != nil {
return return
} }
s.servers = append(s.servers, &server)
} }
return return
} }
@ -107,6 +112,9 @@ func (s *MuxServerManager) Clean() {
s.StopService() s.StopService()
} }
func (s *MuxServerManager) StopService() { func (s *MuxServerManager) StopService() {
for _, server := range s.servers {
(*server).Clean()
}
} }
func (s *MuxServerManager) CheckArgs() (err error) { func (s *MuxServerManager) CheckArgs() (err error) {
if *s.cfg.CertFile == "" || *s.cfg.KeyFile == "" { if *s.cfg.CertFile == "" || *s.cfg.KeyFile == "" {
@ -131,6 +139,7 @@ func NewMuxServer() Service {
udpChn: make(chan MuxUDPItem, 50000), udpChn: make(chan MuxUDPItem, 50000),
lockChn: make(chan bool, 1), lockChn: make(chan bool, 1),
sessions: utils.NewConcurrentMap(), sessions: utils.NewConcurrentMap(),
isStop: false,
} }
} }
@ -140,6 +149,29 @@ type MuxUDPItem struct {
srcAddr *net.UDPAddr srcAddr *net.UDPAddr
} }
func (s *MuxServer) StopService() {
defer func() {
e := recover()
if e != nil {
log.Printf("stop server service crashed,%s", e)
} else {
log.Printf("service server stoped,%s", e)
}
}()
s.isStop = true
for _, sess := range s.sessions.Items() {
sess.(*smux.Session).Close()
}
if s.sc.Listener != nil {
(*s.sc.Listener).Close()
}
if s.sc.UDPListener != nil {
(*s.sc.UDPListener).Close()
}
if s.udpConn != nil {
(*s.udpConn).Close()
}
}
func (s *MuxServer) InitService() (err error) { func (s *MuxServer) InitService() (err error) {
s.UDPConnDeamon() s.UDPConnDeamon()
return return
@ -185,6 +217,9 @@ func (s *MuxServer) Start(args interface{}) (err error) {
var outConn net.Conn var outConn net.Conn
var ID string var ID string
for { for {
if s.isStop {
return
}
outConn, ID, err = s.GetOutConn() outConn, ID, err = s.GetOutConn()
if err != nil { if err != nil {
utils.CloseConn(&outConn) utils.CloseConn(&outConn)
@ -228,7 +263,7 @@ func (s *MuxServer) Start(args interface{}) (err error) {
return return
} }
func (s *MuxServer) Clean() { func (s *MuxServer) Clean() {
s.StopService()
} }
func (s *MuxServer) GetOutConn() (outConn net.Conn, ID string, err error) { func (s *MuxServer) GetOutConn() (outConn net.Conn, ID string, err error) {
i := 1 i := 1
@ -286,10 +321,16 @@ func (s *MuxServer) GetConn(index string) (conn net.Conn, err error) {
return return
} }
} }
if _sess, ok := s.sessions.Get(index); ok {
_sess.(*smux.Session).Close()
}
s.sessions.Set(index, session) s.sessions.Set(index, session)
log.Printf("session[%s] created", index) log.Printf("session[%s] created", index)
go func() { go func() {
for { for {
if s.isStop {
return
}
if session.IsClosed() { if session.IsClosed() {
s.sessions.Remove(index) s.sessions.Remove(index)
break break
@ -332,10 +373,19 @@ func (s *MuxServer) UDPConnDeamon() {
var ID string var ID string
var err error var err error
for { for {
if s.isStop {
return
}
item := <-s.udpChn item := <-s.udpChn
RETRY: RETRY:
if s.isStop {
return
}
if outConn == nil { if outConn == nil {
for { for {
if s.isStop {
return
}
outConn, ID, err = s.GetOutConn() outConn, ID, err = s.GetOutConn()
if err != nil { if err != nil {
outConn = nil outConn = nil
@ -345,10 +395,14 @@ func (s *MuxServer) UDPConnDeamon() {
continue continue
} else { } else {
go func(outConn net.Conn, ID string) { go func(outConn net.Conn, ID string) {
go func() { if s.udpConn != nil {
// outConn.Close() (*s.udpConn).Close()
}() }
s.udpConn = &outConn
for { for {
if s.isStop {
return
}
outConn.SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout))) outConn.SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout)))
srcAddrFromConn, body, err := utils.ReadUDPPacket(outConn) srcAddrFromConn, body, err := utils.ReadUDPPacket(outConn)
outConn.SetDeadline(time.Time{}) outConn.SetDeadline(time.Time{})

View File

@ -18,12 +18,18 @@ type ServiceItem struct {
var servicesMap = map[string]*ServiceItem{} var servicesMap = map[string]*ServiceItem{}
func Regist(name string, s Service, args interface{}) { func Regist(name string, s Service, args interface{}) {
servicesMap[name] = &ServiceItem{ servicesMap[name] = &ServiceItem{
S: s, S: s,
Args: args, Args: args,
Name: name, Name: name,
} }
} }
func Stop(name string) {
if s, ok := servicesMap[name]; ok && s.S != nil {
s.S.Clean()
}
}
func Run(name string, args ...interface{}) (service *ServiceItem, err error) { func Run(name string, args ...interface{}) (service *ServiceItem, err error) {
service, ok := servicesMap[name] service, ok := servicesMap[name]
if ok { if ok {

View File

@ -23,7 +23,9 @@ type Socks struct {
sshClient *ssh.Client sshClient *ssh.Client
lockChn chan bool lockChn chan bool
udpSC utils.ServerChannel udpSC utils.ServerChannel
sc *utils.ServerChannel
domainResolver utils.DomainResolver domainResolver utils.DomainResolver
isStop bool
} }
func NewSocks() Service { func NewSocks() Service {
@ -32,6 +34,7 @@ func NewSocks() Service {
checker: utils.Checker{}, checker: utils.Checker{},
basicAuth: utils.BasicAuth{}, basicAuth: utils.BasicAuth{},
lockChn: make(chan bool, 1), lockChn: make(chan bool, 1),
isStop: false,
} }
} }
@ -103,6 +106,9 @@ func (s *Socks) InitService() (err error) {
go func() { go func() {
//循环检查ssh网络连通性 //循环检查ssh网络连通性
for { for {
if s.isStop {
return
}
conn, err := utils.ConnectHost(s.Resolve(*s.cfg.Parent), *s.cfg.Timeout*2) conn, err := utils.ConnectHost(s.Resolve(*s.cfg.Parent), *s.cfg.Timeout*2)
if err == nil { if err == nil {
conn.SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout))) conn.SetDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout)))
@ -136,12 +142,25 @@ func (s *Socks) InitService() (err error) {
return return
} }
func (s *Socks) StopService() { func (s *Socks) StopService() {
defer func() {
e := recover()
if e != nil {
log.Printf("stop socks service crashed,%s", e)
} else {
log.Printf("service socks stoped,%s", e)
}
}()
s.isStop = true
s.checker.Stop()
if s.sshClient != nil { if s.sshClient != nil {
s.sshClient.Close() s.sshClient.Close()
} }
if s.udpSC.UDPListener != nil { if s.udpSC.UDPListener != nil {
s.udpSC.UDPListener.Close() s.udpSC.UDPListener.Close()
} }
if s.sc != nil && (*s.sc).Listener != nil {
(*(*s.sc).Listener).Close()
}
} }
func (s *Socks) Start(args interface{}) (err error) { func (s *Socks) Start(args interface{}) (err error) {
//start() //start()
@ -166,6 +185,7 @@ func (s *Socks) Start(args interface{}) (err error) {
if err != nil { if err != nil {
return return
} }
s.sc = &sc
log.Printf("%s socks proxy on %s", *s.cfg.LocalType, (*sc.Listener).Addr()) log.Printf("%s socks proxy on %s", *s.cfg.LocalType, (*sc.Listener).Addr())
return return
} }
@ -457,6 +477,9 @@ func (s *Socks) proxyTCP(inConn *net.Conn, methodReq socks.MethodsRequest, reque
return return
} }
for { for {
if s.isStop {
return
}
if *s.cfg.Always { if *s.cfg.Always {
outConn, err = s.getOutConn(methodReq.Bytes(), request.Bytes(), request.Addr()) outConn, err = s.getOutConn(methodReq.Bytes(), request.Bytes(), request.Addr())
} else { } else {
@ -563,7 +586,7 @@ func (s *Socks) getOutConn(methodBytes, reqBytes []byte, host string) (outConn n
maxTryCount := 1 maxTryCount := 1
tryCount := 0 tryCount := 0
RETRY: RETRY:
if tryCount >= maxTryCount { if tryCount >= maxTryCount || s.isStop {
return return
} }
wait := make(chan bool, 1) wait := make(chan bool, 1)

View File

@ -17,17 +17,19 @@ import (
) )
type SPS struct { type SPS struct {
outPool utils.OutPool outPool utils.OutConn
cfg SPSArgs cfg SPSArgs
domainResolver utils.DomainResolver domainResolver utils.DomainResolver
basicAuth utils.BasicAuth basicAuth utils.BasicAuth
serverChannels []*utils.ServerChannel
} }
func NewSPS() Service { func NewSPS() Service {
return &SPS{ return &SPS{
outPool: utils.OutPool{}, outPool: utils.OutConn{},
cfg: SPSArgs{}, cfg: SPSArgs{},
basicAuth: utils.BasicAuth{}, basicAuth: utils.BasicAuth{},
serverChannels: []*utils.ServerChannel{},
} }
} }
func (s *SPS) CheckArgs() (err error) { func (s *SPS) CheckArgs() (err error) {
@ -66,7 +68,7 @@ func (s *SPS) InitOutConnPool() {
if *s.cfg.ParentType == TYPE_TLS || *s.cfg.ParentType == TYPE_TCP || *s.cfg.ParentType == TYPE_KCP { if *s.cfg.ParentType == TYPE_TLS || *s.cfg.ParentType == TYPE_TCP || *s.cfg.ParentType == TYPE_KCP {
//dur int, isTLS bool, certBytes, keyBytes []byte, //dur int, isTLS bool, certBytes, keyBytes []byte,
//parent string, timeout int, InitialCap int, MaxCap int //parent string, timeout int, InitialCap int, MaxCap int
s.outPool = utils.NewOutPool( s.outPool = utils.NewOutConn(
0, 0,
*s.cfg.ParentType, *s.cfg.ParentType,
s.cfg.KCP, s.cfg.KCP,
@ -80,8 +82,21 @@ func (s *SPS) InitOutConnPool() {
} }
func (s *SPS) StopService() { func (s *SPS) StopService() {
if s.outPool.Pool != nil { defer func() {
s.outPool.Pool.ReleaseAll() e := recover()
if e != nil {
log.Printf("stop sps service crashed,%s", e)
} else {
log.Printf("service sps stoped,%s", e)
}
}()
for _, sc := range s.serverChannels {
if sc.Listener != nil && *sc.Listener != nil {
(*sc.Listener).Close()
}
if sc.UDPListener != nil {
(*sc.UDPListener).Close()
}
} }
} }
func (s *SPS) Start(args interface{}) (err error) { func (s *SPS) Start(args interface{}) (err error) {
@ -109,6 +124,7 @@ func (s *SPS) Start(args interface{}) (err error) {
return return
} }
log.Printf("%s http(s)+socks proxy on %s", s.cfg.Protocol(), (*sc.Listener).Addr()) log.Printf("%s http(s)+socks proxy on %s", s.cfg.Protocol(), (*sc.Listener).Addr())
s.serverChannels = append(s.serverChannels, &sc)
} }
} }
return return
@ -207,11 +223,7 @@ func (s *SPS) OutToTCP(inConn *net.Conn) (err error) {
} }
//connect to parent //connect to parent
var outConn net.Conn var outConn net.Conn
var _outConn interface{} outConn, err = s.outPool.Get()
_outConn, err = s.outPool.Pool.Get()
if err == nil {
outConn = _outConn.(net.Conn)
}
if err != nil { if err != nil {
log.Printf("connect to %s , err:%s", *s.cfg.Parent, err) log.Printf("connect to %s , err:%s", *s.cfg.Parent, err)
utils.CloseConn(inConn) utils.CloseConn(inConn)

View File

@ -14,14 +14,17 @@ import (
) )
type TCP struct { type TCP struct {
outPool utils.OutPool outPool utils.OutConn
cfg TCPArgs cfg TCPArgs
sc *utils.ServerChannel
isStop bool
} }
func NewTCP() Service { func NewTCP() Service {
return &TCP{ return &TCP{
outPool: utils.OutPool{}, outPool: utils.OutConn{},
cfg: TCPArgs{}, cfg: TCPArgs{},
isStop: false,
} }
} }
func (s *TCP) CheckArgs() (err error) { func (s *TCP) CheckArgs() (err error) {
@ -46,8 +49,20 @@ func (s *TCP) InitService() (err error) {
return return
} }
func (s *TCP) StopService() { func (s *TCP) StopService() {
if s.outPool.Pool != nil { defer func() {
s.outPool.Pool.ReleaseAll() e := recover()
if e != nil {
log.Printf("stop tcp service crashed,%s", e)
} else {
log.Printf("service tcp stoped,%s", e)
}
}()
s.isStop = true
if s.sc.Listener != nil && *s.sc.Listener != nil {
(*s.sc.Listener).Close()
}
if s.sc.UDPListener != nil {
(*s.sc.UDPListener).Close()
} }
} }
func (s *TCP) Start(args interface{}) (err error) { func (s *TCP) Start(args interface{}) (err error) {
@ -74,6 +89,7 @@ func (s *TCP) Start(args interface{}) (err error) {
return return
} }
log.Printf("%s proxy on %s", s.cfg.Protocol(), (*sc.Listener).Addr()) log.Printf("%s proxy on %s", s.cfg.Protocol(), (*sc.Listener).Addr())
s.sc = &sc
return return
} }
@ -106,11 +122,7 @@ func (s *TCP) callback(inConn net.Conn) {
} }
func (s *TCP) OutToTCP(inConn *net.Conn) (err error) { func (s *TCP) OutToTCP(inConn *net.Conn) (err error) {
var outConn net.Conn var outConn net.Conn
var _outConn interface{} outConn, err = s.outPool.Get()
_outConn, err = s.outPool.Pool.Get()
if err == nil {
outConn = _outConn.(net.Conn)
}
if err != nil { if err != nil {
log.Printf("connect to %s , err:%s", *s.cfg.Parent, err) log.Printf("connect to %s , err:%s", *s.cfg.Parent, err)
utils.CloseConn(inConn) utils.CloseConn(inConn)
@ -129,6 +141,9 @@ func (s *TCP) OutToTCP(inConn *net.Conn) (err error) {
func (s *TCP) OutToUDP(inConn *net.Conn) (err error) { func (s *TCP) OutToUDP(inConn *net.Conn) (err error) {
log.Printf("conn created , remote : %s ", (*inConn).RemoteAddr()) log.Printf("conn created , remote : %s ", (*inConn).RemoteAddr())
for { for {
if s.isStop {
return
}
srcAddr, body, err := utils.ReadUDPPacket(bufio.NewReader(*inConn)) srcAddr, body, err := utils.ReadUDPPacket(bufio.NewReader(*inConn))
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
//log.Printf("connection %s released", srcAddr) //log.Printf("connection %s released", srcAddr)
@ -178,7 +193,7 @@ func (s *TCP) InitOutConnPool() {
if *s.cfg.ParentType == TYPE_TLS || *s.cfg.ParentType == TYPE_TCP || *s.cfg.ParentType == TYPE_KCP { if *s.cfg.ParentType == TYPE_TLS || *s.cfg.ParentType == TYPE_TCP || *s.cfg.ParentType == TYPE_KCP {
//dur int, isTLS bool, certBytes, keyBytes []byte, //dur int, isTLS bool, certBytes, keyBytes []byte,
//parent string, timeout int, InitialCap int, MaxCap int //parent string, timeout int, InitialCap int, MaxCap int
s.outPool = utils.NewOutPool( s.outPool = utils.NewOutConn(
*s.cfg.CheckParentInterval, *s.cfg.CheckParentInterval,
*s.cfg.ParentType, *s.cfg.ParentType,
s.cfg.KCP, s.cfg.KCP,

View File

@ -18,8 +18,7 @@ type TunnelBridge struct {
cfg TunnelBridgeArgs cfg TunnelBridgeArgs
serverConns utils.ConcurrentMap serverConns utils.ConcurrentMap
clientControlConns utils.ConcurrentMap clientControlConns utils.ConcurrentMap
// cmServer utils.ConnManager isStop bool
// cmClient utils.ConnManager
} }
func NewTunnelBridge() Service { func NewTunnelBridge() Service {
@ -27,8 +26,7 @@ func NewTunnelBridge() Service {
cfg: TunnelBridgeArgs{}, cfg: TunnelBridgeArgs{},
serverConns: utils.NewConcurrentMap(), serverConns: utils.NewConcurrentMap(),
clientControlConns: utils.NewConcurrentMap(), clientControlConns: utils.NewConcurrentMap(),
// cmServer: utils.NewConnManager(), isStop: false,
// cmClient: utils.NewConnManager(),
} }
} }
@ -44,7 +42,21 @@ func (s *TunnelBridge) CheckArgs() (err error) {
return return
} }
func (s *TunnelBridge) StopService() { func (s *TunnelBridge) StopService() {
defer func() {
e := recover()
if e != nil {
log.Printf("stop tbridge service crashed,%s", e)
} else {
log.Printf("service tbridge stoped,%s", e)
}
}()
s.isStop = true
for _, sess := range s.clientControlConns.Items() {
(*sess.(*net.Conn)).Close()
}
for _, sess := range s.serverConns.Items() {
(*sess.(ServerConn).Conn).Close()
}
} }
func (s *TunnelBridge) Start(args interface{}) (err error) { func (s *TunnelBridge) Start(args interface{}) (err error) {
s.cfg = args.(TunnelBridgeArgs) s.cfg = args.(TunnelBridgeArgs)
@ -85,6 +97,9 @@ func (s *TunnelBridge) Start(args interface{}) (err error) {
Conn: &inConn, Conn: &inConn,
}) })
for { for {
if s.isStop {
return
}
item, ok := s.clientControlConns.Get(key) item, ok := s.clientControlConns.Get(key)
if !ok { if !ok {
log.Printf("client %s control conn not exists", key) log.Printf("client %s control conn not exists", key)

View File

@ -14,12 +14,14 @@ type TunnelClient struct {
cfg TunnelClientArgs cfg TunnelClientArgs
// cm utils.ConnManager // cm utils.ConnManager
ctrlConn net.Conn ctrlConn net.Conn
isStop bool
} }
func NewTunnelClient() Service { func NewTunnelClient() Service {
return &TunnelClient{ return &TunnelClient{
cfg: TunnelClientArgs{}, cfg: TunnelClientArgs{},
// cm: utils.NewConnManager(), // cm: utils.NewConnManager(),
isStop: false,
} }
} }
@ -42,7 +44,18 @@ func (s *TunnelClient) CheckArgs() (err error) {
return return
} }
func (s *TunnelClient) StopService() { func (s *TunnelClient) StopService() {
// s.cm.RemoveAll() defer func() {
e := recover()
if e != nil {
log.Printf("stop tclient service crashed,%s", e)
} else {
log.Printf("service tclient stoped,%s", e)
}
}()
s.isStop = true
if s.ctrlConn != nil {
s.ctrlConn.Close()
}
} }
func (s *TunnelClient) Start(args interface{}) (err error) { func (s *TunnelClient) Start(args interface{}) (err error) {
s.cfg = args.(TunnelClientArgs) s.cfg = args.(TunnelClientArgs)
@ -55,8 +68,9 @@ func (s *TunnelClient) Start(args interface{}) (err error) {
log.Printf("proxy on tunnel client mode") log.Printf("proxy on tunnel client mode")
for { for {
//close all conn if s.isStop {
// s.cm.Remove(*s.cfg.Key) return
}
if s.ctrlConn != nil { if s.ctrlConn != nil {
s.ctrlConn.Close() s.ctrlConn.Close()
} }
@ -71,6 +85,9 @@ func (s *TunnelClient) Start(args interface{}) (err error) {
continue continue
} }
for { for {
if s.isStop {
return
}
var ID, clientLocalAddr, serverID string var ID, clientLocalAddr, serverID string
err = utils.ReadPacketData(s.ctrlConn, &ID, &clientLocalAddr, &serverID) err = utils.ReadPacketData(s.ctrlConn, &ID, &clientLocalAddr, &serverID)
if err != nil { if err != nil {
@ -121,6 +138,9 @@ func (s *TunnelClient) ServeUDP(localAddr, ID, serverID string) {
var err error var err error
// for { // for {
for { for {
if s.isStop {
return
}
// s.cm.RemoveOne(*s.cfg.Key, ID) // s.cm.RemoveOne(*s.cfg.Key, ID)
inConn, err = s.GetInConn(CONN_CLIENT, *s.cfg.Key, ID, serverID) inConn, err = s.GetInConn(CONN_CLIENT, *s.cfg.Key, ID, serverID)
if err != nil { if err != nil {
@ -136,6 +156,9 @@ func (s *TunnelClient) ServeUDP(localAddr, ID, serverID string) {
log.Printf("conn %s created", ID) log.Printf("conn %s created", ID)
for { for {
if s.isStop {
return
}
srcAddr, body, err := utils.ReadUDPPacket(inConn) srcAddr, body, err := utils.ReadUDPPacket(inConn)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
log.Printf("connection %s released", ID) log.Printf("connection %s released", ID)
@ -192,6 +215,9 @@ func (s *TunnelClient) ServeConn(localAddr, ID, serverID string) {
var inConn, outConn net.Conn var inConn, outConn net.Conn
var err error var err error
for { for {
if s.isStop {
return
}
inConn, err = s.GetInConn(CONN_CLIENT, *s.cfg.Key, ID, serverID) inConn, err = s.GetInConn(CONN_CLIENT, *s.cfg.Key, ID, serverID)
if err != nil { if err != nil {
utils.CloseConn(&inConn) utils.CloseConn(&inConn)
@ -205,6 +231,9 @@ func (s *TunnelClient) ServeConn(localAddr, ID, serverID string) {
i := 0 i := 0
for { for {
if s.isStop {
return
}
i++ i++
outConn, err = utils.ConnectHost(localAddr, *s.cfg.Timeout) outConn, err = utils.ConnectHost(localAddr, *s.cfg.Timeout)
if err == nil || i == 3 { if err == nil || i == 3 {

View File

@ -14,17 +14,18 @@ import (
) )
type TunnelServer struct { type TunnelServer struct {
cfg TunnelServerArgs cfg TunnelServerArgs
udpChn chan UDPItem udpChn chan UDPItem
sc utils.ServerChannel sc utils.ServerChannel
isStop bool
udpConn *net.Conn
} }
type TunnelServerManager struct { type TunnelServerManager struct {
cfg TunnelServerArgs cfg TunnelServerArgs
udpChn chan UDPItem udpChn chan UDPItem
sc utils.ServerChannel
serverID string serverID string
// cm utils.ConnManager servers []*Service
} }
func NewTunnelServerManager() Service { func NewTunnelServerManager() Service {
@ -32,7 +33,7 @@ func NewTunnelServerManager() Service {
cfg: TunnelServerArgs{}, cfg: TunnelServerArgs{},
udpChn: make(chan UDPItem, 50000), udpChn: make(chan UDPItem, 50000),
serverID: utils.Uniqueid(), serverID: utils.Uniqueid(),
// cm: utils.NewConnManager(), servers: []*Service{},
} }
} }
func (s *TunnelServerManager) Start(args interface{}) (err error) { func (s *TunnelServerManager) Start(args interface{}) (err error) {
@ -89,6 +90,7 @@ func (s *TunnelServerManager) Start(args interface{}) (err error) {
if err != nil { if err != nil {
return return
} }
s.servers = append(s.servers, &server)
} }
return return
} }
@ -96,7 +98,9 @@ func (s *TunnelServerManager) Clean() {
s.StopService() s.StopService()
} }
func (s *TunnelServerManager) StopService() { func (s *TunnelServerManager) StopService() {
// s.cm.RemoveAll() for _, server := range s.servers {
(*server).Clean()
}
} }
func (s *TunnelServerManager) CheckArgs() (err error) { func (s *TunnelServerManager) CheckArgs() (err error) {
if *s.cfg.CertFile == "" || *s.cfg.KeyFile == "" { if *s.cfg.CertFile == "" || *s.cfg.KeyFile == "" {
@ -137,6 +141,7 @@ func NewTunnelServer() Service {
return &TunnelServer{ return &TunnelServer{
cfg: TunnelServerArgs{}, cfg: TunnelServerArgs{},
udpChn: make(chan UDPItem, 50000), udpChn: make(chan UDPItem, 50000),
isStop: false,
} }
} }
@ -146,6 +151,27 @@ type UDPItem struct {
srcAddr *net.UDPAddr srcAddr *net.UDPAddr
} }
func (s *TunnelServer) StopService() {
defer func() {
e := recover()
if e != nil {
log.Printf("stop server service crashed,%s", e)
} else {
log.Printf("service server stoped,%s", e)
}
}()
s.isStop = true
if s.sc.Listener != nil {
(*s.sc.Listener).Close()
}
if s.sc.UDPListener != nil {
(*s.sc.UDPListener).Close()
}
if s.udpConn != nil {
(*s.udpConn).Close()
}
}
func (s *TunnelServer) InitService() (err error) { func (s *TunnelServer) InitService() (err error) {
s.UDPConnDeamon() s.UDPConnDeamon()
return return
@ -191,6 +217,9 @@ func (s *TunnelServer) Start(args interface{}) (err error) {
var outConn net.Conn var outConn net.Conn
var ID string var ID string
for { for {
if s.isStop {
return
}
outConn, ID, err = s.GetOutConn(CONN_SERVER) outConn, ID, err = s.GetOutConn(CONN_SERVER)
if err != nil { if err != nil {
utils.CloseConn(&outConn) utils.CloseConn(&outConn)
@ -259,10 +288,19 @@ func (s *TunnelServer) UDPConnDeamon() {
// var cmdChn = make(chan bool, 1000) // var cmdChn = make(chan bool, 1000)
var err error var err error
for { for {
if s.isStop {
return
}
item := <-s.udpChn item := <-s.udpChn
RETRY: RETRY:
if s.isStop {
return
}
if outConn == nil { if outConn == nil {
for { for {
if s.isStop {
return
}
outConn, ID, err = s.GetOutConn(CONN_SERVER) outConn, ID, err = s.GetOutConn(CONN_SERVER)
if err != nil { if err != nil {
// cmdChn <- true // cmdChn <- true
@ -273,11 +311,14 @@ func (s *TunnelServer) UDPConnDeamon() {
continue continue
} else { } else {
go func(outConn net.Conn, ID string) { go func(outConn net.Conn, ID string) {
go func() { if s.udpConn != nil {
// <-cmdChn (*s.udpConn).Close()
// outConn.Close() }
}() s.udpConn = &outConn
for { for {
if s.isStop {
return
}
srcAddrFromConn, body, err := utils.ReadUDPPacket(outConn) srcAddrFromConn, body, err := utils.ReadUDPPacket(outConn)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
log.Printf("UDP deamon connection %s exited", ID) log.Printf("UDP deamon connection %s exited", ID)

View File

@ -17,15 +17,17 @@ import (
type UDP struct { type UDP struct {
p utils.ConcurrentMap p utils.ConcurrentMap
outPool utils.OutPool outPool utils.OutConn
cfg UDPArgs cfg UDPArgs
sc *utils.ServerChannel sc *utils.ServerChannel
isStop bool
} }
func NewUDP() Service { func NewUDP() Service {
return &UDP{ return &UDP{
outPool: utils.OutPool{}, outPool: utils.OutConn{},
p: utils.NewConcurrentMap(), p: utils.NewConcurrentMap(),
isStop: false,
} }
} }
func (s *UDP) CheckArgs() (err error) { func (s *UDP) CheckArgs() (err error) {
@ -52,8 +54,20 @@ func (s *UDP) InitService() (err error) {
return return
} }
func (s *UDP) StopService() { func (s *UDP) StopService() {
if s.outPool.Pool != nil { defer func() {
s.outPool.Pool.ReleaseAll() e := recover()
if e != nil {
log.Printf("stop udp service crashed,%s", e)
} else {
log.Printf("service udp stoped,%s", e)
}
}()
s.isStop = true
if s.sc.Listener != nil && *s.sc.Listener != nil {
(*s.sc.Listener).Close()
}
if s.sc.UDPListener != nil {
(*s.sc.UDPListener).Close()
} }
} }
func (s *UDP) Start(args interface{}) (err error) { func (s *UDP) Start(args interface{}) (err error) {
@ -105,7 +119,7 @@ func (s *UDP) GetConn(connKey string) (conn net.Conn, isNew bool, err error) {
isNew = !s.p.Has(connKey) isNew = !s.p.Has(connKey)
var _conn interface{} var _conn interface{}
if isNew { if isNew {
_conn, err = s.outPool.Pool.Get() _conn, err = s.outPool.Get()
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
@ -138,6 +152,9 @@ func (s *UDP) OutToTCP(packet []byte, localAddr, srcAddr *net.UDPAddr) (err erro
}() }()
log.Printf("conn %d created , local: %s", connKey, srcAddr.String()) log.Printf("conn %d created , local: %s", connKey, srcAddr.String())
for { for {
if s.isStop {
return
}
srcAddrFromConn, body, err := utils.ReadUDPPacket(bufio.NewReader(conn)) srcAddrFromConn, body, err := utils.ReadUDPPacket(bufio.NewReader(conn))
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
//log.Printf("connection %d released", connKey) //log.Printf("connection %d released", connKey)
@ -216,7 +233,7 @@ func (s *UDP) InitOutConnPool() {
if *s.cfg.ParentType == TYPE_TLS || *s.cfg.ParentType == TYPE_TCP { if *s.cfg.ParentType == TYPE_TLS || *s.cfg.ParentType == TYPE_TCP {
//dur int, isTLS bool, certBytes, keyBytes []byte, //dur int, isTLS bool, certBytes, keyBytes []byte,
//parent string, timeout int, InitialCap int, MaxCap int //parent string, timeout int, InitialCap int, MaxCap int
s.outPool = utils.NewOutPool( s.outPool = utils.NewOutConn(
*s.cfg.CheckParentInterval, *s.cfg.CheckParentInterval,
*s.cfg.ParentType, *s.cfg.ParentType,
kcpcfg.KCPConfigArgs{}, kcpcfg.KCPConfigArgs{},

View File

@ -1,145 +0,0 @@
package utils
import (
"log"
"sync"
"time"
)
//ConnPool to use
type ConnPool interface {
Get() (conn interface{}, err error)
Put(conn interface{})
ReleaseAll()
Len() (length int)
}
type poolConfig struct {
Factory func() (interface{}, error)
IsActive func(interface{}) bool
Release func(interface{})
InitialCap int
MaxCap int
}
func NewConnPool(poolConfig poolConfig) (pool ConnPool, err error) {
p := netPool{
config: poolConfig,
conns: make(chan interface{}, poolConfig.MaxCap),
lock: &sync.Mutex{},
}
//log.Printf("pool MaxCap:%d", poolConfig.MaxCap)
if poolConfig.MaxCap > 0 {
err = p.initAutoFill(false)
if err == nil {
p.initAutoFill(true)
}
}
return &p, nil
}
type netPool struct {
conns chan interface{}
lock *sync.Mutex
config poolConfig
}
func (p *netPool) initAutoFill(async bool) (err error) {
var worker = func() (err error) {
for {
//log.Printf("pool fill: %v , len: %d", p.Len() <= p.config.InitialCap/2, p.Len())
if p.Len() <= p.config.InitialCap/2 {
p.lock.Lock()
errN := 0
for i := 0; i < p.config.InitialCap; i++ {
c, err := p.config.Factory()
if err != nil {
errN++
if async {
continue
} else {
p.lock.Unlock()
return err
}
}
select {
case p.conns <- c:
default:
p.config.Release(c)
break
}
if p.Len() >= p.config.InitialCap {
break
}
}
if errN > 0 {
log.Printf("fill conn pool fail , ERRN:%d", errN)
}
p.lock.Unlock()
}
if !async {
return
}
time.Sleep(time.Second * 2)
}
}
if async {
go worker()
} else {
err = worker()
}
return
}
func (p *netPool) Get() (conn interface{}, err error) {
// defer func() {
// log.Printf("pool len : %d", p.Len())
// }()
p.lock.Lock()
defer p.lock.Unlock()
// for {
select {
case conn = <-p.conns:
if p.config.IsActive(conn) {
return
}
p.config.Release(conn)
default:
conn, err = p.config.Factory()
if err != nil {
return nil, err
}
return conn, nil
}
// }
return
}
func (p *netPool) Put(conn interface{}) {
if conn == nil {
return
}
p.lock.Lock()
defer p.lock.Unlock()
if !p.config.IsActive(conn) {
p.config.Release(conn)
}
select {
case p.conns <- conn:
default:
p.config.Release(conn)
}
}
func (p *netPool) ReleaseAll() {
p.lock.Lock()
defer p.lock.Unlock()
close(p.conns)
for c := range p.conns {
p.config.Release(c)
}
p.conns = make(chan interface{}, p.config.InitialCap)
}
func (p *netPool) Len() (length int) {
return len(p.conns)
}

View File

@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"crypto/tls" "crypto/tls"
"encoding/base64" "encoding/base64"
"encoding/binary"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -28,6 +27,7 @@ type Checker struct {
directMap ConcurrentMap directMap ConcurrentMap
interval int64 interval int64
timeout int timeout int
isStop bool
} }
type CheckerItem struct { type CheckerItem struct {
IsHTTPS bool IsHTTPS bool
@ -48,6 +48,7 @@ func NewChecker(timeout int, interval int64, blockedFile, directFile string) Che
data: NewConcurrentMap(), data: NewConcurrentMap(),
interval: interval, interval: interval,
timeout: timeout, timeout: timeout,
isStop: false,
} }
ch.blockedMap = ch.loadMap(blockedFile) ch.blockedMap = ch.loadMap(blockedFile)
ch.directMap = ch.loadMap(directFile) ch.directMap = ch.loadMap(directFile)
@ -81,6 +82,9 @@ func (c *Checker) loadMap(f string) (dataMap ConcurrentMap) {
} }
return return
} }
func (c *Checker) Stop() {
c.isStop = true
}
func (c *Checker) start() { func (c *Checker) start() {
go func() { go func() {
//log.Printf("checker started") //log.Printf("checker started")
@ -107,6 +111,9 @@ func (c *Checker) start() {
}(v.(CheckerItem)) }(v.(CheckerItem))
} }
time.Sleep(time.Second * time.Duration(c.interval)) time.Sleep(time.Second * time.Duration(c.interval))
if c.isStop {
return
}
} }
}() }()
} }
@ -498,8 +505,7 @@ func (req *HTTPRequest) addPortIfNot() (newHost string) {
return return
} }
type OutPool struct { type OutConn struct {
Pool ConnPool
dur int dur int
typ string typ string
certBytes []byte certBytes []byte
@ -510,8 +516,8 @@ type OutPool struct {
timeout int timeout int
} }
func NewOutPool(dur int, typ string, kcp kcpcfg.KCPConfigArgs, certBytes, keyBytes, caCertBytes []byte, address string, timeout int, InitialCap int, MaxCap int) (op OutPool) { func NewOutConn(dur int, typ string, kcp kcpcfg.KCPConfigArgs, certBytes, keyBytes, caCertBytes []byte, address string, timeout int, InitialCap int, MaxCap int) (op OutConn) {
op = OutPool{ return OutConn{
dur: dur, dur: dur,
typ: typ, typ: typ,
certBytes: certBytes, certBytes: certBytes,
@ -521,36 +527,8 @@ func NewOutPool(dur int, typ string, kcp kcpcfg.KCPConfigArgs, certBytes, keyByt
address: address, address: address,
timeout: timeout, timeout: timeout,
} }
var err error
op.Pool, err = NewConnPool(poolConfig{
IsActive: func(conn interface{}) bool { return true },
Release: func(conn interface{}) {
if conn != nil {
conn.(net.Conn).SetDeadline(time.Now().Add(time.Millisecond))
conn.(net.Conn).Close()
// log.Println("conn released")
}
},
InitialCap: InitialCap,
MaxCap: MaxCap,
Factory: func() (conn interface{}, err error) {
conn, err = op.getConn()
return
},
})
if err != nil {
log.Fatalf("init conn pool fail ,%s", err)
} else {
if InitialCap > 0 {
log.Printf("init conn pool success")
op.initPoolDeamon()
} else {
log.Printf("conn pool closed")
}
}
return
} }
func (op *OutPool) getConn() (conn interface{}, err error) { func (op *OutConn) Get() (conn net.Conn, err error) {
if op.typ == "tls" { if op.typ == "tls" {
var _conn tls.Conn var _conn tls.Conn
_conn, err = TlsConnectHost(op.address, op.timeout, op.certBytes, op.keyBytes, op.caCertBytes) _conn, err = TlsConnectHost(op.address, op.timeout, op.certBytes, op.keyBytes, op.caCertBytes)
@ -565,176 +543,6 @@ func (op *OutPool) getConn() (conn interface{}, err error) {
return return
} }
func (op *OutPool) initPoolDeamon() {
go func() {
if op.dur <= 0 {
return
}
log.Printf("pool deamon started")
for {
time.Sleep(time.Second * time.Duration(op.dur))
conn, err := op.getConn()
if err != nil {
log.Printf("pool deamon err %s , release pool", err)
op.Pool.ReleaseAll()
} else {
conn.(net.Conn).SetDeadline(time.Now().Add(time.Millisecond))
conn.(net.Conn).Close()
}
}
}()
}
type HeartbeatData struct {
Data []byte
N int
Error error
}
type HeartbeatReadWriter struct {
conn *net.Conn
// rchn chan HeartbeatData
l *sync.Mutex
dur int
errHandler func(err error, hb *HeartbeatReadWriter)
once *sync.Once
datachn chan byte
// rbuf bytes.Buffer
// signal chan bool
rerrchn chan error
}
func NewHeartbeatReadWriter(conn *net.Conn, dur int, fn func(err error, hb *HeartbeatReadWriter)) (hrw HeartbeatReadWriter) {
hrw = HeartbeatReadWriter{
conn: conn,
l: &sync.Mutex{},
dur: dur,
// rchn: make(chan HeartbeatData, 10000),
// signal: make(chan bool, 1),
errHandler: fn,
datachn: make(chan byte, 4*1024),
once: &sync.Once{},
rerrchn: make(chan error, 1),
// rbuf: bytes.Buffer{},
}
hrw.heartbeat()
hrw.reader()
return
}
func (rw *HeartbeatReadWriter) Close() {
CloseConn(rw.conn)
}
func (rw *HeartbeatReadWriter) reader() {
go func() {
//log.Printf("heartbeat read started")
for {
n, data, err := rw.read()
if n == -1 {
continue
}
//log.Printf("n:%d , data:%s ,err:%s", n, string(data), err)
if err == nil {
//fmt.Printf("write data %s\n", string(data))
for _, b := range data {
rw.datachn <- b
}
}
if err != nil {
//log.Printf("heartbeat reader err: %s", err)
select {
case rw.rerrchn <- err:
default:
}
rw.once.Do(func() {
rw.errHandler(err, rw)
})
break
}
}
//log.Printf("heartbeat read exited")
}()
}
func (rw *HeartbeatReadWriter) read() (n int, data []byte, err error) {
var typ uint8
err = binary.Read((*rw.conn), binary.LittleEndian, &typ)
if err != nil {
return
}
if typ == 0 {
// log.Printf("heartbeat revecived")
n = -1
return
}
var dataLength uint32
binary.Read((*rw.conn), binary.LittleEndian, &dataLength)
_data := make([]byte, dataLength)
// log.Printf("dataLength:%d , data:%s", dataLength, string(data))
n, err = (*rw.conn).Read(_data)
//log.Printf("n:%d , data:%s ,err:%s", n, string(data), err)
if err != nil {
return
}
if uint32(n) != dataLength {
err = fmt.Errorf("read short data body")
return
}
data = _data[:n]
return
}
func (rw *HeartbeatReadWriter) heartbeat() {
go func() {
//log.Printf("heartbeat started")
for {
if rw.conn == nil || *rw.conn == nil {
//log.Printf("heartbeat err: conn nil")
break
}
rw.l.Lock()
_, err := (*rw.conn).Write([]byte{0})
rw.l.Unlock()
if err != nil {
//log.Printf("heartbeat err: %s", err)
rw.once.Do(func() {
rw.errHandler(err, rw)
})
break
} else {
// log.Printf("heartbeat send ok")
}
time.Sleep(time.Second * time.Duration(rw.dur))
}
//log.Printf("heartbeat exited")
}()
}
func (rw *HeartbeatReadWriter) Read(p []byte) (n int, err error) {
data := make([]byte, cap(p))
for i := 0; i < cap(p); i++ {
data[i] = <-rw.datachn
n++
//fmt.Printf("read %d %v\n", i, data[:n])
if len(rw.datachn) == 0 {
n = i + 1
copy(p, data[:n])
return
}
}
return
}
func (rw *HeartbeatReadWriter) Write(p []byte) (n int, err error) {
defer rw.l.Unlock()
rw.l.Lock()
pkg := new(bytes.Buffer)
binary.Write(pkg, binary.LittleEndian, uint8(1))
binary.Write(pkg, binary.LittleEndian, uint32(len(p)))
binary.Write(pkg, binary.LittleEndian, p)
bs := pkg.Bytes()
n, err = (*rw.conn).Write(bs)
if err == nil {
n = len(p)
}
return
}
type ConnManager struct { type ConnManager struct {
pool ConcurrentMap pool ConcurrentMap
l *sync.Mutex l *sync.Mutex