/usr/share/gocode/src/github.com/hashicorp/memberlist/queue.go is in golang-github-hashicorp-memberlist-dev 0.1.0-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | package memberlist
import (
"sort"
"sync"
)
// TransmitLimitedQueue is used to queue messages to broadcast to
// the cluster (via gossip) but limits the number of transmits per
// message. It also prioritizes messages with lower transmit counts
// (hence newer messages).
type TransmitLimitedQueue struct {
// NumNodes returns the number of nodes in the cluster. This is
// used to determine the retransmit count, which is calculated
// based on the log of this.
NumNodes func() int
// RetransmitMult is the multiplier used to determine the maximum
// number of retransmissions attempted.
RetransmitMult int
sync.Mutex
bcQueue limitedBroadcasts
}
type limitedBroadcast struct {
transmits int // Number of transmissions attempted.
b Broadcast
}
type limitedBroadcasts []*limitedBroadcast
// Broadcast is something that can be broadcasted via gossip to
// the memberlist cluster.
type Broadcast interface {
// Invalidates checks if enqueuing the current broadcast
// invalidates a previous broadcast
Invalidates(b Broadcast) bool
// Returns a byte form of the message
Message() []byte
// Finished is invoked when the message will no longer
// be broadcast, either due to invalidation or to the
// transmit limit being reached
Finished()
}
// QueueBroadcast is used to enqueue a broadcast
func (q *TransmitLimitedQueue) QueueBroadcast(b Broadcast) {
q.Lock()
defer q.Unlock()
// Check if this message invalidates another
n := len(q.bcQueue)
for i := 0; i < n; i++ {
if b.Invalidates(q.bcQueue[i].b) {
q.bcQueue[i].b.Finished()
copy(q.bcQueue[i:], q.bcQueue[i+1:])
q.bcQueue[n-1] = nil
q.bcQueue = q.bcQueue[:n-1]
n--
}
}
// Append to the queue
q.bcQueue = append(q.bcQueue, &limitedBroadcast{0, b})
}
// GetBroadcasts is used to get a number of broadcasts, up to a byte limit
// and applying a per-message overhead as provided.
func (q *TransmitLimitedQueue) GetBroadcasts(overhead, limit int) [][]byte {
q.Lock()
defer q.Unlock()
// Fast path the default case
if len(q.bcQueue) == 0 {
return nil
}
transmitLimit := retransmitLimit(q.RetransmitMult, q.NumNodes())
bytesUsed := 0
var toSend [][]byte
for i := len(q.bcQueue) - 1; i >= 0; i-- {
// Check if this is within our limits
b := q.bcQueue[i]
msg := b.b.Message()
if bytesUsed+overhead+len(msg) > limit {
continue
}
// Add to slice to send
bytesUsed += overhead + len(msg)
toSend = append(toSend, msg)
// Check if we should stop transmission
b.transmits++
if b.transmits >= transmitLimit {
b.b.Finished()
n := len(q.bcQueue)
q.bcQueue[i], q.bcQueue[n-1] = q.bcQueue[n-1], nil
q.bcQueue = q.bcQueue[:n-1]
}
}
// If we are sending anything, we need to re-sort to deal
// with adjusted transmit counts
if len(toSend) > 0 {
q.bcQueue.Sort()
}
return toSend
}
// NumQueued returns the number of queued messages
func (q *TransmitLimitedQueue) NumQueued() int {
q.Lock()
defer q.Unlock()
return len(q.bcQueue)
}
// Reset clears all the queued messages
func (q *TransmitLimitedQueue) Reset() {
q.Lock()
defer q.Unlock()
for _, b := range q.bcQueue {
b.b.Finished()
}
q.bcQueue = nil
}
// Prune will retain the maxRetain latest messages, and the rest
// will be discarded. This can be used to prevent unbounded queue sizes
func (q *TransmitLimitedQueue) Prune(maxRetain int) {
q.Lock()
defer q.Unlock()
// Do nothing if queue size is less than the limit
n := len(q.bcQueue)
if n < maxRetain {
return
}
// Invalidate the messages we will be removing
for i := 0; i < n-maxRetain; i++ {
q.bcQueue[i].b.Finished()
}
// Move the messages, and retain only the last maxRetain
copy(q.bcQueue[0:], q.bcQueue[n-maxRetain:])
q.bcQueue = q.bcQueue[:maxRetain]
}
func (b limitedBroadcasts) Len() int {
return len(b)
}
func (b limitedBroadcasts) Less(i, j int) bool {
return b[i].transmits < b[j].transmits
}
func (b limitedBroadcasts) Swap(i, j int) {
b[i], b[j] = b[j], b[i]
}
func (b limitedBroadcasts) Sort() {
sort.Sort(sort.Reverse(b))
}
|