/usr/share/pyshared/mastertickets/model.py is in trac-mastertickets 3.0.2+20111224-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | # Created by Noah Kantrowitz on 2007-07-04.
# Copyright (c) 2007 Noah Kantrowitz. All rights reserved.
import copy
from datetime import datetime
from trac.ticket.model import Ticket
from trac.util.compat import set, sorted
from trac.util.datefmt import utc, to_utimestamp
class TicketLinks(object):
"""A model for the ticket links used MasterTickets."""
def __init__(self, env, tkt, db=None):
self.env = env
if not isinstance(tkt, Ticket):
tkt = Ticket(self.env, tkt)
self.tkt = tkt
db = db or self.env.get_db_cnx()
cursor = db.cursor()
cursor.execute('SELECT dest FROM mastertickets WHERE source=%s ORDER BY dest', (self.tkt.id,))
self.blocking = set([int(num) for num, in cursor])
self._old_blocking = copy.copy(self.blocking)
cursor.execute('SELECT source FROM mastertickets WHERE dest=%s ORDER BY source', (self.tkt.id,))
self.blocked_by = set([int(num) for num, in cursor])
self._old_blocked_by = copy.copy(self.blocked_by)
def save(self, author, comment='', when=None, db=None):
"""Save new links."""
if when is None:
when = datetime.now(utc)
when_ts = to_utimestamp(when)
handle_commit = False
if db is None:
db = self.env.get_db_cnx()
handle_commit = True
cursor = db.cursor()
new_blocking = set(int(n) for n in self.blocking)
new_blocked_by = set(int(n) for n in self.blocked_by)
to_check = [
# new, old, field
(new_blocking, self._old_blocking, 'blockedby', ('source', 'dest')),
(new_blocked_by, self._old_blocked_by, 'blocking', ('dest', 'source')),
]
for new_ids, old_ids, field, sourcedest in to_check:
for n in new_ids | old_ids:
update_field = None
if n in new_ids and n not in old_ids:
# New ticket added
cursor.execute('INSERT INTO mastertickets (%s, %s) VALUES (%%s, %%s)'%sourcedest, (self.tkt.id, n))
update_field = lambda lst: lst.append(str(self.tkt.id))
elif n not in new_ids and n in old_ids:
# Old ticket removed
cursor.execute('DELETE FROM mastertickets WHERE %s=%%s AND %s=%%s'%sourcedest, (self.tkt.id, n))
update_field = lambda lst: lst.remove(str(self.tkt.id))
if update_field is not None:
cursor.execute('SELECT value FROM ticket_custom WHERE ticket=%s AND name=%s',
(n, str(field)))
old_value = (cursor.fetchone() or ('',))[0]
new_value = [x.strip() for x in old_value.split(',') if x.strip()]
update_field(new_value)
new_value = ', '.join(sorted(new_value, key=lambda x: int(x)))
cursor.execute('INSERT INTO ticket_change (ticket, time, author, field, oldvalue, newvalue) VALUES (%s, %s, %s, %s, %s, %s)',
(n, when_ts, author, field, old_value, new_value))
if comment:
cursor.execute('INSERT INTO ticket_change (ticket, time, author, field, oldvalue, newvalue) VALUES (%s, %s, %s, %s, %s, %s)',
(n, when_ts, author, 'comment', '', '(In #%s) %s'%(self.tkt.id, comment)))
cursor.execute('UPDATE ticket_custom SET value=%s WHERE ticket=%s AND name=%s',
(new_value, n, field))
# refresh the changetime to prevent concurrent edits
cursor.execute('UPDATE ticket SET changetime=%s WHERE id=%s', (when_ts,n))
if not cursor.rowcount:
cursor.execute('INSERT INTO ticket_custom (ticket, name, value) VALUES (%s, %s, %s)',
(n, field, new_value))
# cursor.execute('DELETE FROM mastertickets WHERE source=%s OR dest=%s', (self.tkt.id, self.tkt.id))
# data = []
# for tkt in self.blocking:
# if isinstance(tkt, Ticket):
# tkt = tkt.id
# data.append((self.tkt.id, tkt))
# for tkt in self.blocked_by:
# if isisntance(tkt, Ticket):
# tkt = tkt.id
# data.append((tkt, self.tkt.id))
#
# cursor.executemany('INSERT INTO mastertickets (source, dest) VALUES (%s, %s)', data)
if handle_commit:
db.commit()
def __nonzero__(self):
return bool(self.blocking) or bool(self.blocked_by)
def __repr__(self):
def l(arr):
arr2 = []
for tkt in arr:
if isinstance(tkt, Ticket):
tkt = tkt.id
arr2.append(str(tkt))
return '[%s]'%','.join(arr2)
return '<mastertickets.model.TicketLinks #%s blocking=%s blocked_by=%s>'% \
(self.tkt.id, l(getattr(self, 'blocking', [])), l(getattr(self, 'blocked_by', [])))
@staticmethod
def walk_tickets(env, tkt_ids):
"""Return an iterable of all links reachable directly above or below those ones."""
def visit(tkt, memo, next_fn):
if tkt in memo:
return False
links = TicketLinks(env, tkt)
memo[tkt] = links
for n in next_fn(links):
visit(n, memo, next_fn)
memo1 = {}
memo2 = {}
for id in tkt_ids:
visit(id, memo1, lambda links: links.blocking)
visit(id, memo2, lambda links: links.blocked_by)
memo1.update(memo2)
return memo1.itervalues()
|