/usr/share/gocode/src/github.com/mitchellh/prefixedio/reader.go is in golang-github-mitchellh-prefixedio-dev 0.0~git20151214.0.6e69540-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 | package prefixedio
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"sync"
"time"
)
// Reader reads from another io.Reader and de-multiplexes line-oriented
// data into different io.Reader streams.
//
// Lines are delimited with the '\n' character.
//
// When `Read` is called, any data that doesn't currently have a prefix
// registered will be discarded. Data won't start being discarded until
// the first Read is called on a prefix. Once the first Read is called,
// data is read until EOF. Therefore, be sure to request all prefix
// readers before issuing any Read calls on any prefixes.
//
// Reads will block if all the readers aren't routinely draining their
// buffers. Therefore, be sure to be actively reading from all registered
// prefixes, otherwise you can encounter deadlock scenarios.
type Reader struct {
FlushTimeout time.Duration
done bool
prefixes map[string]*io.PipeWriter
r io.Reader
l sync.Mutex
once sync.Once
}
// NewReader creates a new Reader with the given io.Reader.
func NewReader(r io.Reader) (*Reader, error) {
if r == nil {
return nil, errors.New("Reader must not be nil")
}
return &Reader{r: r}, nil
}
// Prefix returns a new io.Reader that will read data that
// is prefixed with the given prefix.
//
// The read data is line-oriented so calling Read will result
// in a full line of output (including the line separator),
// but is exposed as an io.Reader for useful utility interoperating
// with other Go libraries.
//
// The data read has the prefix stripped, but contains the line
// delimiter.
//
// An empty prefix "" will read the data before any other prefix match is
// found, allowing you to have a default reader before a prefix is matched.
func (r *Reader) Prefix(p string) (io.Reader, error) {
r.l.Lock()
defer r.l.Unlock()
if r.prefixes == nil {
r.prefixes = make(map[string]*io.PipeWriter)
}
if _, ok := r.prefixes[p]; ok {
return nil, fmt.Errorf("Prefix already registered: %s", p)
}
pr, pw := io.Pipe()
r.prefixes[p] = pw
if r.done {
pw.Close()
}
return &prefixReader{
r: r,
pr: pr,
}, nil
}
// init starts the goroutine that reads from the underlying reader
// and sends data to the proper place.
//
// This is safe to call multiple times.
func (r *Reader) init() {
r.once.Do(func() {
go r.read()
})
}
// read runs in a goroutine and performs a read on the reader,
// dispatching lines to prefixes where necessary.
func (r *Reader) read() {
var err error
var lastPrefix string
buf := bufio.NewReader(r.r)
// Listen for bytes in a goroutine. We do this so that if we're blocking
// we can flush the bytes we have after some configured time. There is
// probably a way to make this a lot faster but this works for now.
byteCh := make(chan byte)
doneCh := make(chan error)
go func() {
defer close(doneCh)
for {
b, err := buf.ReadByte()
if err != nil {
doneCh <- err
return
}
byteCh <- b
}
}()
// Figure out the timeout we wait until we flush if we see no data
ft := r.FlushTimeout
if ft == 0 {
ft = 100 * time.Millisecond
}
lineBuf := make([]byte, 0, 80)
for {
line := lineBuf[0:0]
for {
brk := false
select {
case b := <-byteCh:
line = append(line, b)
brk = b == '\n'
case err = <-doneCh:
brk = true
case <-time.After(ft):
brk = true
}
if brk {
break
}
}
// If an error occurred and its not an EOF, then report that
// error to all pipes and exit.
if err != nil && err != io.EOF {
break
}
// Go through each prefix and write if the line matches.
// If no lines match, the data is lost.
var prefix string
r.l.Lock()
for p, _ := range r.prefixes {
if p == "" {
continue
}
if bytes.HasPrefix(line, []byte(p)) {
prefix = p
line = line[len(p):]
break
}
}
if prefix == "" {
prefix = lastPrefix
}
pw, ok := r.prefixes[prefix]
if ok {
lastPrefix = prefix
// Make sure we write all the data before we exit.
n := 0
for n < len(line) {
ni, err := pw.Write(line[n:])
if err != nil {
break
}
n += ni
}
}
r.l.Unlock()
if err == io.EOF {
break
}
}
r.l.Lock()
defer r.l.Unlock()
// Mark us done so that we don't create anymore readers
r.done = true
// All previous writers should be closed so that the readers
// properly return an EOF (or another error if we had one)
for _, pw := range r.prefixes {
if err != nil && err != io.EOF {
pw.CloseWithError(err)
} else {
pw.Close()
}
}
}
type prefixReader struct {
r *Reader
pr io.Reader
}
func (r *prefixReader) Read(p []byte) (int, error) {
r.r.init()
return r.pr.Read(p)
}
|