/usr/share/gocode/src/github.com/hashicorp/memberlist/state.go is in golang-github-hashicorp-memberlist-dev 0.1.0-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 | package memberlist
import (
"bytes"
"fmt"
"math"
"math/rand"
"net"
"sync/atomic"
"time"
"github.com/armon/go-metrics"
)
type nodeStateType int
const (
stateAlive nodeStateType = iota
stateSuspect
stateDead
)
// Node represents a node in the cluster.
type Node struct {
Name string
Addr net.IP
Port uint16
Meta []byte // Metadata from the delegate for this node.
PMin uint8 // Minimum protocol version this understands
PMax uint8 // Maximum protocol version this understands
PCur uint8 // Current version node is speaking
DMin uint8 // Min protocol version for the delegate to understand
DMax uint8 // Max protocol version for the delegate to understand
DCur uint8 // Current version delegate is speaking
}
// Address returns the host:port form of a node's address, suitable for use
// with a transport.
func (n *Node) Address() string {
return joinHostPort(n.Addr.String(), n.Port)
}
// NodeState is used to manage our state view of another node
type nodeState struct {
Node
Incarnation uint32 // Last known incarnation number
State nodeStateType // Current state
StateChange time.Time // Time last state change happened
}
// Address returns the host:port form of a node's address, suitable for use
// with a transport.
func (n *nodeState) Address() string {
return n.Node.Address()
}
// ackHandler is used to register handlers for incoming acks and nacks.
type ackHandler struct {
ackFn func([]byte, time.Time)
nackFn func()
timer *time.Timer
}
// NoPingResponseError is used to indicate a 'ping' packet was
// successfully issued but no response was received
type NoPingResponseError struct {
node string
}
func (f NoPingResponseError) Error() string {
return fmt.Sprintf("No response from node %s", f.node)
}
// Schedule is used to ensure the Tick is performed periodically. This
// function is safe to call multiple times. If the memberlist is already
// scheduled, then it won't do anything.
func (m *Memberlist) schedule() {
m.tickerLock.Lock()
defer m.tickerLock.Unlock()
// If we already have tickers, then don't do anything, since we're
// scheduled
if len(m.tickers) > 0 {
return
}
// Create the stop tick channel, a blocking channel. We close this
// when we should stop the tickers.
stopCh := make(chan struct{})
// Create a new probeTicker
if m.config.ProbeInterval > 0 {
t := time.NewTicker(m.config.ProbeInterval)
go m.triggerFunc(m.config.ProbeInterval, t.C, stopCh, m.probe)
m.tickers = append(m.tickers, t)
}
// Create a push pull ticker if needed
if m.config.PushPullInterval > 0 {
go m.pushPullTrigger(stopCh)
}
// Create a gossip ticker if needed
if m.config.GossipInterval > 0 && m.config.GossipNodes > 0 {
t := time.NewTicker(m.config.GossipInterval)
go m.triggerFunc(m.config.GossipInterval, t.C, stopCh, m.gossip)
m.tickers = append(m.tickers, t)
}
// If we made any tickers, then record the stopTick channel for
// later.
if len(m.tickers) > 0 {
m.stopTick = stopCh
}
}
// triggerFunc is used to trigger a function call each time a
// message is received until a stop tick arrives.
func (m *Memberlist) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) {
// Use a random stagger to avoid syncronizing
randStagger := time.Duration(uint64(rand.Int63()) % uint64(stagger))
select {
case <-time.After(randStagger):
case <-stop:
return
}
for {
select {
case <-C:
f()
case <-stop:
return
}
}
}
// pushPullTrigger is used to periodically trigger a push/pull until
// a stop tick arrives. We don't use triggerFunc since the push/pull
// timer is dynamically scaled based on cluster size to avoid network
// saturation
func (m *Memberlist) pushPullTrigger(stop <-chan struct{}) {
interval := m.config.PushPullInterval
// Use a random stagger to avoid syncronizing
randStagger := time.Duration(uint64(rand.Int63()) % uint64(interval))
select {
case <-time.After(randStagger):
case <-stop:
return
}
// Tick using a dynamic timer
for {
tickTime := pushPullScale(interval, m.estNumNodes())
select {
case <-time.After(tickTime):
m.pushPull()
case <-stop:
return
}
}
}
// Deschedule is used to stop the background maintenance. This is safe
// to call multiple times.
func (m *Memberlist) deschedule() {
m.tickerLock.Lock()
defer m.tickerLock.Unlock()
// If we have no tickers, then we aren't scheduled.
if len(m.tickers) == 0 {
return
}
// Close the stop channel so all the ticker listeners stop.
close(m.stopTick)
// Explicitly stop all the tickers themselves so they don't take
// up any more resources, and get rid of the list.
for _, t := range m.tickers {
t.Stop()
}
m.tickers = nil
}
// Tick is used to perform a single round of failure detection and gossip
func (m *Memberlist) probe() {
// Track the number of indexes we've considered probing
numCheck := 0
START:
m.nodeLock.RLock()
// Make sure we don't wrap around infinitely
if numCheck >= len(m.nodes) {
m.nodeLock.RUnlock()
return
}
// Handle the wrap around case
if m.probeIndex >= len(m.nodes) {
m.nodeLock.RUnlock()
m.resetNodes()
m.probeIndex = 0
numCheck++
goto START
}
// Determine if we should probe this node
skip := false
var node nodeState
node = *m.nodes[m.probeIndex]
if node.Name == m.config.Name {
skip = true
} else if node.State == stateDead {
skip = true
}
// Potentially skip
m.nodeLock.RUnlock()
m.probeIndex++
if skip {
numCheck++
goto START
}
// Probe the specific node
m.probeNode(&node)
}
// probeNode handles a single round of failure checking on a node.
func (m *Memberlist) probeNode(node *nodeState) {
defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now())
// We use our health awareness to scale the overall probe interval, so we
// slow down if we detect problems. The ticker that calls us can handle
// us running over the base interval, and will skip missed ticks.
probeInterval := m.awareness.ScaleTimeout(m.config.ProbeInterval)
if probeInterval > m.config.ProbeInterval {
metrics.IncrCounter([]string{"memberlist", "degraded", "probe"}, 1)
}
// Prepare a ping message and setup an ack handler.
ping := ping{SeqNo: m.nextSeqNo(), Node: node.Name}
ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
nackCh := make(chan struct{}, m.config.IndirectChecks+1)
m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval)
// Send a ping to the node. If this node looks like it's suspect or dead,
// also tack on a suspect message so that it has a chance to refute as
// soon as possible.
deadline := time.Now().Add(probeInterval)
addr := node.Address()
if node.State == stateAlive {
if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
return
}
} else {
var msgs [][]byte
if buf, err := encode(pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to encode ping message: %s", err)
return
} else {
msgs = append(msgs, buf.Bytes())
}
s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
if buf, err := encode(suspectMsg, &s); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to encode suspect message: %s", err)
return
} else {
msgs = append(msgs, buf.Bytes())
}
compound := makeCompoundMessage(msgs)
if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", addr, err)
return
}
}
// Mark the sent time here, which should be after any pre-processing and
// system calls to do the actual send. This probably under-reports a bit,
// but it's the best we can do.
sent := time.Now()
// Arrange for our self-awareness to get updated. At this point we've
// sent the ping, so any return statement means the probe succeeded
// which will improve our health until we get to the failure scenarios
// at the end of this function, which will alter this delta variable
// accordingly.
awarenessDelta := -1
defer func() {
m.awareness.ApplyDelta(awarenessDelta)
}()
// Wait for response or round-trip-time.
select {
case v := <-ackCh:
if v.Complete == true {
if m.config.Ping != nil {
rtt := v.Timestamp.Sub(sent)
m.config.Ping.NotifyPingComplete(&node.Node, rtt, v.Payload)
}
return
}
// As an edge case, if we get a timeout, we need to re-enqueue it
// here to break out of the select below.
if v.Complete == false {
ackCh <- v
}
case <-time.After(m.config.ProbeTimeout):
// Note that we don't scale this timeout based on awareness and
// the health score. That's because we don't really expect waiting
// longer to help get UDP through. Since health does extend the
// probe interval it will give the TCP fallback more time, which
// is more active in dealing with lost packets, and it gives more
// time to wait for indirect acks/nacks.
m.logger.Printf("[DEBUG] memberlist: Failed ping: %v (timeout reached)", node.Name)
}
// Get some random live nodes.
m.nodeLock.RLock()
kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool {
return n.Name == m.config.Name ||
n.Name == node.Name ||
n.State != stateAlive
})
m.nodeLock.RUnlock()
// Attempt an indirect ping.
expectedNacks := 0
ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr, Port: node.Port, Node: node.Name}
for _, peer := range kNodes {
// We only expect nack to be sent from peers who understand
// version 4 of the protocol.
if ind.Nack = peer.PMax >= 4; ind.Nack {
expectedNacks++
}
if err := m.encodeAndSendMsg(peer.Address(), indirectPingMsg, &ind); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)
}
}
// Also make an attempt to contact the node directly over TCP. This
// helps prevent confused clients who get isolated from UDP traffic
// but can still speak TCP (which also means they can possibly report
// misinformation to other nodes via anti-entropy), avoiding flapping in
// the cluster.
//
// This is a little unusual because we will attempt a TCP ping to any
// member who understands version 3 of the protocol, regardless of
// which protocol version we are speaking. That's why we've included a
// config option to turn this off if desired.
fallbackCh := make(chan bool, 1)
if (!m.config.DisableTcpPings) && (node.PMax >= 3) {
go func() {
defer close(fallbackCh)
didContact, err := m.sendPingAndWaitForAck(node.Address(), ping, deadline)
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err)
} else {
fallbackCh <- didContact
}
}()
} else {
close(fallbackCh)
}
// Wait for the acks or timeout. Note that we don't check the fallback
// channel here because we want to issue a warning below if that's the
// *only* way we hear back from the peer, so we have to let this time
// out first to allow the normal UDP-based acks to come in.
select {
case v := <-ackCh:
if v.Complete == true {
return
}
}
// Finally, poll the fallback channel. The timeouts are set such that
// the channel will have something or be closed without having to wait
// any additional time here.
for didContact := range fallbackCh {
if didContact {
m.logger.Printf("[WARN] memberlist: Was able to connect to %s but other probes failed, network may be misconfigured", node.Name)
return
}
}
// Update our self-awareness based on the results of this failed probe.
// If we don't have peers who will send nacks then we penalize for any
// failed probe as a simple health metric. If we do have peers to nack
// verify, then we can use that as a more sophisticated measure of self-
// health because we assume them to be working, and they can help us
// decide if the probed node was really dead or if it was something wrong
// with ourselves.
awarenessDelta = 0
if expectedNacks > 0 {
if nackCount := len(nackCh); nackCount < expectedNacks {
awarenessDelta += (expectedNacks - nackCount)
}
} else {
awarenessDelta += 1
}
// No acks received from target, suspect it as failed.
m.logger.Printf("[INFO] memberlist: Suspect %s has failed, no acks received", node.Name)
s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
m.suspectNode(&s)
}
// Ping initiates a ping to the node with the specified name.
func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) {
// Prepare a ping message and setup an ack handler.
ping := ping{SeqNo: m.nextSeqNo(), Node: node}
ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
m.setProbeChannels(ping.SeqNo, ackCh, nil, m.config.ProbeInterval)
// Send a ping to the node.
if err := m.encodeAndSendMsg(addr.String(), pingMsg, &ping); err != nil {
return 0, err
}
// Mark the sent time here, which should be after any pre-processing and
// system calls to do the actual send. This probably under-reports a bit,
// but it's the best we can do.
sent := time.Now()
// Wait for response or timeout.
select {
case v := <-ackCh:
if v.Complete == true {
return v.Timestamp.Sub(sent), nil
}
case <-time.After(m.config.ProbeTimeout):
// Timeout, return an error below.
}
m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node)
return 0, NoPingResponseError{ping.Node}
}
// resetNodes is used when the tick wraps around. It will reap the
// dead nodes and shuffle the node list.
func (m *Memberlist) resetNodes() {
m.nodeLock.Lock()
defer m.nodeLock.Unlock()
// Move dead nodes, but respect gossip to the dead interval
deadIdx := moveDeadNodes(m.nodes, m.config.GossipToTheDeadTime)
// Deregister the dead nodes
for i := deadIdx; i < len(m.nodes); i++ {
delete(m.nodeMap, m.nodes[i].Name)
m.nodes[i] = nil
}
// Trim the nodes to exclude the dead nodes
m.nodes = m.nodes[0:deadIdx]
// Update numNodes after we've trimmed the dead nodes
atomic.StoreUint32(&m.numNodes, uint32(deadIdx))
// Shuffle live nodes
shuffleNodes(m.nodes)
}
// gossip is invoked every GossipInterval period to broadcast our gossip
// messages to a few random nodes.
func (m *Memberlist) gossip() {
defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now())
// Get some random live, suspect, or recently dead nodes
m.nodeLock.RLock()
kNodes := kRandomNodes(m.config.GossipNodes, m.nodes, func(n *nodeState) bool {
if n.Name == m.config.Name {
return true
}
switch n.State {
case stateAlive, stateSuspect:
return false
case stateDead:
return time.Since(n.StateChange) > m.config.GossipToTheDeadTime
default:
return true
}
})
m.nodeLock.RUnlock()
// Compute the bytes available
bytesAvail := m.config.UDPBufferSize - compoundHeaderOverhead
if m.config.EncryptionEnabled() {
bytesAvail -= encryptOverhead(m.encryptionVersion())
}
for _, node := range kNodes {
// Get any pending broadcasts
msgs := m.getBroadcasts(compoundOverhead, bytesAvail)
if len(msgs) == 0 {
return
}
addr := node.Address()
if len(msgs) == 1 {
// Send single message as is
if err := m.rawSendMsgPacket(addr, &node.Node, msgs[0]); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
}
} else {
// Otherwise create and send a compound message
compound := makeCompoundMessage(msgs)
if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
}
}
}
}
// pushPull is invoked periodically to randomly perform a complete state
// exchange. Used to ensure a high level of convergence, but is also
// reasonably expensive as the entire state of this node is exchanged
// with the other node.
func (m *Memberlist) pushPull() {
// Get a random live node
m.nodeLock.RLock()
nodes := kRandomNodes(1, m.nodes, func(n *nodeState) bool {
return n.Name == m.config.Name ||
n.State != stateAlive
})
m.nodeLock.RUnlock()
// If no nodes, bail
if len(nodes) == 0 {
return
}
node := nodes[0]
// Attempt a push pull
if err := m.pushPullNode(node.Address(), false); err != nil {
m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err)
}
}
// pushPullNode does a complete state exchange with a specific node.
func (m *Memberlist) pushPullNode(addr string, join bool) error {
defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now())
// Attempt to send and receive with the node
remote, userState, err := m.sendAndReceiveState(addr, join)
if err != nil {
return err
}
if err := m.mergeRemoteState(join, remote, userState); err != nil {
return err
}
return nil
}
// verifyProtocol verifies that all the remote nodes can speak with our
// nodes and vice versa on both the core protocol as well as the
// delegate protocol level.
//
// The verification works by finding the maximum minimum and
// minimum maximum understood protocol and delegate versions. In other words,
// it finds the common denominator of protocol and delegate version ranges
// for the entire cluster.
//
// After this, it goes through the entire cluster (local and remote) and
// verifies that everyone's speaking protocol versions satisfy this range.
// If this passes, it means that every node can understand each other.
func (m *Memberlist) verifyProtocol(remote []pushNodeState) error {
m.nodeLock.RLock()
defer m.nodeLock.RUnlock()
// Maximum minimum understood and minimum maximum understood for both
// the protocol and delegate versions. We use this to verify everyone
// can be understood.
var maxpmin, minpmax uint8
var maxdmin, mindmax uint8
minpmax = math.MaxUint8
mindmax = math.MaxUint8
for _, rn := range remote {
// If the node isn't alive, then skip it
if rn.State != stateAlive {
continue
}
// Skip nodes that don't have versions set, it just means
// their version is zero.
if len(rn.Vsn) == 0 {
continue
}
if rn.Vsn[0] > maxpmin {
maxpmin = rn.Vsn[0]
}
if rn.Vsn[1] < minpmax {
minpmax = rn.Vsn[1]
}
if rn.Vsn[3] > maxdmin {
maxdmin = rn.Vsn[3]
}
if rn.Vsn[4] < mindmax {
mindmax = rn.Vsn[4]
}
}
for _, n := range m.nodes {
// Ignore non-alive nodes
if n.State != stateAlive {
continue
}
if n.PMin > maxpmin {
maxpmin = n.PMin
}
if n.PMax < minpmax {
minpmax = n.PMax
}
if n.DMin > maxdmin {
maxdmin = n.DMin
}
if n.DMax < mindmax {
mindmax = n.DMax
}
}
// Now that we definitively know the minimum and maximum understood
// version that satisfies the whole cluster, we verify that every
// node in the cluster satisifies this.
for _, n := range remote {
var nPCur, nDCur uint8
if len(n.Vsn) > 0 {
nPCur = n.Vsn[2]
nDCur = n.Vsn[5]
}
if nPCur < maxpmin || nPCur > minpmax {
return fmt.Errorf(
"Node '%s' protocol version (%d) is incompatible: [%d, %d]",
n.Name, nPCur, maxpmin, minpmax)
}
if nDCur < maxdmin || nDCur > mindmax {
return fmt.Errorf(
"Node '%s' delegate protocol version (%d) is incompatible: [%d, %d]",
n.Name, nDCur, maxdmin, mindmax)
}
}
for _, n := range m.nodes {
nPCur := n.PCur
nDCur := n.DCur
if nPCur < maxpmin || nPCur > minpmax {
return fmt.Errorf(
"Node '%s' protocol version (%d) is incompatible: [%d, %d]",
n.Name, nPCur, maxpmin, minpmax)
}
if nDCur < maxdmin || nDCur > mindmax {
return fmt.Errorf(
"Node '%s' delegate protocol version (%d) is incompatible: [%d, %d]",
n.Name, nDCur, maxdmin, mindmax)
}
}
return nil
}
// nextSeqNo returns a usable sequence number in a thread safe way
func (m *Memberlist) nextSeqNo() uint32 {
return atomic.AddUint32(&m.sequenceNum, 1)
}
// nextIncarnation returns the next incarnation number in a thread safe way
func (m *Memberlist) nextIncarnation() uint32 {
return atomic.AddUint32(&m.incarnation, 1)
}
// skipIncarnation adds the positive offset to the incarnation number.
func (m *Memberlist) skipIncarnation(offset uint32) uint32 {
return atomic.AddUint32(&m.incarnation, offset)
}
// estNumNodes is used to get the current estimate of the number of nodes
func (m *Memberlist) estNumNodes() int {
return int(atomic.LoadUint32(&m.numNodes))
}
type ackMessage struct {
Complete bool
Payload []byte
Timestamp time.Time
}
// setProbeChannels is used to attach the ackCh to receive a message when an ack
// with a given sequence number is received. The `complete` field of the message
// will be false on timeout. Any nack messages will cause an empty struct to be
// passed to the nackCh, which can be nil if not needed.
func (m *Memberlist) setProbeChannels(seqNo uint32, ackCh chan ackMessage, nackCh chan struct{}, timeout time.Duration) {
// Create handler functions for acks and nacks
ackFn := func(payload []byte, timestamp time.Time) {
select {
case ackCh <- ackMessage{true, payload, timestamp}:
default:
}
}
nackFn := func() {
select {
case nackCh <- struct{}{}:
default:
}
}
// Add the handlers
ah := &ackHandler{ackFn, nackFn, nil}
m.ackLock.Lock()
m.ackHandlers[seqNo] = ah
m.ackLock.Unlock()
// Setup a reaping routing
ah.timer = time.AfterFunc(timeout, func() {
m.ackLock.Lock()
delete(m.ackHandlers, seqNo)
m.ackLock.Unlock()
select {
case ackCh <- ackMessage{false, nil, time.Now()}:
default:
}
})
}
// setAckHandler is used to attach a handler to be invoked when an ack with a
// given sequence number is received. If a timeout is reached, the handler is
// deleted. This is used for indirect pings so does not configure a function
// for nacks.
func (m *Memberlist) setAckHandler(seqNo uint32, ackFn func([]byte, time.Time), timeout time.Duration) {
// Add the handler
ah := &ackHandler{ackFn, nil, nil}
m.ackLock.Lock()
m.ackHandlers[seqNo] = ah
m.ackLock.Unlock()
// Setup a reaping routing
ah.timer = time.AfterFunc(timeout, func() {
m.ackLock.Lock()
delete(m.ackHandlers, seqNo)
m.ackLock.Unlock()
})
}
// Invokes an ack handler if any is associated, and reaps the handler immediately
func (m *Memberlist) invokeAckHandler(ack ackResp, timestamp time.Time) {
m.ackLock.Lock()
ah, ok := m.ackHandlers[ack.SeqNo]
delete(m.ackHandlers, ack.SeqNo)
m.ackLock.Unlock()
if !ok {
return
}
ah.timer.Stop()
ah.ackFn(ack.Payload, timestamp)
}
// Invokes nack handler if any is associated.
func (m *Memberlist) invokeNackHandler(nack nackResp) {
m.ackLock.Lock()
ah, ok := m.ackHandlers[nack.SeqNo]
m.ackLock.Unlock()
if !ok || ah.nackFn == nil {
return
}
ah.nackFn()
}
// refute gossips an alive message in response to incoming information that we
// are suspect or dead. It will make sure the incarnation number beats the given
// accusedInc value, or you can supply 0 to just get the next incarnation number.
// This alters the node state that's passed in so this MUST be called while the
// nodeLock is held.
func (m *Memberlist) refute(me *nodeState, accusedInc uint32) {
// Make sure the incarnation number beats the accusation.
inc := m.nextIncarnation()
if accusedInc >= inc {
inc = m.skipIncarnation(accusedInc - inc + 1)
}
me.Incarnation = inc
// Decrease our health because we are being asked to refute a problem.
m.awareness.ApplyDelta(1)
// Format and broadcast an alive message.
a := alive{
Incarnation: inc,
Node: me.Name,
Addr: me.Addr,
Port: me.Port,
Meta: me.Meta,
Vsn: []uint8{
me.PMin, me.PMax, me.PCur,
me.DMin, me.DMax, me.DCur,
},
}
m.encodeAndBroadcast(me.Addr.String(), aliveMsg, a)
}
// aliveNode is invoked by the network layer when we get a message about a
// live node.
func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
m.nodeLock.Lock()
defer m.nodeLock.Unlock()
state, ok := m.nodeMap[a.Node]
// It is possible that during a Leave(), there is already an aliveMsg
// in-queue to be processed but blocked by the locks above. If we let
// that aliveMsg process, it'll cause us to re-join the cluster. This
// ensures that we don't.
if m.leave && a.Node == m.config.Name {
return
}
// Invoke the Alive delegate if any. This can be used to filter out
// alive messages based on custom logic. For example, using a cluster name.
// Using a merge delegate is not enough, as it is possible for passive
// cluster merging to still occur.
if m.config.Alive != nil {
node := &Node{
Name: a.Node,
Addr: a.Addr,
Port: a.Port,
Meta: a.Meta,
PMin: a.Vsn[0],
PMax: a.Vsn[1],
PCur: a.Vsn[2],
DMin: a.Vsn[3],
DMax: a.Vsn[4],
DCur: a.Vsn[5],
}
if err := m.config.Alive.NotifyAlive(node); err != nil {
m.logger.Printf("[WARN] memberlist: ignoring alive message for '%s': %s",
a.Node, err)
return
}
}
// Check if we've never seen this node before, and if not, then
// store this node in our node map.
if !ok {
state = &nodeState{
Node: Node{
Name: a.Node,
Addr: a.Addr,
Port: a.Port,
Meta: a.Meta,
},
State: stateDead,
}
// Add to map
m.nodeMap[a.Node] = state
// Get a random offset. This is important to ensure
// the failure detection bound is low on average. If all
// nodes did an append, failure detection bound would be
// very high.
n := len(m.nodes)
offset := randomOffset(n)
// Add at the end and swap with the node at the offset
m.nodes = append(m.nodes, state)
m.nodes[offset], m.nodes[n] = m.nodes[n], m.nodes[offset]
// Update numNodes after we've added a new node
atomic.AddUint32(&m.numNodes, 1)
}
// Check if this address is different than the existing node
if !bytes.Equal([]byte(state.Addr), a.Addr) || state.Port != a.Port {
m.logger.Printf("[ERR] memberlist: Conflicting address for %s. Mine: %v:%d Theirs: %v:%d",
state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port)
// Inform the conflict delegate if provided
if m.config.Conflict != nil {
other := Node{
Name: a.Node,
Addr: a.Addr,
Port: a.Port,
Meta: a.Meta,
}
m.config.Conflict.NotifyConflict(&state.Node, &other)
}
return
}
// Bail if the incarnation number is older, and this is not about us
isLocalNode := state.Name == m.config.Name
if a.Incarnation <= state.Incarnation && !isLocalNode {
return
}
// Bail if strictly less and this is about us
if a.Incarnation < state.Incarnation && isLocalNode {
return
}
// Clear out any suspicion timer that may be in effect.
delete(m.nodeTimers, a.Node)
// Store the old state and meta data
oldState := state.State
oldMeta := state.Meta
// If this is us we need to refute, otherwise re-broadcast
if !bootstrap && isLocalNode {
// Compute the version vector
versions := []uint8{
state.PMin, state.PMax, state.PCur,
state.DMin, state.DMax, state.DCur,
}
// If the Incarnation is the same, we need special handling, since it
// possible for the following situation to happen:
// 1) Start with configuration C, join cluster
// 2) Hard fail / Kill / Shutdown
// 3) Restart with configuration C', join cluster
//
// In this case, other nodes and the local node see the same incarnation,
// but the values may not be the same. For this reason, we always
// need to do an equality check for this Incarnation. In most cases,
// we just ignore, but we may need to refute.
//
if a.Incarnation == state.Incarnation &&
bytes.Equal(a.Meta, state.Meta) &&
bytes.Equal(a.Vsn, versions) {
return
}
m.refute(state, a.Incarnation)
m.logger.Printf("[WARN] memberlist: Refuting an alive message")
} else {
m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify)
// Update protocol versions if it arrived
if len(a.Vsn) > 0 {
state.PMin = a.Vsn[0]
state.PMax = a.Vsn[1]
state.PCur = a.Vsn[2]
state.DMin = a.Vsn[3]
state.DMax = a.Vsn[4]
state.DCur = a.Vsn[5]
}
// Update the state and incarnation number
state.Incarnation = a.Incarnation
state.Meta = a.Meta
if state.State != stateAlive {
state.State = stateAlive
state.StateChange = time.Now()
}
}
// Update metrics
metrics.IncrCounter([]string{"memberlist", "msg", "alive"}, 1)
// Notify the delegate of any relevant updates
if m.config.Events != nil {
if oldState == stateDead {
// if Dead -> Alive, notify of join
m.config.Events.NotifyJoin(&state.Node)
} else if !bytes.Equal(oldMeta, state.Meta) {
// if Meta changed, trigger an update notification
m.config.Events.NotifyUpdate(&state.Node)
}
}
}
// suspectNode is invoked by the network layer when we get a message
// about a suspect node
func (m *Memberlist) suspectNode(s *suspect) {
m.nodeLock.Lock()
defer m.nodeLock.Unlock()
state, ok := m.nodeMap[s.Node]
// If we've never heard about this node before, ignore it
if !ok {
return
}
// Ignore old incarnation numbers
if s.Incarnation < state.Incarnation {
return
}
// See if there's a suspicion timer we can confirm. If the info is new
// to us we will go ahead and re-gossip it. This allows for multiple
// independent confirmations to flow even when a node probes a node
// that's already suspect.
if timer, ok := m.nodeTimers[s.Node]; ok {
if timer.Confirm(s.From) {
m.encodeAndBroadcast(s.Node, suspectMsg, s)
}
return
}
// Ignore non-alive nodes
if state.State != stateAlive {
return
}
// If this is us we need to refute, otherwise re-broadcast
if state.Name == m.config.Name {
m.refute(state, s.Incarnation)
m.logger.Printf("[WARN] memberlist: Refuting a suspect message (from: %s)", s.From)
return // Do not mark ourself suspect
} else {
m.encodeAndBroadcast(s.Node, suspectMsg, s)
}
// Update metrics
metrics.IncrCounter([]string{"memberlist", "msg", "suspect"}, 1)
// Update the state
state.Incarnation = s.Incarnation
state.State = stateSuspect
changeTime := time.Now()
state.StateChange = changeTime
// Setup a suspicion timer. Given that we don't have any known phase
// relationship with our peers, we set up k such that we hit the nominal
// timeout two probe intervals short of what we expect given the suspicion
// multiplier.
k := m.config.SuspicionMult - 2
// If there aren't enough nodes to give the expected confirmations, just
// set k to 0 to say that we don't expect any. Note we subtract 2 from n
// here to take out ourselves and the node being probed.
n := m.estNumNodes()
if n-2 < k {
k = 0
}
// Compute the timeouts based on the size of the cluster.
min := suspicionTimeout(m.config.SuspicionMult, n, m.config.ProbeInterval)
max := time.Duration(m.config.SuspicionMaxTimeoutMult) * min
fn := func(numConfirmations int) {
m.nodeLock.Lock()
state, ok := m.nodeMap[s.Node]
timeout := ok && state.State == stateSuspect && state.StateChange == changeTime
m.nodeLock.Unlock()
if timeout {
if k > 0 && numConfirmations < k {
metrics.IncrCounter([]string{"memberlist", "degraded", "timeout"}, 1)
}
m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached (%d peer confirmations)",
state.Name, numConfirmations)
d := dead{Incarnation: state.Incarnation, Node: state.Name, From: m.config.Name}
m.deadNode(&d)
}
}
m.nodeTimers[s.Node] = newSuspicion(s.From, k, min, max, fn)
}
// deadNode is invoked by the network layer when we get a message
// about a dead node
func (m *Memberlist) deadNode(d *dead) {
m.nodeLock.Lock()
defer m.nodeLock.Unlock()
state, ok := m.nodeMap[d.Node]
// If we've never heard about this node before, ignore it
if !ok {
return
}
// Ignore old incarnation numbers
if d.Incarnation < state.Incarnation {
return
}
// Clear out any suspicion timer that may be in effect.
delete(m.nodeTimers, d.Node)
// Ignore if node is already dead
if state.State == stateDead {
return
}
// Check if this is us
if state.Name == m.config.Name {
// If we are not leaving we need to refute
if !m.leave {
m.refute(state, d.Incarnation)
m.logger.Printf("[WARN] memberlist: Refuting a dead message (from: %s)", d.From)
return // Do not mark ourself dead
}
// If we are leaving, we broadcast and wait
m.encodeBroadcastNotify(d.Node, deadMsg, d, m.leaveBroadcast)
} else {
m.encodeAndBroadcast(d.Node, deadMsg, d)
}
// Update metrics
metrics.IncrCounter([]string{"memberlist", "msg", "dead"}, 1)
// Update the state
state.Incarnation = d.Incarnation
state.State = stateDead
state.StateChange = time.Now()
// Notify of death
if m.config.Events != nil {
m.config.Events.NotifyLeave(&state.Node)
}
}
// mergeState is invoked by the network layer when we get a Push/Pull
// state transfer
func (m *Memberlist) mergeState(remote []pushNodeState) {
for _, r := range remote {
switch r.State {
case stateAlive:
a := alive{
Incarnation: r.Incarnation,
Node: r.Name,
Addr: r.Addr,
Port: r.Port,
Meta: r.Meta,
Vsn: r.Vsn,
}
m.aliveNode(&a, nil, false)
case stateDead:
// If the remote node believes a node is dead, we prefer to
// suspect that node instead of declaring it dead instantly
fallthrough
case stateSuspect:
s := suspect{Incarnation: r.Incarnation, Node: r.Name, From: m.config.Name}
m.suspectNode(&s)
}
}
}
|