/usr/share/pyshared/rpm/transaction.py is in python-rpm 4.11.1-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | #!/usr/bin/python
import rpm
from rpm._rpm import ts as TransactionSetCore
# TODO: migrate relevant documentation from C-side
class TransactionSet(TransactionSetCore):
_probFilter = 0
def _wrapSetGet(self, attr, val):
oval = getattr(self, attr)
setattr(self, attr, val)
return oval
def setVSFlags(self, flags):
return self._wrapSetGet('_vsflags', flags)
def getVSFlags(self):
return self._vsflags
def setColor(self, color):
return self._wrapSetGet('_color', color)
def setPrefColor(self, color):
return self._wrapSetGet('_prefcolor', color)
def setFlags(self, flags):
return self._wrapSetGet('_flags', flags)
def setProbFilter(self, ignoreSet):
return self._wrapSetGet('_probFilter', ignoreSet)
def parseSpec(self, specfile):
import _rpmb
return _rpmb.spec(specfile)
def getKeys(self):
keys = []
for te in self:
keys.append(te.Key())
# Backwards compatibility goo - WTH does this return a *tuple* ?!
if not keys:
return None
else:
return tuple(keys)
def addInstall(self, item, key, how="u"):
if isinstance(item, basestring):
f = file(item)
header = self.hdrFromFdno(f)
f.close()
elif isinstance(item, file):
header = self.hdrFromFdno(item)
else:
header = item
if not how in ['u', 'i']:
raise ValueError('how argument must be "u" or "i"')
upgrade = (how == "u")
if not TransactionSetCore.addInstall(self, header, key, upgrade):
raise rpm.error("adding package to transaction failed")
def addErase(self, item):
hdrs = []
if isinstance(item, rpm.hdr):
hdrs = [item]
elif isinstance(item, rpm.mi):
hdrs = item
elif isinstance(item, int):
hdrs = self.dbMatch(rpm.RPMDBI_PACKAGES, item)
elif isinstance(item, basestring):
hdrs = self.dbMatch(rpm.RPMDBI_LABEL, item)
else:
raise TypeError("invalid type %s" % type(item))
for h in hdrs:
if not TransactionSetCore.addErase(self, h):
raise rpm.error("package not installed")
# garbage collection should take care but just in case...
if isinstance(hdrs, rpm.mi):
del hdrs
def run(self, callback, data):
rc = TransactionSetCore.run(self, callback, data, self._probFilter)
# crazy backwards compatibility goo: None for ok, list of problems
# if transaction didn't complete and empty list if it completed
# with errors
if rc == 0:
return None
res = []
if rc > 0:
for prob in self.problems():
item = ("%s" % prob, (prob.type, prob._str, prob._num))
res.append(item)
return res
def check(self, *args, **kwds):
TransactionSetCore.check(self, *args, **kwds)
# compatibility: munge problem strings into dependency tuples of doom
res = []
for p in self.problems():
# is it anything we need to care about?
if p.type == rpm.RPMPROB_CONFLICT:
sense = rpm.RPMDEP_SENSE_CONFLICTS
elif p.type == rpm.RPMPROB_REQUIRES:
sense = rpm.RPMDEP_SENSE_REQUIRES
else:
continue
# strip arch, split to name, version, release
nevr = p.altNEVR.rsplit('.', 1)[0]
n, v, r = nevr.rsplit('-', 2)
# extract the dependency information
needs = p._str.split()
needname = needs[0]
needflags = rpm.RPMSENSE_ANY
if len(needs) == 3:
needop = needs[1]
if needop.find('<') >= 0: needflags |= rpm.RPMSENSE_LESS
if needop.find('=') >= 0: needflags |= rpm.RPMSENSE_EQUAL
if needop.find('>') >= 0: needflags |= rpm.RPMSENSE_GREATER
needver = needs[2]
else:
needver = ""
res.append(((n, v, r),(needname,needver),needflags,sense,p.key))
return res
def hdrCheck(self, blob):
res, msg = TransactionSetCore.hdrCheck(self, blob)
# generate backwards compatibly broken exceptions
if res == rpm.RPMRC_NOKEY:
raise rpm.error("public key not available")
elif res == rpm.RPMRC_NOTTRUSTED:
raise rpm.error("public key not trusted")
elif res != rpm.RPMRC_OK:
raise rpm.error(msg)
def hdrFromFdno(self, fd):
res, h = TransactionSetCore.hdrFromFdno(self, fd)
# generate backwards compatibly broken exceptions
if res == rpm.RPMRC_NOKEY:
raise rpm.error("public key not available")
elif res == rpm.RPMRC_NOTTRUSTED:
raise rpm.error("public key not trusted")
elif res != rpm.RPMRC_OK:
raise rpm.error("error reading package header")
return h
|