/usr/lib/python2.7/dist-packages/Bcfg2/Server/Plugins/Svn.py is in bcfg2-server 1.4.0~pre2+git141-g6d40dace6358-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 | """ The Svn plugin provides a revision interface for Bcfg2 repos using
Subversion. If PySvn libraries are installed, then it exposes two
additional XML-RPC methods for committing data to the repository and
updating the repository. """
import sys
import Bcfg2.Options
import Bcfg2.Server.Plugin
try:
import pysvn
HAS_SVN = True
except ImportError:
from Bcfg2.Utils import Executor
HAS_SVN = False
class Svn(Bcfg2.Server.Plugin.Version):
"""Svn is a version plugin for dealing with Bcfg2 repos."""
options = Bcfg2.Server.Plugin.Version.options + [
Bcfg2.Options.Option(
cf=("svn", "user"), dest="svn_user", help="SVN username"),
Bcfg2.Options.Option(
cf=("svn", "password"), dest="svn_password", help="SVN password"),
Bcfg2.Options.BooleanOption(
cf=("svn", "always_trust"), dest="svn_trust_ssl",
help="Always trust SSL certs from SVN server")]
if HAS_SVN:
options.append(
Bcfg2.Options.Option(
cf=("svn", "conflict_resolution"),
dest="svn_conflict_resolution",
type=lambda v: v.replace("-", "_"),
# pylint: disable=E1101
choices=dir(pysvn.wc_conflict_choice),
default=pysvn.wc_conflict_choice.postpone,
# pylint: enable=E1101
help="SVN conflict resolution method"))
__author__ = 'bcfg-dev@mcs.anl.gov'
__vcs_metadata_path__ = ".svn"
if HAS_SVN:
__rmi__ = Bcfg2.Server.Plugin.Version.__rmi__ + ['Update', 'Commit']
else:
__vcs_metadata_path__ = ".svn"
def __init__(self, core):
Bcfg2.Server.Plugin.Version.__init__(self, core)
self.revision = None
self.svn_root = None
self.client = None
self.cmd = None
if not HAS_SVN:
self.logger.debug("Svn: PySvn not found, using CLI interface to "
"SVN")
self.cmd = Executor()
else:
self.client = pysvn.Client()
self.debug_log("Svn: Conflicts will be resolved with %s" %
Bcfg2.Options.setup.svn_conflict_resolution)
self.client.callback_conflict_resolver = self.conflict_resolver
if Bcfg2.Options.setup.svn_trust_ssl:
self.client.callback_ssl_server_trust_prompt = \
self.ssl_server_trust_prompt
if (Bcfg2.Options.setup.svn_user and
Bcfg2.Options.setup.svn_password):
self.client.callback_get_login = self.get_login
self.logger.debug("Svn: Initialized svn plugin with SVN directory %s" %
self.vcs_path)
def get_login(self, realm, username, may_save): # pylint: disable=W0613
""" PySvn callback to get credentials for HTTP basic authentication """
self.logger.debug("Svn: Logging in with username: %s" %
Bcfg2.Options.setup.svn_user)
return (True,
Bcfg2.Options.setup.svn_user,
Bcfg2.Options.setup.svn_password,
False)
def ssl_server_trust_prompt(self, trust_dict):
""" PySvn callback to always trust SSL certificates from SVN server """
self.logger.debug("Svn: Trusting SSL certificate from %s, "
"issued by %s for realm %s" %
(trust_dict['hostname'],
trust_dict['issuer_dname'],
trust_dict['realm']))
return True, trust_dict['failures'], False
def conflict_resolver(self, conflict_description):
""" PySvn callback function to resolve conflicts """
self.logger.info("Svn: Resolving conflict for %s with %s" %
(conflict_description['path'],
Bcfg2.Options.setup.svn_conflict_resolution))
return Bcfg2.Options.setup.svn_conflict_resolution, None, False
def get_revision(self):
"""Read svn revision information for the Bcfg2 repository."""
msg = None
if HAS_SVN:
try:
info = self.client.info(Bcfg2.Options.setup.vcs_root)
self.revision = info.revision
self.svn_root = info.url
return str(self.revision.number)
except pysvn.ClientError: # pylint: disable=E1101
msg = "Svn: Failed to get revision: %s" % sys.exc_info()[1]
else:
result = self.cmd.run(["env LC_ALL=C", "svn", "info",
Bcfg2.Options.setup.vcs_root],
shell=True)
if result.success:
self.revision = [line.split(': ')[1]
for line in result.stdout.splitlines()
if line.startswith('Revision:')][-1]
return self.revision
else:
msg = "Failed to read svn info: %s" % result.error
self.revision = None
raise Bcfg2.Server.Plugin.PluginExecutionError(msg)
def Update(self):
'''Svn.Update() => True|False\nUpdate svn working copy\n'''
try:
old_revision = self.revision.number
self.revision = self.client.update(Bcfg2.Options.setup.vcs_root,
recurse=True)[0]
except pysvn.ClientError: # pylint: disable=E1101
err = sys.exc_info()[1]
# try to be smart about the error we got back
details = None
if "callback_ssl_server_trust_prompt" in str(err):
details = "SVN server certificate is not trusted"
elif "callback_get_login" in str(err):
details = "SVN credentials not cached"
if details is None:
self.logger.error("Svn: Failed to update server repository",
exc_info=1)
else:
self.logger.error("Svn: Failed to update server repository: "
"%s" % details)
return False
if old_revision == self.revision.number:
self.logger.debug("repository is current")
else:
self.logger.info("Updated %s from revision %s to %s" %
(Bcfg2.Options.setup.vcs_root, old_revision,
self.revision.number))
return True
def Commit(self):
"""Svn.Commit() => True|False\nCommit svn repository\n"""
# First try to update
if not self.Update():
self.logger.error("Failed to update svn repository, refusing to "
"commit changes")
return False
try:
self.revision = self.client.checkin([Bcfg2.Options.setup.vcs_root],
'Svn: autocommit',
recurse=True)
self.revision = self.client.update(Bcfg2.Options.setup.vcs_root,
recurse=True)[0]
self.logger.info("Svn: Commited changes. At %s" %
self.revision.number)
return True
except pysvn.ClientError: # pylint: disable=E1101
err = sys.exc_info()[1]
# try to be smart about the error we got back
details = None
if "callback_ssl_server_trust_prompt" in str(err):
details = "SVN server certificate is not trusted"
elif "callback_get_login" in str(err):
details = "SVN credentials not cached"
if details is None:
self.logger.error("Svn: Failed to commit changes",
exc_info=1)
else:
self.logger.error("Svn: Failed to commit changes: %s" %
details)
return False
|