269 lines
6.0 KiB
Go
269 lines
6.0 KiB
Go
// Copyright 2014 The DST Authors. All rights reserved.
|
|
// Use of this source code is governed by an MIT-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package dst
|
|
|
|
import (
|
|
"fmt"
|
|
"runtime/debug"
|
|
|
|
"sync"
|
|
|
|
"github.com/juju/ratelimit"
|
|
)
|
|
|
|
/*
|
|
sendWindow
|
|
v
|
|
[S|S|S|S|Q|Q|Q|Q| | | | | | | | | ]
|
|
^ ^writeSlot
|
|
sendSlot
|
|
*/
|
|
type sendBuffer struct {
|
|
mux *Mux // we send packets here
|
|
scheduler *ratelimit.Bucket // sets send rate for packets
|
|
|
|
sendWindow int // maximum number of outstanding non-acked packets
|
|
packetRate int // target pps
|
|
|
|
send packetList // buffered packets
|
|
sendSlot int // buffer slot from which to send next packet
|
|
|
|
lost packetList // list of packets reported lost by timeout
|
|
lostSlot int // next lost packet to resend
|
|
|
|
closed bool
|
|
closing bool
|
|
mut sync.Mutex
|
|
cond *sync.Cond
|
|
}
|
|
|
|
const (
|
|
schedulerRate = 1e6
|
|
schedulerCapacity = schedulerRate / 40
|
|
)
|
|
|
|
// newSendBuffer creates a new send buffer with a zero window.
|
|
// SetRateAndWindow() must be called to set an initial packet rate and send
|
|
// window before using.
|
|
func newSendBuffer(m *Mux) *sendBuffer {
|
|
b := &sendBuffer{
|
|
mux: m,
|
|
scheduler: ratelimit.NewBucketWithRate(schedulerRate, schedulerCapacity),
|
|
}
|
|
b.cond = sync.NewCond(&b.mut)
|
|
go func() {
|
|
defer func() {
|
|
if e := recover(); e != nil {
|
|
fmt.Printf("crashed, err: %s\nstack:", e, string(debug.Stack()))
|
|
}
|
|
}()
|
|
b.writerLoop()
|
|
}()
|
|
return b
|
|
}
|
|
|
|
// Write puts a new packet in send buffer and schedules a send. Blocks when
|
|
// the window size is or would be exceeded.
|
|
func (b *sendBuffer) Write(pkt packet) error {
|
|
b.mut.Lock()
|
|
defer b.mut.Unlock()
|
|
|
|
for b.send.Full() || b.send.Len() >= b.sendWindow {
|
|
if b.closing {
|
|
return ErrClosedConn
|
|
}
|
|
if debugConnection {
|
|
log.Println(b, "Write blocked")
|
|
}
|
|
b.cond.Wait()
|
|
}
|
|
if !b.send.Append(pkt) {
|
|
panic("bug: append failed")
|
|
}
|
|
b.cond.Broadcast()
|
|
return nil
|
|
}
|
|
|
|
// Acknowledge removes packets with lower sequence numbers from the loss list
|
|
// or send buffer.
|
|
func (b *sendBuffer) Acknowledge(seq sequenceNo) {
|
|
b.mut.Lock()
|
|
|
|
if cut := b.lost.CutLessSeq(seq); cut > 0 {
|
|
if debugConnection {
|
|
log.Println(b, "cut", cut, "from loss list")
|
|
}
|
|
// Next resend should always start with the first packet, regardless
|
|
// of what we might already have resent previously.
|
|
b.lostSlot = 0
|
|
b.cond.Broadcast()
|
|
}
|
|
|
|
if cut := b.send.CutLessSeq(seq); cut > 0 {
|
|
if debugConnection {
|
|
log.Println(b, "cut", cut, "from send list")
|
|
}
|
|
b.sendSlot -= cut
|
|
b.cond.Broadcast()
|
|
}
|
|
|
|
b.mut.Unlock()
|
|
}
|
|
|
|
func (b *sendBuffer) NegativeAck(seq sequenceNo) {
|
|
b.mut.Lock()
|
|
|
|
pkts := b.send.PopSequence(seq)
|
|
if cut := len(pkts); cut > 0 {
|
|
b.lost.AppendAll(pkts)
|
|
if debugConnection {
|
|
log.Println(b, "cut", cut, "from send list, adding to loss list")
|
|
log.Println(seq, pkts)
|
|
}
|
|
b.sendSlot -= cut
|
|
b.lostSlot = 0
|
|
b.cond.Broadcast()
|
|
}
|
|
|
|
b.mut.Unlock()
|
|
}
|
|
|
|
// ScheduleResend arranges for a resend of all currently unacknowledged
|
|
// packets.
|
|
func (b *sendBuffer) ScheduleResend() {
|
|
b.mut.Lock()
|
|
|
|
if b.sendSlot > 0 {
|
|
// There are packets that have been sent but not acked. Move them from
|
|
// the send buffer to the loss list for retransmission.
|
|
if debugConnection {
|
|
log.Println(b, "scheduled resend from send list", b.sendSlot)
|
|
}
|
|
|
|
// Append the packets to the loss list and rewind the send buffer
|
|
b.lost.AppendAll(b.send.All()[:b.sendSlot])
|
|
b.send.Cut(b.sendSlot)
|
|
b.sendSlot = 0
|
|
b.cond.Broadcast()
|
|
}
|
|
|
|
if b.lostSlot > 0 {
|
|
// Also resend whatever was already in the loss list
|
|
if debugConnection {
|
|
log.Println(b, "scheduled resend from loss list", b.lostSlot)
|
|
}
|
|
b.lostSlot = 0
|
|
b.cond.Broadcast()
|
|
}
|
|
|
|
b.mut.Unlock()
|
|
}
|
|
|
|
// SetWindowAndRate sets the window size (in packets) and packet rate (in
|
|
// packets per second) to use when sending.
|
|
func (b *sendBuffer) SetWindowAndRate(sendWindow, packetRate int) {
|
|
b.mut.Lock()
|
|
if debugConnection {
|
|
log.Println(b, "new window & rate", sendWindow, packetRate)
|
|
}
|
|
b.packetRate = packetRate
|
|
b.sendWindow = sendWindow
|
|
if b.sendWindow > b.send.Cap() {
|
|
b.send.Resize(b.sendWindow)
|
|
b.cond.Broadcast()
|
|
}
|
|
b.mut.Unlock()
|
|
}
|
|
|
|
// Stop stops the send buffer from any doing further sending, but waits for
|
|
// the current buffers to be drained.
|
|
func (b *sendBuffer) Stop() {
|
|
b.mut.Lock()
|
|
|
|
if b.closed || b.closing {
|
|
return
|
|
}
|
|
|
|
b.closing = true
|
|
for b.lost.Len() > 0 || b.send.Len() > 0 {
|
|
b.cond.Wait()
|
|
}
|
|
|
|
b.closed = true
|
|
b.cond.Broadcast()
|
|
b.mut.Unlock()
|
|
}
|
|
|
|
// CrashStop stops the send buffer from any doing further sending, without
|
|
// waiting for buffers to drain.
|
|
func (b *sendBuffer) CrashStop() {
|
|
b.mut.Lock()
|
|
|
|
if b.closed || b.closing {
|
|
return
|
|
}
|
|
|
|
b.closing = true
|
|
b.closed = true
|
|
b.cond.Broadcast()
|
|
b.mut.Unlock()
|
|
}
|
|
|
|
func (b *sendBuffer) String() string {
|
|
return fmt.Sprintf("sendBuffer@%p", b)
|
|
}
|
|
|
|
func (b *sendBuffer) writerLoop() {
|
|
if debugConnection {
|
|
log.Println(b, "writer() starting")
|
|
defer log.Println(b, "writer() exiting")
|
|
}
|
|
|
|
b.scheduler.Take(schedulerCapacity)
|
|
for {
|
|
var pkt packet
|
|
b.mut.Lock()
|
|
for b.lostSlot >= b.sendWindow ||
|
|
(b.sendSlot == b.send.Len() && b.lostSlot == b.lost.Len()) {
|
|
if b.closed {
|
|
b.mut.Unlock()
|
|
return
|
|
}
|
|
|
|
if debugConnection {
|
|
log.Println(b, "writer() paused", b.lostSlot, b.sendSlot, b.sendWindow, b.lost.Len())
|
|
}
|
|
b.cond.Wait()
|
|
}
|
|
|
|
if b.lostSlot < b.lost.Len() {
|
|
pkt = b.lost.All()[b.lostSlot]
|
|
pkt.hdr.timestamp = timestampMicros()
|
|
b.lostSlot++
|
|
|
|
if debugConnection {
|
|
log.Println(b, "resend", b.lostSlot, b.lost.Len(), b.sendWindow, pkt.hdr.connID, pkt.hdr.sequenceNo)
|
|
}
|
|
} else if b.sendSlot < b.send.Len() {
|
|
pkt = b.send.All()[b.sendSlot]
|
|
pkt.hdr.timestamp = timestampMicros()
|
|
b.sendSlot++
|
|
|
|
if debugConnection {
|
|
log.Println(b, "send", b.sendSlot, b.send.Len(), b.sendWindow, pkt.hdr.connID, pkt.hdr.sequenceNo)
|
|
}
|
|
}
|
|
|
|
b.cond.Broadcast()
|
|
packetRate := b.packetRate
|
|
b.mut.Unlock()
|
|
|
|
if pkt.dst != nil {
|
|
b.scheduler.Wait(schedulerRate / int64(packetRate))
|
|
b.mux.write(pkt)
|
|
}
|
|
}
|
|
}
|