summaryrefslogtreecommitdiffstats
path: root/venv/lib/python3.9/site-packages/pydub/audio_segment.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.9/site-packages/pydub/audio_segment.py')
-rw-r--r--venv/lib/python3.9/site-packages/pydub/audio_segment.py1399
1 files changed, 1399 insertions, 0 deletions
diff --git a/venv/lib/python3.9/site-packages/pydub/audio_segment.py b/venv/lib/python3.9/site-packages/pydub/audio_segment.py
new file mode 100644
index 00000000..14ea46e0
--- /dev/null
+++ b/venv/lib/python3.9/site-packages/pydub/audio_segment.py
@@ -0,0 +1,1399 @@
+from __future__ import division
+
+import array
+import os
+import subprocess
+from tempfile import TemporaryFile, NamedTemporaryFile
+import wave
+import sys
+import struct
+from .logging_utils import log_conversion, log_subprocess_output
+from .utils import mediainfo_json, fsdecode
+import base64
+from collections import namedtuple
+
+try:
+ from StringIO import StringIO
+except:
+ from io import StringIO
+
+from io import BytesIO
+
+try:
+ from itertools import izip
+except:
+ izip = zip
+
+from .utils import (
+ _fd_or_path_or_tempfile,
+ db_to_float,
+ ratio_to_db,
+ get_encoder_name,
+ get_array_type,
+ audioop,
+)
+from .exceptions import (
+ TooManyMissingFrames,
+ InvalidDuration,
+ InvalidID3TagVersion,
+ InvalidTag,
+ CouldntDecodeError,
+ CouldntEncodeError,
+ MissingAudioParameter,
+)
+
+if sys.version_info >= (3, 0):
+ basestring = str
+ xrange = range
+ StringIO = BytesIO
+
+
+class ClassPropertyDescriptor(object):
+
+ def __init__(self, fget, fset=None):
+ self.fget = fget
+ self.fset = fset
+
+ def __get__(self, obj, klass=None):
+ if klass is None:
+ klass = type(obj)
+ return self.fget.__get__(obj, klass)()
+
+ def __set__(self, obj, value):
+ if not self.fset:
+ raise AttributeError("can't set attribute")
+ type_ = type(obj)
+ return self.fset.__get__(obj, type_)(value)
+
+ def setter(self, func):
+ if not isinstance(func, (classmethod, staticmethod)):
+ func = classmethod(func)
+ self.fset = func
+ return self
+
+
+def classproperty(func):
+ if not isinstance(func, (classmethod, staticmethod)):
+ func = classmethod(func)
+
+ return ClassPropertyDescriptor(func)
+
+
+AUDIO_FILE_EXT_ALIASES = {
+ "m4a": "mp4",
+ "wave": "wav",
+}
+
+WavSubChunk = namedtuple('WavSubChunk', ['id', 'position', 'size'])
+WavData = namedtuple('WavData', ['audio_format', 'channels', 'sample_rate',
+ 'bits_per_sample', 'raw_data'])
+
+
+def extract_wav_headers(data):
+ # def search_subchunk(data, subchunk_id):
+ pos = 12 # The size of the RIFF chunk descriptor
+ subchunks = []
+ while pos + 8 <= len(data) and len(subchunks) < 10:
+ subchunk_id = data[pos:pos + 4]
+ subchunk_size = struct.unpack_from('<I', data[pos + 4:pos + 8])[0]
+ subchunks.append(WavSubChunk(subchunk_id, pos, subchunk_size))
+ if subchunk_id == b'data':
+ # 'data' is the last subchunk
+ break
+ pos += subchunk_size + 8
+
+ return subchunks
+
+
+def read_wav_audio(data, headers=None):
+ if not headers:
+ headers = extract_wav_headers(data)
+
+ fmt = [x for x in headers if x.id == b'fmt ']
+ if not fmt or fmt[0].size < 16:
+ raise CouldntDecodeError("Couldn't find fmt header in wav data")
+ fmt = fmt[0]
+ pos = fmt.position + 8
+ audio_format = struct.unpack_from('<H', data[pos:pos + 2])[0]
+ if audio_format != 1 and audio_format != 0xFFFE:
+ raise CouldntDecodeError("Unknown audio format 0x%X in wav data" %
+ audio_format)
+
+ channels = struct.unpack_from('<H', data[pos + 2:pos + 4])[0]
+ sample_rate = struct.unpack_from('<I', data[pos + 4:pos + 8])[0]
+ bits_per_sample = struct.unpack_from('<H', data[pos + 14:pos + 16])[0]
+
+ data_hdr = headers[-1]
+ if data_hdr.id != b'data':
+ raise CouldntDecodeError("Couldn't find data header in wav data")
+
+ pos = data_hdr.position + 8
+ return WavData(audio_format, channels, sample_rate, bits_per_sample,
+ data[pos:pos + data_hdr.size])
+
+
+def fix_wav_headers(data):
+ headers = extract_wav_headers(data)
+ if not headers or headers[-1].id != b'data':
+ return
+
+ # TODO: Handle huge files in some other way
+ if len(data) > 2**32:
+ raise CouldntDecodeError("Unable to process >4GB files")
+
+ # Set the file size in the RIFF chunk descriptor
+ data[4:8] = struct.pack('<I', len(data) - 8)
+
+ # Set the data size in the data subchunk
+ pos = headers[-1].position
+ data[pos + 4:pos + 8] = struct.pack('<I', len(data) - pos - 8)
+
+
+class AudioSegment(object):
+ """
+ AudioSegments are *immutable* objects representing segments of audio
+ that can be manipulated using python code.
+
+ AudioSegments are slicable using milliseconds.
+ for example:
+ a = AudioSegment.from_mp3(mp3file)
+ first_second = a[:1000] # get the first second of an mp3
+ slice = a[5000:10000] # get a slice from 5 to 10 seconds of an mp3
+ """
+ converter = get_encoder_name() # either ffmpeg or avconv
+
+ # TODO: remove in 1.0 release
+ # maintain backwards compatibility for ffmpeg attr (now called converter)
+ @classproperty
+ def ffmpeg(cls):
+ return cls.converter
+
+ @ffmpeg.setter
+ def ffmpeg(cls, val):
+ cls.converter = val
+
+ DEFAULT_CODECS = {
+ "ogg": "libvorbis"
+ }
+
+ def __init__(self, data=None, *args, **kwargs):
+ self.sample_width = kwargs.pop("sample_width", None)
+ self.frame_rate = kwargs.pop("frame_rate", None)
+ self.channels = kwargs.pop("channels", None)
+
+ audio_params = (self.sample_width, self.frame_rate, self.channels)
+
+ if isinstance(data, array.array):
+ try:
+ data = data.tobytes()
+ except:
+ data = data.tostring()
+
+ # prevent partial specification of arguments
+ if any(audio_params) and None in audio_params:
+ raise MissingAudioParameter("Either all audio parameters or no parameter must be specified")
+
+ # all arguments are given
+ elif self.sample_width is not None:
+ if len(data) % (self.sample_width * self.channels) != 0:
+ raise ValueError("data length must be a multiple of '(sample_width * channels)'")
+
+ self.frame_width = self.channels * self.sample_width
+ self._data = data
+
+ # keep support for 'metadata' until audio params are used everywhere
+ elif kwargs.get('metadata', False):
+ # internal use only
+ self._data = data
+ for attr, val in kwargs.pop('metadata').items():
+ setattr(self, attr, val)
+ else:
+ # normal construction
+ try:
+ data = data if isinstance(data, (basestring, bytes)) else data.read()
+ except(OSError):
+ d = b''
+ reader = data.read(2 ** 31 - 1)
+ while reader:
+ d += reader
+ reader = data.read(2 ** 31 - 1)
+ data = d
+
+ wav_data = read_wav_audio(data)
+ if not wav_data:
+ raise CouldntDecodeError("Couldn't read wav audio from data")
+
+ self.channels = wav_data.channels
+ self.sample_width = wav_data.bits_per_sample // 8
+ self.frame_rate = wav_data.sample_rate
+ self.frame_width = self.channels * self.sample_width
+ self._data = wav_data.raw_data
+ if self.sample_width == 1:
+ # convert from unsigned integers in wav
+ self._data = audioop.bias(self._data, 1, -128)
+
+ # Convert 24-bit audio to 32-bit audio.
+ # (stdlib audioop and array modules do not support 24-bit data)
+ if self.sample_width == 3:
+ byte_buffer = BytesIO()
+
+ # Workaround for python 2 vs python 3. _data in 2.x are length-1 strings,
+ # And in 3.x are ints.
+ pack_fmt = 'BBB' if isinstance(self._data[0], int) else 'ccc'
+
+ # This conversion maintains the 24 bit values. The values are
+ # not scaled up to the 32 bit range. Other conversions could be
+ # implemented.
+ i = iter(self._data)
+ padding = {False: b'\x00', True: b'\xFF'}
+ for b0, b1, b2 in izip(i, i, i):
+ byte_buffer.write(padding[b2 > b'\x7f'[0]])
+ old_bytes = struct.pack(pack_fmt, b0, b1, b2)
+ byte_buffer.write(old_bytes)
+
+ self._data = byte_buffer.getvalue()
+ self.sample_width = 4
+ self.frame_width = self.channels * self.sample_width
+
+ super(AudioSegment, self).__init__(*args, **kwargs)
+
+ @property
+ def raw_data(self):
+ """
+ public access to the raw audio data as a bytestring
+ """
+ return self._data
+
+ def get_array_of_samples(self, array_type_override=None):
+ """
+ returns the raw_data as an array of samples
+ """
+ if array_type_override is None:
+ array_type_override = self.array_type
+ return array.array(array_type_override, self._data)
+
+ @property
+ def array_type(self):
+ return get_array_type(self.sample_width * 8)
+
+ def __len__(self):
+ """
+ returns the length of this audio segment in milliseconds
+ """
+ return round(1000 * (self.frame_count() / self.frame_rate))
+
+ def __eq__(self, other):
+ try:
+ return self._data == other._data
+ except:
+ return False
+
+ def __hash__(self):
+ return hash(AudioSegment) ^ hash((self.channels, self.frame_rate, self.sample_width, self._data))
+
+ def __ne__(self, other):
+ return not (self == other)
+
+ def __iter__(self):
+ return (self[i] for i in xrange(len(self)))
+
+ def __getitem__(self, millisecond):
+ if isinstance(millisecond, slice):
+ if millisecond.step:
+ return (
+ self[i:i + millisecond.step]
+ for i in xrange(*millisecond.indices(len(self)))
+ )
+
+ start = millisecond.start if millisecond.start is not None else 0
+ end = millisecond.stop if millisecond.stop is not None \
+ else len(self)
+
+ start = min(start, len(self))
+ end = min(end, len(self))
+ else:
+ start = millisecond
+ end = millisecond + 1
+
+ start = self._parse_position(start) * self.frame_width
+ end = self._parse_position(end) * self.frame_width
+ data = self._data[start:end]
+
+ # ensure the output is as long as the requester is expecting
+ expected_length = end - start
+ missing_frames = (expected_length - len(data)) // self.frame_width
+ if missing_frames:
+ if missing_frames > self.frame_count(ms=2):
+ raise TooManyMissingFrames(
+ "You should never be filling in "
+ " more than 2 ms with silence here, "
+ "missing frames: %s" % missing_frames)
+ silence = audioop.mul(data[:self.frame_width],
+ self.sample_width, 0)
+ data += (silence * missing_frames)
+
+ return self._spawn(data)
+
+ def get_sample_slice(self, start_sample=None, end_sample=None):
+ """
+ Get a section of the audio segment by sample index.
+
+ NOTE: Negative indices do *not* address samples backword
+ from the end of the audio segment like a python list.
+ This is intentional.
+ """
+ max_val = int(self.frame_count())
+
+ def bounded(val, default):
+ if val is None:
+ return default
+ if val < 0:
+ return 0
+ if val > max_val:
+ return max_val
+ return val
+
+ start_i = bounded(start_sample, 0) * self.frame_width
+ end_i = bounded(end_sample, max_val) * self.frame_width
+
+ data = self._data[start_i:end_i]
+ return self._spawn(data)
+
+ def __add__(self, arg):
+ if isinstance(arg, AudioSegment):
+ return self.append(arg, crossfade=0)
+ else:
+ return self.apply_gain(arg)
+
+ def __radd__(self, rarg):
+ """
+ Permit use of sum() builtin with an iterable of AudioSegments
+ """
+ if rarg == 0:
+ return self
+ raise TypeError("Gains must be the second addend after the "
+ "AudioSegment")
+
+ def __sub__(self, arg):
+ if isinstance(arg, AudioSegment):
+ raise TypeError("AudioSegment objects can't be subtracted from "
+ "each other")
+ else:
+ return self.apply_gain(-arg)
+
+ def __mul__(self, arg):
+ """
+ If the argument is an AudioSegment, overlay the multiplied audio
+ segment.
+
+ If it's a number, just use the string multiply operation to repeat the
+ audio.
+
+ The following would return an AudioSegment that contains the
+ audio of audio_seg eight times
+
+ `audio_seg * 8`
+ """
+ if isinstance(arg, AudioSegment):
+ return self.overlay(arg, position=0, loop=True)
+ else:
+ return self._spawn(data=self._data * arg)
+
+ def _spawn(self, data, overrides={}):
+ """
+ Creates a new audio segment using the metadata from the current one
+ and the data passed in. Should be used whenever an AudioSegment is
+ being returned by an operation that would alters the current one,
+ since AudioSegment objects are immutable.
+ """
+ # accept lists of data chunks
+ if isinstance(data, list):
+ data = b''.join(data)
+
+ if isinstance(data, array.array):
+ try:
+ data = data.tobytes()
+ except:
+ data = data.tostring()
+
+ # accept file-like objects
+ if hasattr(data, 'read'):
+ if hasattr(data, 'seek'):
+ data.seek(0)
+ data = data.read()
+
+ metadata = {
+ 'sample_width': self.sample_width,
+ 'frame_rate': self.frame_rate,
+ 'frame_width': self.frame_width,
+ 'channels': self.channels
+ }
+ metadata.update(overrides)
+ return self.__class__(data=data, metadata=metadata)
+
+ @classmethod
+ def _sync(cls, *segs):
+ channels = max(seg.channels for seg in segs)
+ frame_rate = max(seg.frame_rate for seg in segs)
+ sample_width = max(seg.sample_width for seg in segs)
+
+ return tuple(
+ seg.set_channels(channels).set_frame_rate(frame_rate).set_sample_width(sample_width)
+ for seg in segs
+ )
+
+ def _parse_position(self, val):
+ if val < 0:
+ val = len(self) - abs(val)
+ val = self.frame_count(ms=len(self)) if val == float("inf") else \
+ self.frame_count(ms=val)
+ return int(val)
+
+ @classmethod
+ def empty(cls):
+ return cls(b'', metadata={
+ "channels": 1,
+ "sample_width": 1,
+ "frame_rate": 1,
+ "frame_width": 1
+ })
+
+ @classmethod
+ def silent(cls, duration=1000, frame_rate=11025):
+ """
+ Generate a silent audio segment.
+ duration specified in milliseconds (default duration: 1000ms, default frame_rate: 11025).
+ """
+ frames = int(frame_rate * (duration / 1000.0))
+ data = b"\0\0" * frames
+ return cls(data, metadata={"channels": 1,
+ "sample_width": 2,
+ "frame_rate": frame_rate,
+ "frame_width": 2})
+
+ @classmethod
+ def from_mono_audiosegments(cls, *mono_segments):
+ if not len(mono_segments):
+ raise ValueError("At least one AudioSegment instance is required")
+
+ segs = cls._sync(*mono_segments)
+
+ if segs[0].channels != 1:
+ raise ValueError(
+ "AudioSegment.from_mono_audiosegments requires all arguments are mono AudioSegment instances")
+
+ channels = len(segs)
+ sample_width = segs[0].sample_width
+ frame_rate = segs[0].frame_rate
+
+ frame_count = max(int(seg.frame_count()) for seg in segs)
+ data = array.array(
+ segs[0].array_type,
+ b'\0' * (frame_count * sample_width * channels)
+ )
+
+ for i, seg in enumerate(segs):
+ data[i::channels] = seg.get_array_of_samples()
+
+ return cls(
+ data,
+ channels=channels,
+ sample_width=sample_width,
+ frame_rate=frame_rate,
+ )
+
+ @classmethod
+ def from_file_using_temporary_files(cls, file, format=None, codec=None, parameters=None, start_second=None, duration=None, **kwargs):
+ orig_file = file
+ file, close_file = _fd_or_path_or_tempfile(file, 'rb', tempfile=False)
+
+ if format:
+ format = format.lower()
+ format = AUDIO_FILE_EXT_ALIASES.get(format, format)
+
+ def is_format(f):
+ f = f.lower()
+ if format == f:
+ return True
+ if isinstance(orig_file, basestring):
+ return orig_file.lower().endswith(".{0}".format(f))
+ if isinstance(orig_file, bytes):
+ return orig_file.lower().endswith((".{0}".format(f)).encode('utf8'))
+ return False
+
+ if is_format("wav"):
+ try:
+ obj = cls._from_safe_wav(file)
+ if close_file:
+ file.close()
+ if start_second is None and duration is None:
+ return obj
+ elif start_second is not None and duration is None:
+ return obj[start_second*1000:]
+ elif start_second is None and duration is not None:
+ return obj[:duration*1000]
+ else:
+ return obj[start_second*1000:(start_second+duration)*1000]
+ except:
+ file.seek(0)
+ elif is_format("raw") or is_format("pcm"):
+ sample_width = kwargs['sample_width']
+ frame_rate = kwargs['frame_rate']
+ channels = kwargs['channels']
+ metadata = {
+ 'sample_width': sample_width,
+ 'frame_rate': frame_rate,
+ 'channels': channels,
+ 'frame_width': channels * sample_width
+ }
+ obj = cls(data=file.read(), metadata=metadata)
+ if close_file:
+ file.close()
+ if start_second is None and duration is None:
+ return obj
+ elif start_second is not None and duration is None:
+ return obj[start_second * 1000:]
+ elif start_second is None and duration is not None:
+ return obj[:duration * 1000]
+ else:
+ return obj[start_second * 1000:(start_second + duration) * 1000]
+
+ input_file = NamedTemporaryFile(mode='wb', delete=False)
+ try:
+ input_file.write(file.read())
+ except(OSError):
+ input_file.flush()
+ input_file.close()
+ input_file = NamedTemporaryFile(mode='wb', delete=False, buffering=2 ** 31 - 1)
+ if close_file:
+ file.close()
+ close_file = True
+ file = open(orig_file, buffering=2 ** 13 - 1, mode='rb')
+ reader = file.read(2 ** 31 - 1)
+ while reader:
+ input_file.write(reader)
+ reader = file.read(2 ** 31 - 1)
+ input_file.flush()
+ if close_file:
+ file.close()
+
+ output = NamedTemporaryFile(mode="rb", delete=False)
+
+ conversion_command = [cls.converter,
+ '-y', # always overwrite existing files
+ ]
+
+ # If format is not defined
+ # ffmpeg/avconv will detect it automatically
+ if format:
+ conversion_command += ["-f", format]
+
+ if codec:
+ # force audio decoder
+ conversion_command += ["-acodec", codec]
+
+ conversion_command += [
+ "-i", input_file.name, # input_file options (filename last)
+ "-vn", # Drop any video streams if there are any
+ "-f", "wav" # output options (filename last)
+ ]
+
+ if start_second is not None:
+ conversion_command += ["-ss", str(start_second)]
+
+ if duration is not None:
+ conversion_command += ["-t", str(duration)]
+
+ conversion_command += [output.name]
+
+ if parameters is not None:
+ # extend arguments with arbitrary set
+ conversion_command.extend(parameters)
+
+ log_conversion(conversion_command)
+
+ with open(os.devnull, 'rb') as devnull:
+ p = subprocess.Popen(conversion_command, stdin=devnull, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ p_out, p_err = p.communicate()
+
+ log_subprocess_output(p_out)
+ log_subprocess_output(p_err)
+
+ try:
+ if p.returncode != 0:
+ raise CouldntDecodeError(
+ "Decoding failed. ffmpeg returned error code: {0}\n\nOutput from ffmpeg/avlib:\n\n{1}".format(
+ p.returncode, p_err.decode(errors='ignore') ))
+ obj = cls._from_safe_wav(output)
+ finally:
+ input_file.close()
+ output.close()
+ os.unlink(input_file.name)
+ os.unlink(output.name)
+
+ if start_second is None and duration is None:
+ return obj
+ elif start_second is not None and duration is None:
+ return obj[0:]
+ elif start_second is None and duration is not None:
+ return obj[:duration * 1000]
+ else:
+ return obj[0:duration * 1000]
+
+
+ @classmethod
+ def from_file(cls, file, format=None, codec=None, parameters=None, start_second=None, duration=None, **kwargs):
+ orig_file = file
+ try:
+ filename = fsdecode(file)
+ except TypeError:
+ filename = None
+ file, close_file = _fd_or_path_or_tempfile(file, 'rb', tempfile=False)
+
+ if format:
+ format = format.lower()
+ format = AUDIO_FILE_EXT_ALIASES.get(format, format)
+
+ def is_format(f):
+ f = f.lower()
+ if format == f:
+ return True
+
+ if filename:
+ return filename.lower().endswith(".{0}".format(f))
+
+ return False
+
+ if is_format("wav"):
+ try:
+ if start_second is None and duration is None:
+ return cls._from_safe_wav(file)
+ elif start_second is not None and duration is None:
+ return cls._from_safe_wav(file)[start_second*1000:]
+ elif start_second is None and duration is not None:
+ return cls._from_safe_wav(file)[:duration*1000]
+ else:
+ return cls._from_safe_wav(file)[start_second*1000:(start_second+duration)*1000]
+ except:
+ file.seek(0)
+ elif is_format("raw") or is_format("pcm"):
+ sample_width = kwargs['sample_width']
+ frame_rate = kwargs['frame_rate']
+ channels = kwargs['channels']
+ metadata = {
+ 'sample_width': sample_width,
+ 'frame_rate': frame_rate,
+ 'channels': channels,
+ 'frame_width': channels * sample_width
+ }
+ if start_second is None and duration is None:
+ return cls(data=file.read(), metadata=metadata)
+ elif start_second is not None and duration is None:
+ return cls(data=file.read(), metadata=metadata)[start_second*1000:]
+ elif start_second is None and duration is not None:
+ return cls(data=file.read(), metadata=metadata)[:duration*1000]
+ else:
+ return cls(data=file.read(), metadata=metadata)[start_second*1000:(start_second+duration)*1000]
+
+ conversion_command = [cls.converter,
+ '-y', # always overwrite existing files
+ ]
+
+ # If format is not defined
+ # ffmpeg/avconv will detect it automatically
+ if format:
+ conversion_command += ["-f", format]
+
+ if codec:
+ # force audio decoder
+ conversion_command += ["-acodec", codec]
+
+ read_ahead_limit = kwargs.get('read_ahead_limit', -1)
+ if filename:
+ conversion_command += ["-i", filename]
+ stdin_parameter = None
+ stdin_data = None
+ else:
+ if cls.converter == 'ffmpeg':
+ conversion_command += ["-read_ahead_limit", str(read_ahead_limit),
+ "-i", "cache:pipe:0"]
+ else:
+ conversion_command += ["-i", "-"]
+ stdin_parameter = subprocess.PIPE
+ stdin_data = file.read()
+
+ if codec:
+ info = None
+ else:
+ info = mediainfo_json(orig_file, read_ahead_limit=read_ahead_limit)
+ if info:
+ audio_streams = [x for x in info['streams']
+ if x['codec_type'] == 'audio']
+ # This is a workaround for some ffprobe versions that always say
+ # that mp3/mp4/aac/webm/ogg files contain fltp samples
+ audio_codec = audio_streams[0].get('codec_name')
+ if (audio_streams[0].get('sample_fmt') == 'fltp' and
+ audio_codec in ['mp3', 'mp4', 'aac', 'webm', 'ogg']):
+ bits_per_sample = 16
+ else:
+ bits_per_sample = audio_streams[0]['bits_per_sample']
+ if bits_per_sample == 8:
+ acodec = 'pcm_u8'
+ else:
+ acodec = 'pcm_s%dle' % bits_per_sample
+
+ conversion_command += ["-acodec", acodec]
+
+ conversion_command += [
+ "-vn", # Drop any video streams if there are any
+ "-f", "wav" # output options (filename last)
+ ]
+
+ if start_second is not None:
+ conversion_command += ["-ss", str(start_second)]
+
+ if duration is not None:
+ conversion_command += ["-t", str(duration)]
+
+ conversion_command += ["-"]
+
+ if parameters is not None:
+ # extend arguments with arbitrary set
+ conversion_command.extend(parameters)
+
+ log_conversion(conversion_command)
+
+ p = subprocess.Popen(conversion_command, stdin=stdin_parameter,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ p_out, p_err = p.communicate(input=stdin_data)
+
+ if p.returncode != 0 or len(p_out) == 0:
+ if close_file:
+ file.close()
+ raise CouldntDecodeError(
+ "Decoding failed. ffmpeg returned error code: {0}\n\nOutput from ffmpeg/avlib:\n\n{1}".format(
+ p.returncode, p_err.decode(errors='ignore') ))
+
+ p_out = bytearray(p_out)
+ fix_wav_headers(p_out)
+ p_out = bytes(p_out)
+ obj = cls(p_out)
+
+ if close_file:
+ file.close()
+
+ if start_second is None and duration is None:
+ return obj
+ elif start_second is not None and duration is None:
+ return obj[0:]
+ elif start_second is None and duration is not None:
+ return obj[:duration * 1000]
+ else:
+ return obj[0:duration * 1000]
+
+ @classmethod
+ def from_mp3(cls, file, parameters=None):
+ return cls.from_file(file, 'mp3', parameters=parameters)
+
+ @classmethod
+ def from_flv(cls, file, parameters=None):
+ return cls.from_file(file, 'flv', parameters=parameters)
+
+ @classmethod
+ def from_ogg(cls, file, parameters=None):
+ return cls.from_file(file, 'ogg', parameters=parameters)
+
+ @classmethod
+ def from_wav(cls, file, parameters=None):
+ return cls.from_file(file, 'wav', parameters=parameters)
+
+ @classmethod
+ def from_raw(cls, file, **kwargs):
+ return cls.from_file(file, 'raw', sample_width=kwargs['sample_width'], frame_rate=kwargs['frame_rate'],
+ channels=kwargs['channels'])
+
+ @classmethod
+ def _from_safe_wav(cls, file):
+ file, close_file = _fd_or_path_or_tempfile(file, 'rb', tempfile=False)
+ file.seek(0)
+ obj = cls(data=file)
+ if close_file:
+ file.close()
+ return obj
+
+ def export(self, out_f=None, format='mp3', codec=None, bitrate=None, parameters=None, tags=None, id3v2_version='4',
+ cover=None):
+ """
+ Export an AudioSegment to a file with given options
+
+ out_f (string):
+ Path to destination audio file. Also accepts os.PathLike objects on
+ python >= 3.6
+
+ format (string)
+ Format for destination audio file.
+ ('mp3', 'wav', 'raw', 'ogg' or other ffmpeg/avconv supported files)
+
+ codec (string)
+ Codec used to encode the destination file.
+
+ bitrate (string)
+ Bitrate used when encoding destination file. (64, 92, 128, 256, 312k...)
+ Each codec accepts different bitrate arguments so take a look at the
+ ffmpeg documentation for details (bitrate usually shown as -b, -ba or
+ -a:b).
+
+ parameters (list of strings)
+ Aditional ffmpeg/avconv parameters
+
+ tags (dict)
+ Set metadata information to destination files
+ usually used as tags. ({title='Song Title', artist='Song Artist'})
+
+ id3v2_version (string)
+ Set ID3v2 version for tags. (default: '4')
+
+ cover (file)
+ Set cover for audio file from image file. (png or jpg)
+ """
+ id3v2_allowed_versions = ['3', '4']
+
+ if format == "raw" and (codec is not None or parameters is not None):
+ raise AttributeError(
+ 'Can not invoke ffmpeg when export format is "raw"; '
+ 'specify an ffmpeg raw format like format="s16le" instead '
+ 'or call export(format="raw") with no codec or parameters')
+
+ out_f, _ = _fd_or_path_or_tempfile(out_f, 'wb+')
+ out_f.seek(0)
+
+ if format == "raw":
+ out_f.write(self._data)
+ out_f.seek(0)
+ return out_f
+
+ # wav with no ffmpeg parameters can just be written directly to out_f
+ easy_wav = format == "wav" and codec is None and parameters is None
+
+ if easy_wav:
+ data = out_f
+ else:
+ data = NamedTemporaryFile(mode="wb", delete=False)
+
+ pcm_for_wav = self._data
+ if self.sample_width == 1:
+ # convert to unsigned integers for wav
+ pcm_for_wav = audioop.bias(self._data, 1, 128)
+
+ wave_data = wave.open(data, 'wb')
+ wave_data.setnchannels(self.channels)
+ wave_data.setsampwidth(self.sample_width)
+ wave_data.setframerate(self.frame_rate)
+ # For some reason packing the wave header struct with
+ # a float in python 2 doesn't throw an exception
+ wave_data.setnframes(int(self.frame_count()))
+ wave_data.writeframesraw(pcm_for_wav)
+ wave_data.close()
+
+ # for easy wav files, we're done (wav data is written directly to out_f)
+ if easy_wav:
+ out_f.seek(0)
+ return out_f
+
+ output = NamedTemporaryFile(mode="w+b", delete=False)
+
+ # build converter command to export
+ conversion_command = [
+ self.converter,
+ '-y', # always overwrite existing files
+ "-f", "wav", "-i", data.name, # input options (filename last)
+ ]
+
+ if codec is None:
+ codec = self.DEFAULT_CODECS.get(format, None)
+
+ if cover is not None:
+ if cover.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tif', '.tiff')) and format == "mp3":
+ conversion_command.extend(["-i", cover, "-map", "0", "-map", "1", "-c:v", "mjpeg"])
+ else:
+ raise AttributeError(
+ "Currently cover images are only supported by MP3 files. The allowed image formats are: .tif, .jpg, .bmp, .jpeg and .png.")
+
+ if codec is not None:
+ # force audio encoder
+ conversion_command.extend(["-acodec", codec])
+
+ if bitrate is not None:
+ conversion_command.extend(["-b:a", bitrate])
+
+ if parameters is not None:
+ # extend arguments with arbitrary set
+ conversion_command.extend(parameters)
+
+ if tags is not None:
+ if not isinstance(tags, dict):
+ raise InvalidTag("Tags must be a dictionary.")
+ else:
+ # Extend converter command with tags
+ # print(tags)
+ for key, value in tags.items():
+ conversion_command.extend(
+ ['-metadata', '{0}={1}'.format(key, value)])
+
+ if format == 'mp3':
+ # set id3v2 tag version
+ if id3v2_version not in id3v2_allowed_versions:
+ raise InvalidID3TagVersion(
+ "id3v2_version not allowed, allowed versions: %s" % id3v2_allowed_versions)
+ conversion_command.extend([
+ "-id3v2_version", id3v2_version
+ ])
+
+ if sys.platform == 'darwin' and codec == 'mp3':
+ conversion_command.extend(["-write_xing", "0"])
+
+ conversion_command.extend([
+ "-f", format, output.name, # output options (filename last)
+ ])
+
+ log_conversion(conversion_command)
+
+ # read stdin / write stdout
+ with open(os.devnull, 'rb') as devnull:
+ p = subprocess.Popen(conversion_command, stdin=devnull, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ p_out, p_err = p.communicate()
+
+ log_subprocess_output(p_out)
+ log_subprocess_output(p_err)
+
+ if p.returncode != 0:
+ raise CouldntEncodeError(
+ "Encoding failed. ffmpeg/avlib returned error code: {0}\n\nCommand:{1}\n\nOutput from ffmpeg/avlib:\n\n{2}".format(
+ p.returncode, conversion_command, p_err.decode(errors='ignore') ))
+
+ output.seek(0)
+ out_f.write(output.read())
+
+ data.close()
+ output.close()
+
+ os.unlink(data.name)
+ os.unlink(output.name)
+
+ out_f.seek(0)
+ return out_f
+
+ def get_frame(self, index):
+ frame_start = index * self.frame_width
+ frame_end = frame_start + self.frame_width
+ return self._data[frame_start:frame_end]
+
+ def frame_count(self, ms=None):
+ """
+ returns the number of frames for the given number of milliseconds, or
+ if not specified, the number of frames in the whole AudioSegment
+ """
+ if ms is not None:
+ return ms * (self.frame_rate / 1000.0)
+ else:
+ return float(len(self._data) // self.frame_width)
+
+ def set_sample_width(self, sample_width):
+ if sample_width == self.sample_width:
+ return self
+
+ frame_width = self.channels * sample_width
+
+ return self._spawn(
+ audioop.lin2lin(self._data, self.sample_width, sample_width),
+ overrides={'sample_width': sample_width, 'frame_width': frame_width}
+ )
+
+ def set_frame_rate(self, frame_rate):
+ if frame_rate == self.frame_rate:
+ return self
+
+ if self._data:
+ converted, _ = audioop.ratecv(self._data, self.sample_width,
+ self.channels, self.frame_rate,
+ frame_rate, None)
+ else:
+ converted = self._data
+
+ return self._spawn(data=converted,
+ overrides={'frame_rate': frame_rate})
+
+ def set_channels(self, channels):
+ if channels == self.channels:
+ return self
+
+ if channels == 2 and self.channels == 1:
+ fn = audioop.tostereo
+ frame_width = self.frame_width * 2
+ fac = 1
+ converted = fn(self._data, self.sample_width, fac, fac)
+ elif channels == 1 and self.channels == 2:
+ fn = audioop.tomono
+ frame_width = self.frame_width // 2
+ fac = 0.5
+ converted = fn(self._data, self.sample_width, fac, fac)
+ elif channels == 1:
+ channels_data = [seg.get_array_of_samples() for seg in self.split_to_mono()]
+ frame_count = int(self.frame_count())
+ converted = array.array(
+ channels_data[0].typecode,
+ b'\0' * (frame_count * self.sample_width)
+ )
+ for raw_channel_data in channels_data:
+ for i in range(frame_count):
+ converted[i] += raw_channel_data[i] // self.channels
+ frame_width = self.frame_width // self.channels
+ elif self.channels == 1:
+ dup_channels = [self for iChannel in range(channels)]
+ return AudioSegment.from_mono_audiosegments(*dup_channels)
+ else:
+ raise ValueError(
+ "AudioSegment.set_channels only supports mono-to-multi channel and multi-to-mono channel conversion")
+
+ return self._spawn(data=converted,
+ overrides={
+ 'channels': channels,
+ 'frame_width': frame_width})
+
+ def split_to_mono(self):
+ if self.channels == 1:
+ return [self]
+
+ samples = self.get_array_of_samples()
+
+ mono_channels = []
+ for i in range(self.channels):
+ samples_for_current_channel = samples[i::self.channels]
+
+ try:
+ mono_data = samples_for_current_channel.tobytes()
+ except AttributeError:
+ mono_data = samples_for_current_channel.tostring()
+
+ mono_channels.append(
+ self._spawn(mono_data, overrides={"channels": 1, "frame_width": self.sample_width})
+ )
+
+ return mono_channels
+
+ @property
+ def rms(self):
+ return audioop.rms(self._data, self.sample_width)
+
+ @property
+ def dBFS(self):
+ rms = self.rms
+ if not rms:
+ return -float("infinity")
+ return ratio_to_db(self.rms / self.max_possible_amplitude)
+
+ @property
+ def max(self):
+ return audioop.max(self._data, self.sample_width)
+
+ @property
+ def max_possible_amplitude(self):
+ bits = self.sample_width * 8
+ max_possible_val = (2 ** bits)
+
+ # since half is above 0 and half is below the max amplitude is divided
+ return max_possible_val / 2
+
+ @property
+ def max_dBFS(self):
+ return ratio_to_db(self.max, self.max_possible_amplitude)
+
+ @property
+ def duration_seconds(self):
+ return self.frame_rate and self.frame_count() / self.frame_rate or 0.0
+
+ def get_dc_offset(self, channel=1):
+ """
+ Returns a value between -1.0 and 1.0 representing the DC offset of a
+ channel (1 for left, 2 for right).
+ """
+ if not 1 <= channel <= 2:
+ raise ValueError("channel value must be 1 (left) or 2 (right)")
+
+ if self.channels == 1:
+ data = self._data
+ elif channel == 1:
+ data = audioop.tomono(self._data, self.sample_width, 1, 0)
+ else:
+ data = audioop.tomono(self._data, self.sample_width, 0, 1)
+
+ return float(audioop.avg(data, self.sample_width)) / self.max_possible_amplitude
+
+ def remove_dc_offset(self, channel=None, offset=None):
+ """
+ Removes DC offset of given channel. Calculates offset if it's not given.
+ Offset values must be in range -1.0 to 1.0. If channel is None, removes
+ DC offset from all available channels.
+ """
+ if channel and not 1 <= channel <= 2:
+ raise ValueError("channel value must be None, 1 (left) or 2 (right)")
+
+ if offset and not -1.0 <= offset <= 1.0:
+ raise ValueError("offset value must be in range -1.0 to 1.0")
+
+ if offset:
+ offset = int(round(offset * self.max_possible_amplitude))
+
+ def remove_data_dc(data, off):
+ if not off:
+ off = audioop.avg(data, self.sample_width)
+ return audioop.bias(data, self.sample_width, -off)
+
+ if self.channels == 1:
+ return self._spawn(data=remove_data_dc(self._data, offset))
+
+ left_channel = audioop.tomono(self._data, self.sample_width, 1, 0)
+ right_channel = audioop.tomono(self._data, self.sample_width, 0, 1)
+
+ if not channel or channel == 1:
+ left_channel = remove_data_dc(left_channel, offset)
+
+ if not channel or channel == 2:
+ right_channel = remove_data_dc(right_channel, offset)
+
+ left_channel = audioop.tostereo(left_channel, self.sample_width, 1, 0)
+ right_channel = audioop.tostereo(right_channel, self.sample_width, 0, 1)
+
+ return self._spawn(data=audioop.add(left_channel, right_channel,
+ self.sample_width))
+
+ def apply_gain(self, volume_change):
+ return self._spawn(data=audioop.mul(self._data, self.sample_width,
+ db_to_float(float(volume_change))))
+
+ def overlay(self, seg, position=0, loop=False, times=None, gain_during_overlay=None):
+ """
+ Overlay the provided segment on to this segment starting at the
+ specificed position and using the specfied looping beahvior.
+
+ seg (AudioSegment):
+ The audio segment to overlay on to this one.
+
+ position (optional int):
+ The position to start overlaying the provided segment in to this
+ one.
+
+ loop (optional bool):
+ Loop seg as many times as necessary to match this segment's length.
+ Overrides loops param.
+
+ times (optional int):
+ Loop seg the specified number of times or until it matches this
+ segment's length. 1 means once, 2 means twice, ... 0 would make the
+ call a no-op
+ gain_during_overlay (optional int):
+ Changes this segment's volume by the specified amount during the
+ duration of time that seg is overlaid on top of it. When negative,
+ this has the effect of 'ducking' the audio under the overlay.
+ """
+
+ if loop:
+ # match loop=True's behavior with new times (count) mechinism.
+ times = -1
+ elif times is None:
+ # no times specified, just once through
+ times = 1
+ elif times == 0:
+ # it's a no-op, make a copy since we never mutate
+ return self._spawn(self._data)
+
+ output = StringIO()
+
+ seg1, seg2 = AudioSegment._sync(self, seg)
+ sample_width = seg1.sample_width
+ spawn = seg1._spawn
+
+ output.write(seg1[:position]._data)
+
+ # drop down to the raw data
+ seg1 = seg1[position:]._data
+ seg2 = seg2._data
+ pos = 0
+ seg1_len = len(seg1)
+ seg2_len = len(seg2)
+ while times:
+ remaining = max(0, seg1_len - pos)
+ if seg2_len >= remaining:
+ seg2 = seg2[:remaining]
+ seg2_len = remaining
+ # we've hit the end, we're done looping (if we were) and this
+ # is our last go-around
+ times = 1
+
+ if gain_during_overlay:
+ seg1_overlaid = seg1[pos:pos + seg2_len]
+ seg1_adjusted_gain = audioop.mul(seg1_overlaid, self.sample_width,
+ db_to_float(float(gain_during_overlay)))
+ output.write(audioop.add(seg1_adjusted_gain, seg2, sample_width))
+ else:
+ output.write(audioop.add(seg1[pos:pos + seg2_len], seg2,
+ sample_width))
+ pos += seg2_len
+
+ # dec times to break our while loop (eventually)
+ times -= 1
+
+ output.write(seg1[pos:])
+
+ return spawn(data=output)
+
+ def append(self, seg, crossfade=100):
+ seg1, seg2 = AudioSegment._sync(self, seg)
+
+ if not crossfade:
+ return seg1._spawn(seg1._data + seg2._data)
+ elif crossfade > len(self):
+ raise ValueError("Crossfade is longer than the original AudioSegment ({}ms > {}ms)".format(
+ crossfade, len(self)
+ ))
+ elif crossfade > len(seg):
+ raise ValueError("Crossfade is longer than the appended AudioSegment ({}ms > {}ms)".format(
+ crossfade, len(seg)
+ ))
+
+ xf = seg1[-crossfade:].fade(to_gain=-120, start=0, end=float('inf'))
+ xf *= seg2[:crossfade].fade(from_gain=-120, start=0, end=float('inf'))
+
+ output = TemporaryFile()
+
+ output.write(seg1[:-crossfade]._data)
+ output.write(xf._data)
+ output.write(seg2[crossfade:]._data)
+
+ output.seek(0)
+ obj = seg1._spawn(data=output)
+ output.close()
+ return obj
+
+ def fade(self, to_gain=0, from_gain=0, start=None, end=None,
+ duration=None):
+ """
+ Fade the volume of this audio segment.
+
+ to_gain (float):
+ resulting volume_change in db
+
+ start (int):
+ default = beginning of the segment
+ when in this segment to start fading in milliseconds
+
+ end (int):
+ default = end of the segment
+ when in this segment to start fading in milliseconds
+
+ duration (int):
+ default = until the end of the audio segment
+ the duration of the fade
+ """
+ if None not in [duration, end, start]:
+ raise TypeError('Only two of the three arguments, "start", '
+ '"end", and "duration" may be specified')
+
+ # no fade == the same audio
+ if to_gain == 0 and from_gain == 0:
+ return self
+
+ start = min(len(self), start) if start is not None else None
+ end = min(len(self), end) if end is not None else None
+
+ if start is not None and start < 0:
+ start += len(self)
+ if end is not None and end < 0:
+ end += len(self)
+
+ if duration is not None and duration < 0:
+ raise InvalidDuration("duration must be a positive integer")
+
+ if duration:
+ if start is not None:
+ end = start + duration
+ elif end is not None:
+ start = end - duration
+ else:
+ duration = end - start
+
+ from_power = db_to_float(from_gain)
+
+ output = []
+
+ # original data - up until the crossfade portion, as is
+ before_fade = self[:start]._data
+ if from_gain != 0:
+ before_fade = audioop.mul(before_fade,
+ self.sample_width,
+ from_power)
+ output.append(before_fade)
+
+ gain_delta = db_to_float(to_gain) - from_power
+
+ # fades longer than 100ms can use coarse fading (one gain step per ms),
+ # shorter fades will have audible clicks so they use precise fading
+ # (one gain step per sample)
+ if duration > 100:
+ scale_step = gain_delta / duration
+
+ for i in range(duration):
+ volume_change = from_power + (scale_step * i)
+ chunk = self[start + i]
+ chunk = audioop.mul(chunk._data,
+ self.sample_width,
+ volume_change)
+
+ output.append(chunk)
+ else:
+ start_frame = self.frame_count(ms=start)
+ end_frame = self.frame_count(ms=end)
+ fade_frames = end_frame - start_frame
+ scale_step = gain_delta / fade_frames
+
+ for i in range(int(fade_frames)):
+ volume_change = from_power + (scale_step * i)
+ sample = self.get_frame(int(start_frame + i))
+ sample = audioop.mul(sample, self.sample_width, volume_change)
+
+ output.append(sample)
+
+ # original data after the crossfade portion, at the new volume
+ after_fade = self[end:]._data
+ if to_gain != 0:
+ after_fade = audioop.mul(after_fade,
+ self.sample_width,
+ db_to_float(to_gain))
+ output.append(after_fade)
+
+ return self._spawn(data=output)
+
+ def fade_out(self, duration):
+ return self.fade(to_gain=-120, duration=duration, end=float('inf'))
+
+ def fade_in(self, duration):
+ return self.fade(from_gain=-120, duration=duration, start=0)
+
+ def reverse(self):
+ return self._spawn(
+ data=audioop.reverse(self._data, self.sample_width)
+ )
+
+ def _repr_html_(self):
+ src = """
+ <audio controls>
+ <source src="data:audio/mpeg;base64,{base64}" type="audio/mpeg"/>
+ Your browser does not support the audio element.
+ </audio>
+ """
+ fh = self.export()
+ data = base64.b64encode(fh.read()).decode('ascii')
+ return src.format(base64=data)
+
+
+from . import effects