/usr/share/pyshared/audioread/ffdec.py is in python-audioread 1.0.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | # This file is part of audioread.
# Copyright 2012, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Read audio data using the ffmpeg command line tools via a UNIX
pipe.
"""
import subprocess
import re
import threading
import select
import time
from . import DecodeError
class FFmpegError(DecodeError):
pass
class CommunicationError(FFmpegError):
"""Raised when the output of FFmpeg is not parseable."""
class UnsupportedError(FFmpegError):
"""The file could not be decoded by FFmpeg."""
class NotInstalledError(FFmpegError):
"""Could not find the ffmpeg binary."""
class ReadTimeoutError(FFmpegError):
"""Reading from the ffmpeg command-line tool timed out."""
class ReaderThread(threading.Thread):
"""A thread that consumes data from a filehandle. This is used to ensure
that a buffer for an input stream never fills up.
"""
# It may seem a little hacky, but this is the most straightforward &
# reliable way I can think of to do this. select() is sort of
# inefficient because it doesn't indicate how much is available to
# read -- so I end up reading character by character.
def __init__(self, fh, blocksize=1024):
super(ReaderThread, self).__init__()
self.fh = fh
self.blocksize = blocksize
self.daemon = True
self.data = []
def run(self):
while True:
data = self.fh.read(self.blocksize)
if not data:
break
self.data.append(data)
class FFmpegAudioFile(object):
"""An audio file decoded by the ffmpeg command-line utility."""
def __init__(self, filename):
try:
self.proc = subprocess.Popen(
['avconv', '-i', filename, '-f', 's16le', '-'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
except OSError:
raise NotInstalledError()
# Read relevant information from stderr.
self._get_info()
# Start a separate thread to read the rest of the data from
# stderr.
self.stderr_reader = ReaderThread(self.proc.stderr)
self.stderr_reader.start()
def read_data(self, block_size=4096, timeout=10.0):
"""Read blocks of raw PCM data from the file."""
# Read from stdout on this thread.
start_time = time.time()
while True:
# Wait for data to be available or a timeout.
rready, _, xready = select.select((self.proc.stdout,),
(), (self.proc.stdout,),
timeout)
end_time = time.time()
if not rready and not xready:
if end_time - start_time >= timeout:
# Nothing interesting has happened for a while --
# FFmpeg is probably hanging.
raise ReadTimeoutError(
'ffmpeg output: %s' %
''.join(self.stderr_reader.data)
)
else:
# Keep waiting.
continue
start_time = end_time
data = self.proc.stdout.read(block_size)
if not data:
break
yield data
def _get_info(self):
"""Reads the tool's output from its stderr stream, extracts the
relevant information, and parses it.
"""
out_parts = []
while True:
line = self.proc.stderr.readline()
if not line:
# EOF and data not found.
raise CommunicationError("stream info not found")
# In Python 3, result of reading from stderr is bytes.
if isinstance(line, bytes):
line = line.decode('utf8', 'ignore')
line = line.strip().lower()
if 'no such file' in line:
raise IOError('file not found')
elif 'invalid data found' in line:
raise UnsupportedError()
elif 'duration:' in line:
out_parts.append(line)
elif 'audio:' in line:
out_parts.append(line)
self._parse_info(''.join(out_parts))
break
def _parse_info(self, s):
"""Given relevant data from the ffmpeg output, set audio
parameter fields on this object.
"""
# Sample rate.
match = re.search(r'(\d+) hz', s)
if match:
self.samplerate = int(match.group(1))
else:
self.samplerate = 0
# Channel count.
match = re.search(r'hz, ([^,]+),', s)
if match:
mode = match.group(1)
if mode == 'stereo':
self.channels = 2
else:
match = re.match(r'(\d+) ', mode)
if match:
self.channels = int(match.group(1))
else:
self.channels = 1
else:
self.channels = 0
# Duration.
match = re.search(
r'duration: (\d+):(\d+):(\d+).(\d)', s
)
if match:
durparts = list(map(int, match.groups()))
duration = durparts[0] * 60 * 60 + \
durparts[1] * 60 + \
durparts[2] + \
float(durparts[3]) / 10
self.duration = duration
else:
# No duration found.
self.duration = 0
def close(self):
"""Close the ffmpeg process used to perform the decoding."""
if hasattr(self, 'proc') and self.proc.returncode is None:
self.proc.terminate()
# Flush the stdout buffer (stderr already flushed).
stdout_reader = ReaderThread(self.proc.stdout)
stdout_reader.start()
self.proc.wait()
def __del__(self):
self.close()
# Iteration.
def __iter__(self):
return self.read_data()
# Context manager.
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
|