/usr/lib/python2.7/dist-packages/tooz/coordination.py is in python-tooz 1.40.0-4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 | # -*- coding: utf-8 -*-
#
# Copyright (C) 2016 Red Hat, Inc.
# Copyright (C) 2013-2014 eNovance Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import collections
import enum
import logging
import threading
from oslo_utils import excutils
from oslo_utils import netutils
from oslo_utils import timeutils
import six
from stevedore import driver
import tooz
LOG = logging.getLogger(__name__)
TOOZ_BACKENDS_NAMESPACE = "tooz.backends"
class Characteristics(enum.Enum):
"""Attempts to describe the characteristic that a driver supports."""
DISTRIBUTED_ACROSS_THREADS = 'DISTRIBUTED_ACROSS_THREADS'
"""Coordinator components when used by multiple **threads** work
the same as if those components were only used by a single thread."""
DISTRIBUTED_ACROSS_PROCESSES = 'DISTRIBUTED_ACROSS_PROCESSES'
"""Coordinator components when used by multiple **processes** work
the same as if those components were only used by a single thread."""
DISTRIBUTED_ACROSS_HOSTS = 'DISTRIBUTED_ACROSS_HOSTS'
"""Coordinator components when used by multiple **hosts** work
the same as if those components were only used by a single thread."""
NON_TIMEOUT_BASED = 'NON_TIMEOUT_BASED'
"""The driver has the following property:
* Its operations are not based on the timeout of other clients, but on some
other more robust mechanisms.
"""
LINEARIZABLE = 'LINEARIZABLE'
"""The driver has the following properties:
* Ensures each operation must take place before its
completion time.
* Any operation invoked subsequently must take place
after the invocation and by extension, after the original operation
itself.
"""
SEQUENTIAL = 'SEQUENTIAL'
"""The driver has the following properties:
* Operations can take effect before or after completion – but all
operations retain the constraint that operations from any given process
must take place in that processes order.
"""
CAUSAL = 'CAUSAL'
"""The driver has the following properties:
* Does **not** have to enforce the order of every
operation from a process, perhaps, only causally related operations
must occur in order.
"""
SERIALIZABLE = 'SERIALIZABLE'
"""The driver has the following properties:
* The history of **all** operations is equivalent to
one that took place in some single atomic order but with unknown
invocation and completion times - it places no bounds on
time or order.
"""
SAME_VIEW_UNDER_PARTITIONS = 'SAME_VIEW_UNDER_PARTITIONS'
"""When a client is connected to a server and that server is partitioned
from a group of other servers it will (somehow) have the same view of
data as a client connected to a server on the other side of the
partition (typically this is accomplished by write availability being
lost and therefore nothing can change).
"""
SAME_VIEW_ACROSS_CLIENTS = 'SAME_VIEW_ACROSS_CLIENTS'
"""A client connected to one server will *always* have the same view
every other client will have (no matter what server those other
clients are connected to). Typically this is a sacrifice in
write availability because before a write can be acknowledged it must
be acknowledged by *all* servers in a cluster (so that all clients
that are connected to those servers read the exact *same* thing).
"""
class Hooks(list):
def run(self, *args, **kwargs):
return list(map(lambda cb: cb(*args, **kwargs), self))
class Event(object):
"""Base class for events."""
class MemberJoinedGroup(Event):
"""A member joined a group event."""
def __init__(self, group_id, member_id):
self.group_id = group_id
self.member_id = member_id
class MemberLeftGroup(Event):
"""A member left a group event."""
def __init__(self, group_id, member_id):
self.group_id = group_id
self.member_id = member_id
class LeaderElected(Event):
"""A leader as been elected."""
def __init__(self, group_id, member_id):
self.group_id = group_id
self.member_id = member_id
class Heart(object):
"""Coordination drivers main liveness pump (its heart)."""
def __init__(self, driver, thread_cls=threading.Thread,
event_cls=threading.Event):
self._thread_cls = thread_cls
self._dead = event_cls()
self._finished = event_cls()
self._finished.set()
self._runner = None
self._driver = driver
self._beats = 0
@property
def beats(self):
"""How many times the heart has beaten."""
return self._beats
def is_alive(self):
"""Returns if the heart is beating."""
return not (self._runner is None
or not self._runner.is_alive()
or self._finished.is_set())
@excutils.forever_retry_uncaught_exceptions
def _beat_forever_until_stopped(self):
"""Inner beating loop."""
try:
while not self._dead.is_set():
with timeutils.StopWatch() as w:
wait_until_next_beat = self._driver.heartbeat()
ran_for = w.elapsed()
if ran_for > wait_until_next_beat:
LOG.warning(
"Heartbeating took too long to execute (it ran for"
" %0.2f seconds which is %0.2f seconds longer than"
" the next heartbeat idle time). This may cause"
" timeouts (in locks, leadership, ...) to"
" happen (which will not end well).", ran_for,
ran_for - wait_until_next_beat)
self._beats += 1
# NOTE(harlowja): use the event object for waiting and
# not a sleep function since doing that will allow this code
# to terminate early if stopped via the stop() method vs
# having to wait until the sleep function returns.
self._dead.wait(wait_until_next_beat)
finally:
self._finished.set()
def start(self, thread_cls=None):
"""Starts the heart beating thread (noop if already started)."""
if not self.is_alive():
self._finished.clear()
self._dead.clear()
self._beats = 0
if thread_cls is None:
thread_cls = self._thread_cls
self._runner = thread_cls(target=self._beat_forever_until_stopped)
self._runner.daemon = True
self._runner.start()
def stop(self):
"""Requests the heart beating thread to stop beating."""
self._dead.set()
def wait(self, timeout=None):
"""Wait up to given timeout for the heart beating thread to stop."""
self._finished.wait(timeout)
return self._finished.is_set()
@six.add_metaclass(abc.ABCMeta)
class CoordinationDriver(object):
requires_beating = False
"""
Usage requirement that if true requires that the :py:meth:`~.heartbeat`
be called periodically (at a given rate) to avoid locks, sessions and
other from being automatically closed/discarded by the coordinators
backing store.
"""
CHARACTERISTICS = ()
"""
Tuple of :py:class:`~tooz.coordination.Characteristics` introspectable
enum member(s) that can be used to interogate how this driver works.
"""
def __init__(self):
self._started = False
self._hooks_join_group = collections.defaultdict(Hooks)
self._hooks_leave_group = collections.defaultdict(Hooks)
self._hooks_elected_leader = collections.defaultdict(Hooks)
# A cache for group members
self._group_members = collections.defaultdict(set)
self.requires_beating = (
CoordinationDriver.heartbeat != self.__class__.heartbeat
)
self.heart = Heart(self)
def _has_hooks_for_group(self, group_id):
return (len(self._hooks_join_group[group_id]) +
len(self._hooks_leave_group[group_id]))
@staticmethod
def run_watchers(timeout=None):
"""Run the watchers callback.
This may also activate :py:meth:`.run_elect_coordinator` (depending
on driver implementation).
"""
raise tooz.NotImplemented
@staticmethod
def run_elect_coordinator():
"""Try to leader elect this coordinator & activate hooks on success."""
raise tooz.NotImplemented
@abc.abstractmethod
def watch_join_group(self, group_id, callback):
"""Call a function when group_id sees a new member joined.
The callback functions will be executed when `run_watchers` is
called.
:param group_id: The group id to watch
:param callback: The function to execute when a member joins this group
"""
self._hooks_join_group[group_id].append(callback)
@abc.abstractmethod
def unwatch_join_group(self, group_id, callback):
"""Stop executing a function when a group_id sees a new member joined.
:param group_id: The group id to unwatch
:param callback: The function that was executed when a member joined
this group
"""
try:
self._hooks_join_group[group_id].remove(callback)
except ValueError:
raise WatchCallbackNotFound(group_id, callback)
if (not self._has_hooks_for_group(group_id) and
group_id in self._group_members):
del self._group_members[group_id]
@abc.abstractmethod
def watch_leave_group(self, group_id, callback):
"""Call a function when group_id sees a new member leaving.
The callback functions will be executed when `run_watchers` is
called.
:param group_id: The group id to watch
:param callback: The function to execute when a member leaves this
group
"""
self._hooks_leave_group[group_id].append(callback)
@abc.abstractmethod
def unwatch_leave_group(self, group_id, callback):
"""Stop executing a function when a group_id sees a new member leaving.
:param group_id: The group id to unwatch
:param callback: The function that was executed when a member left
this group
"""
try:
self._hooks_leave_group[group_id].remove(callback)
except ValueError:
raise WatchCallbackNotFound(group_id, callback)
if (not self._has_hooks_for_group(group_id) and
group_id in self._group_members):
del self._group_members[group_id]
@abc.abstractmethod
def watch_elected_as_leader(self, group_id, callback):
"""Call a function when member gets elected as leader.
The callback functions will be executed when `run_watchers` is
called.
:param group_id: The group id to watch
:param callback: The function to execute when a member leaves this
group
"""
self._hooks_elected_leader[group_id].append(callback)
@abc.abstractmethod
def unwatch_elected_as_leader(self, group_id, callback):
"""Call a function when member gets elected as leader.
The callback functions will be executed when `run_watchers` is
called.
:param group_id: The group id to watch
:param callback: The function to execute when a member leaves this
group
"""
try:
self._hooks_elected_leader[group_id].remove(callback)
except ValueError:
raise WatchCallbackNotFound(group_id, callback)
if not self._hooks_elected_leader[group_id]:
del self._hooks_elected_leader[group_id]
@staticmethod
def stand_down_group_leader(group_id):
"""Stand down as the group leader if we are.
:param group_id: The group where we don't want to be a leader anymore
"""
raise tooz.NotImplemented
@property
def is_started(self):
return self._started
def start(self, start_heart=False):
"""Start the service engine.
If needed, the establishment of a connection to the servers
is initiated.
"""
if self._started:
raise ToozError(
"Can not start a driver which has not been stopped")
self._start()
if self.requires_beating and start_heart:
self.heart.start()
self._started = True
def _start(self):
pass
def stop(self):
"""Stop the service engine.
If needed, the connection to servers is closed and the client will
disappear from all joined groups.
"""
if not self._started:
raise ToozError("Can not stop a driver which has not been started")
if self.heart.is_alive():
self.heart.stop()
self.heart.wait()
self._stop()
self._started = False
def _stop(self):
pass
@staticmethod
def create_group(group_id):
"""Request the creation of a group asynchronously.
:param group_id: the id of the group to create
:type group_id: str
:returns: None
:rtype: CoordAsyncResult
"""
raise tooz.NotImplemented
@staticmethod
def get_groups():
"""Return the list composed by all groups ids asynchronously.
:returns: the list of all created group ids
:rtype: CoordAsyncResult
"""
raise tooz.NotImplemented
@staticmethod
def join_group(group_id, capabilities=b""):
"""Join a group and establish group membership asynchronously.
:param group_id: the id of the group to join
:type group_id: str
:param capabilities: the capabilities of the joined member
:type capabilities: object (typically str)
:returns: None
:rtype: CoordAsyncResult
"""
raise tooz.NotImplemented
@staticmethod
def leave_group(group_id):
"""Leave a group asynchronously.
:param group_id: the id of the group to leave
:type group_id: str
:returns: None
:rtype: CoordAsyncResult
"""
raise tooz.NotImplemented
@staticmethod
def delete_group(group_id):
"""Delete a group asynchronously.
:param group_id: the id of the group to leave
:type group_id: str
:returns: Result
:rtype: CoordAsyncResult
"""
raise tooz.NotImplemented
@staticmethod
def get_members(group_id):
"""Return the list of all members ids of the specified group.
:returns: list of all created group ids
:rtype: CoordAsyncResult
"""
raise tooz.NotImplemented
@staticmethod
def get_member_capabilities(group_id, member_id):
"""Return the capabilities of a member asynchronously.
:param group_id: the id of the group of the member
:type group_id: str
:param member_id: the id of the member
:type member_id: str
:returns: capabilities of a member
:rtype: CoordAsyncResult
"""
raise tooz.NotImplemented
@staticmethod
def get_member_info(group_id, member_id):
"""Return the statistics and capabilities of a member asynchronously.
:param group_id: the id of the group of the member
:type group_id: str
:param member_id: the id of the member
:type member_id: str
:returns: capabilities and statistics of a member
:rtype: CoordAsyncResult
"""
raise tooz.NotImplemented
@staticmethod
def update_capabilities(group_id, capabilities):
"""Update member capabilities in the specified group.
:param group_id: the id of the group of the current member
:type group_id: str
:param capabilities: the capabilities of the updated member
:type capabilities: object (typically str)
:returns: None
:rtype: CoordAsyncResult
"""
raise tooz.NotImplemented
@staticmethod
def get_leader(group_id):
"""Return the leader for a group.
:param group_id: the id of the group:
:returns: the leader
:rtype: CoordAsyncResult
"""
raise tooz.NotImplemented
@staticmethod
def get_lock(name):
"""Return a distributed lock.
This is a exclusive lock, a second call to acquire() will block or
return False.
:param name: The lock name that is used to identify it across all
nodes.
"""
raise tooz.NotImplemented
@staticmethod
def heartbeat():
"""Update member status to indicate it is still alive.
Method to run once in a while to be sure that the member is not dead
and is still an active member of a group.
:return: The number of seconds to wait before sending a new heartbeat.
"""
pass
@six.add_metaclass(abc.ABCMeta)
class CoordAsyncResult(object):
"""Representation of an asynchronous task.
Every call API returns an CoordAsyncResult object on which the result or
the status of the task can be requested.
"""
@abc.abstractmethod
def get(self, timeout=10):
"""Retrieve the result of the corresponding asynchronous call.
:param timeout: block until the timeout expire.
:type timeout: float
"""
@abc.abstractmethod
def done(self):
"""Returns True if the task is done, False otherwise."""
class _RunWatchersMixin(object):
"""Mixin to share the *mostly* common ``run_watchers`` implementation."""
def run_watchers(self, timeout=None):
with timeutils.StopWatch(duration=timeout) as w:
known_groups = self.get_groups().get(
timeout=w.leftover(return_none=True))
result = []
for group_id in known_groups:
try:
group_members_fut = self.get_members(group_id)
group_members = group_members_fut.get(
timeout=w.leftover(return_none=True))
except GroupNotCreated:
group_members = set()
else:
group_members = set(group_members)
if (group_id in self._joined_groups and
self._member_id not in group_members):
self._joined_groups.discard(group_id)
old_group_members = self._group_members.get(group_id, set())
for member_id in (group_members - old_group_members):
result.extend(
self._hooks_join_group[group_id].run(
MemberJoinedGroup(group_id, member_id)))
for member_id in (old_group_members - group_members):
result.extend(
self._hooks_leave_group[group_id].run(
MemberLeftGroup(group_id, member_id)))
self._group_members[group_id] = group_members
return result
def get_coordinator(backend_url, member_id,
characteristics=frozenset(), **kwargs):
"""Initialize and load the backend.
:param backend_url: the backend URL to use
:type backend: str
:param member_id: the id of the member
:type member_id: str
:param characteristics: set
:type characteristics: set of :py:class:`.Characteristics` that will
be matched to the requested driver (this **will**
become a **required** parameter in a future tooz
version)
:param kwargs: additional coordinator options (these take precedence over
options of the **same** name found in the ``backend_url``
arguments query string)
"""
parsed_url = netutils.urlsplit(backend_url)
parsed_qs = six.moves.urllib.parse.parse_qs(parsed_url.query)
if kwargs:
options = {}
for (k, v) in six.iteritems(kwargs):
options[k] = [v]
for (k, v) in six.iteritems(parsed_qs):
if k not in options:
options[k] = v
else:
options = parsed_qs
d = driver.DriverManager(
namespace=TOOZ_BACKENDS_NAMESPACE,
name=parsed_url.scheme,
invoke_on_load=True,
invoke_args=(member_id, parsed_url, options)).driver
characteristics = set(characteristics)
driver_characteristics = set(getattr(d, 'CHARACTERISTICS', set()))
missing_characteristics = characteristics - driver_characteristics
if missing_characteristics:
raise ToozDriverChosenPoorly("Desired characteristics %s"
" is not a strict subset of driver"
" characteristics %s, %s"
" characteristics were not found"
% (characteristics,
driver_characteristics,
missing_characteristics))
return d
class ToozError(Exception):
"""Exception raised when an internal error occurs.
Raised for instance in case of server internal error.
:ivar cause: the cause of the exception being raised, when not none this
will itself be an exception instance, this is useful for
creating a chain of exceptions for versions of python where
this is not yet implemented/supported natively.
"""
def __init__(self, message, cause=None):
super(ToozError, self).__init__(message)
self.cause = cause
class ToozDriverChosenPoorly(ToozError):
"""Raised when a driver does not match desired characteristics."""
class ToozConnectionError(ToozError):
"""Exception raised when the client cannot connect to the server."""
class OperationTimedOut(ToozError):
"""Exception raised when an operation times out."""
class LockAcquireFailed(ToozError):
"""Exception raised when a lock acquire fails in a context manager."""
class GroupNotCreated(ToozError):
"""Exception raised when the caller request an nonexistent group."""
def __init__(self, group_id):
self.group_id = group_id
super(GroupNotCreated, self).__init__(
"Group %s does not exist" % group_id)
class GroupAlreadyExist(ToozError):
"""Exception raised trying to create an already existing group."""
def __init__(self, group_id):
self.group_id = group_id
super(GroupAlreadyExist, self).__init__(
"Group %s already exists" % group_id)
class MemberAlreadyExist(ToozError):
"""Exception raised trying to join a group already joined."""
def __init__(self, group_id, member_id):
self.group_id = group_id
self.member_id = member_id
super(MemberAlreadyExist, self).__init__(
"Member %s has already joined %s" %
(member_id, group_id))
class MemberNotJoined(ToozError):
"""Exception raised trying to access a member not in a group."""
def __init__(self, group_id, member_id):
self.group_id = group_id
self.member_id = member_id
super(MemberNotJoined, self).__init__("Member %s has not joined %s" %
(member_id, group_id))
class GroupNotEmpty(ToozError):
"Exception raised when the caller try to delete a group with members."
def __init__(self, group_id):
self.group_id = group_id
super(GroupNotEmpty, self).__init__("Group %s is not empty" % group_id)
class WatchCallbackNotFound(ToozError):
"""Exception raised when unwatching a group.
Raised when the caller tries to unwatch a group with a callback that
does not exist.
"""
def __init__(self, group_id, callback):
self.group_id = group_id
self.callback = callback
super(WatchCallbackNotFound, self).__init__(
'Callback %s is not registered on group %s' %
(callback.__name__, group_id))
class SerializationError(ToozError):
"Exception raised when serialization or deserialization breaks."
def raise_with_cause(exc_cls, message, *args, **kwargs):
"""Helper to raise + chain exceptions (when able) and associate a *cause*.
**For internal usage only.**
NOTE(harlowja): Since in py3.x exceptions can be chained (due to
:pep:`3134`) we should try to raise the desired exception with the given
*cause*.
:param exc_cls: the :py:class:`~tooz.coordination.ToozError` class
to raise.
:param message: the text/str message that will be passed to
the exceptions constructor as its first positional
argument.
:param args: any additional positional arguments to pass to the
exceptions constructor.
:param kwargs: any additional keyword arguments to pass to the
exceptions constructor.
"""
if not issubclass(exc_cls, ToozError):
raise ValueError("Subclass of tooz error is required")
excutils.raise_with_cause(exc_cls, message, *args, **kwargs)
|