This file is indexed.

/usr/lib/python3/dist-packages/plainbox/vendor/extcmd/__init__.py is in python3-plainbox 0.25-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
# encoding: UTF-8
# Copyright (c) 2010-2012 Linaro Limited
# Copyright (c) 2013 Canonical Ltd.
#
# Author: Zygmunt Krynicki <zygmunt.krynicki@linaro.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
:mod:`plainbox.vendor.extcmd` - subprocess with advanced output processing
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Unlike subprocess, which just gives you a lump of output at the end, extcmd
allows you to get callbacks (via a delegate class) on all IO.

Delegates
=========

Each delegate has four methods (on_begin, on_line, on_end, on_interrupt) but it
is possible to simply specify the ones you are interested in and extcmd will do
the right thing automatically. There is an associated interface
extcmd.IDelegate, if you subclass that class in your delegates then extcmd will
trust you do everything properly. If you pass any other object as delegate then
extcmd will wrap your object in extcmd.SafeDelegate that which provides default
implementations of all the required methods.

To make some common cases easier to work with, extcmd comes with a number of
utility callback delegates: decoding and encoding from bytes to Unicode,
transforming the data, redirecting the output to other streams and even forking
the output so that you can, for example, log and display data at the same time.


Everything is encapsulated in a single module, extcmd::

    >>> from __future__ import with_statement
    >>> import extcmd

Basic IO
========

Since IO is oriented around bytes, the first example will actually focus on
getting the basic IO work-flow right: convert bytes and Unicode at application
boundaries. Sadly Popen does not support that, let's build one that does::

    >>> unicode_popen = extcmd.ExternalCommandWithDelegate(
    ...     extcmd.Decode(
    ...         extcmd.EncodeInPython2(
    ...             extcmd.Redirect())))

So this may be somewhat hard to read but the basic looks like this:

We instantiate the ExtrnalCommandWithDelegate, this is like subprocess.Popen
metaclass as the return value is something we can use to actually run call() or
check_call(). The only argument to that is a single delegate object. We'll use
three of the delegates provided by extcmd here.

The Decode delegate simply decodes all IO from a specified encoding (uses UTF-8
by default). The Encode delegate does the reverse (which is a real no-op but
we'll grow this example in a second and it will be useful to have Unicode
strings then). Lastly the Redirect delegate sends all of stdout/stderr back to
real stdout/stderr (it is also flexible so you can redirect to file or any
other stream but we're using the defaults again).

All those delegates are connected so one delegate gives the output to another
delegate. In practice it looks like this::

    (real data from the process) -> Decode -> Encode -> Redirect

Let's see how that works now:

    >>> returncode = unicode_popen.call(["echo", "zażółć gęsią jaźń"])
    zażółć gęsią jaźń

Well that was boring, but the point here is that id did _not_ crash on any
UnicodeDecodeErrors and I actually used some non-ASCII characters.

One thing worth pointing out is that unlike in subprocess, each call() returns
the process exit code::

    >>> returncode
    0

Using Transform delegate
========================

So now we have the basics. Let's explore further. The Transform delegate
allows one to call a user specified function on each line of the output.

As before we'll build a list of participating delegate objects, we'll start
with the Decode delegate, then the Transform delegate, the Encode and lastly,
Redirect. This will look like this:

    (output from process) -> Decode -> Transform -> Encode -> Redirect

For clarity we'll define the transformation first::

    >>> def transform_fn(stream_name, line):
    ...     return "{0}: {1}".format(stream_name, line)

Then build the actual stack of delegates::

    >>> delegate = extcmd.Decode(
    ...     extcmd.Transform(transform_fn,
    ...         extcmd.EncodeInPython2(
    ...             extcmd.Redirect())))
    >>> transform_popen = extcmd.ExternalCommandWithDelegate(delegate)
    >>> returncode = transform_popen.call(["echo", "hello"])
    stdout: hello

Simple Unicode-aware sed(1)
===========================

Let's build a simple in sed(1) like program. We'll use the 're' module to
actually transform text. Let's import it now::

    >>> import re

Let's define another transformation function:

    >>> def transform_fn(stream_name, line):
    ...     return re.sub("Hello", "Goodbye", line)

And plug it into the stack we've used before:

    >>> delegate = extcmd.Decode(
    ...     extcmd.Transform(transform_fn,
    ...         extcmd.EncodeInPython2(
    ...             extcmd.Redirect())))
    >>> sed_popen = extcmd.ExternalCommandWithDelegate(delegate)
    >>> returncode = sed_popen.call(["echo", "Hello World"])
    Goodbye World

Simple tee(1)
=============

Ok, so one more example, this time tee(1)-like program. This pattern can be
used to build various kinds of programs where many consumers get to see the
data that was read.

We'll use one more delegate this time, the extcmd.Chain (which is, from
retrospective, rather unfortunately named, as it's really a "fork" while
regular delegates build a chain themselves).

So this example will save everything written to stdout to a log file, while
still displaying it back to the user::

    >>> delegate = extcmd.Chain([
    ...     extcmd.Decode(
    ...         extcmd.EncodeInPython2(
    ...             extcmd.Redirect())),
    ...     extcmd.Redirect(
    ...         stdout=open("stdout.log", "wb"),
    ...         close_stdout_on_end=True)
    ... ])
    >>> tee_popen = extcmd.ExternalCommandWithDelegate(delegate)
    >>> returncode = tee_popen.call(['echo', "Hello Tee!"])
    Hello Tee!

So this example is actually more interesting, unlike before we don't decode
_all_ data, only the data that is displayed, the stdout.log file will contain a
verbatim copy of all the bytes that were produced by the called process::

    >>> import os
    >>> assert os.path.exists("stdout.log")
    >>> with open("stdout.log", "rt") as stream:
    ...     stream.read()
    'Hello Tee!\\n'
    >>> os.remove("stdout.log")

Misc stuff
==========

Apart from ExtrnalCommandWithDelegate there is a base class called
ExternalCommand that simply helps if you want to subclass and override the
call() method.

There is also the check_call() method that behaves exactly as in the subprocess
module, by raising subprocess.CalledProcessError exception on a non-zero return
code

    >>> extcmd.ExternalCommand().check_call(['false'])
    ... # doctest: +IGNORE_EXCEPTION_DETAIL
    Traceback (most recent call last):
    ...
    CalledProcessError: Command '['false']' returned non-zero exit status 1

If you don't use check_call You can also look at the return code that is
returned from each call(). The returncode is also passed to each delegate that
supports the on_end() method::


    >>> import sys
    >>> class ReturnCode(extcmd.DelegateBase):
    ...     def on_end(self, returncode):
    ...         sys.stdout.write("Return code is %s\\n" % returncode)
    >>> returncode = extcmd.ExternalCommandWithDelegate(
    ...     ReturnCode()).call(['false'])
    Return code is 1
    >>> returncode
    1

Each started program is also passed to the on_start() method::

    >>> import sys
    >>> class VerboseStart(extcmd.DelegateBase):
    ...     def on_begin(self, args, kwargs):
    ...         sys.stdout.write("Starting %r %r\\n" % (args, kwargs))
    >>> returncode = extcmd.ExternalCommandWithDelegate(
    ...     VerboseStart()).call(['true'])
    Starting (['true'],) {}
"""

__version__ = (1, 0, 1, "final", 0)

from queue import Queue
import abc
import errno
import logging
import signal
import subprocess
import sys
import threading
try:
    import posix
except ImportError:
    posix = None


_logger = logging.getLogger("plainbox.vendor.extcmd")


class ExternalCommand(object):
    """
    A subprocess.Popen wrapper with that is friendly for sub-classing with
    common .call() and check_call() methods.
    """

    def call(self, *args, **kwargs):
        """
        Invoke a sub-command and wait for it to finish.

        Returns the command error code
        """
        proc = self._popen(*args, **kwargs)
        proc.wait()
        return proc.returncode

    def check_call(self, *args, **kwargs):
        """
        Invoke a sub-command and wait for it to finish while raising exception
        if the return code is not zero.

        The raised exception is the same as raised by subprocess.check_call(),
        that is :class:`subprocess.CalledProcessError`
        """
        returncode = self.call(*args, **kwargs)
        if returncode != 0:
            raise subprocess.CalledProcessError(
                returncode, kwargs.get("args") or args[0])
        return returncode

    def _popen(self, *args, **kwargs):
        if posix:
            kwargs['close_fds'] = True
        return subprocess.Popen(*args, **kwargs)


class IDelegate(object, metaclass=abc.ABCMeta):
    """
    Interface class for delegates compatible with ExtrnalCommandWithDelegate
    """

    @abc.abstractmethod
    def on_begin(self, args, kwargs):
        """
        Callback invoked when a command begins
        """

    @abc.abstractmethod
    def on_line(self, stream_name, line):
        """
        Callback invoked for each line of the output

        This method is only called when the ``CHUNKED_IO`` flag is **not**
        passed to extcmd. Otherwise :meth:`on_chunk()` will be called instead.
        """

    @abc.abstractmethod
    def on_chunk(self, stream_name, chunk):
        """
        Callback invoked on each chunk of the input

        This method is only called when the ``CHUNKED_IO`` flag is passed to
        extcmd. Otherwise :meth:`on_line()` will be called instead.
        """

    @abc.abstractmethod
    def on_end(self, returncode):
        """
        Callback invoked when a command ends
        """

    @abc.abstractmethod
    def on_abnormal_end(self, signal_num):
        """
        Callback invoked when a command gets killed by a signal
        """

    @abc.abstractmethod
    def on_interrupt(self):
        """
        Callback invoked when the user triggers KeyboardInterrupt
        """


class DelegateBase(IDelegate):
    """
    An IDelegate implementation that does nothing
    """

    def on_begin(self, args, kwargs):
        """
        Do nothing
        """

    def on_line(self, stream_name, line):
        """
        Do nothing
        """

    def on_chunk(self, stream_name, chunk):
        """
        Do nothing
        """

    def on_end(self, returncode):
        """
        Do nothing
        """

    def on_abnormal_end(self, signal_num):
        """
        Do nothing
        """

    def on_interrupt(self):
        """
        Do nothing
        """


class SafeDelegate(IDelegate):
    """
    Delegate that checks for missing methods in another delegate

    This class is useful when your delegate is of the older type (it may just
    have the on_line method) but you don't want to provide all of the dummy
    methods.

    It is automatically used by ExternalCommandWithDelegate, Chain and
    Transform classes
    """

    def __init__(self, delegate):
        if isinstance(delegate, IDelegate):
            raise TypeError(
                "Using SafeDelegate with IDelegate subclass makes no sense")
        self._delegate = delegate

    def __repr__(self):
        return "<{0} wrapping {1!r}>".format(
            self.__class__.__name__, self._delegate)

    def on_begin(self, args, kwargs):
        """
        Call on_begin() on the wrapped delegate if supported
        """
        if hasattr(self._delegate, "on_begin"):
            self._delegate.on_begin(args, kwargs)

    def on_line(self, stream_name, line):
        """
        Call on_line() on the wrapped delegate if supported
        """
        if hasattr(self._delegate, "on_line"):
            self._delegate.on_line(stream_name, line)

    def on_chunk(self, stream_name, chunk):
        """
        Call on_chunk() on the wrapped delegate if supported
        """
        if hasattr(self._delegate, "on_chunk"):
            self._delegate.on_chunk(stream_name, chunk)

    def on_end(self, returncode):
        """
        Call on_end() on the wrapped delegate if supported
        """
        if hasattr(self._delegate, "on_end"):
            self._delegate.on_end(returncode)

    def on_abnormal_end(self, signal_num):
        """
        Call on_abnormal_end() on the wrapped delegate if supported
        """
        if hasattr(self._delegate, "on_abnormal_end"):
            self._delegate.on_abnormal_end(signal_num)

    def on_interrupt(self):
        """
        Call on_interrupt() on the wrapped delegate if supported
        """
        if hasattr(self._delegate, "on_interrupt"):
            self._delegate.on_interrupt()

    @classmethod
    def wrap_if_needed(cls, delegate):
        """
        Wrap another delegate in SafeDelegate if needed
        """
        if isinstance(delegate, IDelegate):
            return delegate
        else:
            return cls(delegate)


CHUNKED_IO = 1


class ExternalCommandWithDelegate(ExternalCommand):
    """
    The actually interesting subclass of ExternalCommand.

    Here both stdout and stderr are unconditionally captured and parsed for
    line-by-line output that is then passed to a helper delegate object.

    This allows writing 'tee' like programs that both display (with possible
    transformations) and store the output stream.

    ..note:
        Technically this class uses threads and queues to communicate which is
        very heavyweight but (yay) works portably for windows. A unix-specific
        subclass implementing this with _just_ poll could be provided with the
        same interface.
    """

    def __init__(self, delegate, killsig=signal.SIGINT, flags=0):
        """
        Set the delegate helper. Technically it needs to have a 'on_line()'
        method. For actual example code look at :class:`Tee`.
        """
        _logger.debug("ExternalCommandWithDelegate(%r, killsig=%r, flags=%x)",
                      delegate, killsig, flags)
        self._queue = Queue()
        self._delegate = SafeDelegate.wrap_if_needed(delegate)
        self._killsig = killsig
        self._flags = flags

    def call(self, *args, **kwargs):
        """
        Invoke the desired sub-process and intercept the output.
        See the description of the class for details.

        .. note:
            A very important aspect is that CTRL-C (aka KeyboardInterrupt) will
            KILL the invoked subprocess. This is handled by
            _on_keyboard_interrupt() method.
        """
        # Notify that the process is about to start
        self._delegate.on_begin(args, kwargs)
        # Setup stodut/stderr redirection
        kwargs['stdout'] = subprocess.PIPE
        kwargs['stderr'] = subprocess.PIPE
        proc = None
        stdout_reader = None
        stderr_reader = None
        queue_worker = None
        should_terminate = True
        try:
            # Start the process
            _logger.debug("starting process %r", (args,))
            proc = self._popen(*args, **kwargs)
            _logger.debug("Process created: %r (pid: %d)", proc, proc.pid)
            # Setup all worker threads. By now the pipes have been created and
            # proc.stdout/proc.stderr point to open pipe objects.
            stdout_reader = threading.Thread(
                target=self._read_stream, name='stdout_reader',
                args=(proc.stdout, "stdout"))
            stderr_reader = threading.Thread(
                target=self._read_stream, name='stderr_reader',
                args=(proc.stderr, "stderr"))
            queue_worker = threading.Thread(
                target=self._drain_queue, name='queue_worker')
            # Start all workers
            _logger.debug("Starting thread: %r", queue_worker)
            queue_worker.start()
            _logger.debug("Starting thread: %r", stdout_reader)
            stdout_reader.start()
            _logger.debug("Starting thread: %r", stderr_reader)
            stderr_reader.start()
            while True:
                try:
                    # Wait for the process to finish
                    _logger.debug("Waiting for process to exit")
                    return_code = proc.wait()
                    _logger.debug(
                        "Process did exit with code %d", return_code)
                    # Break out of the endless loop if it does
                    should_terminate = False
                    break
                except KeyboardInterrupt:
                    _logger.debug("KeyboardInterrupt in call()")
                    # On interrupt send a signal to the process
                    self._on_keyboard_interrupt(proc)
                    # And send a notification about this
                    self._delegate.on_interrupt()
        finally:
            do_close = False
            # Don't try to terminate processes that we know have exited.
            # This causes all kinds of ugly issues on Windows.
            if should_terminate:
                # Try to kill the process
                if proc is not None:
                    try:
                        _logger.debug("Calling terminate() on the process")
                        proc.terminate()
                        if hasattr(signal, "SIGKILL"):
                            _logger.debug("Killing the process")
                            proc.send_signal(signal.SIGKILL)
                            _logger.debug("Killing the process again")
                            proc.send_signal(signal.SIGKILL)
                    except OSError as exc:
                        if exc.errno == errno.ESRCH:
                            _logger.debug("The process is already dead")
                        else:
                            _logger.warning("Cannot kill the process: %s", exc)
                            do_close = True
                            raise
            # Wait until all worker threads shut down
            _logger.debug("Joining all threads...")
            if do_close:
                _logger.debug("Closing child stdout")
                proc.stdout.close()
            if stdout_reader is not None and stdout_reader.is_alive():
                _logger.debug("Joining 1/3 %r...", stdout_reader)
                stdout_reader.join()
                _logger.debug("Joined thread: %r", stdout_reader)
            if do_close:
                _logger.debug("Closing child stderr")
                proc.stderr.close()
            if stderr_reader is not None and stderr_reader.is_alive():
                _logger.debug("Joining 2/3 %r...", stderr_reader)
                stderr_reader.join()
                _logger.debug("Joined thread: %r", stderr_reader)
            if queue_worker is not None:
                # Tell the queue worker to shut down
                _logger.debug("Telling queue_worker thread to exit")
                self._queue.put(None)
                _logger.debug("Joining 3/3 %r...", queue_worker)
                queue_worker.join()
                _logger.debug("Joined thread: %r", queue_worker)
        # Notify that the process has finished
        if proc.returncode < 0:
            # negative returncode from subprocess is a sign that the process
            # was killed by a signal with that number
            self._delegate.on_abnormal_end(-proc.returncode)
        else:
            self._delegate.on_end(proc.returncode)
        return proc.returncode

    def _on_keyboard_interrupt(self, proc):
        _logger.debug("Sending signal %s to the process", self._killsig)
        try:
            proc.send_signal(self._killsig)
        except OSError as exc:
            if exc.errno == errno.ESRCH:
                pass
                _logger.debug(
                    "Cannot deliver signal %d, the process gone",
                    self._killsig)
            else:
                raise

    def _read_stream(self, stream, stream_name):
        _logger.debug("_read_stream(%r, %r) entering", stream, stream_name)
        while True:
            try:
                if self._flags & CHUNKED_IO:
                    data = stream.read(1)
                else:
                    data = stream.readline()
            except (IOError, ValueError):
                # Ignore IOError and ValueError that may be raised if
                # the stream was closed this can happen if the process exits
                # very quickly without printing anything and the cleanup code
                # starts to close both of the streams
                break
            else:
                if len(data) == 0:
                    break
                cmd = (stream_name, data)
                self._queue.put(cmd)
        _logger.debug("_read_stream(%r, %r) exiting", stream, stream_name)

    def _drain_queue(self):
        _logger.debug("_drain_queue() entering")
        while True:
            args = self._queue.get()
            if args is None:
                break
            if self._flags & CHUNKED_IO:
                self._delegate.on_chunk(*args)
            else:
                self._delegate.on_line(*args)
        _logger.debug("_drain_queue() exiting")


class Chain(IDelegate):
    """
    Delegate for using a chain of delegates.

    Each method is invoked for all the delegates. This make it easy to compose
    the desired effect out of a list of smaller specialized classes.
    """

    def __init__(self, delegate_list):
        """
        Construct a Chain of delegates.

        Each delegate is wrapped in :class:`SafeDelegate` if needed
        """
        self.delegate_list = [
            SafeDelegate.wrap_if_needed(delegate)
            for delegate in delegate_list]

    def __repr__(self):
        return "<{0} {1!r}>".format(
            self.__class__.__name__, self.delegate_list)

    def on_begin(self, args, kwargs):
        """
        Call the on_begin() method on each delegate in the list
        """
        for delegate in self.delegate_list:
            delegate.on_begin(args, kwargs)

    def on_line(self, stream_name, line):
        """
        Call the on_line() method on each delegate in the list
        """
        for delegate in self.delegate_list:
            delegate.on_line(stream_name, line)

    def on_chunk(self, stream_name, chunk):
        """
        Call the on_line() method on each delegate in the list
        """
        for delegate in self.delegate_list:
            delegate.on_chunk(stream_name, chunk)

    def on_end(self, returncode):
        """
        Call the on_end() method on each delegate in the list
        """
        for delegate in self.delegate_list:
            delegate.on_end(returncode)

    def on_abnormal_end(self, signal_num):
        for delegate in self.delegate_list:
            delegate.on_abnormal_end(signal_num)

    def on_interrupt(self):
        """
        Call the on_interrupt() method on each delegate in the list
        """
        for delegate in self.delegate_list:
            delegate.on_interrupt()


class Redirect(DelegateBase):
    """
    Redirect each line to desired stream.
    """

    def __init__(self, stdout=None, stderr=None, close_stdout_on_end=False,
                 close_stderr_on_end=False):
        """
        Set ``stdout`` and ``stderr`` streams for writing the output to.  If
        left blank then ``sys.stdout`` and ``sys.stderr`` are used instead.
        """
        self._stdout = stdout or sys.stdout
        self._stderr = stderr or sys.stderr
        self._close_stdout_on_end = close_stdout_on_end
        self._close_stderr_on_end = close_stderr_on_end

    def __repr__(self):
        return "<{0} stdout:{1!r} stderr:{2!r}>".format(
            self.__class__.__name__,
            self._stdout, self._stderr)

    def on_line(self, stream_name, line):
        """
        Write each line, verbatim, to the desired stream.
        """
        assert stream_name == 'stdout' or stream_name == 'stderr'
        if stream_name == 'stdout':
            self._stdout.write(line)
        else:
            self._stderr.write(line)

    def on_chunk(self, stream_name, chunk):
        """
        Write each chunk, verbatim, to the desired stream.
        """
        assert stream_name == 'stdout' or stream_name == 'stderr'
        if stream_name == 'stdout':
            self._stdout.write(chunk)
        else:
            self._stderr.write(chunk)

    def on_end(self, returncode):
        """
        Close the output streams if requested
        """
        if self._close_stdout_on_end:
            self._stdout.close()
        if self._close_stderr_on_end:
            self._stderr.close()

    def on_abnormal_end(self, signal_num):
        """
        Close the output streams if requested
        """
        if self._close_stdout_on_end:
            self._stdout.close()
        if self._close_stderr_on_end:
            self._stderr.close()


class Transform(DelegateBase):
    """
    Transformation filter for output lines

    Allows to transform each line before being passed down to subsequent
    delegate. The delegate is automatically wrapped in :class:`SafeDelegate` if
    needed.
    """

    def __init__(self, callback, delegate):
        """
        Set the callback and subsequent delegate.
        """
        self._callback = callback
        self._delegate = SafeDelegate.wrap_if_needed(delegate)

    def __repr__(self):
        return "<{0} callback:{1!r} delegate:{2!r}>".format(
            self.__class__.__name__, self._callback, self._delegate)

    def on_line(self, stream_name, line):
        """
        Transform each line by calling callback(stream_name, line) and pass it
        down to the subsequent delegate.
        """
        transformed_line = self._callback(stream_name, line)
        self._delegate.on_line(stream_name, transformed_line)

    def on_chunk(self, stream_name, chunk):
        """
        Transform each chunk by calling callback(stream_name, chunk) and pass
        it down to the subsequent delegate.
        """
        transformed_chunk = self._callback(stream_name, chunk)
        self._delegate.on_chunk(stream_name, transformed_chunk)

    def on_begin(self, args, kwargs):
        self._delegate.on_begin(args, kwargs)

    def on_end(self, returncode):
        self._delegate.on_end(returncode)

    def on_abnormal_end(self, signal_num):
        self._delegate.on_abnormal_end(signal_num)


class Decode(Transform):
    """
    Decode output lines with the specified encoding

    Allows to work with Unicode strings on the inside of the application and
    bytes on the outside, as it should be. Especially useful in python 3.
    """

    def __init__(self, delegate, encoding='UTF-8'):
        """
        Set the callback and subsequent delegate.
        """
        super(Decode, self).__init__(self._decode, delegate)
        self._encoding = encoding

    def __repr__(self):
        return "<{0} encoding:{1!r} delegate:{2!r}>".format(
            self.__class__.__name__, self._encoding, self._delegate)

    def _decode(self, stream_name, line):
        """
        Decode each line with the configured encoding
        """
        return line.decode(self._encoding)


class Encode(Transform):
    """
    Encode output lines into the specified bytes encoding

    Allows to work with Unicode strings on the inside of the application and
    bytes on the outside, as it should be. Especially useful in python 3.
    """

    def __init__(self, delegate, encoding='UTF-8'):
        """
        Set the callback and subsequent delegate.
        """
        super(Encode, self).__init__(self._encode, delegate)
        self._encoding = encoding

    def __repr__(self):
        return "<{0} encoding:{1!r} delegate:{2!r}>".format(
            self.__class__.__name__, self._encoding, self._delegate)

    def _encode(self, stream_name, line):
        """
        Decode each line with the configured encoding
        """
        return line.encode(self._encoding)


class EncodeInPython2(Encode):
    """
    Encode Unicode strings to byte strings, but only in python2

    This class is kind of awkward but it solves one interesting problem in the
    python3 transition, that stdout/stderr are opened in text mode by default
    (unless redirected). This means that you can and indeed must write Unicode
    strings to that stream, not byte strings.
    """

    def _encode(self, stream_name, line):
        """
        Decode each line with the configured encoding
        """
        if sys.version_info[0] == 2:
            return line.encode(self._encoding)
        else:
            return line