/usr/share/gocode/src/github.com/weaveworks/mesh/local_peer.go is in golang-github-weaveworks-mesh-dev 0+git20161024.3dd75b1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 | package mesh
import (
"encoding/gob"
"fmt"
"net"
"sync"
"time"
)
// localPeer is the only "active" peer in the mesh. It extends Peer with
// additional behaviors, mostly to retrieve and manage connection state.
type localPeer struct {
sync.RWMutex
*Peer
router *Router
actionChan chan<- localPeerAction
}
// The actor closure used by localPeer.
type localPeerAction func()
// newLocalPeer returns a usable LocalPeer.
func newLocalPeer(name PeerName, nickName string, router *Router) *localPeer {
actionChan := make(chan localPeerAction, ChannelSize)
peer := &localPeer{
Peer: newPeer(name, nickName, randomPeerUID(), 0, randomPeerShortID()),
router: router,
actionChan: actionChan,
}
go peer.actorLoop(actionChan)
return peer
}
// Connections returns all the connections that the local peer is aware of.
func (peer *localPeer) getConnections() connectionSet {
connections := make(connectionSet)
peer.RLock()
defer peer.RUnlock()
for _, conn := range peer.connections {
connections[conn] = struct{}{}
}
return connections
}
// ConnectionTo returns the connection to the named peer, if any.
//
// TODO(pb): Weave Net invokes router.Ourself.ConnectionTo;
// it may be better to provide that on Router directly.
func (peer *localPeer) ConnectionTo(name PeerName) (Connection, bool) {
peer.RLock()
defer peer.RUnlock()
conn, found := peer.connections[name]
return conn, found // yes, you really can't inline that. FFS.
}
// ConnectionsTo returns all known connections to the named peers.
//
// TODO(pb): Weave Net invokes router.Ourself.ConnectionsTo;
// it may be better to provide that on Router directly.
func (peer *localPeer) ConnectionsTo(names []PeerName) []Connection {
if len(names) == 0 {
return nil
}
conns := make([]Connection, 0, len(names))
peer.RLock()
defer peer.RUnlock()
for _, name := range names {
conn, found := peer.connections[name]
// Again, !found could just be due to a race.
if found {
conns = append(conns, conn)
}
}
return conns
}
// createConnection creates a new connection, originating from
// localAddr, to peerAddr. If acceptNewPeer is false, peerAddr must
// already be a member of the mesh.
func (peer *localPeer) createConnection(localAddr string, peerAddr string, acceptNewPeer bool, logger Logger) error {
if err := peer.checkConnectionLimit(); err != nil {
return err
}
localTCPAddr, err := net.ResolveTCPAddr("tcp4", localAddr)
if err != nil {
return err
}
remoteTCPAddr, err := net.ResolveTCPAddr("tcp4", peerAddr)
if err != nil {
return err
}
tcpConn, err := net.DialTCP("tcp4", localTCPAddr, remoteTCPAddr)
if err != nil {
return err
}
connRemote := newRemoteConnection(peer.Peer, nil, peerAddr, true, false)
startLocalConnection(connRemote, tcpConn, peer.router, acceptNewPeer, logger)
return nil
}
// ACTOR client API
// Synchronous.
func (peer *localPeer) doAddConnection(conn ourConnection, isRestartedPeer bool) error {
resultChan := make(chan error)
peer.actionChan <- func() {
resultChan <- peer.handleAddConnection(conn, isRestartedPeer)
}
return <-resultChan
}
// Asynchronous.
func (peer *localPeer) doConnectionEstablished(conn ourConnection) {
peer.actionChan <- func() {
peer.handleConnectionEstablished(conn)
}
}
// Synchronous.
func (peer *localPeer) doDeleteConnection(conn ourConnection) {
resultChan := make(chan interface{})
peer.actionChan <- func() {
peer.handleDeleteConnection(conn)
resultChan <- nil
}
<-resultChan
}
func (peer *localPeer) encode(enc *gob.Encoder) {
peer.RLock()
defer peer.RUnlock()
peer.Peer.encode(enc)
}
// ACTOR server
func (peer *localPeer) actorLoop(actionChan <-chan localPeerAction) {
gossipTimer := time.Tick(gossipInterval)
for {
select {
case action := <-actionChan:
action()
case <-gossipTimer:
peer.router.sendAllGossip()
}
}
}
func (peer *localPeer) handleAddConnection(conn ourConnection, isRestartedPeer bool) error {
if peer.Peer != conn.getLocal() {
panic("Attempt made to add connection to peer where peer is not the source of connection")
}
if conn.Remote() == nil {
panic("Attempt made to add connection to peer with unknown remote peer")
}
toName := conn.Remote().Name
dupErr := fmt.Errorf("Multiple connections to %s added to %s", conn.Remote(), peer.String())
// deliberately non symmetrical
if dupConn, found := peer.connections[toName]; found {
if dupConn == conn {
return nil
}
dupOurConn := dupConn.(ourConnection)
switch conn.breakTie(dupOurConn) {
case tieBreakWon:
dupOurConn.shutdown(dupErr)
peer.handleDeleteConnection(dupOurConn)
case tieBreakLost:
return dupErr
case tieBreakTied:
// oh good grief. Sod it, just kill both of them.
dupOurConn.shutdown(dupErr)
peer.handleDeleteConnection(dupOurConn)
return dupErr
}
}
if err := peer.checkConnectionLimit(); err != nil {
return err
}
_, isConnectedPeer := peer.router.Routes.Unicast(toName)
peer.addConnection(conn)
switch {
case isRestartedPeer:
conn.logf("connection added (restarted peer)")
peer.router.sendAllGossipDown(conn)
case isConnectedPeer:
conn.logf("connection added")
default:
conn.logf("connection added (new peer)")
peer.router.sendAllGossipDown(conn)
}
peer.router.Routes.recalculate()
peer.broadcastPeerUpdate(conn.Remote())
return nil
}
func (peer *localPeer) handleConnectionEstablished(conn ourConnection) {
if peer.Peer != conn.getLocal() {
panic("Peer informed of active connection where peer is not the source of connection")
}
if dupConn, found := peer.connections[conn.Remote().Name]; !found || conn != dupConn {
conn.shutdown(fmt.Errorf("Cannot set unknown connection active"))
return
}
peer.connectionEstablished(conn)
conn.logf("connection fully established")
peer.router.Routes.recalculate()
peer.broadcastPeerUpdate()
}
func (peer *localPeer) handleDeleteConnection(conn ourConnection) {
if peer.Peer != conn.getLocal() {
panic("Attempt made to delete connection from peer where peer is not the source of connection")
}
if conn.Remote() == nil {
panic("Attempt made to delete connection to peer with unknown remote peer")
}
toName := conn.Remote().Name
if connFound, found := peer.connections[toName]; !found || connFound != conn {
return
}
peer.deleteConnection(conn)
conn.logf("connection deleted")
// Must do garbage collection first to ensure we don't send out an
// update with unreachable peers (can cause looping)
peer.router.Peers.GarbageCollect()
peer.router.Routes.recalculate()
peer.broadcastPeerUpdate()
}
// helpers
func (peer *localPeer) broadcastPeerUpdate(peers ...*Peer) {
// Some tests run without a router. This should be fixed so
// that the relevant part of Router can be easily run in the
// context of a test, but that will involve significant
// reworking of tests.
if peer.router != nil {
peer.router.broadcastTopologyUpdate(append(peers, peer.Peer))
}
}
func (peer *localPeer) checkConnectionLimit() error {
limit := peer.router.ConnLimit
if 0 != limit && peer.connectionCount() >= limit {
return fmt.Errorf("Connection limit reached (%v)", limit)
}
return nil
}
func (peer *localPeer) addConnection(conn Connection) {
peer.Lock()
defer peer.Unlock()
peer.connections[conn.Remote().Name] = conn
peer.Version++
}
func (peer *localPeer) deleteConnection(conn Connection) {
peer.Lock()
defer peer.Unlock()
delete(peer.connections, conn.Remote().Name)
peer.Version++
}
func (peer *localPeer) connectionEstablished(conn Connection) {
peer.Lock()
defer peer.Unlock()
peer.Version++
}
func (peer *localPeer) connectionCount() int {
peer.RLock()
defer peer.RUnlock()
return len(peer.connections)
}
func (peer *localPeer) setShortID(shortID PeerShortID) {
peer.Lock()
defer peer.Unlock()
peer.ShortID = shortID
peer.Version++
}
func (peer *localPeer) setVersionBeyond(version uint64) bool {
peer.Lock()
defer peer.Unlock()
if version >= peer.Version {
peer.Version = version + 1
return true
}
return false
}
|