This file is indexed.

/usr/lib/python3/dist-packages/os_win/utils/jobutils.py is in python3-os-win 1.2.0-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# Copyright 2015 Cloudbase Solutions Srl
#
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

"""
Base Utility class for operations on Hyper-V.
"""

import time

from oslo_log import log as logging

from os_win._i18n import _
from os_win import _utils
from os_win import constants
from os_win import exceptions
from os_win.utils import baseutils
from os_win.utils import win32utils

LOG = logging.getLogger(__name__)


class JobUtils(baseutils.BaseUtilsVirt):

    _CONCRETE_JOB_CLASS = "Msvm_ConcreteJob"

    _DEFAULT_JOB_TERMINATE_TIMEOUT = 15  # seconds
    _KILL_JOB_STATE_CHANGE_REQUEST = 5
    _WBEM_E_NOT_FOUND = 0x80041002

    _completed_job_states = [constants.JOB_STATE_COMPLETED,
                             constants.JOB_STATE_TERMINATED,
                             constants.JOB_STATE_KILLED,
                             constants.JOB_STATE_COMPLETED_WITH_WARNINGS]

    def check_ret_val(self, ret_val, job_path, success_values=[0]):
        """Checks that the job represented by the given arguments succeeded.

        Some Hyper-V operations are not atomic, and will return a reference
        to a job. In this case, this method will wait for the job's
        completion.

        :param ret_val: integer, representing the return value of the job.
            if the value is WMI_JOB_STATUS_STARTED or WMI_JOB_STATE_RUNNING,
            a job_path cannot be None.
        :param job_path: string representing the WMI object path of a
            Hyper-V job.
        :param success_values: list of return values that can be considered
            successful. WMI_JOB_STATUS_STARTED and WMI_JOB_STATE_RUNNING
            values are ignored.
        :raises exceptions.HyperVException: if the given ret_val is
            WMI_JOB_STATUS_STARTED or WMI_JOB_STATE_RUNNING and the state of
            job represented by the given job_path is not
            WMI_JOB_STATE_COMPLETED, or if the given ret_val is not in the
            list of given success_values.
        """
        if ret_val in [constants.WMI_JOB_STATUS_STARTED,
                       constants.WMI_JOB_STATE_RUNNING]:
            return self._wait_for_job(job_path)
        elif ret_val not in success_values:
            raise exceptions.HyperVException(
                _('Operation failed with return value: %s') % ret_val)

    def _wait_for_job(self, job_path):
        """Poll WMI job state and wait for completion."""

        job_wmi_path = job_path.replace('\\', '/')
        job = self._get_wmi_obj(job_wmi_path)

        while job.JobState == constants.WMI_JOB_STATE_RUNNING:
            time.sleep(0.1)
            job = self._get_wmi_obj(job_wmi_path)

        if job.JobState != constants.WMI_JOB_STATE_COMPLETED:
            job_state = job.JobState
            if job.path().Class == "Msvm_ConcreteJob":
                err_sum_desc = job.ErrorSummaryDescription
                err_desc = job.ErrorDescription
                err_code = job.ErrorCode
                data = {'job_state': job_state,
                        'err_sum_desc': err_sum_desc,
                        'err_desc': err_desc,
                        'err_code': err_code}
                raise exceptions.HyperVException(
                    _("WMI job failed with status %(job_state)d. "
                      "Error details: %(err_sum_desc)s - %(err_desc)s - "
                      "Error code: %(err_code)d") % data)
            else:
                (error, ret_val) = job.GetError()
                if not ret_val and error:
                    data = {'job_state': job_state,
                            'error': error}
                    raise exceptions.HyperVException(
                        _("WMI job failed with status %(job_state)d. "
                          "Error details: %(error)s") % data)
                else:
                    raise exceptions.HyperVException(
                        _("WMI job failed with status %d. No error "
                          "description available") % job_state)
        desc = job.Description
        elap = job.ElapsedTime
        LOG.debug("WMI job succeeded: %(desc)s, Elapsed=%(elap)s",
                  {'desc': desc, 'elap': elap})
        return job

    def _get_pending_jobs_affecting_element(self, element,
                                            ignore_error_state=True):
        # Msvm_AffectedJobElement is in fact an association between
        # the affected element and the affecting job.
        mappings = self._conn.Msvm_AffectedJobElement(
            AffectedElement=element.path_())
        pending_jobs = [
            mapping.AffectingElement
            for mapping in mappings
            if (mapping.AffectingElement and not
                self._is_job_completed(mapping.AffectingElement,
                                       ignore_error_state))]
        return pending_jobs

    def _stop_jobs(self, element):
        pending_jobs = self._get_pending_jobs_affecting_element(
            element, ignore_error_state=False)
        for job in pending_jobs:
            try:
                if not job.Cancellable:
                    LOG.debug("Got request to terminate "
                              "non-cancelable job.")
                    continue
                elif job.JobState == constants.JOB_STATE_EXCEPTION:
                    LOG.debug("Attempting to terminate exception state job.")

                job.RequestStateChange(
                    self._KILL_JOB_STATE_CHANGE_REQUEST)
            except exceptions.x_wmi as ex:
                hresult = win32utils.Win32Utils.get_com_error_hresult(
                    ex.com_error)
                # The job may had been completed right before we've
                # attempted to kill it.
                if not hresult == self._WBEM_E_NOT_FOUND:
                    LOG.debug("Failed to stop job. Exception: %s", ex)

        pending_jobs = self._get_pending_jobs_affecting_element(element)
        if pending_jobs:
            LOG.debug("Attempted to terminate jobs "
                      "affecting element %(element)s but "
                      "%(pending_count)s jobs are still pending.",
                      dict(element=element,
                           pending_count=len(pending_jobs)))
            raise exceptions.JobTerminateFailed()

    def _is_job_completed(self, job, ignore_error_state=True):
        return (job.JobState in self._completed_job_states or
                (job.JobState == constants.JOB_STATE_EXCEPTION and
                 ignore_error_state))

    def stop_jobs(self, element, timeout=_DEFAULT_JOB_TERMINATE_TIMEOUT):
        """Stops the Hyper-V jobs associated with the given resource.

        :param element: string representing the path of the Hyper-V resource
            whose jobs will be stopped.
        :param timeout: the maximum amount of time allowed to stop all the
            given resource's jobs.
        :raises exceptions.JobTerminateFailed: if there are still pending jobs
            associated with the given resource and the given timeout amount of
            time has passed.
        """
        @_utils.retry_decorator(exceptions=exceptions.JobTerminateFailed,
                                timeout=timeout, max_retry_count=None)
        def _stop_jobs_with_timeout():
            self._stop_jobs(element)

        _stop_jobs_with_timeout()

    @_utils.retry_decorator(exceptions=exceptions.HyperVException)
    def add_virt_resource(self, virt_resource, parent):
        (job_path, new_resources,
         ret_val) = self._vs_man_svc.AddResourceSettings(
            parent.path_(), [virt_resource.GetText_(1)])
        self.check_ret_val(ret_val, job_path)
        return new_resources

    # modify_virt_resource can fail, especially while setting up the VM's
    # serial port connection. Retrying the operation will yield success.
    @_utils.retry_decorator(exceptions=exceptions.HyperVException)
    def modify_virt_resource(self, virt_resource):
        (job_path, out_set_data,
         ret_val) = self._vs_man_svc.ModifyResourceSettings(
            ResourceSettings=[virt_resource.GetText_(1)])
        self.check_ret_val(ret_val, job_path)

    @_utils.retry_decorator(exceptions=exceptions.HyperVException)
    def remove_virt_resource(self, virt_resource):
        (job, ret_val) = self._vs_man_svc.RemoveResourceSettings(
            ResourceSettings=[virt_resource.path_()])
        self.check_ret_val(ret_val, job)

    def add_virt_feature(self, virt_feature, parent):
        self.add_multiple_virt_features([virt_feature], parent)

    @_utils.retry_decorator(exceptions=exceptions.HyperVException)
    def add_multiple_virt_features(self, virt_features, parent):
        (job_path, out_set_data,
         ret_val) = self._vs_man_svc.AddFeatureSettings(
            parent.path_(), [f.GetText_(1) for f in virt_features])
        self.check_ret_val(ret_val, job_path)

    def remove_virt_feature(self, virt_feature):
        self.remove_multiple_virt_features([virt_feature])

    def remove_multiple_virt_features(self, virt_features):
        (job_path, ret_val) = self._vs_man_svc.RemoveFeatureSettings(
            FeatureSettings=[f.path_() for f in virt_features])
        self.check_ret_val(ret_val, job_path)