/usr/lib/python2.7/dist-packages/calabash/common.py is in python-calabash 0.0.3-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 | # -*- coding: utf-8 -*-
import re
from calabash.pipeline import pipe
@pipe
def echo(item):
"""
Yield a single item. Equivalent to ``iter([item])``, but nicer-looking.
>>> list(echo(1))
[1]
>>> list(echo('hello'))
['hello']
"""
yield item
@pipe
def cat(*args, **kwargs):
r"""
Read a file. Passes directly through to a call to `open()`.
>>> src_file = __file__.replace('.pyc', '.py')
>>> for line in cat(src_file):
... if line.startswith('def cat'):
... print repr(line)
'def cat(*args, **kwargs):\n'
"""
return iter(open(*args, **kwargs))
@pipe
def curl(url):
"""
Fetch a URL, yielding output line-by-line.
>>> UNLICENSE = 'http://unlicense.org/UNLICENSE'
>>> for line in curl(UNLICENSE): # doctest: +SKIP
... print line,
This is free and unencumbered software released into the public domain.
...
"""
import urllib2
conn = urllib2.urlopen(url)
try:
line = conn.readline()
while line:
yield line
line = conn.readline()
finally:
conn.close()
@pipe
def grep(stdin, pattern_src):
"""
Filter strings on stdin for the given regex (uses :func:`re.search`).
>>> list(iter(['cat', 'cabbage', 'conundrum', 'cathedral']) | grep(r'^ca'))
['cat', 'cabbage', 'cathedral']
"""
pattern = re.compile(pattern_src)
for line in stdin:
if pattern.search(line):
yield line
@pipe
def sed(stdin, pattern_src, replacement, exclusive=False):
"""
Apply :func:`re.sub` to each line on stdin with the given pattern/repl.
>>> list(iter(['cat', 'cabbage']) | sed(r'^ca', 'fu'))
['fut', 'fubbage']
Upon encountering a non-matching line of input, :func:`sed` will pass it
through as-is. If you want to change this behaviour to only yield lines
which match the given pattern, pass `exclusive=True`::
>>> list(iter(['cat', 'nomatch']) | sed(r'^ca', 'fu'))
['fut', 'nomatch']
>>> list(iter(['cat', 'nomatch']) | sed(r'^ca', 'fu', exclusive=True))
['fut']
"""
pattern = re.compile(pattern_src)
for line in stdin:
match = pattern.search(line)
if match:
yield (line[:match.start()] +
match.expand(replacement) +
line[match.end():])
elif not exclusive:
yield line
@pipe
def pretty_printer(stdin, **kwargs):
"""
Pretty print each item on stdin and pass it straight through.
>>> for item in iter([{'a': 1}, ['b', 'c', 3]]) | pretty_printer():
... pass
{'a': 1}
['b', 'c', 3]
"""
import pprint
for item in stdin:
pprint.pprint(item, **kwargs)
yield item
@pipe
def map(stdin, func):
"""
Map each item on stdin through the given function.
>>> list(xrange(5) | map(lambda x: x + 2))
[2, 3, 4, 5, 6]
"""
for item in stdin:
yield func(item)
@pipe
def filter(stdin, predicate):
"""
Only pass through items for which `predicate(item)` is truthy.
>>> list(xrange(5) | filter(lambda x: x % 2 == 0))
[0, 2, 4]
"""
for item in stdin:
if predicate(item):
yield item
@pipe
def sh(stdin, command=None, check_success=False):
r"""
Run a shell command, send it input, and produce its output.
>>> print ''.join(echo("h\ne\nl\nl\no") | sh('sort -u'))
e
h
l
o
<BLANKLINE>
>>> for line in sh('echo Hello World'):
... print line,
Hello World
>>> for line in sh('false', check_success=True):
... print line, # doctest: +ELLIPSIS
Traceback (most recent call last):
...
CalledProcessError: Command '['false']' returned non-zero exit status 1
"""
import subprocess
import shlex
if command is None:
stdin, command = (), stdin
if isinstance(command, basestring):
command = shlex.split(command)
pipe = subprocess.Popen(command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
try:
for line in stdin:
pipe.stdin.write(line)
pipe.stdin.close()
for line in pipe.stdout:
yield line
finally:
result = pipe.wait()
if check_success and result != 0:
raise subprocess.CalledProcessError(result, command)
|