no message

This commit is contained in:
arraykeys
2017-10-02 15:58:12 +08:00
parent db729915ad
commit 644ec6891d
5 changed files with 124 additions and 69 deletions

View File

@ -12,7 +12,8 @@ import (
type ServerConn struct { type ServerConn struct {
ClientLocalAddr string //tcp:2.2.22:333@ID ClientLocalAddr string //tcp:2.2.22:333@ID
Conn *net.Conn // Conn *net.Conn
Conn *utils.HeartbeatReadWriter
} }
type TunnelBridge struct { type TunnelBridge struct {
cfg TunnelBridgeArgs cfg TunnelBridgeArgs
@ -116,9 +117,13 @@ func (s *TunnelBridge) Start(args interface{}) (err error) {
switch connType { switch connType {
case CONN_SERVER: case CONN_SERVER:
hb := utils.NewHeartbeatReadWriter(&inConn, 3, func(err error, hb *utils.HeartbeatReadWriter) {
log.Printf("%s conn %s from server released", key, ID)
s.serverConns.Remove(ID)
})
addr := clientLocalAddr + "@" + ID addr := clientLocalAddr + "@" + ID
s.serverConns.Set(ID, ServerConn{ s.serverConns.Set(ID, ServerConn{
Conn: &inConn, Conn: &hb,
ClientLocalAddr: addr, ClientLocalAddr: addr,
}) })
for { for {
@ -146,13 +151,14 @@ func (s *TunnelBridge) Start(args interface{}) (err error) {
} }
serverConn := serverConnItem.(ServerConn).Conn serverConn := serverConnItem.(ServerConn).Conn
// hw := utils.NewHeartbeatReadWriter(&inConn, 3, func(err error, hw *utils.HeartbeatReadWriter) { // hw := utils.NewHeartbeatReadWriter(&inConn, 3, func(err error, hw *utils.HeartbeatReadWriter) {
// log.Printf("hw err %s", err) // log.Printf("%s conn %s from client released", key, ID)
// hw.Close() // hw.Close()
// }) // })
// utils.IoBind(*serverConn, &hw, func(isSrcErr bool, err error) { utils.IoBind(serverConn, inConn, func(isSrcErr bool, err error) {
utils.IoBind(*serverConn, inConn, func(isSrcErr bool, err error) { // utils.IoBind(serverConn, inConn, func(isSrcErr bool, err error) {
utils.CloseConn(serverConn) serverConn.Close()
utils.CloseConn(&inConn) utils.CloseConn(&inConn)
// hw.Close()
s.serverConns.Remove(ID) s.serverConns.Remove(ID)
log.Printf("conn %s released", ID) log.Printf("conn %s released", ID)
}, func(i int, b bool) {}, 0) }, func(i int, b bool) {}, 0)

View File

@ -142,10 +142,14 @@ func (s *TunnelClient) ServeUDP(localAddr, ID string) {
log.Printf("connection %s released", ID) log.Printf("connection %s released", ID)
utils.CloseConn(&inConn) utils.CloseConn(&inConn)
break break
} } else if err != nil {
//log.Printf("udp packet revecived:%s,%v", srcAddr, body) log.Printf("udp packet revecived fail, err: %s", err)
} else {
log.Printf("udp packet revecived:%s,%v", srcAddr, body)
go s.processUDPPacket(&inConn, srcAddr, localAddr, body) go s.processUDPPacket(&inConn, srcAddr, localAddr, body)
} }
}
// } // }
} }
func (s *TunnelClient) processUDPPacket(inConn *net.Conn, srcAddr, localAddr string, body []byte) { func (s *TunnelClient) processUDPPacket(inConn *net.Conn, srcAddr, localAddr string, body []byte) {
@ -168,21 +172,22 @@ func (s *TunnelClient) processUDPPacket(inConn *net.Conn, srcAddr, localAddr str
return return
} }
//log.Printf("send udp packet to %s success", dstAddr.String()) //log.Printf("send udp packet to %s success", dstAddr.String())
buf := make([]byte, 512) buf := make([]byte, 1024)
len, _, err := conn.ReadFromUDP(buf) length, _, err := conn.ReadFromUDP(buf)
if err != nil { if err != nil {
log.Printf("read udp response from %s fail ,ERR:%s", dstAddr.String(), err) log.Printf("read udp response from %s fail ,ERR:%s", dstAddr.String(), err)
return return
} }
respBody := buf[0:len] respBody := buf[0:length]
//log.Printf("revecived udp packet from %s , %v", dstAddr.String(), respBody) log.Printf("revecived udp packet from %s , %v", dstAddr.String(), respBody)
_, err = (*inConn).Write(utils.UDPPacket(srcAddr, respBody)) bs := utils.UDPPacket(srcAddr, respBody)
_, err = (*inConn).Write(bs)
if err != nil { if err != nil {
log.Printf("send udp response fail ,ERR:%s", err) log.Printf("send udp response fail ,ERR:%s", err)
utils.CloseConn(inConn) utils.CloseConn(inConn)
return return
} }
//log.Printf("send udp response success ,from:%s", dstAddr.String()) log.Printf("send udp response success ,from:%s ,%d ,%v", dstAddr.String(), len(bs), bs)
} }
func (s *TunnelClient) ServeConn(localAddr, ID string) { func (s *TunnelClient) ServeConn(localAddr, ID string) {
var inConn, outConn net.Conn var inConn, outConn net.Conn

View File

@ -1,7 +1,6 @@
package services package services
import ( import (
"bufio"
"bytes" "bytes"
"crypto/tls" "crypto/tls"
"encoding/binary" "encoding/binary"
@ -80,8 +79,9 @@ func (s *TunnelServer) Start(args interface{}) (err error) {
} }
}() }()
var outConn net.Conn var outConn net.Conn
var ID string
for { for {
outConn, err = s.GetOutConn("") outConn, ID, err = s.GetOutConn("")
if err != nil { if err != nil {
utils.CloseConn(&outConn) utils.CloseConn(&outConn)
log.Printf("connect to %s fail, err: %s, retrying...", *s.cfg.Parent, err) log.Printf("connect to %s fail, err: %s, retrying...", *s.cfg.Parent, err)
@ -91,13 +91,18 @@ func (s *TunnelServer) Start(args interface{}) (err error) {
break break
} }
} }
utils.IoBind(inConn, outConn, func(isSrcErr bool, err error) { hb := utils.NewHeartbeatReadWriter(&outConn, 3, func(err error, hb *utils.HeartbeatReadWriter) {
log.Printf("%s conn %s to bridge released", *s.cfg.Key, ID)
hb.Close()
})
utils.IoBind(inConn, &hb, func(isSrcErr bool, err error) {
//utils.IoBind(inConn, outConn, func(isSrcErr bool, err error) {
utils.CloseConn(&outConn) utils.CloseConn(&outConn)
utils.CloseConn(&inConn) utils.CloseConn(&inConn)
log.Printf("%s conn %s - %s - %s - %s released", *s.cfg.Key, inConn.RemoteAddr(), inConn.LocalAddr(), outConn.LocalAddr(), outConn.RemoteAddr()) log.Printf("%s conn %s released", *s.cfg.Key, ID)
}, func(i int, b bool) {}, 0) }, func(i int, b bool) {}, 0)
log.Printf("%s conn %s - %s - %s - %s created", *s.cfg.Key, inConn.RemoteAddr(), inConn.LocalAddr(), outConn.LocalAddr(), outConn.RemoteAddr()) log.Printf("%s conn %s created", *s.cfg.Key, ID)
}) })
if err != nil { if err != nil {
return return
@ -109,7 +114,7 @@ func (s *TunnelServer) Start(args interface{}) (err error) {
func (s *TunnelServer) Clean() { func (s *TunnelServer) Clean() {
s.StopService() s.StopService()
} }
func (s *TunnelServer) GetOutConn(id string) (outConn net.Conn, err error) { func (s *TunnelServer) GetOutConn(id string) (outConn net.Conn, ID string, err error) {
outConn, err = s.GetConn() outConn, err = s.GetConn()
if err != nil { if err != nil {
log.Printf("connection err: %s", err) log.Printf("connection err: %s", err)
@ -117,8 +122,10 @@ func (s *TunnelServer) GetOutConn(id string) (outConn net.Conn, err error) {
} }
keyBytes := []byte(*s.cfg.Key) keyBytes := []byte(*s.cfg.Key)
keyLength := uint16(len(keyBytes)) keyLength := uint16(len(keyBytes))
IDBytes := []byte(utils.NewUniqueID().String()) ID = utils.NewUniqueID().String()
IDBytes := []byte(ID)
if id != "" { if id != "" {
ID = id
IDBytes = []byte(id) IDBytes = []byte(id)
} }
IDLength := uint16(len(IDBytes)) IDLength := uint16(len(IDBytes))
@ -159,6 +166,8 @@ func (s *TunnelServer) UDPConnDeamon() {
} }
}() }()
var outConn net.Conn var outConn net.Conn
var hb utils.HeartbeatReadWriter
var ID string
var cmdChn = make(chan bool, 1) var cmdChn = make(chan bool, 1)
var err error var err error
@ -167,7 +176,7 @@ func (s *TunnelServer) UDPConnDeamon() {
RETRY: RETRY:
if outConn == nil { if outConn == nil {
for { for {
outConn, err = s.GetOutConn("") outConn, ID, err = s.GetOutConn("")
if err != nil { if err != nil {
cmdChn <- true cmdChn <- true
outConn = nil outConn = nil
@ -176,19 +185,23 @@ func (s *TunnelServer) UDPConnDeamon() {
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
continue continue
} else { } else {
go func(outConn net.Conn) { hb = utils.NewHeartbeatReadWriter(&outConn, 3, func(err error, hb *utils.HeartbeatReadWriter) {
log.Printf("%s conn %s to bridge released", *s.cfg.Key, ID)
hb.Close()
})
go func(outConn net.Conn, hb utils.HeartbeatReadWriter, ID string) {
go func() { go func() {
<-cmdChn <-cmdChn
outConn.Close() outConn.Close()
}() }()
for { for {
srcAddrFromConn, body, err := utils.ReadUDPPacket(bufio.NewReader(outConn)) srcAddrFromConn, body, err := utils.ReadUDPPacket(&hb)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
log.Printf("udp connection deamon exited, %s -> %s", outConn.LocalAddr(), outConn.RemoteAddr()) log.Printf("UDP deamon connection %s exited", ID)
break break
} }
if err != nil { if err != nil {
log.Printf("parse revecived udp packet fail, err: %s", err) log.Printf("parse revecived udp packet fail, err: %s ,%v", err, body)
continue continue
} }
//log.Printf("udp packet revecived over parent , local:%s", srcAddrFromConn) //log.Printf("udp packet revecived over parent , local:%s", srcAddrFromConn)
@ -204,25 +217,27 @@ func (s *TunnelServer) UDPConnDeamon() {
log.Printf("udp response to local %s fail,ERR:%s", srcAddrFromConn, err) log.Printf("udp response to local %s fail,ERR:%s", srcAddrFromConn, err)
continue continue
} }
//log.Printf("udp response to local %s success", srcAddrFromConn) //log.Printf("udp response to local %s success , %v", srcAddrFromConn, body)
} }
}(outConn) }(outConn, hb, ID)
break break
} }
} }
} }
outConn.SetWriteDeadline(time.Now().Add(time.Second)) outConn.SetWriteDeadline(time.Now().Add(time.Second))
writer := bufio.NewWriter(outConn) _, err = hb.Write(utils.UDPPacket(item.srcAddr.String(), *item.packet))
writer.Write(utils.UDPPacket(item.srcAddr.String(), *item.packet)) // writer := bufio.NewWriter(outConn)
err := writer.Flush() // writer.Write(utils.UDPPacket(item.srcAddr.String(), *item.packet))
// err := writer.Flush()
outConn.SetWriteDeadline(time.Time{})
if err != nil { if err != nil {
utils.CloseConn(&outConn) utils.CloseConn(&outConn)
outConn = nil outConn = nil
log.Printf("write udp packet to %s fail ,flush err:%s ,retrying...", *s.cfg.Parent, err) log.Printf("write udp packet to %s fail ,flush err:%s ,retrying...", *s.cfg.Parent, err)
goto RETRY goto RETRY
} }
outConn.SetWriteDeadline(time.Time{})
//log.Printf("write packet %v", *item.packet) log.Printf("write packet %v", *item.packet)
} }
}() }()
} }

View File

@ -1,6 +1,7 @@
package utils package utils
import ( import (
"bufio"
"bytes" "bytes"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
@ -264,6 +265,7 @@ func UDPPacket(srcAddr string, packet []byte) []byte {
addrBytes := []byte(srcAddr) addrBytes := []byte(srcAddr)
addrLength := uint16(len(addrBytes)) addrLength := uint16(len(addrBytes))
bodyLength := uint16(len(packet)) bodyLength := uint16(len(packet))
//log.Printf("build packet : addr len %d, body len %d", addrLength, bodyLength)
pkg := new(bytes.Buffer) pkg := new(bytes.Buffer)
binary.Write(pkg, binary.LittleEndian, addrLength) binary.Write(pkg, binary.LittleEndian, addrLength)
binary.Write(pkg, binary.LittleEndian, addrBytes) binary.Write(pkg, binary.LittleEndian, addrBytes)
@ -271,8 +273,8 @@ func UDPPacket(srcAddr string, packet []byte) []byte {
binary.Write(pkg, binary.LittleEndian, packet) binary.Write(pkg, binary.LittleEndian, packet)
return pkg.Bytes() return pkg.Bytes()
} }
func ReadUDPPacket(reader io.Reader) (srcAddr string, packet []byte, err error) { func ReadUDPPacket(_reader io.Reader) (srcAddr string, packet []byte, err error) {
// reader := bufio.NewReader(_reader) reader := bufio.NewReader(_reader)
var addrLength uint16 var addrLength uint16
var bodyLength uint16 var bodyLength uint16
err = binary.Read(reader, binary.LittleEndian, &addrLength) err = binary.Read(reader, binary.LittleEndian, &addrLength)
@ -285,12 +287,14 @@ func ReadUDPPacket(reader io.Reader) (srcAddr string, packet []byte, err error)
return return
} }
if n != int(addrLength) { if n != int(addrLength) {
err = fmt.Errorf("n != int(addrLength), %d,%d", n, addrLength)
return return
} }
srcAddr = string(_srcAddr) srcAddr = string(_srcAddr)
err = binary.Read(reader, binary.LittleEndian, &bodyLength) err = binary.Read(reader, binary.LittleEndian, &bodyLength)
if err != nil { if err != nil {
return return
} }
packet = make([]byte, bodyLength) packet = make([]byte, bodyLength)
@ -299,6 +303,7 @@ func ReadUDPPacket(reader io.Reader) (srcAddr string, packet []byte, err error)
return return
} }
if n != int(bodyLength) { if n != int(bodyLength) {
err = fmt.Errorf("n != int(bodyLength), %d,%d", n, bodyLength)
return return
} }
return return

View File

@ -470,11 +470,15 @@ type HeartbeatData struct {
} }
type HeartbeatReadWriter struct { type HeartbeatReadWriter struct {
conn *net.Conn conn *net.Conn
rchn chan HeartbeatData // rchn chan HeartbeatData
l *sync.Mutex l *sync.Mutex
dur int dur int
errHandler func(err error, hb *HeartbeatReadWriter) errHandler func(err error, hb *HeartbeatReadWriter)
once *sync.Once once *sync.Once
datachn chan byte
// rbuf bytes.Buffer
// signal chan bool
rerrchn chan error
} }
func NewHeartbeatReadWriter(conn *net.Conn, dur int, fn func(err error, hb *HeartbeatReadWriter)) (hrw HeartbeatReadWriter) { func NewHeartbeatReadWriter(conn *net.Conn, dur int, fn func(err error, hb *HeartbeatReadWriter)) (hrw HeartbeatReadWriter) {
@ -482,9 +486,13 @@ func NewHeartbeatReadWriter(conn *net.Conn, dur int, fn func(err error, hb *Hear
conn: conn, conn: conn,
l: &sync.Mutex{}, l: &sync.Mutex{},
dur: dur, dur: dur,
rchn: make(chan HeartbeatData, 10000), // rchn: make(chan HeartbeatData, 10000),
// signal: make(chan bool, 1),
errHandler: fn, errHandler: fn,
datachn: make(chan byte, 4*1024),
once: &sync.Once{}, once: &sync.Once{},
rerrchn: make(chan error, 1),
// rbuf: bytes.Buffer{},
} }
hrw.heartbeat() hrw.heartbeat()
hrw.reader() hrw.reader()
@ -499,15 +507,25 @@ func (rw *HeartbeatReadWriter) reader() {
//log.Printf("heartbeat read started") //log.Printf("heartbeat read started")
for { for {
n, data, err := rw.read() n, data, err := rw.read()
log.Printf("n:%d , data:%s ,err:%s", n, string(data), err) if n == -1 {
if n >= 0 { continue
rw.rchn <- HeartbeatData{ }
Data: data, //log.Printf("n:%d , data:%s ,err:%s", n, string(data), err)
Error: err, if err == nil {
N: n, //fmt.Printf("write data %s\n", string(data))
for _, b := range data {
rw.datachn <- b
} }
} }
if err != nil { if err != nil {
//log.Printf("heartbeat reader err: %s", err)
select {
case rw.rerrchn <- err:
default:
}
rw.once.Do(func() {
rw.errHandler(err, rw)
})
break break
} }
} }
@ -515,13 +533,6 @@ func (rw *HeartbeatReadWriter) reader() {
}() }()
} }
func (rw *HeartbeatReadWriter) read() (n int, data []byte, err error) { func (rw *HeartbeatReadWriter) read() (n int, data []byte, err error) {
defer func() {
if err != nil {
rw.once.Do(func() {
rw.errHandler(err, rw)
})
}
}()
var typ uint8 var typ uint8
err = binary.Read((*rw.conn), binary.LittleEndian, &typ) err = binary.Read((*rw.conn), binary.LittleEndian, &typ)
if err != nil { if err != nil {
@ -534,13 +545,18 @@ func (rw *HeartbeatReadWriter) read() (n int, data []byte, err error) {
} }
var dataLength uint32 var dataLength uint32
binary.Read((*rw.conn), binary.LittleEndian, &dataLength) binary.Read((*rw.conn), binary.LittleEndian, &dataLength)
data = make([]byte, dataLength) _data := make([]byte, dataLength)
// log.Printf("dataLength:%d , data:%s", dataLength, string(data)) // log.Printf("dataLength:%d , data:%s", dataLength, string(data))
n, err = (*rw.conn).Read(data) n, err = (*rw.conn).Read(_data)
//log.Printf("n:%d , data:%s ,err:%s", n, string(data), err) //log.Printf("n:%d , data:%s ,err:%s", n, string(data), err)
if err != nil { if err != nil {
return return
} }
if uint32(n) != dataLength {
err = fmt.Errorf("read short data body")
return
}
data = _data[:n]
return return
} }
func (rw *HeartbeatReadWriter) heartbeat() { func (rw *HeartbeatReadWriter) heartbeat() {
@ -555,13 +571,11 @@ func (rw *HeartbeatReadWriter) heartbeat() {
_, err := (*rw.conn).Write([]byte{0}) _, err := (*rw.conn).Write([]byte{0})
rw.l.Unlock() rw.l.Unlock()
if err != nil { if err != nil {
if rw.errHandler != nil {
//log.Printf("heartbeat err: %s", err) //log.Printf("heartbeat err: %s", err)
rw.once.Do(func() { rw.once.Do(func() {
rw.errHandler(err, rw) rw.errHandler(err, rw)
}) })
break break
}
} else { } else {
// log.Printf("heartbeat send ok") // log.Printf("heartbeat send ok")
} }
@ -571,12 +585,18 @@ func (rw *HeartbeatReadWriter) heartbeat() {
}() }()
} }
func (rw *HeartbeatReadWriter) Read(p []byte) (n int, err error) { func (rw *HeartbeatReadWriter) Read(p []byte) (n int, err error) {
item := <-rw.rchn data := make([]byte, cap(p))
//log.Println(item) for i := 0; i < cap(p); i++ {
if item.N > 0 { data[i] = <-rw.datachn
copy(p, item.Data) n++
//fmt.Printf("read %d %v\n", i, data[:n])
if len(rw.datachn) == 0 {
n = i + 1
copy(p, data[:n])
return
} }
return item.N, item.Error }
return
} }
func (rw *HeartbeatReadWriter) Write(p []byte) (n int, err error) { func (rw *HeartbeatReadWriter) Write(p []byte) (n int, err error) {
defer rw.l.Unlock() defer rw.l.Unlock()
@ -585,6 +605,10 @@ func (rw *HeartbeatReadWriter) Write(p []byte) (n int, err error) {
binary.Write(pkg, binary.LittleEndian, uint8(1)) binary.Write(pkg, binary.LittleEndian, uint8(1))
binary.Write(pkg, binary.LittleEndian, uint32(len(p))) binary.Write(pkg, binary.LittleEndian, uint32(len(p)))
binary.Write(pkg, binary.LittleEndian, p) binary.Write(pkg, binary.LittleEndian, p)
n, err = (*rw.conn).Write(pkg.Bytes()) bs := pkg.Bytes()
n, err = (*rw.conn).Write(bs)
if err == nil {
n = len(p)
}
return return
} }