This file is indexed.

/usr/lib/python3/dist-packages/pytestsalt/salt/engines/pytest_engine.py is in python3-pytestsalt 2018.1.13-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# -*- coding: utf-8 -*-
'''
    :codeauthor: :email:`Pedro Algarvio (pedro@algarvio.me)`
    :copyright: Copyright 2015 by the SaltStack Team, see AUTHORS for more details.
    :license: Apache 2.0, see LICENSE for more details.


    pytestsalt.engines.pytest_engine
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

    Simple salt engine which will setup a socket to accept connections allowing us to know
    when a daemon is up and running
'''

# Import python libs
from __future__ import absolute_import, print_function
import sys
import errno
import socket
import logging

# Import salt libs
import salt.utils.event

# Import 3rd-party libs
from tornado import gen
from tornado import ioloop
from tornado import netutil

log = logging.getLogger(__name__)

__virtualname__ = 'pytest'


def __virtual__():
    return 'pytest_port' in __opts__  # pylint: disable=undefined-variable


def start():
    pytest_engine = PyTestEngine(__opts__)  # pylint: disable=undefined-variable
    pytest_engine.start()


class PyTestEngine(object):
    def __init__(self, opts):
        self.opts = opts
        self.sock = None

    def start(self):
        self.io_loop = ioloop.IOLoop()
        self.io_loop.make_current()
        self.io_loop.add_callback(self._start)
        self.io_loop.start()

    @gen.coroutine
    def _start(self):
        if self.opts['__role'] == 'minion':
            yield self.listen_to_minion_connected_event()
        else:
            self.io_loop.spawn_callback(self.fire_master_started_event)

        port = int(self.opts['pytest_port'])
        log.info('Starting Pytest Engine(role=%s) on port %s', self.opts['__role'], port)
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.setblocking(0)
        # bind the socket to localhost on the config provided port
        self.sock.bind(('localhost', port))
        # become a server socket
        self.sock.listen(5)
        netutil.add_accept_handler(
            self.sock,
            self.handle_connection,
            io_loop=self.io_loop,
        )

    def handle_connection(self, connection, address):
        log.warning('Accepted connection from %s. Role: %s', address, self.opts['__role'])
        # We just need to know that the daemon running the engine is alive...
        try:
            connection.shutdown(socket.SHUT_RDWR)  # pylint: disable=no-member
            connection.close()
        except socket.error as exc:
            if not sys.platform.startswith('darwin'):
                raise
            try:
                if exc.errno != errno.ENOTCONN:
                    raise
            except AttributeError:
                # This is not macOS !?
                pass

    @gen.coroutine
    def listen_to_minion_connected_event(self):
        log.info('Listening for minion connected event...')
        minion_start_event_match = 'salt/minion/{0}/start'.format(self.opts['id'])
        event_bus = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'], listen=True)
        event_bus.subscribe(minion_start_event_match)
        while True:
            event = event_bus.get_event(full=True, no_block=True)
            if event is not None and event['tag'] == minion_start_event_match:
                log.info('Got minion connected event: %s', event)
                break
            yield gen.sleep(0.25)

    @gen.coroutine
    def fire_master_started_event(self):
        log.info('Firing salt-master started event...')
        event_bus = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'], listen=False)
        master_start_event_tag = 'salt/master/{0}/start'.format(self.opts['id'])
        load = {'id': self.opts['id'], 'tag': master_start_event_tag, 'data': {}}
        # One minute should be more than enough to fire these events every second in order
        # for pytest-salt to pickup that the master is running
        timeout = 60
        while True:
            timeout -= 1
            event_bus.fire_event(load, master_start_event_tag, timeout=500)
            if timeout <= 0:
                break
            yield gen.sleep(1)