/usr/share/gocode/src/github.com/hashicorp/consul/consul/rpc.go is in golang-github-hashicorp-consul-dev 0.6.4~dfsg-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 | package consul
import (
"crypto/tls"
"fmt"
"io"
"math/rand"
"net"
"strings"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/yamux"
"github.com/inconshreveable/muxado"
)
type RPCType byte
const (
rpcConsul RPCType = iota
rpcRaft
rpcMultiplex
rpcTLS
rpcMultiplexV2
)
const (
// maxQueryTime is used to bound the limit of a blocking query
maxQueryTime = 600 * time.Second
// defaultQueryTime is the amount of time we block waiting for a change
// if no time is specified. Previously we would wait the maxQueryTime.
defaultQueryTime = 300 * time.Second
// jitterFraction is a the limit to the amount of jitter we apply
// to a user specified MaxQueryTime. We divide the specified time by
// the fraction. So 16 == 6.25% limit of jitter
jitterFraction = 16
// Warn if the Raft command is larger than this.
// If it's over 1MB something is probably being abusive.
raftWarnSize = 1024 * 1024
// enqueueLimit caps how long we will wait to enqueue
// a new Raft command. Something is probably wrong if this
// value is ever reached. However, it prevents us from blocking
// the requesting goroutine forever.
enqueueLimit = 30 * time.Second
)
// listen is used to listen for incoming RPC connections
func (s *Server) listen() {
for {
// Accept a connection
conn, err := s.rpcListener.Accept()
if err != nil {
if s.shutdown {
return
}
s.logger.Printf("[ERR] consul.rpc: failed to accept RPC conn: %v", err)
continue
}
go s.handleConn(conn, false)
metrics.IncrCounter([]string{"consul", "rpc", "accept_conn"}, 1)
}
}
// logConn is a wrapper around memberlist's LogConn so that we format references
// to "from" addresses in a consistent way. This is just a shorter name.
func logConn(conn net.Conn) string {
return memberlist.LogConn(conn)
}
// handleConn is used to determine if this is a Raft or
// Consul type RPC connection and invoke the correct handler
func (s *Server) handleConn(conn net.Conn, isTLS bool) {
// Read a single byte
buf := make([]byte, 1)
if _, err := conn.Read(buf); err != nil {
if err != io.EOF {
s.logger.Printf("[ERR] consul.rpc: failed to read byte: %v %s", err, logConn(conn))
}
conn.Close()
return
}
// Enforce TLS if VerifyIncoming is set
if s.config.VerifyIncoming && !isTLS && RPCType(buf[0]) != rpcTLS {
s.logger.Printf("[WARN] consul.rpc: Non-TLS connection attempted with VerifyIncoming set %s", logConn(conn))
conn.Close()
return
}
// Switch on the byte
switch RPCType(buf[0]) {
case rpcConsul:
s.handleConsulConn(conn)
case rpcRaft:
metrics.IncrCounter([]string{"consul", "rpc", "raft_handoff"}, 1)
s.raftLayer.Handoff(conn)
case rpcMultiplex:
s.handleMultiplex(conn)
case rpcTLS:
if s.rpcTLS == nil {
s.logger.Printf("[WARN] consul.rpc: TLS connection attempted, server not configured for TLS %s", logConn(conn))
conn.Close()
return
}
conn = tls.Server(conn, s.rpcTLS)
s.handleConn(conn, true)
case rpcMultiplexV2:
s.handleMultiplexV2(conn)
default:
s.logger.Printf("[ERR] consul.rpc: unrecognized RPC byte: %v %s", buf[0], logConn(conn))
conn.Close()
return
}
}
// handleMultiplex is used to multiplex a single incoming connection
// using the Muxado multiplexer
func (s *Server) handleMultiplex(conn net.Conn) {
defer conn.Close()
server := muxado.Server(conn)
for {
sub, err := server.Accept()
if err != nil {
if !strings.Contains(err.Error(), "closed") {
s.logger.Printf("[ERR] consul.rpc: multiplex conn accept failed: %v %s", err, logConn(conn))
}
return
}
go s.handleConsulConn(sub)
}
}
// handleMultiplexV2 is used to multiplex a single incoming connection
// using the Yamux multiplexer
func (s *Server) handleMultiplexV2(conn net.Conn) {
defer conn.Close()
conf := yamux.DefaultConfig()
conf.LogOutput = s.config.LogOutput
server, _ := yamux.Server(conn, conf)
for {
sub, err := server.Accept()
if err != nil {
if err != io.EOF {
s.logger.Printf("[ERR] consul.rpc: multiplex conn accept failed: %v %s", err, logConn(conn))
}
return
}
go s.handleConsulConn(sub)
}
}
// handleConsulConn is used to service a single Consul RPC connection
func (s *Server) handleConsulConn(conn net.Conn) {
defer conn.Close()
rpcCodec := msgpackrpc.NewServerCodec(conn)
for {
select {
case <-s.shutdownCh:
return
default:
}
if err := s.rpcServer.ServeRequest(rpcCodec); err != nil {
if err != io.EOF && !strings.Contains(err.Error(), "closed") {
s.logger.Printf("[ERR] consul.rpc: RPC error: %v %s", err, logConn(conn))
metrics.IncrCounter([]string{"consul", "rpc", "request_error"}, 1)
}
return
}
metrics.IncrCounter([]string{"consul", "rpc", "request"}, 1)
}
}
// forward is used to forward to a remote DC or to forward to the local leader
// Returns a bool of if forwarding was performed, as well as any error
func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
// Handle DC forwarding
dc := info.RequestDatacenter()
if dc != s.config.Datacenter {
err := s.forwardDC(method, dc, args, reply)
return true, err
}
// Check if we can allow a stale read
if info.IsRead() && info.AllowStaleRead() {
return false, nil
}
// Handle leader forwarding
if !s.IsLeader() {
err := s.forwardLeader(method, args, reply)
return true, err
}
return false, nil
}
// forwardLeader is used to forward an RPC call to the leader, or fail if no leader
func (s *Server) forwardLeader(method string, args interface{}, reply interface{}) error {
// Get the leader
leader := s.raft.Leader()
if leader == "" {
return structs.ErrNoLeader
}
// Lookup the server
s.localLock.RLock()
server := s.localConsuls[leader]
s.localLock.RUnlock()
// Handle a missing server
if server == nil {
return structs.ErrNoLeader
}
return s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, method, args, reply)
}
// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers
func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{}) error {
// Bail if we can't find any servers
s.remoteLock.RLock()
servers := s.remoteConsuls[dc]
if len(servers) == 0 {
s.remoteLock.RUnlock()
s.logger.Printf("[WARN] consul.rpc: RPC request for DC '%s', no path found", dc)
return structs.ErrNoDCPath
}
// Select a random addr
offset := rand.Int31() % int32(len(servers))
server := servers[offset]
s.remoteLock.RUnlock()
// Forward to remote Consul
metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1)
return s.connPool.RPC(dc, server.Addr, server.Version, method, args, reply)
}
// globalRPC is used to forward an RPC request to one server in each datacenter.
// This will only error for RPC-related errors. Otherwise, application-level
// errors can be sent in the response objects.
func (s *Server) globalRPC(method string, args interface{},
reply structs.CompoundResponse) error {
errorCh := make(chan error)
respCh := make(chan interface{})
// Make a new request into each datacenter
for dc, _ := range s.remoteConsuls {
go func(dc string) {
rr := reply.New()
if err := s.forwardDC(method, dc, args, &rr); err != nil {
errorCh <- err
return
}
respCh <- rr
}(dc)
}
replies, total := 0, len(s.remoteConsuls)
for replies < total {
select {
case err := <-errorCh:
return err
case rr := <-respCh:
reply.Add(rr)
replies++
}
}
return nil
}
// raftApply is used to encode a message, run it through raft, and return
// the FSM response along with any errors
func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) {
buf, err := structs.Encode(t, msg)
if err != nil {
return nil, fmt.Errorf("Failed to encode request: %v", err)
}
// Warn if the command is very large
if n := len(buf); n > raftWarnSize {
s.logger.Printf("[WARN] consul: Attempting to apply large raft entry (%d bytes)", n)
}
future := s.raft.Apply(buf, enqueueLimit)
if err := future.Error(); err != nil {
return nil, err
}
return future.Response(), nil
}
// blockingRPC is used for queries that need to wait for a minimum index. This
// is used to block and wait for changes.
func (s *Server) blockingRPC(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta,
watch state.Watch, run func() error) error {
var timeout *time.Timer
var notifyCh chan struct{}
// Fast path right to the non-blocking query.
if queryOpts.MinQueryIndex == 0 {
goto RUN_QUERY
}
// Make sure a watch was given if we were asked to block.
if watch == nil {
panic("no watch given for blocking query")
}
// Restrict the max query time, and ensure there is always one.
if queryOpts.MaxQueryTime > maxQueryTime {
queryOpts.MaxQueryTime = maxQueryTime
} else if queryOpts.MaxQueryTime <= 0 {
queryOpts.MaxQueryTime = defaultQueryTime
}
// Apply a small amount of jitter to the request.
queryOpts.MaxQueryTime += lib.RandomStagger(queryOpts.MaxQueryTime / jitterFraction)
// Setup a query timeout.
timeout = time.NewTimer(queryOpts.MaxQueryTime)
// Setup the notify channel.
notifyCh = make(chan struct{}, 1)
// Ensure we tear down any watches on return.
defer func() {
timeout.Stop()
watch.Clear(notifyCh)
}()
REGISTER_NOTIFY:
// Register the notification channel. This may be done multiple times if
// we haven't reached the target wait index.
watch.Wait(notifyCh)
RUN_QUERY:
// Update the query metadata.
s.setQueryMeta(queryMeta)
// If the read must be consistent we verify that we are still the leader.
if queryOpts.RequireConsistent {
if err := s.consistentRead(); err != nil {
return err
}
}
// Run the query.
metrics.IncrCounter([]string{"consul", "rpc", "query"}, 1)
err := run()
// Check for minimum query time.
if err == nil && queryMeta.Index > 0 && queryMeta.Index <= queryOpts.MinQueryIndex {
select {
case <-notifyCh:
goto REGISTER_NOTIFY
case <-timeout.C:
}
}
return err
}
// setQueryMeta is used to populate the QueryMeta data for an RPC call
func (s *Server) setQueryMeta(m *structs.QueryMeta) {
if s.IsLeader() {
m.LastContact = 0
m.KnownLeader = true
} else {
m.LastContact = time.Now().Sub(s.raft.LastContact())
m.KnownLeader = (s.raft.Leader() != "")
}
}
// consistentRead is used to ensure we do not perform a stale
// read. This is done by verifying leadership before the read.
func (s *Server) consistentRead() error {
defer metrics.MeasureSince([]string{"consul", "rpc", "consistentRead"}, time.Now())
future := s.raft.VerifyLeader()
return future.Error()
}
|