/usr/share/gocode/src/github.com/streadway/amqp/consumers.go is in golang-github-streadway-amqp-dev 0.0~git20150820.0.f4879ba-6.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 | // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Source code and contact info at http://github.com/streadway/amqp
package amqp
import (
"fmt"
"os"
"sync"
"sync/atomic"
)
var consumerSeq uint64
func uniqueConsumerTag() string {
return fmt.Sprintf("ctag-%s-%d", os.Args[0], atomic.AddUint64(&consumerSeq, 1))
}
type consumerBuffers map[string]chan *Delivery
// Concurrent type that manages the consumerTag ->
// ingress consumerBuffer mapping
type consumers struct {
sync.Mutex
chans consumerBuffers
}
func makeConsumers() *consumers {
return &consumers{chans: make(consumerBuffers)}
}
func bufferDeliveries(in chan *Delivery, out chan Delivery) {
var queue []*Delivery
var queueIn = in
for delivery := range in {
select {
case out <- *delivery:
// delivered immediately while the consumer chan can receive
default:
queue = append(queue, delivery)
}
for len(queue) > 0 {
select {
case out <- *queue[0]:
queue = queue[1:]
case delivery, open := <-queueIn:
if open {
queue = append(queue, delivery)
} else {
// stop receiving to drain the queue
queueIn = nil
}
}
}
}
close(out)
}
// On key conflict, close the previous channel.
func (me *consumers) add(tag string, consumer chan Delivery) {
me.Lock()
defer me.Unlock()
if prev, found := me.chans[tag]; found {
close(prev)
}
in := make(chan *Delivery)
go bufferDeliveries(in, consumer)
me.chans[tag] = in
}
func (me *consumers) close(tag string) (found bool) {
me.Lock()
defer me.Unlock()
ch, found := me.chans[tag]
if found {
delete(me.chans, tag)
close(ch)
}
return found
}
func (me *consumers) closeAll() {
me.Lock()
defer me.Unlock()
for _, ch := range me.chans {
close(ch)
}
me.chans = make(consumerBuffers)
}
// Sends a delivery to a the consumer identified by `tag`.
// If unbuffered channels are used for Consume this method
// could block all deliveries until the consumer
// receives on the other end of the channel.
func (me *consumers) send(tag string, msg *Delivery) bool {
me.Lock()
defer me.Unlock()
buffer, found := me.chans[tag]
if found {
buffer <- msg
}
return found
}
|