This file is indexed.

/usr/share/gocode/src/github.com/streadway/amqp/consumers.go is in golang-github-streadway-amqp-dev 0.0~git20150820.0.f4879ba-6.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Source code and contact info at http://github.com/streadway/amqp

package amqp

import (
	"fmt"
	"os"
	"sync"
	"sync/atomic"
)

var consumerSeq uint64

func uniqueConsumerTag() string {
	return fmt.Sprintf("ctag-%s-%d", os.Args[0], atomic.AddUint64(&consumerSeq, 1))
}

type consumerBuffers map[string]chan *Delivery

// Concurrent type that manages the consumerTag ->
// ingress consumerBuffer mapping
type consumers struct {
	sync.Mutex
	chans consumerBuffers
}

func makeConsumers() *consumers {
	return &consumers{chans: make(consumerBuffers)}
}

func bufferDeliveries(in chan *Delivery, out chan Delivery) {
	var queue []*Delivery
	var queueIn = in

	for delivery := range in {
		select {
		case out <- *delivery:
			// delivered immediately while the consumer chan can receive
		default:
			queue = append(queue, delivery)
		}

		for len(queue) > 0 {
			select {
			case out <- *queue[0]:
				queue = queue[1:]
			case delivery, open := <-queueIn:
				if open {
					queue = append(queue, delivery)
				} else {
					// stop receiving to drain the queue
					queueIn = nil
				}
			}
		}
	}

	close(out)
}

// On key conflict, close the previous channel.
func (me *consumers) add(tag string, consumer chan Delivery) {
	me.Lock()
	defer me.Unlock()

	if prev, found := me.chans[tag]; found {
		close(prev)
	}

	in := make(chan *Delivery)
	go bufferDeliveries(in, consumer)

	me.chans[tag] = in
}

func (me *consumers) close(tag string) (found bool) {
	me.Lock()
	defer me.Unlock()

	ch, found := me.chans[tag]

	if found {
		delete(me.chans, tag)
		close(ch)
	}

	return found
}

func (me *consumers) closeAll() {
	me.Lock()
	defer me.Unlock()

	for _, ch := range me.chans {
		close(ch)
	}

	me.chans = make(consumerBuffers)
}

// Sends a delivery to a the consumer identified by `tag`.
// If unbuffered channels are used for Consume this method
// could block all deliveries until the consumer
// receives on the other end of the channel.
func (me *consumers) send(tag string, msg *Delivery) bool {
	me.Lock()
	defer me.Unlock()

	buffer, found := me.chans[tag]
	if found {
		buffer <- msg
	}

	return found
}