/usr/share/ganeti/2.16/cluster-merge is in ganeti-2.16 2.16.0~rc2-1build1.
This file is owned by root:root, with mode 0o755.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 | #!/usr/bin/python
# Generated file; do not edit.
#
# Copyright (C) 2010, 2012 Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tool to merge two or more clusters together.
The clusters have to run the same version of Ganeti!
"""
# pylint: disable=C0103
# C0103: Invalid name cluster-merge
import logging
import os
import optparse
import shutil
import sys
import tempfile
from ganeti import cli
from ganeti import config
from ganeti import constants
from ganeti import errors
from ganeti import ssh
from ganeti import utils
from ganeti import pathutils
from ganeti import compat
_GROUPS_MERGE = "merge"
_GROUPS_RENAME = "rename"
_CLUSTERMERGE_ECID = "clustermerge-ecid"
_RESTART_ALL = "all"
_RESTART_UP = "up"
_RESTART_NONE = "none"
_RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
_PARAMS_STRICT = "strict"
_PARAMS_WARN = "warn"
_PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
action="store", type="int",
dest="pause_period",
help=("Amount of time in seconds watcher"
" should be suspended from running"))
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
choices=(_GROUPS_MERGE, _GROUPS_RENAME),
dest="groups",
help=("How to handle groups that have the"
" same name (One of: %s/%s)" %
(_GROUPS_MERGE, _GROUPS_RENAME)))
PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
metavar="STRATEGY",
choices=_PARAMS_CHOICES,
dest="params",
help=("How to handle params that have"
" different values (One of: %s/%s)" %
_PARAMS_CHOICES))
RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
metavar="STRATEGY",
choices=_RESTART_CHOICES,
dest="restart",
help=("How to handle restarting instances"
" same name (One of: %s/%s/%s)" %
_RESTART_CHOICES))
SKIP_STOP_INSTANCES_OPT = \
cli.cli_option("--skip-stop-instances", default=True, action="store_false",
dest="stop_instances",
help=("Don't stop the instances on the clusters, just check "
"that none is running"))
def Flatten(unflattened_list):
"""Flattens a list.
@param unflattened_list: A list of unflattened list objects.
@return: A flattened list
"""
flattened_list = []
for item in unflattened_list:
if isinstance(item, list):
flattened_list.extend(Flatten(item))
else:
flattened_list.append(item)
return flattened_list
class MergerData(object):
"""Container class to hold data used for merger.
"""
def __init__(self, cluster, key_path, nodes, instances, master_node,
config_path=None):
"""Initialize the container.
@param cluster: The name of the cluster
@param key_path: Path to the ssh private key used for authentication
@param nodes: List of online nodes in the merging cluster
@param instances: List of instances running on merging cluster
@param master_node: Name of the master node
@param config_path: Path to the merging cluster config
"""
self.cluster = cluster
self.key_path = key_path
self.nodes = nodes
self.instances = instances
self.master_node = master_node
self.config_path = config_path
class Merger(object):
"""Handling the merge.
"""
RUNNING_STATUSES = compat.UniqueFrozenset([
constants.INSTST_RUNNING,
constants.INSTST_ERRORUP,
])
def __init__(self, clusters, pause_period, groups, restart, params,
stop_instances):
"""Initialize object with sane defaults and infos required.
@param clusters: The list of clusters to merge in
@param pause_period: The time watcher shall be disabled for
@param groups: How to handle group conflicts
@param restart: How to handle instance restart
@param stop_instances: Indicates whether the instances must be stopped
(True) or if the Merger must only check if no
instances are running on the mergee clusters (False)
"""
self.merger_data = []
self.clusters = clusters
self.pause_period = pause_period
self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
(self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
self.ssh_runner = ssh.SshRunner(self.cluster_name)
self.groups = groups
self.restart = restart
self.params = params
self.stop_instances = stop_instances
if self.restart == _RESTART_UP:
raise NotImplementedError
def Setup(self):
"""Sets up our end so we can do the merger.
This method is setting us up as a preparation for the merger.
It makes the initial contact and gathers information needed.
@raise errors.RemoteError: for errors in communication/grabbing
"""
(remote_path, _, _) = ssh.GetUserFiles("root")
if self.cluster_name in self.clusters:
raise errors.CommandError("Cannot merge cluster %s with itself" %
self.cluster_name)
# Fetch remotes private key
for cluster in self.clusters:
result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
ask_key=False)
if result.failed:
raise errors.RemoteError("There was an error while grabbing ssh private"
" key from %s. Fail reason: %s; output: %s" %
(cluster, result.fail_reason, result.output))
key_path = utils.PathJoin(self.work_dir, cluster)
utils.WriteFile(key_path, mode=0600, data=result.stdout)
result = self._RunCmd(cluster, "gnt-node list -o name,offline"
" --no-headers --separator=,", private_key=key_path)
if result.failed:
raise errors.RemoteError("Unable to retrieve list of nodes from %s."
" Fail reason: %s; output: %s" %
(cluster, result.fail_reason, result.output))
nodes_statuses = [line.split(",") for line in result.stdout.splitlines()]
nodes = [node_status[0] for node_status in nodes_statuses
if node_status[1] == "N"]
result = self._RunCmd(cluster, "gnt-instance list -o name --no-headers",
private_key=key_path)
if result.failed:
raise errors.RemoteError("Unable to retrieve list of instances from"
" %s. Fail reason: %s; output: %s" %
(cluster, result.fail_reason, result.output))
instances = result.stdout.splitlines()
path = utils.PathJoin(pathutils.DATA_DIR, "ssconf_%s" %
constants.SS_MASTER_NODE)
result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
if result.failed:
raise errors.RemoteError("Unable to retrieve the master node name from"
" %s. Fail reason: %s; output: %s" %
(cluster, result.fail_reason, result.output))
master_node = result.stdout.strip()
self.merger_data.append(MergerData(cluster, key_path, nodes, instances,
master_node))
def _PrepareAuthorizedKeys(self):
"""Prepare the authorized_keys on every merging node.
This method add our public key to remotes authorized_key for further
communication.
"""
(_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
pub_key = utils.ReadFile(pub_key_file)
for data in self.merger_data:
for node in data.nodes:
result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
(auth_keys, pub_key)),
private_key=data.key_path, max_attempts=3)
if result.failed:
raise errors.RemoteError("Unable to add our public key to %s in %s."
" Fail reason: %s; output: %s" %
(node, data.cluster, result.fail_reason,
result.output))
def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
strict_host_check=False, private_key=None, batch=True,
ask_key=False, max_attempts=1):
"""Wrapping SshRunner.Run with default parameters.
For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
"""
for _ in range(max_attempts):
result = self.ssh_runner.Run(hostname=hostname, command=command,
user=user, use_cluster_key=use_cluster_key,
strict_host_check=strict_host_check,
private_key=private_key, batch=batch,
ask_key=ask_key)
if not result.failed:
break
return result
def _CheckRunningInstances(self):
"""Checks if on the clusters to be merged there are running instances
@rtype: boolean
@return: True if there are running instances, False otherwise
"""
for cluster in self.clusters:
result = self._RunCmd(cluster, "gnt-instance list -o status")
if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
return True
return False
def _StopMergingInstances(self):
"""Stop instances on merging clusters.
"""
for cluster in self.clusters:
result = self._RunCmd(cluster, "gnt-instance shutdown --all"
" --force-multiple")
if result.failed:
raise errors.RemoteError("Unable to stop instances on %s."
" Fail reason: %s; output: %s" %
(cluster, result.fail_reason, result.output))
def _DisableWatcher(self):
"""Disable watch on all merging clusters, including ourself.
"""
for cluster in ["localhost"] + self.clusters:
result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
self.pause_period)
if result.failed:
raise errors.RemoteError("Unable to pause watcher on %s."
" Fail reason: %s; output: %s" %
(cluster, result.fail_reason, result.output))
def _RemoveMasterIps(self):
"""Removes the master IPs from the master nodes of each cluster.
"""
for data in self.merger_data:
result = self._RunCmd(data.master_node,
"gnt-cluster deactivate-master-ip --yes")
if result.failed:
raise errors.RemoteError("Unable to remove master IP on %s."
" Fail reason: %s; output: %s" %
(data.master_node,
result.fail_reason,
result.output))
def _StopDaemons(self):
"""Stop all daemons on merging nodes.
"""
cmd = "%s stop-all" % pathutils.DAEMON_UTIL
for data in self.merger_data:
for node in data.nodes:
result = self._RunCmd(node, cmd, max_attempts=3)
if result.failed:
raise errors.RemoteError("Unable to stop daemons on %s."
" Fail reason: %s; output: %s." %
(node, result.fail_reason, result.output))
def _FetchRemoteConfig(self):
"""Fetches and stores remote cluster config from the master.
This step is needed before we can merge the config.
"""
for data in self.merger_data:
result = self._RunCmd(data.cluster, "cat %s" %
pathutils.CLUSTER_CONF_FILE)
if result.failed:
raise errors.RemoteError("Unable to retrieve remote config on %s."
" Fail reason: %s; output %s" %
(data.cluster, result.fail_reason,
result.output))
data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
data.cluster)
utils.WriteFile(data.config_path, data=result.stdout)
# R0201: Method could be a function
def _KillMasterDaemon(self): # pylint: disable=R0201
"""Kills the local master daemon.
@raise errors.CommandError: If unable to kill
"""
result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
if result.failed:
raise errors.CommandError("Unable to stop master daemons."
" Fail reason: %s; output: %s" %
(result.fail_reason, result.output))
def _MergeConfig(self):
"""Merges all foreign config into our own config.
"""
my_config = config.ConfigWriter(offline=True)
fake_ec_id = 0 # Needs to be uniq over the whole config merge
for data in self.merger_data:
other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
self._MergeClusterConfigs(my_config, other_config)
self._MergeNodeGroups(my_config, other_config)
for node in other_config.GetNodeList():
node_info = other_config.GetNodeInfo(node)
# Offline the node, it will be reonlined later at node readd
node_info.master_candidate = False
node_info.drained = False
node_info.offline = True
my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
fake_ec_id += 1
for instance in other_config.GetInstanceList():
instance_info = other_config.GetInstanceInfo(instance)
# Update the DRBD port assignments
# This is a little bit hackish
for dsk in instance_info.disks:
if dsk.dev_type in constants.DTS_DRBD:
port = my_config.AllocatePort()
logical_id = list(dsk.logical_id)
logical_id[2] = port
dsk.logical_id = tuple(logical_id)
my_config.AddInstance(instance_info,
_CLUSTERMERGE_ECID + str(fake_ec_id))
fake_ec_id += 1
def _MergeClusterConfigs(self, my_config, other_config):
"""Checks that all relevant cluster parameters are compatible
"""
my_cluster = my_config.GetClusterInfo()
other_cluster = other_config.GetClusterInfo()
err_count = 0
#
# Generic checks
#
check_params = [
"beparams",
"default_iallocator",
"drbd_usermode_helper",
"hidden_os",
"maintain_node_health",
"master_netdev",
"ndparams",
"nicparams",
"primary_ip_family",
"tags",
"uid_pool",
]
check_params_strict = [
"volume_group_name",
]
if my_cluster.IsFileStorageEnabled() or \
other_cluster.IsFileStorageEnabled():
check_params_strict.append("file_storage_dir")
if my_cluster.IsSharedFileStorageEnabled() or \
other_cluster.IsSharedFileStorageEnabled():
check_params_strict.append("shared_file_storage_dir")
check_params.extend(check_params_strict)
params_strict = (self.params == _PARAMS_STRICT)
for param_name in check_params:
my_param = getattr(my_cluster, param_name)
other_param = getattr(other_cluster, param_name)
if my_param != other_param:
logging.error("The value (%s) of the cluster parameter %s on %s"
" differs to this cluster's value (%s)",
other_param, param_name, other_cluster.cluster_name,
my_param)
if params_strict or param_name in check_params_strict:
err_count += 1
#
# Custom checks
#
# Check default hypervisor
my_defhyp = my_cluster.enabled_hypervisors[0]
other_defhyp = other_cluster.enabled_hypervisors[0]
if my_defhyp != other_defhyp:
logging.warning("The default hypervisor (%s) differs on %s, new"
" instances will be created with this cluster's"
" default hypervisor (%s)", other_defhyp,
other_cluster.cluster_name, my_defhyp)
if (set(my_cluster.enabled_hypervisors) !=
set(other_cluster.enabled_hypervisors)):
logging.error("The set of enabled hypervisors (%s) on %s differs to"
" this cluster's set (%s)",
other_cluster.enabled_hypervisors,
other_cluster.cluster_name, my_cluster.enabled_hypervisors)
err_count += 1
# Check hypervisor params for hypervisors we care about
for hyp in my_cluster.enabled_hypervisors:
for param in my_cluster.hvparams[hyp]:
my_value = my_cluster.hvparams[hyp][param]
other_value = other_cluster.hvparams[hyp][param]
if my_value != other_value:
logging.error("The value (%s) of the %s parameter of the %s"
" hypervisor on %s differs to this cluster's parameter"
" (%s)",
other_value, param, hyp, other_cluster.cluster_name,
my_value)
if params_strict:
err_count += 1
# Check os hypervisor params for hypervisors we care about
for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
for hyp in my_cluster.enabled_hypervisors:
my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
if my_os_hvp != other_os_hvp:
logging.error("The OS parameters (%s) for the %s OS for the %s"
" hypervisor on %s differs to this cluster's parameters"
" (%s)",
other_os_hvp, os_name, hyp, other_cluster.cluster_name,
my_os_hvp)
if params_strict:
err_count += 1
#
# Warnings
#
if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
logging.warning("The modify_etc_hosts value (%s) differs on %s,"
" this cluster's value (%s) will take precedence",
other_cluster.modify_etc_hosts,
other_cluster.cluster_name,
my_cluster.modify_etc_hosts)
if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
logging.warning("The modify_ssh_setup value (%s) differs on %s,"
" this cluster's value (%s) will take precedence",
other_cluster.modify_ssh_setup,
other_cluster.cluster_name,
my_cluster.modify_ssh_setup)
#
# Actual merging
#
my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
other_cluster.reserved_lvs))
if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
" cluster's value (%s). The least permissive value (%s)"
" will be used", other_cluster.prealloc_wipe_disks,
other_cluster.cluster_name,
my_cluster.prealloc_wipe_disks, True)
my_cluster.prealloc_wipe_disks = True
for os_, osparams in other_cluster.osparams.items():
if os_ not in my_cluster.osparams:
my_cluster.osparams[os_] = osparams
elif my_cluster.osparams[os_] != osparams:
logging.error("The OS parameters (%s) for the %s OS on %s differs to"
" this cluster's parameters (%s)",
osparams, os_, other_cluster.cluster_name,
my_cluster.osparams[os_])
if params_strict:
err_count += 1
if err_count:
raise errors.ConfigurationError("Cluster config for %s has incompatible"
" values, please fix and re-run" %
other_cluster.cluster_name)
# R0201: Method could be a function
def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201
if os_name in cluster.os_hvp:
return cluster.os_hvp[os_name].get(hyp, None)
else:
return None
# R0201: Method could be a function
def _MergeNodeGroups(self, my_config, other_config):
"""Adds foreign node groups
ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
"""
# pylint: disable=R0201
logging.info("Node group conflict strategy: %s", self.groups)
my_grps = my_config.GetAllNodeGroupsInfo().values()
other_grps = other_config.GetAllNodeGroupsInfo().values()
# Check for node group naming conflicts:
conflicts = []
for other_grp in other_grps:
for my_grp in my_grps:
if other_grp.name == my_grp.name:
conflicts.append(other_grp)
if conflicts:
conflict_names = utils.CommaJoin([g.name for g in conflicts])
logging.info("Node groups in both local and remote cluster: %s",
conflict_names)
# User hasn't specified how to handle conflicts
if not self.groups:
raise errors.CommandError("The following node group(s) are in both"
" clusters, and no merge strategy has been"
" supplied (see the --groups option): %s" %
conflict_names)
# User wants to rename conflicts
elif self.groups == _GROUPS_RENAME:
for grp in conflicts:
new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
logging.info("Renaming remote node group from %s to %s"
" to resolve conflict", grp.name, new_name)
grp.name = new_name
# User wants to merge conflicting groups
elif self.groups == _GROUPS_MERGE:
for other_grp in conflicts:
logging.info("Merging local and remote '%s' groups", other_grp.name)
for node_name in other_grp.members[:]:
node = other_config.GetNodeInfo(node_name)
# Access to a protected member of a client class
# pylint: disable=W0212
other_config._UnlockedRemoveNodeFromGroup(node)
# Access to a protected member of a client class
# pylint: disable=W0212
my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
# Access to a protected member of a client class
# pylint: disable=W0212
my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
node.group = my_grp_uuid
# Remove from list of groups to add
other_grps.remove(other_grp)
for grp in other_grps:
#TODO: handle node group conflicts
my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
# R0201: Method could be a function
def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201
"""Starts the local master daemon.
@param no_vote: Should the masterd started without voting? default: False
@raise errors.CommandError: If unable to start daemon.
"""
env = {}
if no_vote:
env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
if result.failed:
raise errors.CommandError("Couldn't start ganeti master."
" Fail reason: %s; output: %s" %
(result.fail_reason, result.output))
def _ReaddMergedNodesAndRedist(self):
"""Readds all merging nodes and make sure their config is up-to-date.
@raise errors.CommandError: If anything fails.
"""
for data in self.merger_data:
for node in data.nodes:
logging.info("Readding node %s", node)
result = utils.RunCmd(["gnt-node", "add", "--readd",
"--no-ssh-key-check", node])
if result.failed:
logging.error("%s failed to be readded. Reason: %s, output: %s",
node, result.fail_reason, result.output)
result = utils.RunCmd(["gnt-cluster", "redist-conf"])
if result.failed:
raise errors.CommandError("Redistribution failed. Fail reason: %s;"
" output: %s" % (result.fail_reason,
result.output))
# R0201: Method could be a function
def _StartupAllInstances(self): # pylint: disable=R0201
"""Starts up all instances (locally).
@raise errors.CommandError: If unable to start clusters
"""
result = utils.RunCmd(["gnt-instance", "startup", "--all",
"--force-multiple"])
if result.failed:
raise errors.CommandError("Unable to start all instances."
" Fail reason: %s; output: %s" %
(result.fail_reason, result.output))
# R0201: Method could be a function
# TODO: make this overridable, for some verify errors
def _VerifyCluster(self): # pylint: disable=R0201
"""Runs gnt-cluster verify to verify the health.
@raise errors.ProgrammError: If cluster fails on verification
"""
result = utils.RunCmd(["gnt-cluster", "verify"])
if result.failed:
raise errors.CommandError("Verification of cluster failed."
" Fail reason: %s; output: %s" %
(result.fail_reason, result.output))
def Merge(self):
"""Does the actual merge.
It runs all the steps in the right order and updates the user about steps
taken. Also it keeps track of rollback_steps to undo everything.
"""
rbsteps = []
try:
logging.info("Pre cluster verification")
self._VerifyCluster()
logging.info("Prepare authorized_keys")
rbsteps.append("Remove our key from authorized_keys on nodes:"
" %(nodes)s")
self._PrepareAuthorizedKeys()
rbsteps.append("Start all instances again on the merging"
" clusters: %(clusters)s")
if self.stop_instances:
logging.info("Stopping merging instances (takes a while)")
self._StopMergingInstances()
logging.info("Checking that no instances are running on the mergees")
instances_running = self._CheckRunningInstances()
if instances_running:
raise errors.CommandError("Some instances are still running on the"
" mergees")
logging.info("Disable watcher")
self._DisableWatcher()
logging.info("Merging config")
self._FetchRemoteConfig()
logging.info("Removing master IPs on mergee master nodes")
self._RemoveMasterIps()
logging.info("Stop daemons on merging nodes")
self._StopDaemons()
logging.info("Stopping master daemon")
self._KillMasterDaemon()
rbsteps.append("Restore %s from another master candidate"
" and restart master daemon" %
pathutils.CLUSTER_CONF_FILE)
self._MergeConfig()
self._StartMasterDaemon(no_vote=True)
# Point of no return, delete rbsteps
del rbsteps[:]
logging.warning("We are at the point of no return. Merge can not easily"
" be undone after this point.")
logging.info("Readd nodes")
self._ReaddMergedNodesAndRedist()
logging.info("Merge done, restart master daemon normally")
self._KillMasterDaemon()
self._StartMasterDaemon()
if self.restart == _RESTART_ALL:
logging.info("Starting instances again")
self._StartupAllInstances()
else:
logging.info("Not starting instances again")
logging.info("Post cluster verification")
self._VerifyCluster()
except errors.GenericError, e:
logging.exception(e)
if rbsteps:
nodes = Flatten([data.nodes for data in self.merger_data])
info = {
"clusters": self.clusters,
"nodes": nodes,
}
logging.critical("In order to rollback do the following:")
for step in rbsteps:
logging.critical(" * %s", step % info)
else:
logging.critical("Nothing to rollback.")
# TODO: Keep track of steps done for a flawless resume?
def Cleanup(self):
"""Clean up our environment.
This cleans up remote private keys and configs and after that
deletes the temporary directory.
"""
shutil.rmtree(self.work_dir)
def main():
"""Main routine.
"""
program = os.path.basename(sys.argv[0])
parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
prog=program)
parser.add_option(cli.DEBUG_OPT)
parser.add_option(cli.VERBOSE_OPT)
parser.add_option(PAUSE_PERIOD_OPT)
parser.add_option(GROUPS_OPT)
parser.add_option(RESTART_OPT)
parser.add_option(PARAMS_OPT)
parser.add_option(SKIP_STOP_INSTANCES_OPT)
(options, args) = parser.parse_args()
utils.SetupToolLogging(options.debug, options.verbose)
if not args:
parser.error("No clusters specified")
cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
options.groups, options.restart, options.params,
options.stop_instances)
try:
try:
cluster_merger.Setup()
cluster_merger.Merge()
except errors.GenericError, e:
logging.exception(e)
return constants.EXIT_FAILURE
finally:
cluster_merger.Cleanup()
return constants.EXIT_SUCCESS
if __name__ == "__main__":
sys.exit(main())
|