优化错误捕获,防止意外crash

优化停止服务,释放内存
This commit is contained in:
arraykeys@gmail.com
2018-09-05 15:00:31 +08:00
parent e2f7377259
commit 997f438dfb
23 changed files with 509 additions and 86 deletions

View File

@ -21,6 +21,7 @@ v6.0 enterprise version
10.SOCKS5代理支持级联认证.
11.插件机制,定制功能可以使用插件方式开发了热插拔不需要修改goproxy二进制可以插件so或者
dylib注入.
12.修复了tclient可能意外退出的bug.
v5.4
1.优化了获取本地IP信息导致CPU过高的问题.

View File

@ -10,6 +10,7 @@ import (
"os/exec"
"path"
"path/filepath"
"runtime/debug"
"runtime/pprof"
"time"
@ -57,7 +58,7 @@ func initConfig() (err error) {
//build srvice args
app = kingpin.New("proxy", "happy with proxy")
app.Author("snail").Version("v" + APP_VERSION + " enterprise version")
debug := app.Flag("debug", "debug log output").Default("false").Bool()
isDebug := app.Flag("debug", "debug log output").Default("false").Bool()
daemon := app.Flag("daemon", "run proxy in background").Default("false").Bool()
forever := app.Flag("forever", "run proxy in forever,fail and retry").Default("false").Bool()
logfile := app.Flag("log", "log file path").Default("").String()
@ -120,7 +121,7 @@ func initConfig() (err error) {
httpArgs.LoadBalanceOnlyHA = http.Flag("lb-onlyha", "use only `high availability mode` to choose parent for LB").Default("false").Bool()
httpArgs.RateLimit = http.Flag("rate-limit", "rate limit (bytes/second) of each connection, such as: 100K 1.5M . 0 means no limitation").Short('l').Default("0").String()
httpArgs.BindListen = http.Flag("bind-listen", "using listener binding IP when connect to target").Short('B').Default("false").Bool()
httpArgs.Debug = debug
httpArgs.Debug = isDebug
//########tcp#########
tcp := app.Command("tcp", "proxy on tcp mode")
tcpArgs.Parent = tcp.Flag("parent", "parent address, such as: \"23.32.32.19:28008\"").Default("[]").Short('P').String()
@ -243,7 +244,7 @@ func initConfig() (err error) {
socksArgs.LoadBalanceOnlyHA = socks.Flag("lb-onlyha", "use only `high availability mode` to choose parent for LB").Default("false").Bool()
socksArgs.RateLimit = socks.Flag("rate-limit", "rate limit (bytes/second) of each connection, such as: 100K 1.5M . 0 means no limitation").Short('l').Default("0").String()
socksArgs.BindListen = socks.Flag("bind-listen", "using listener binding IP when connect to target").Short('B').Default("false").Bool()
socksArgs.Debug = debug
socksArgs.Debug = isDebug
//########socks+http(s)#########
sps := app.Command("sps", "proxy on socks+http(s) mode")
@ -283,7 +284,7 @@ func initConfig() (err error) {
spsArgs.LoadBalanceHashTarget = sps.Flag("lb-hashtarget", "use target address to choose parent for LB").Default("false").Bool()
spsArgs.LoadBalanceOnlyHA = sps.Flag("lb-onlyha", "use only `high availability mode` to choose parent for LB").Default("false").Bool()
spsArgs.RateLimit = sps.Flag("rate-limit", "rate limit (bytes/second) of each connection, such as: 100K 1.5M . 0 means no limitation").Short('l').Default("0").String()
spsArgs.Debug = debug
spsArgs.Debug = isDebug
//########dns#########
dns := app.Command("dns", "proxy on dns server mode")
@ -314,8 +315,6 @@ func initConfig() (err error) {
//parse args
serviceName := kingpin.MustParse(app.Parse(os.Args[1:]))
isDebug = *debug
//set kcp config
switch *kcpArgs.Mode {
@ -372,7 +371,7 @@ func initConfig() (err error) {
log := logger.New(os.Stderr, "", logger.Ldate|logger.Ltime)
flags := logger.Ldate
if *debug {
if *isDebug {
flags |= logger.Lshortfile | logger.Lmicroseconds
cpuProfilingFile, _ = os.Create("cpu.prof")
memProfilingFile, _ = os.Create("memory.prof")
@ -417,6 +416,11 @@ func initConfig() (err error) {
}
}
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
for {
if cmd != nil {
cmd.Process.Kill()
@ -436,11 +440,21 @@ func initConfig() (err error) {
scanner := bufio.NewScanner(cmdReader)
scannerStdErr := bufio.NewScanner(cmdReaderStderr)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
for scanner.Scan() {
fmt.Println(scanner.Text())
}
}()
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
for scannerStdErr.Scan() {
fmt.Println(scannerStdErr.Text())
}
@ -462,7 +476,7 @@ func initConfig() (err error) {
}
if *logfile == "" {
poster()
if *debug {
if *isDebug {
log.Println("[profiling] cpu profiling save to file : cpu.prof")
log.Println("[profiling] memory profiling save to file : memory.prof")
log.Println("[profiling] block profiling save to file : block.prof")

View File

@ -10,6 +10,7 @@ import (
"encoding/binary"
"fmt"
"io"
"runtime/debug"
"math/rand"
"net"
@ -133,7 +134,14 @@ func newConn(m *Mux, dst net.Addr) *Conn {
}
func (c *Conn) start() {
go c.reader()
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
c.reader()
}()
}
func (c *Conn) reader() {

View File

@ -6,6 +6,7 @@ package dst
import (
"fmt"
"runtime/debug"
"net"
"sync"
@ -78,8 +79,14 @@ func NewMux(conn net.PacketConn, packetSize int) *Mux {
}
}
go m.readerLoop()
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
m.readerLoop()
}()
return m
}

View File

@ -6,6 +6,7 @@ package dst
import (
"fmt"
"runtime/debug"
"sync"
@ -52,7 +53,14 @@ func newSendBuffer(m *Mux) *sendBuffer {
scheduler: ratelimit.NewBucketWithRate(schedulerRate, schedulerCapacity),
}
b.cond = sync.NewCond(&b.mut)
go b.writerLoop()
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
b.writerLoop()
}()
return b
}

View File

@ -2,6 +2,8 @@ package mapx
import (
"encoding/json"
"fmt"
"runtime/debug"
"sync"
)
@ -150,7 +152,14 @@ type Tuple struct {
func (m ConcurrentMap) Iter() <-chan Tuple {
chans := snapshot(m)
ch := make(chan Tuple)
go fanIn(chans, ch)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
fanIn(chans, ch)
}()
return ch
}
@ -162,7 +171,14 @@ func (m ConcurrentMap) IterBuffered() <-chan Tuple {
total += cap(c)
}
ch := make(chan Tuple, total)
go fanIn(chans, ch)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
fanIn(chans, ch)
}()
return ch
}
@ -176,17 +192,24 @@ func snapshot(m ConcurrentMap) (chans []chan Tuple) {
wg.Add(SHARD_COUNT)
// Foreach shard.
for index, shard := range m {
go func(index int, shard *ConcurrentMapShared) {
// Foreach key, value pair.
shard.RLock()
chans[index] = make(chan Tuple, len(shard.items))
wg.Done()
for key, val := range shard.items {
chans[index] <- Tuple{key, val}
}
shard.RUnlock()
close(chans[index])
}(index, shard)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
func(index int, shard *ConcurrentMapShared) {
// Foreach key, value pair.
shard.RLock()
chans[index] = make(chan Tuple, len(shard.items))
wg.Done()
for key, val := range shard.items {
chans[index] <- Tuple{key, val}
}
shard.RUnlock()
close(chans[index])
}(index, shard)
}()
}
wg.Wait()
return chans
@ -197,12 +220,19 @@ func fanIn(chans []chan Tuple, out chan Tuple) {
wg := sync.WaitGroup{}
wg.Add(len(chans))
for _, ch := range chans {
go func(ch chan Tuple) {
for t := range ch {
out <- t
}
wg.Done()
}(ch)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
func(ch chan Tuple) {
for t := range ch {
out <- t
}
wg.Done()
}(ch)
}()
}
wg.Wait()
close(out)
@ -244,19 +274,31 @@ func (m ConcurrentMap) Keys() []string {
count := m.Count()
ch := make(chan string, count)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
// Foreach shard.
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
for _, shard := range m {
go func(shard *ConcurrentMapShared) {
// Foreach key, value pair.
shard.RLock()
for key := range shard.items {
ch <- key
}
shard.RUnlock()
wg.Done()
}(shard)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
func(shard *ConcurrentMapShared) {
// Foreach key, value pair.
shard.RLock()
for key := range shard.items {
ch <- key
}
shard.RUnlock()
wg.Done()
}(shard)
}()
}
wg.Wait()
close(ch)

View File

@ -1,8 +1,10 @@
package udputils
import (
"fmt"
logger "log"
"net"
"runtime/debug"
"strings"
"time"
@ -58,6 +60,11 @@ func (s *IOBinder) Clean(fn CleanFn) *IOBinder {
func (s *IOBinder) AliveWithServeConn(srcAddr string, inTCPConn *net.Conn) *IOBinder {
s.inTCPConn = inTCPConn
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
buf := make([]byte, 1)
(*inTCPConn).SetReadDeadline(time.Time{})
if _, err := (*inTCPConn).Read(buf); err != nil {
@ -66,6 +73,11 @@ func (s *IOBinder) AliveWithServeConn(srcAddr string, inTCPConn *net.Conn) *IOBi
}
}()
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
for {
(*inTCPConn).SetWriteDeadline(time.Now().Add(time.Second * 5))
if _, err := (*inTCPConn).Write([]byte{0x00}); err != nil {
@ -82,6 +94,11 @@ func (s *IOBinder) AliveWithServeConn(srcAddr string, inTCPConn *net.Conn) *IOBi
func (s *IOBinder) AliveWithClientConn(srcAddr string, outTCPConn *net.Conn) *IOBinder {
s.outTCPConn = outTCPConn
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
buf := make([]byte, 1)
(*outTCPConn).SetReadDeadline(time.Time{})
if _, err := (*outTCPConn).Read(buf); err != nil {
@ -137,6 +154,11 @@ func (s *IOBinder) Run() (err error) {
return err
}
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
defer func() {
s.clean(srcAddr.String())
}()

View File

@ -1,9 +1,11 @@
package main
import (
"fmt"
"log"
"os"
"os/signal"
"runtime/debug"
"syscall"
"github.com/snail007/goproxy/services"
@ -32,6 +34,11 @@ func Clean(s *services.Service) {
syscall.SIGTERM,
syscall.SIGQUIT)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
for _ = range signalChan {
log.Println("Received an interrupt, stopping services...")
if s != nil && *s != nil {

View File

@ -11,10 +11,10 @@ import (
"golang.org/x/net/proxy"
services "github.com/snail007/goproxy/services"
"github.com/snail007/goproxy/services/kcpcfg"
"github.com/miekg/dns"
gocache "github.com/pmylund/go-cache"
services "github.com/snail007/goproxy/services"
"github.com/snail007/goproxy/services/kcpcfg"
)
type DNSArgs struct {
@ -58,6 +58,11 @@ func (s *DNS) InitService() (err error) {
s.cache = gocache.New(time.Second*time.Duration(*s.cfg.DNSTTL), time.Second*60)
s.cache.LoadFile(*s.cfg.CacheFile)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
for {
select {
case <-s.exitSig:
@ -135,6 +140,11 @@ func (s *DNS) Start(args interface{}, log *logger.Logger) (err error) {
}
dns.HandleFunc(".", s.callback)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
log.Printf("dns server on udp %s", *s.cfg.Local)
err := dns.ListenAndServe(*s.cfg.Local, "udp", nil)
if err != nil {

View File

@ -181,6 +181,11 @@ func (s *HTTP) InitService() (err error) {
return
}
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
//循环检查ssh网络连通性
for {
if s.isStop {
@ -218,6 +223,17 @@ func (s *HTTP) StopService() {
} else {
s.log.Printf("service http(s) stoped")
}
s.basicAuth = utils.BasicAuth{}
s.cfg = HTTPArgs{}
s.checker = utils.Checker{}
s.domainResolver = dnsx.DomainResolver{}
s.lb = nil
s.lockChn = nil
s.log = nil
s.serverChannels = nil
s.sshClient = nil
s.userConns = nil
s = nil
}()
s.isStop = true
if len(*s.cfg.Parent) > 0 {

View File

@ -7,6 +7,7 @@ import (
logger "log"
"math/rand"
"net"
"runtime/debug"
"strconv"
"strings"
"sync"
@ -79,6 +80,14 @@ func (s *MuxBridge) StopService() {
} else {
s.log.Printf("service bridge stoped")
}
s.cfg = MuxBridgeArgs{}
s.clientControlConns = nil
s.l = nil
s.log = nil
s.router = utils.ClientKeyRouter{}
s.sc = nil
s.serverConns = nil
s = nil
}()
s.isStop = true
if s.sc != nil && (*s.sc).Listener != nil {
@ -209,6 +218,11 @@ func (s *MuxBridge) handler(inConn net.Conn) {
group.Set(index, session)
// s.clientControlConns.Set(key, session)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
for {
if s.isStop {
return
@ -277,10 +291,20 @@ func (s *MuxBridge) callback(inConn net.Conn, serverID, key string) {
die1 := make(chan bool, 1)
die2 := make(chan bool, 1)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
io.Copy(stream, inConn)
die1 <- true
}()
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
io.Copy(inConn, stream)
die2 <- true
}()

View File

@ -6,6 +6,7 @@ import (
"io"
logger "log"
"net"
"runtime/debug"
"strings"
"time"
@ -106,6 +107,12 @@ func (s *MuxClient) StopService() {
} else {
s.log.Printf("service client stoped")
}
s.cfg = MuxClientArgs{}
s.jumper = nil
s.log = nil
s.sessions = nil
s.udpConns = nil
s = nil
}()
s.isStop = true
for _, sess := range s.sessions.Items() {
@ -297,6 +304,11 @@ func (s *MuxClient) ServeUDP(inConn *smux.Stream, localAddr, ID string) {
}
func (s *MuxClient) UDPRevecive(key, ID string) {
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
s.log.Printf("udp conn %s connected", ID)
v, ok := s.udpConns.Get(key)
if !ok {
@ -322,6 +334,11 @@ func (s *MuxClient) UDPRevecive(key, ID string) {
}
cui.touchtime = time.Now().Unix()
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
cui.conn.SetWriteDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout)))
_, err = cui.conn.Write(utils.UDPPacket(cui.srcAddr.String(), buf[:n]))
cui.conn.SetWriteDeadline(time.Time{})
@ -336,6 +353,11 @@ func (s *MuxClient) UDPRevecive(key, ID string) {
func (s *MuxClient) UDPGCDeamon() {
gctime := int64(30)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
if s.isStop {
return
}
@ -390,10 +412,20 @@ func (s *MuxClient) ServeConn(inConn *smux.Stream, localAddr, ID string) {
die1 := make(chan bool, 1)
die2 := make(chan bool, 1)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
io.Copy(outConn, snappy.NewReader(inConn))
die1 <- true
}()
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
io.Copy(snappy.NewWriter(inConn), outConn)
die2 <- true
}()

View File

@ -149,6 +149,11 @@ func (s *MuxServerManager) StopService() {
for _, server := range s.servers {
(*server).Clean()
}
s.cfg = MuxServerArgs{}
s.log = nil
s.serverID = ""
s.servers = nil
s = nil
}
func (s *MuxServerManager) CheckArgs() (err error) {
if *s.cfg.CertFile == "" || *s.cfg.KeyFile == "" {
@ -198,6 +203,14 @@ func (s *MuxServer) StopService() {
} else {
s.log.Printf("service server stoped")
}
s.cfg = MuxServerArgs{}
s.jumper = nil
s.lockChn = nil
s.log = nil
s.sc = utils.ServerChannel{}
s.sessions = nil
s.udpConns = nil
s = nil
}()
s.isStop = true
for _, sess := range s.sessions.Items() {
@ -283,10 +296,20 @@ func (s *MuxServer) Start(args interface{}, log *logger.Logger) (err error) {
die1 := make(chan bool, 1)
die2 := make(chan bool, 1)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
io.Copy(inConn, snappy.NewReader(outConn))
die1 <- true
}()
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
io.Copy(snappy.NewWriter(outConn), inConn)
die2 <- true
}()
@ -377,6 +400,11 @@ func (s *MuxServer) GetConn(index string) (conn net.Conn, err error) {
s.sessions.Set(index, session)
s.log.Printf("session[%s] created", index)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
for {
if s.isStop {
return
@ -432,6 +460,11 @@ func (s *MuxServer) getParentConn() (conn net.Conn, err error) {
func (s *MuxServer) UDPGCDeamon() {
gctime := int64(30)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
if s.isStop {
return
}
@ -506,6 +539,11 @@ func (s *MuxServer) UDPSend(data []byte, localAddr, srcAddr *net.UDPAddr) {
}
func (s *MuxServer) UDPRevecive(key, ID string) {
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
s.log.Printf("udp conn %s connected", ID)
var uc *MuxUDPConnItem
defer func() {
@ -533,7 +571,14 @@ func (s *MuxServer) UDPRevecive(key, ID string) {
return
}
uc.touchtime = time.Now().Unix()
go s.sc.UDPListener.WriteToUDP(body, uc.srcAddr)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
s.sc.UDPListener.WriteToUDP(body, uc.srcAddr)
}()
}
}()
}

View File

@ -189,6 +189,11 @@ func (s *Socks) InitService() (err error) {
return
}
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
//循环检查ssh网络连通性
for {
if s.isStop {
@ -226,6 +231,21 @@ func (s *Socks) StopService() {
} else {
s.log.Printf("service socks stoped")
}
s.basicAuth = utils.BasicAuth{}
s.cfg = SocksArgs{}
s.checker = utils.Checker{}
s.domainResolver = dnsx.DomainResolver{}
s.lb = nil
s.lockChn = nil
s.log = nil
s.sc = nil
s.sshClient = nil
s.udpLocalKey = nil
s.udpParentKey = nil
s.udpRelatedPacketConns = nil
s.udpSC = utils.ServerChannel{}
s.userConns = nil
s = nil
}()
s.isStop = true
if len(*s.cfg.Parent) > 0 {

View File

@ -178,6 +178,19 @@ func (s *SPS) StopService() {
} else {
s.log.Printf("service sps stoped")
}
s.basicAuth = utils.BasicAuth{}
s.cfg = SPSArgs{}
s.domainResolver = dnsx.DomainResolver{}
s.lb = nil
s.localCipher = nil
s.log = nil
s.parentCipher = nil
s.serverChannels = nil
s.udpLocalKey = nil
s.udpParentKey = nil
s.udpRelatedPacketConns = nil
s.userConns = nil
s = nil
}()
for _, sc := range s.serverChannels {
if sc.Listener != nil && *sc.Listener != nil {

View File

@ -92,6 +92,12 @@ func (s *TCP) StopService() {
} else {
s.log.Printf("service tcp stoped")
}
s.cfg = TCPArgs{}
s.jumper = nil
s.log = nil
s.sc = nil
s.userConns = nil
s = nil
}()
s.isStop = true
if s.sc.Listener != nil && *s.sc.Listener != nil {

View File

@ -7,6 +7,7 @@ import (
"net"
"os"
"strconv"
"strings"
"time"
"github.com/snail007/goproxy/services"
@ -72,6 +73,11 @@ func (s *TunnelBridge) StopService() {
} else {
s.log.Printf("service tbridge stoped")
}
s.cfg = TunnelBridgeArgs{}
s.clientControlConns = nil
s.log = nil
s.serverConns = nil
s = nil
}()
s.isStop = true
for _, sess := range s.clientControlConns.Items() {
@ -178,7 +184,7 @@ func (s *TunnelBridge) callback(inConn net.Conn) {
(*item.(*net.Conn)).SetWriteDeadline(time.Now().Add(time.Second * 3))
_, err := (*item.(*net.Conn)).Write(packet)
(*item.(*net.Conn)).SetWriteDeadline(time.Time{})
if err != nil {
if err != nil && strings.Contains(err.Error(), "stream closed") {
s.log.Printf("%s client control conn write signal fail, err: %s, retrying...", key, err)
time.Sleep(time.Second * 3)
continue

View File

@ -7,6 +7,7 @@ import (
logger "log"
"net"
"os"
"runtime/debug"
"strings"
"time"
@ -40,7 +41,7 @@ type ClientUDPConnItem struct {
}
type TunnelClient struct {
cfg TunnelClientArgs
ctrlConn net.Conn
ctrlConn *net.Conn
isStop bool
userConns mapx.ConcurrentMap
log *logger.Logger
@ -96,10 +97,17 @@ func (s *TunnelClient) StopService() {
} else {
s.log.Printf("service tclient stoped")
}
s.cfg = TunnelClientArgs{}
s.ctrlConn = nil
s.jumper = nil
s.log = nil
s.udpConns = nil
s.userConns = nil
s = nil
}()
s.isStop = true
if s.ctrlConn != nil {
s.ctrlConn.Close()
(*s.ctrlConn).Close()
}
for _, c := range s.userConns.Items() {
(*c.(*net.Conn)).Close()
@ -121,27 +129,25 @@ func (s *TunnelClient) Start(args interface{}, log *logger.Logger) (err error) {
return
}
if s.ctrlConn != nil {
s.ctrlConn.Close()
(*s.ctrlConn).Close()
}
s.ctrlConn, err = s.GetInConn(CONN_CLIENT_CONTROL, *s.cfg.Key)
var c net.Conn
c, err = s.GetInConn(CONN_CLIENT_CONTROL, *s.cfg.Key)
if err != nil {
s.log.Printf("control connection err: %s, retrying...", err)
time.Sleep(time.Second * 3)
if s.ctrlConn != nil {
s.ctrlConn.Close()
}
continue
}
s.ctrlConn = &c
for {
if s.isStop {
return
}
var ID, clientLocalAddr, serverID string
err = utils.ReadPacketData(s.ctrlConn, &ID, &clientLocalAddr, &serverID)
err = utils.ReadPacketData(*s.ctrlConn, &ID, &clientLocalAddr, &serverID)
if err != nil {
if s.ctrlConn != nil {
s.ctrlConn.Close()
(*s.ctrlConn).Close()
}
s.log.Printf("read connection signal err: %s, retrying...", err)
break
@ -150,9 +156,23 @@ func (s *TunnelClient) Start(args interface{}, log *logger.Logger) (err error) {
protocol := clientLocalAddr[:3]
localAddr := clientLocalAddr[4:]
if protocol == "udp" {
go s.ServeUDP(localAddr, ID, serverID)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
s.ServeUDP(localAddr, ID, serverID)
}()
} else {
go s.ServeConn(localAddr, ID, serverID)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
s.ServeConn(localAddr, ID, serverID)
}()
}
}
}
@ -301,11 +321,23 @@ func (s *TunnelClient) ServeUDP(localAddr, ID, serverID string) {
item = v.(*ClientUDPConnItem)
}
(*item).touchtime = time.Now().Unix()
go (*item).udpConn.Write(body)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
(*item).udpConn.Write(body)
}()
}
}
func (s *TunnelClient) UDPRevecive(key, ID string) {
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
s.log.Printf("udp conn %s connected", ID)
v, ok := s.udpConns.Get(key)
if !ok {
@ -331,6 +363,11 @@ func (s *TunnelClient) UDPRevecive(key, ID string) {
}
cui.touchtime = time.Now().Unix()
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
(*cui.conn).SetWriteDeadline(time.Now().Add(time.Millisecond * time.Duration(*s.cfg.Timeout)))
_, err = (*cui.conn).Write(utils.UDPPacket(cui.srcAddr.String(), buf[:n]))
(*cui.conn).SetWriteDeadline(time.Time{})
@ -345,6 +382,11 @@ func (s *TunnelClient) UDPRevecive(key, ID string) {
func (s *TunnelClient) UDPGCDeamon() {
gctime := int64(30)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
if s.isStop {
return
}

View File

@ -128,6 +128,11 @@ func (s *TunnelServerManager) StopService() {
for _, server := range s.servers {
(*server).Clean()
}
s.cfg = TunnelServerArgs{}
s.log = nil
s.serverID = ""
s.servers = nil
s = nil
}
func (s *TunnelServerManager) CheckArgs() (err error) {
if *s.cfg.CertFile == "" || *s.cfg.KeyFile == "" {
@ -172,6 +177,14 @@ func (s *TunnelServer) StopService() {
} else {
s.log.Printf("service server stoped")
}
s.cfg = TunnelServerArgs{}
s.jumper = nil
s.log = nil
s.sc = utils.ServerChannel{}
s.udpConn = nil
s.udpConns = nil
s.userConns = nil
s = nil
}()
s.isStop = true
@ -353,6 +366,11 @@ func (s *TunnelServer) GetConn() (conn net.Conn, err error) {
func (s *TunnelServer) UDPGCDeamon() {
gctime := int64(30)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
if s.isStop {
return
}
@ -419,6 +437,11 @@ func (s *TunnelServer) UDPSend(data []byte, localAddr, srcAddr *net.UDPAddr) {
}
func (s *TunnelServer) UDPRevecive(key, ID string) {
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
s.log.Printf("udp conn %s connected", ID)
var uc *TunnelUDPConnItem
defer func() {
@ -446,7 +469,14 @@ func (s *TunnelServer) UDPRevecive(key, ID string) {
return
}
uc.touchtime = time.Now().Unix()
go s.sc.UDPListener.WriteToUDP(body, uc.srcAddr)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
s.sc.UDPListener.WriteToUDP(body, uc.srcAddr)
}()
}
}()
}

View File

@ -72,6 +72,11 @@ func (s *UDP) StopService() {
} else {
s.log.Printf("service udp stoped")
}
s.cfg = UDPArgs{}
s.log = nil
s.p = nil
s.sc = nil
s = nil
}()
s.isStop = true
if s.sc.Listener != nil && *s.sc.Listener != nil {

View File

@ -2,8 +2,10 @@ package lb
import (
"errors"
"fmt"
"log"
"net"
"runtime/debug"
"sync"
"time"
@ -112,6 +114,11 @@ func (b *Backend) StartHeartCheck() {
}
func (b *Backend) startMuxHeartCheck() {
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
for {
if b.isStop {
return
@ -142,6 +149,11 @@ func (b *Backend) startMuxHeartCheck() {
// Monitoring the backend
func (b *Backend) startTCPHeartCheck() {
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
for {
if b.isStop {
return

View File

@ -2,6 +2,8 @@ package mapx
import (
"encoding/json"
"fmt"
"runtime/debug"
"sync"
)
@ -150,7 +152,14 @@ type Tuple struct {
func (m ConcurrentMap) Iter() <-chan Tuple {
chans := snapshot(m)
ch := make(chan Tuple)
go fanIn(chans, ch)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
fanIn(chans, ch)
}()
return ch
}
@ -162,7 +171,14 @@ func (m ConcurrentMap) IterBuffered() <-chan Tuple {
total += cap(c)
}
ch := make(chan Tuple, total)
go fanIn(chans, ch)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
fanIn(chans, ch)
}()
return ch
}
@ -176,17 +192,24 @@ func snapshot(m ConcurrentMap) (chans []chan Tuple) {
wg.Add(SHARD_COUNT)
// Foreach shard.
for index, shard := range m {
go func(index int, shard *ConcurrentMapShared) {
// Foreach key, value pair.
shard.RLock()
chans[index] = make(chan Tuple, len(shard.items))
wg.Done()
for key, val := range shard.items {
chans[index] <- Tuple{key, val}
}
shard.RUnlock()
close(chans[index])
}(index, shard)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
func(index int, shard *ConcurrentMapShared) {
// Foreach key, value pair.
shard.RLock()
chans[index] = make(chan Tuple, len(shard.items))
wg.Done()
for key, val := range shard.items {
chans[index] <- Tuple{key, val}
}
shard.RUnlock()
close(chans[index])
}(index, shard)
}()
}
wg.Wait()
return chans
@ -197,12 +220,19 @@ func fanIn(chans []chan Tuple, out chan Tuple) {
wg := sync.WaitGroup{}
wg.Add(len(chans))
for _, ch := range chans {
go func(ch chan Tuple) {
for t := range ch {
out <- t
}
wg.Done()
}(ch)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
func(ch chan Tuple) {
for t := range ch {
out <- t
}
wg.Done()
}(ch)
}()
}
wg.Wait()
close(out)
@ -244,19 +274,31 @@ func (m ConcurrentMap) Keys() []string {
count := m.Count()
ch := make(chan string, count)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
// Foreach shard.
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
for _, shard := range m {
go func(shard *ConcurrentMapShared) {
// Foreach key, value pair.
shard.RLock()
for key := range shard.items {
ch <- key
}
shard.RUnlock()
wg.Done()
}(shard)
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
func(shard *ConcurrentMapShared) {
// Foreach key, value pair.
shard.RLock()
for key := range shard.items {
ch <- key
}
shard.RUnlock()
wg.Done()
}(shard)
}()
}
wg.Wait()
close(ch)

View File

@ -11,6 +11,7 @@ import (
logger "log"
"net"
"net/url"
"runtime/debug"
"strings"
"sync"
"time"
@ -87,11 +88,21 @@ func (c *Checker) Stop() {
}
func (c *Checker) start() {
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
//log.Printf("checker started")
for {
//log.Printf("checker did")
for _, v := range c.data.Items() {
go func(item CheckerItem) {
defer func() {
if e := recover(); e != nil {
fmt.Printf("crashed:%s", string(debug.Stack()))
}
}()
if c.isNeedCheck(item) {
//log.Printf("check %s", item.Host)
var conn net.Conn