/usr/lib/python2.7/dist-packages/jenkinsapi/queue.py is in python-jenkinsapi 0.2.16-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | """
Queue module for jenkinsapi
"""
from jenkinsapi.jenkinsbase import JenkinsBase
from jenkinsapi.custom_exceptions import UnknownQueueItem
import logging
log = logging.getLogger(__name__)
class Queue(JenkinsBase):
"""
Class that represents the Jenkins queue
"""
def __init__(self, baseurl, jenkins_obj):
"""
Init the Jenkins queue object
:param baseurl: basic url for the queue
:param jenkins_obj: ref to the jenkins obj
"""
self.jenkins = jenkins_obj
JenkinsBase.__init__(self, baseurl)
def __str__(self):
return self.baseurl
def get_jenkins_obj(self):
return self.jenkins
def iteritems(self):
for item in self._data['items']:
yield item['id'], QueueItem(self.jenkins, **item)
def iterkeys(self):
for item in self._data['items']:
yield item['id']
def iterivalues(self):
for item in self._data['items']:
yield QueueItem(self.jenkins, **item)
def keys(self):
return list(self.iterkeys())
def values(self):
return list(self.itervalues())
def __len__(self):
return len(self._data['items'])
def __getitem__(self, item_id):
self_as_dict = dict(self.iteritems())
if item_id in self_as_dict:
return self_as_dict[item_id]
else:
raise UnknownQueueItem(item_id)
def get_queue_items_for_job(self, job_name):
if not job_name:
return [QueueItem(self.jenkins, **item) for item in self._data['items']]
else:
return [QueueItem(self.jenkins, **item) for item in self._data['items']
if item['task']['name'] == job_name]
def delete_item(self, queue_item):
self.delete_item_by_id(queue_item.id)
def delete_item_by_id(self, item_id):
deleteurl = '%s/cancelItem?id=%s' % (self.baseurl, item_id)
self.get_jenkins_obj().requester.post_url(deleteurl)
class QueueItem(object):
"""
Flexible class to handle queue items. If the Jenkins API changes this support
those changes
"""
def __init__(self, jenkins, **kwargs):
self.jenkins = jenkins
self.__dict__.update(kwargs)
def get_job(self):
"""
Return the job associated with this queue item
"""
return self.jenkins[self.task['name']]
def get_parameters(self):
"""returns parameters of queue item"""
actions = getattr(self, 'actions', [])
for action in actions:
if type(action) is dict and 'parameters' in action:
parameters = action['parameters']
return dict([(x['name'], x['value']) for x in parameters])
return []
def __repr__(self):
return "<%s.%s %s>" % (self.__class__.__module__, self.__class__.__name__, str(self))
def __str__(self):
return "%s #%i" % (self.task['name'], self.id)
|