Source code for librosa.segment

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Temporal segmentation
=====================

Recurrence and self-similarity
------------------------------
.. autosummary::
    :toctree: generated/

    cross_similarity
    recurrence_matrix
    recurrence_to_lag
    lag_to_recurrence
    timelag_filter
    path_enhance

Temporal clustering
-------------------
.. autosummary::
    :toctree: generated/

    agglomerative
    subsegment
"""

from decorator import decorator

import numpy as np
import scipy
import scipy.signal
import scipy.ndimage

import sklearn
import sklearn.cluster
import sklearn.feature_extraction
import sklearn.neighbors

from ._cache import cache
from . import util
from .filters import diagonal_filter
from .util.exceptions import ParameterError
from typing import Any, Callable, Optional, TypeVar, Union, overload
from typing_extensions import Literal
from ._typing import _WindowSpec, _FloatLike_co

__all__ = [
    "cross_similarity",
    "recurrence_matrix",
    "recurrence_to_lag",
    "lag_to_recurrence",
    "timelag_filter",
    "agglomerative",
    "subsegment",
    "path_enhance",
]


@overload
def cross_similarity(
    data: np.ndarray,
    data_ref: np.ndarray,
    *,
    k: Optional[int] = ...,
    metric: str = ...,
    sparse: Literal[False] = ...,
    mode: str = ...,
    bandwidth: Optional[Union[np.ndarray, _FloatLike_co, str]] = None,
    full: bool = False,
) -> np.ndarray:
    ...


@overload
def cross_similarity(
    data: np.ndarray,
    data_ref: np.ndarray,
    *,
    k: Optional[int] = ...,
    metric: str = ...,
    sparse: Literal[True] = ...,
    mode: str = ...,
    bandwidth: Optional[Union[np.ndarray, _FloatLike_co, str]] = None,
    full: bool = False,
) -> scipy.sparse.csc_matrix:
    ...


[docs]@cache(level=30) def cross_similarity( data: np.ndarray, data_ref: np.ndarray, *, k: Optional[int] = None, metric: str = "euclidean", sparse: bool = False, mode: str = "connectivity", bandwidth: Optional[Union[np.ndarray, _FloatLike_co, str]] = None, full: bool = False, ) -> Union[np.ndarray, scipy.sparse.csc_matrix]: """Compute cross-similarity from one data sequence to a reference sequence. The output is a matrix ``xsim``, where ``xsim[i, j]`` is non-zero if ``data_ref[..., i]`` is a k-nearest neighbor of ``data[..., j]``. Parameters ---------- data : np.ndarray [shape=(..., d, n)] A feature matrix for the comparison sequence. If the data has more than two dimensions (e.g., for multi-channel inputs), the leading dimensions are flattened prior to comparison. For example, a stereo input with shape `(2, d, n)` is automatically reshaped to `(2 * d, n)`. data_ref : np.ndarray [shape=(..., d, n_ref)] A feature matrix for the reference sequence If the data has more than two dimensions (e.g., for multi-channel inputs), the leading dimensions are flattened prior to comparison. For example, a stereo input with shape `(2, d, n_ref)` is automatically reshaped to `(2 * d, n_ref)`. k : int > 0 [scalar] or None the number of nearest-neighbors for each sample Default: ``k = 2 * ceil(sqrt(n_ref))``, or ``k = 2`` if ``n_ref <= 3`` metric : str Distance metric to use for nearest-neighbor calculation. See `sklearn.neighbors.NearestNeighbors` for details. sparse : bool [scalar] if False, returns a dense type (ndarray) if True, returns a sparse type (scipy.sparse.csc_matrix) mode : str, {'connectivity', 'distance', 'affinity'} If 'connectivity', a binary connectivity matrix is produced. If 'distance', then a non-zero entry contains the distance between points. If 'affinity', then non-zero entries are mapped to ``exp( - distance(i, j) / bandwidth)`` where ``bandwidth`` is as specified below. bandwidth : None, float > 0, ndarray, or str str options include ``{'med_k_scalar', 'mean_k', 'gmean_k', 'mean_k_avg', 'gmean_k_avg', 'mean_k_avg_and_pair'}`` If ndarray is supplied, use ndarray as bandwidth for each i,j pair. If using ``mode='affinity'``, this can be used to set the bandwidth on the affinity kernel. If no value is provided or ``None``, default to ``'med_k_scalar'``. If ``bandwidth='med_k_scalar'``, bandwidth is set automatically to the median distance to the k'th nearest neighbor of each ``data[:, i]``. If ``bandwidth='mean_k'``, bandwidth is estimated for each sample-pair (i, j) by taking the arithmetic mean between distances to the k-th nearest neighbor for sample i and sample j. If ``bandwidth='gmean_k'``, bandwidth is estimated for each sample-pair (i, j) by taking the geometric mean between distances to the k-th nearest neighbor for sample i and j [#z]_. If ``bandwidth='mean_k_avg'``, bandwidth is estimated for each sample-pair (i, j) by taking the arithmetic mean between the average distances to the first k-th nearest neighbors for sample i and sample j. This is similar to the approach in Wang et al. (2014) [#w]_ but does not include the distance between i and j. If ``bandwidth='gmean_k_avg'``, bandwidth is estimated for each sample-pair (i, j) by taking the geometric mean between the average distances to the first k-th nearest neighbors for sample i and sample j. If ``bandwidth='mean_k_avg_and_pair'``, bandwidth is estimated for each sample-pair (i, j) by taking the arithmetic mean between three terms: the average distances to the first k-th nearest neighbors for sample i and sample j respectively, as well as the distance between i and j. This is similar to the approach in Wang et al. (2014). [#w]_ .. [#z] Zelnik-Manor, Lihi, and Pietro Perona. (2004). "Self-tuning spectral clustering." Advances in neural information processing systems 17. .. [#w] Wang, Bo, et al. (2014). "Similarity network fusion for aggregating data types on a genomic scale." Nat Methods 11, 333–337. https://doi.org/10.1038/nmeth.2810 full : bool If using ``mode ='affinity'`` or ``mode='distance'``, this option can be used to compute the full affinity or distance matrix as opposed a sparse matrix with only none-zero terms for the first k-neighbors of each sample. This option has no effect when using ``mode='connectivity'``. When using ``mode='distance'``, setting ``full=True`` will ignore ``k`` and ``width``. When using ``mode='affinity'``, setting ``full=True`` will use ``k`` exclusively for bandwidth estimation, and ignore ``width``. Returns ------- xsim : np.ndarray or scipy.sparse.csc_matrix, [shape=(n_ref, n)] Cross-similarity matrix See Also -------- recurrence_matrix recurrence_to_lag librosa.feature.stack_memory sklearn.neighbors.NearestNeighbors scipy.spatial.distance.cdist Notes ----- This function caches at level 30. Examples -------- Find nearest neighbors in CQT space between two sequences >>> hop_length = 1024 >>> y_ref, sr = librosa.load(librosa.ex('pistachio')) >>> y_comp, sr = librosa.load(librosa.ex('pistachio'), offset=10) >>> chroma_ref = librosa.feature.chroma_cqt(y=y_ref, sr=sr, hop_length=hop_length) >>> chroma_comp = librosa.feature.chroma_cqt(y=y_comp, sr=sr, hop_length=hop_length) >>> # Use time-delay embedding to get a cleaner recurrence matrix >>> x_ref = librosa.feature.stack_memory(chroma_ref, n_steps=10, delay=3) >>> x_comp = librosa.feature.stack_memory(chroma_comp, n_steps=10, delay=3) >>> xsim = librosa.segment.cross_similarity(x_comp, x_ref) Or fix the number of nearest neighbors to 5 >>> xsim = librosa.segment.cross_similarity(x_comp, x_ref, k=5) Use cosine similarity instead of Euclidean distance >>> xsim = librosa.segment.cross_similarity(x_comp, x_ref, metric='cosine') Use an affinity matrix instead of binary connectivity >>> xsim_aff = librosa.segment.cross_similarity(x_comp, x_ref, metric='cosine', mode='affinity') Plot the feature and recurrence matrices >>> import matplotlib.pyplot as plt >>> fig, ax = plt.subplots(ncols=2, sharex=True, sharey=True) >>> imgsim = librosa.display.specshow(xsim, x_axis='s', y_axis='s', ... hop_length=hop_length, ax=ax[0]) >>> ax[0].set(title='Binary cross-similarity (symmetric)') >>> imgaff = librosa.display.specshow(xsim_aff, x_axis='s', y_axis='s', ... cmap='magma_r', hop_length=hop_length, ax=ax[1]) >>> ax[1].set(title='Cross-affinity') >>> ax[1].label_outer() >>> fig.colorbar(imgsim, ax=ax[0], orientation='horizontal', ticks=[0, 1]) >>> fig.colorbar(imgaff, ax=ax[1], orientation='horizontal') """ data_ref = np.atleast_2d(data_ref) data = np.atleast_2d(data) if not np.allclose(data_ref.shape[:-1], data.shape[:-1]): raise ParameterError( f"data_ref.shape={data_ref.shape} and data.shape={data.shape} do not match on leading dimension(s)" ) # swap data axes so the feature axis is last data_ref = np.swapaxes(data_ref, -1, 0) n_ref = data_ref.shape[0] # Use F-ordering for reshape to preserve leading axis data_ref = data_ref.reshape((n_ref, -1), order="F") data = np.swapaxes(data, -1, 0) n = data.shape[0] data = data.reshape((n, -1), order="F") if mode not in ["connectivity", "distance", "affinity"]: raise ParameterError( ( f"Invalid mode='{mode}'. Must be one of " "['connectivity', 'distance', 'affinity']" ) ) if k is None: k = min(n_ref, 2 * np.ceil(np.sqrt(n_ref))) k = int(k) # using k for bandwidth estimation also and decouple k for full mode bandwidth_k = k if full and (mode != "connectivity"): k = n # Build the neighbor search object # `auto` mode does not work with some choices of metric. Rather than special-case # those here, we instead use a fall-back to brute force if auto fails. try: knn = sklearn.neighbors.NearestNeighbors( n_neighbors=min(n_ref, k), metric=metric, algorithm="auto" ) except ValueError: knn = sklearn.neighbors.NearestNeighbors( n_neighbors=min(n_ref, k), metric=metric, algorithm="brute" ) knn.fit(data_ref) # Get the knn graph if mode == "affinity": # sklearn's nearest neighbor doesn't support affinity, # so we use distance here and then do the conversion post-hoc kng_mode = "distance" else: kng_mode = mode xsim = knn.kneighbors_graph(X=data, mode=kng_mode).tolil() if not full: # Retain only the top-k links per point for i in range(n): # Get the links from point i links = xsim[i].nonzero()[1] # Order them ascending idx = links[np.argsort(xsim[i, links].toarray())][0] # Everything past the kth closest gets squashed xsim[i, idx[k:]] = 0 # Convert a compressed sparse row (CSR) format xsim = xsim.tocsr() xsim.eliminate_zeros() if mode == "connectivity": xsim = xsim.astype(bool) elif mode == "affinity": aff_bandwidth = __affinity_bandwidth(xsim, bandwidth, bandwidth_k) xsim.data[:] = np.exp(xsim.data / (-1 * aff_bandwidth)) # Transpose to n_ref by n xsim = xsim.T if not sparse: xsim = xsim.toarray() return xsim
@overload def recurrence_matrix( data: np.ndarray, *, k: Optional[int] = ..., width: int = ..., metric: str = ..., sym: bool = ..., sparse: Literal[True] = ..., mode: str = ..., bandwidth: Optional[Union[np.ndarray, _FloatLike_co, str]] = ..., self: bool = ..., axis: int = ..., full: bool = False, ) -> scipy.sparse.csc_matrix: ... @overload def recurrence_matrix( data: np.ndarray, *, k: Optional[int] = ..., width: int = ..., metric: str = ..., sym: bool = ..., sparse: Literal[False] = ..., mode: str = ..., bandwidth: Optional[Union[np.ndarray, _FloatLike_co, str]] = ..., self: bool = ..., axis: int = ..., full: bool = False, ) -> np.ndarray: ...
[docs]@cache(level=30) def recurrence_matrix( data: np.ndarray, *, k: Optional[int] = None, width: int = 1, metric: str = "euclidean", sym: bool = False, sparse: bool = False, mode: str = "connectivity", bandwidth: Optional[Union[np.ndarray, _FloatLike_co, str]] = None, self: bool = False, axis: int = -1, full: bool = False, ) -> Union[np.ndarray, scipy.sparse.csc_matrix]: """Compute a recurrence matrix from a data matrix. ``rec[i, j]`` is non-zero if ``data[..., i]`` is a k-nearest neighbor of ``data[..., j]`` and ``|i - j| >= width`` The specific value of ``rec[i, j]`` can have several forms, governed by the ``mode`` parameter below: - Connectivity: ``rec[i, j] = 1 or 0`` indicates that frames ``i`` and ``j`` are repetitions - Affinity: ``rec[i, j] > 0`` measures how similar frames ``i`` and ``j`` are. This is also known as a (sparse) self-similarity matrix. - Distance: ``rec[i, j] > 0`` measures how distant frames ``i`` and ``j`` are. This is also known as a (sparse) self-distance matrix. The general term *recurrence matrix* can refer to any of the three forms above. Parameters ---------- data : np.ndarray [shape=(..., d, n)] A feature matrix. If the data has more than two dimensions (e.g., for multi-channel inputs), the leading dimensions are flattened prior to comparison. For example, a stereo input with shape `(2, d, n)` is automatically reshaped to `(2 * d, n)`. k : int > 0 [scalar] or None the number of nearest-neighbors for each sample Default: ``k = 2 * ceil(sqrt(t - 2 * width + 1))``, or ``k = 2`` if ``t <= 2 * width + 1`` width : int >= 1 [scalar] only link neighbors ``(data[..., i], data[..., j])`` if ``|i - j| >= width`` ``width`` cannot exceed the length of the data. metric : str Distance metric to use for nearest-neighbor calculation. See `sklearn.neighbors.NearestNeighbors` for details. sym : bool [scalar] set ``sym=True`` to only link mutual nearest-neighbors sparse : bool [scalar] if False, returns a dense type (ndarray) if True, returns a sparse type (scipy.sparse.csc_matrix) mode : str, {'connectivity', 'distance', 'affinity'} If 'connectivity', a binary connectivity matrix is produced. If 'distance', then a non-zero entry contains the distance between points. If 'affinity', then non-zero entries are mapped to ``exp( - distance(i, j) / bandwidth)`` where ``bandwidth`` is as specified below. bandwidth : None, float > 0, ndarray, or str str options include ``{'med_k_scalar', 'mean_k', 'gmean_k', 'mean_k_avg', 'gmean_k_avg', 'mean_k_avg_and_pair'}`` If ndarray is supplied, use ndarray as bandwidth for each i,j pair. If using ``mode='affinity'``, the ``bandwidth`` option can be used to set the bandwidth on the affinity kernel. If no value is provided or ``None``, default to ``'med_k_scalar'``. If ``bandwidth='med_k_scalar'``, a scalar bandwidth is set to the median distance of the k-th nearest neighbor for all samples. If ``bandwidth='mean_k'``, bandwidth is estimated for each sample-pair (i, j) by taking the arithmetic mean between distances to the k-th nearest neighbor for sample i and sample j. If ``bandwidth='gmean_k'``, bandwidth is estimated for each sample-pair (i, j) by taking the geometric mean between distances to the k-th nearest neighbor for sample i and j [#z]_. If ``bandwidth='mean_k_avg'``, bandwidth is estimated for each sample-pair (i, j) by taking the arithmetic mean between the average distances to the first k-th nearest neighbors for sample i and sample j. This is similar to the approach in Wang et al. (2014) [#w]_ but does not include the distance between i and j. If ``bandwidth='gmean_k_avg'``, bandwidth is estimated for each sample-pair (i, j) by taking the geometric mean between the average distances to the first k-th nearest neighbors for sample i and sample j. If ``bandwidth='mean_k_avg_and_pair'``, bandwidth is estimated for each sample-pair (i, j) by taking the arithmetic mean between three terms: the average distances to the first k-th nearest neighbors for sample i and sample j respectively, as well as the distance between i and j. This is similar to the approach in Wang et al. (2014). [#w]_ .. [#z] Zelnik-Manor, Lihi, and Pietro Perona. (2004). "Self-tuning spectral clustering." Advances in neural information processing systems 17. .. [#w] Wang, Bo, et al. (2014). "Similarity network fusion for aggregating data types on a genomic scale." Nat Methods 11, 333–337. https://doi.org/10.1038/nmeth.2810 self : bool If ``True``, then the main diagonal is populated with self-links: 0 if ``mode='distance'``, and 1 otherwise. If ``False``, the main diagonal is left empty. axis : int The axis along which to compute recurrence. By default, the last index (-1) is taken. full : bool If using ``mode ='affinity'`` or ``mode='distance'``, this option can be used to compute the full affinity or distance matrix as opposed a sparse matrix with only none-zero terms for the first k-neighbors of each sample. This option has no effect when using ``mode='connectivity'``. When using ``mode='distance'``, setting ``full=True`` will ignore ``k`` and ``width``. When using ``mode='affinity'``, setting ``full=True`` will use ``k`` exclusively for bandwidth estimation, and ignore ``width``. Returns ------- rec : np.ndarray or scipy.sparse.csc_matrix, [shape=(t, t)] Recurrence matrix See Also -------- sklearn.neighbors.NearestNeighbors scipy.spatial.distance.cdist librosa.feature.stack_memory recurrence_to_lag Notes ----- This function caches at level 30. Examples -------- Find nearest neighbors in CQT space >>> y, sr = librosa.load(librosa.ex('nutcracker')) >>> hop_length = 1024 >>> chroma = librosa.feature.chroma_cqt(y=y, sr=sr, hop_length=hop_length) >>> # Use time-delay embedding to get a cleaner recurrence matrix >>> chroma_stack = librosa.feature.stack_memory(chroma, n_steps=10, delay=3) >>> R = librosa.segment.recurrence_matrix(chroma_stack) Or fix the number of nearest neighbors to 5 >>> R = librosa.segment.recurrence_matrix(chroma_stack, k=5) Suppress neighbors within +- 7 frames >>> R = librosa.segment.recurrence_matrix(chroma_stack, width=7) Use cosine similarity instead of Euclidean distance >>> R = librosa.segment.recurrence_matrix(chroma_stack, metric='cosine') Require mutual nearest neighbors >>> R = librosa.segment.recurrence_matrix(chroma_stack, sym=True) Use an affinity matrix instead of binary connectivity >>> R_aff = librosa.segment.recurrence_matrix(chroma_stack, metric='cosine', ... mode='affinity') Plot the feature and recurrence matrices >>> import matplotlib.pyplot as plt >>> fig, ax = plt.subplots(ncols=2, sharex=True, sharey=True) >>> imgsim = librosa.display.specshow(R, x_axis='s', y_axis='s', ... hop_length=hop_length, ax=ax[0]) >>> ax[0].set(title='Binary recurrence (symmetric)') >>> imgaff = librosa.display.specshow(R_aff, x_axis='s', y_axis='s', ... hop_length=hop_length, cmap='magma_r', ax=ax[1]) >>> ax[1].set(title='Affinity recurrence') >>> ax[1].label_outer() >>> fig.colorbar(imgsim, ax=ax[0], orientation='horizontal', ticks=[0, 1]) >>> fig.colorbar(imgaff, ax=ax[1], orientation='horizontal') """ data = np.atleast_2d(data) # Swap observations to the first dimension and flatten the rest data = np.swapaxes(data, axis, 0) t = data.shape[0] # Use F-ordering here to preserve leading axis layout data = data.reshape((t, -1), order="F") if width < 1 or width >= (t - 1) // 2: raise ParameterError( "width={} must be at least 1 and at most (data.shape[{}] - 1) // 2={}".format( width, axis, (t - 1) // 2 ) ) if mode not in ["connectivity", "distance", "affinity"]: raise ParameterError( ( f"Invalid mode='{mode}'. Must be one of " "['connectivity', 'distance', 'affinity']" ) ) if k is None: k = 2 * np.ceil(np.sqrt(t - 2 * width + 1)) k = int(k) # using k for bandwidth estimation also and decouple k for full mode bandwidth_k = k if full and (mode != "connectivity"): k = t # Build the neighbor search object try: knn = sklearn.neighbors.NearestNeighbors( n_neighbors=min(t - 1, k + 2 * width), metric=metric, algorithm="auto" ) except ValueError: knn = sklearn.neighbors.NearestNeighbors( n_neighbors=min(t - 1, k + 2 * width), metric=metric, algorithm="brute" ) knn.fit(data) # Get the knn graph if mode == "affinity": kng_mode = "distance" else: kng_mode = mode rec = knn.kneighbors_graph(mode=kng_mode).tolil() if not full: # Remove connections within width for diag in range(-width + 1, width): rec.setdiag(0, diag) # Retain only the top-k links per point for i in range(t): # Get the links from point i links = rec[i].nonzero()[1] # Order them ascending idx = links[np.argsort(rec[i, links].toarray())][0] # Everything past the kth closest gets squashed rec[i, idx[k:]] = 0 if self: if mode == "connectivity": rec.setdiag(1) elif mode == "affinity": # we need to keep the self-loop in here, but not mess up the # bandwidth estimation # # using negative distances here preserves the structure without changing # the statistics of the data rec.setdiag(-1) else: rec.setdiag(0) # symmetrize if sym: # Note: this operation produces a CSR (compressed sparse row) matrix! # This is why we have to do it after filling the diagonal in self-mode rec = rec.minimum(rec.T) rec = rec.tocsr() rec.eliminate_zeros() if mode == "connectivity": rec = rec.astype(bool) elif mode == "affinity": # Set all the negatives back to 0 # Negatives are temporarily inserted above to preserve the sparsity structure # of the matrix without corrupting the bandwidth calculations rec.data[rec.data < 0] = 0.0 aff_bandwidth = __affinity_bandwidth(rec, bandwidth, bandwidth_k) rec.data[:] = np.exp(rec.data / (-1 * aff_bandwidth)) # Transpose to be column-major rec = rec.T if not sparse: rec = rec.toarray() return rec
_ArrayOrSparseMatrix = TypeVar( "_ArrayOrSparseMatrix", bound=Union[np.ndarray, scipy.sparse.spmatrix] )
[docs]def recurrence_to_lag( rec: _ArrayOrSparseMatrix, *, pad: bool = True, axis: int = -1 ) -> _ArrayOrSparseMatrix: """Convert a recurrence matrix into a lag matrix. ``lag[i, j] == rec[i+j, j]`` This transformation turns diagonal structures in the recurrence matrix into horizontal structures in the lag matrix. These horizontal structures can be used to infer changes in the repetition structure of a piece, e.g., the beginning of a new section as done in [#]_. .. [#] Serra, J., Müller, M., Grosche, P., & Arcos, J. L. (2014). Unsupervised music structure annotation by time series structure features and segment similarity. IEEE Transactions on Multimedia, 16(5), 1229-1240. Parameters ---------- rec : np.ndarray, or scipy.sparse.spmatrix [shape=(n, n)] A (binary) recurrence matrix, as returned by `recurrence_matrix` pad : bool If False, ``lag`` matrix is square, which is equivalent to assuming that the signal repeats itself indefinitely. If True, ``lag`` is padded with ``n`` zeros, which eliminates the assumption of repetition. axis : int The axis to keep as the ``time`` axis. The alternate axis will be converted to lag coordinates. Returns ------- lag : np.ndarray The recurrence matrix in (lag, time) (if ``axis=1``) or (time, lag) (if ``axis=0``) coordinates Raises ------ ParameterError : if ``rec`` is non-square See Also -------- recurrence_matrix lag_to_recurrence util.shear Examples -------- >>> y, sr = librosa.load(librosa.ex('nutcracker')) >>> hop_length = 1024 >>> chroma = librosa.feature.chroma_cqt(y=y, sr=sr, hop_length=hop_length) >>> chroma_stack = librosa.feature.stack_memory(chroma, n_steps=10, delay=3) >>> recurrence = librosa.segment.recurrence_matrix(chroma_stack) >>> lag_pad = librosa.segment.recurrence_to_lag(recurrence, pad=True) >>> lag_nopad = librosa.segment.recurrence_to_lag(recurrence, pad=False) >>> import matplotlib.pyplot as plt >>> fig, ax = plt.subplots(nrows=2, sharex=True) >>> librosa.display.specshow(lag_pad, x_axis='time', y_axis='lag', ... hop_length=hop_length, ax=ax[0]) >>> ax[0].set(title='Lag (zero-padded)') >>> ax[0].label_outer() >>> librosa.display.specshow(lag_nopad, x_axis='time', y_axis='lag', ... hop_length=hop_length, ax=ax[1]) >>> ax[1].set(title='Lag (no padding)') """ axis = np.abs(axis) if rec.ndim != 2 or rec.shape[0] != rec.shape[1]: raise ParameterError(f"non-square recurrence matrix shape: {rec.shape}") sparse = scipy.sparse.issparse(rec) if sparse: # suppress type check here, mypy doesn't know about issparse fmt = rec.format # type: ignore t = rec.shape[axis] if pad: if sparse: padding = np.asarray([[1, 0]], dtype=rec.dtype).swapaxes(axis, 0) if axis == 0: rec_fmt = "csr" else: rec_fmt = "csc" rec = scipy.sparse.kron(padding, rec, format=rec_fmt) else: padding = np.array([(0, 0), (0, 0)]) padding[(1 - axis), :] = [0, t] # Suppress type check, mypy doesn't know that rec is an ndarray here rec = np.pad(rec, padding, mode="constant") # type: ignore lag: _ArrayOrSparseMatrix = util.shear(rec, factor=-1, axis=axis) if sparse: # Suppress type check, mypy doesn't know # that lag is sparse here lag = lag.asformat(fmt) # type: ignore return lag
[docs]def lag_to_recurrence( lag: _ArrayOrSparseMatrix, *, axis: int = -1 ) -> _ArrayOrSparseMatrix: """Convert a lag matrix into a recurrence matrix. Parameters ---------- lag : np.ndarray or scipy.sparse.spmatrix A lag matrix, as produced by ``recurrence_to_lag`` axis : int The axis corresponding to the time dimension. The alternate axis will be interpreted in lag coordinates. Returns ------- rec : np.ndarray or scipy.sparse.spmatrix [shape=(n, n)] A recurrence matrix in (time, time) coordinates For sparse matrices, format will match that of ``lag``. Raises ------ ParameterError : if ``lag`` does not have the correct shape See Also -------- recurrence_to_lag Examples -------- >>> y, sr = librosa.load(librosa.ex('nutcracker')) >>> hop_length = 1024 >>> chroma = librosa.feature.chroma_cqt(y=y, sr=sr, hop_length=hop_length) >>> chroma_stack = librosa.feature.stack_memory(chroma, n_steps=10, delay=3) >>> recurrence = librosa.segment.recurrence_matrix(chroma_stack) >>> lag_pad = librosa.segment.recurrence_to_lag(recurrence, pad=True) >>> lag_nopad = librosa.segment.recurrence_to_lag(recurrence, pad=False) >>> rec_pad = librosa.segment.lag_to_recurrence(lag_pad) >>> rec_nopad = librosa.segment.lag_to_recurrence(lag_nopad) >>> import matplotlib.pyplot as plt >>> fig, ax = plt.subplots(nrows=2, ncols=2, sharex=True) >>> librosa.display.specshow(lag_pad, x_axis='s', y_axis='lag', ... hop_length=hop_length, ax=ax[0, 0]) >>> ax[0, 0].set(title='Lag (zero-padded)') >>> ax[0, 0].label_outer() >>> librosa.display.specshow(lag_nopad, x_axis='s', y_axis='time', ... hop_length=hop_length, ax=ax[0, 1]) >>> ax[0, 1].set(title='Lag (no padding)') >>> ax[0, 1].label_outer() >>> librosa.display.specshow(rec_pad, x_axis='s', y_axis='time', ... hop_length=hop_length, ax=ax[1, 0]) >>> ax[1, 0].set(title='Recurrence (with padding)') >>> librosa.display.specshow(rec_nopad, x_axis='s', y_axis='time', ... hop_length=hop_length, ax=ax[1, 1]) >>> ax[1, 1].set(title='Recurrence (without padding)') >>> ax[1, 1].label_outer() """ if axis not in [0, 1, -1]: raise ParameterError(f"Invalid target axis: {axis}") axis = np.abs(axis) if lag.ndim != 2 or ( lag.shape[0] != lag.shape[1] and lag.shape[1 - axis] != 2 * lag.shape[axis] ): raise ParameterError(f"Invalid lag matrix shape: {lag.shape}") # Since lag must be 2-dimensional, abs(axis) = axis t = lag.shape[axis] rec = util.shear(lag, factor=+1, axis=axis) sub_slice = [slice(None)] * rec.ndim sub_slice[1 - axis] = slice(t) rec_slice: _ArrayOrSparseMatrix = rec[tuple(sub_slice)] return rec_slice
_F = TypeVar("_F", bound=Callable[..., Any])
[docs]def timelag_filter(function: _F, pad: bool = True, index: int = 0) -> _F: """Apply a filter in the time-lag domain. This is primarily useful for adapting image filters to operate on `recurrence_to_lag` output. Using `timelag_filter` is equivalent to the following sequence of operations: >>> data_tl = librosa.segment.recurrence_to_lag(data) >>> data_filtered_tl = function(data_tl) >>> data_filtered = librosa.segment.lag_to_recurrence(data_filtered_tl) Parameters ---------- function : callable The filtering function to wrap, e.g., `scipy.ndimage.median_filter` pad : bool Whether to zero-pad the structure feature matrix index : int >= 0 If ``function`` accepts input data as a positional argument, it should be indexed by ``index`` Returns ------- wrapped_function : callable A new filter function which applies in time-lag space rather than time-time space. Examples -------- Apply a 31-bin median filter to the diagonal of a recurrence matrix. With default, parameters, this corresponds to a time window of about 0.72 seconds. >>> y, sr = librosa.load(librosa.ex('nutcracker'), duration=30) >>> chroma = librosa.feature.chroma_cqt(y=y, sr=sr) >>> chroma_stack = librosa.feature.stack_memory(chroma, n_steps=3, delay=3) >>> rec = librosa.segment.recurrence_matrix(chroma_stack) >>> from scipy.ndimage import median_filter >>> diagonal_median = librosa.segment.timelag_filter(median_filter) >>> rec_filtered = diagonal_median(rec, size=(1, 31), mode='mirror') Or with affinity weights >>> rec_aff = librosa.segment.recurrence_matrix(chroma_stack, mode='affinity') >>> rec_aff_fil = diagonal_median(rec_aff, size=(1, 31), mode='mirror') >>> import matplotlib.pyplot as plt >>> fig, ax = plt.subplots(nrows=2, ncols=2, sharex=True, sharey=True) >>> librosa.display.specshow(rec, y_axis='s', x_axis='s', ax=ax[0, 0]) >>> ax[0, 0].set(title='Raw recurrence matrix') >>> ax[0, 0].label_outer() >>> librosa.display.specshow(rec_filtered, y_axis='s', x_axis='s', ax=ax[0, 1]) >>> ax[0, 1].set(title='Filtered recurrence matrix') >>> ax[0, 1].label_outer() >>> librosa.display.specshow(rec_aff, x_axis='s', y_axis='s', ... cmap='magma_r', ax=ax[1, 0]) >>> ax[1, 0].set(title='Raw affinity matrix') >>> librosa.display.specshow(rec_aff_fil, x_axis='s', y_axis='s', ... cmap='magma_r', ax=ax[1, 1]) >>> ax[1, 1].set(title='Filtered affinity matrix') >>> ax[1, 1].label_outer() """ def __my_filter(wrapped_f, *args, **kwargs): """Wrap the filter with lag conversions""" # Map the input data into time-lag space args = list(args) args[index] = recurrence_to_lag(args[index], pad=pad) # Apply the filtering function result = wrapped_f(*args, **kwargs) # Map back into time-time and return return lag_to_recurrence(result) return decorator(__my_filter, function) # type: ignore
[docs]@cache(level=30) def subsegment( data: np.ndarray, frames: np.ndarray, *, n_segments: int = 4, axis: int = -1 ) -> np.ndarray: """Sub-divide a segmentation by feature clustering. Given a set of frame boundaries (``frames``), and a data matrix (``data``), each successive interval defined by ``frames`` is partitioned into ``n_segments`` by constrained agglomerative clustering. .. note:: If an interval spans fewer than ``n_segments`` frames, then each frame becomes a sub-segment. Parameters ---------- data : np.ndarray Data matrix to use in clustering frames : np.ndarray [shape=(n_boundaries,)], dtype=int, non-negative] Array of beat or segment boundaries, as provided by `librosa.beat.beat_track`, `librosa.onset.onset_detect`, or `agglomerative`. n_segments : int > 0 Maximum number of frames to sub-divide each interval. axis : int Axis along which to apply the segmentation. By default, the last index (-1) is taken. Returns ------- boundaries : np.ndarray [shape=(n_subboundaries,)] List of sub-divided segment boundaries See Also -------- agglomerative : Temporal segmentation librosa.onset.onset_detect : Onset detection librosa.beat.beat_track : Beat tracking Notes ----- This function caches at level 30. Examples -------- Load audio, detect beat frames, and subdivide in twos by CQT >>> y, sr = librosa.load(librosa.ex('choice'), duration=10) >>> tempo, beats = librosa.beat.beat_track(y=y, sr=sr, hop_length=512) >>> beat_times = librosa.frames_to_time(beats, sr=sr, hop_length=512) >>> cqt = np.abs(librosa.cqt(y, sr=sr, hop_length=512)) >>> subseg = librosa.segment.subsegment(cqt, beats, n_segments=2) >>> subseg_t = librosa.frames_to_time(subseg, sr=sr, hop_length=512) >>> import matplotlib.pyplot as plt >>> fig, ax = plt.subplots() >>> librosa.display.specshow(librosa.amplitude_to_db(cqt, ... ref=np.max), ... y_axis='cqt_hz', x_axis='time', ax=ax) >>> lims = ax.get_ylim() >>> ax.vlines(beat_times, lims[0], lims[1], color='lime', alpha=0.9, ... linewidth=2, label='Beats') >>> ax.vlines(subseg_t, lims[0], lims[1], color='linen', linestyle='--', ... linewidth=1.5, alpha=0.5, label='Sub-beats') >>> ax.legend() >>> ax.set(title='CQT + Beat and sub-beat markers') """ frames = util.fix_frames(frames, x_min=0, x_max=data.shape[axis], pad=True) if n_segments < 1: raise ParameterError("n_segments must be a positive integer") boundaries = [] idx_slices = [slice(None)] * data.ndim for seg_start, seg_end in zip(frames[:-1], frames[1:]): idx_slices[axis] = slice(seg_start, seg_end) boundaries.extend( seg_start + agglomerative( data[tuple(idx_slices)], min(seg_end - seg_start, n_segments), axis=axis ) ) return np.array(boundaries)
[docs]def agglomerative( data: np.ndarray, k: int, *, clusterer: Optional[sklearn.cluster.AgglomerativeClustering] = None, axis: int = -1, ) -> np.ndarray: """Bottom-up temporal segmentation. Use a temporally-constrained agglomerative clustering routine to partition ``data`` into ``k`` contiguous segments. Parameters ---------- data : np.ndarray data to cluster k : int > 0 [scalar] number of segments to produce clusterer : sklearn.cluster.AgglomerativeClustering, optional An optional AgglomerativeClustering object. If `None`, a constrained Ward object is instantiated. axis : int axis along which to cluster. By default, the last axis (-1) is chosen. Returns ------- boundaries : np.ndarray [shape=(k,)] left-boundaries (frame numbers) of detected segments. This will always include `0` as the first left-boundary. See Also -------- sklearn.cluster.AgglomerativeClustering Examples -------- Cluster by chroma similarity, break into 20 segments >>> y, sr = librosa.load(librosa.ex('nutcracker'), duration=15) >>> chroma = librosa.feature.chroma_cqt(y=y, sr=sr) >>> bounds = librosa.segment.agglomerative(chroma, 20) >>> bound_times = librosa.frames_to_time(bounds, sr=sr) >>> bound_times array([ 0. , 0.65 , 1.091, 1.927, 2.438, 2.902, 3.924, 4.783, 5.294, 5.712, 6.13 , 7.314, 8.522, 8.916, 9.66 , 10.844, 11.238, 12.028, 12.492, 14.095]) Plot the segmentation over the chromagram >>> import matplotlib.pyplot as plt >>> import matplotlib.transforms as mpt >>> fig, ax = plt.subplots() >>> trans = mpt.blended_transform_factory( ... ax.transData, ax.transAxes) >>> librosa.display.specshow(chroma, y_axis='chroma', x_axis='time', ax=ax) >>> ax.vlines(bound_times, 0, 1, color='linen', linestyle='--', ... linewidth=2, alpha=0.9, label='Segment boundaries', ... transform=trans) >>> ax.legend() >>> ax.set(title='Power spectrogram') """ # Make sure we have at least two dimensions data = np.atleast_2d(data) # Swap data index to position 0 data = np.swapaxes(data, axis, 0) # Flatten the features n = data.shape[0] data = data.reshape((n, -1), order="F") if clusterer is None: # Connect the temporal connectivity graph grid = sklearn.feature_extraction.image.grid_to_graph(n_x=n, n_y=1, n_z=1) # Instantiate the clustering object clusterer = sklearn.cluster.AgglomerativeClustering( n_clusters=k, connectivity=grid, memory=cache.memory ) # Fit the model clusterer.fit(data) # Find the change points from the labels boundaries = [0] boundaries.extend(list(1 + np.nonzero(np.diff(clusterer.labels_))[0].astype(int))) return np.asarray(boundaries)
[docs]def path_enhance( R: np.ndarray, n: int, *, window: _WindowSpec = "hann", max_ratio: float = 2.0, min_ratio: Optional[float] = None, n_filters: int = 7, zero_mean: bool = False, clip: bool = True, **kwargs: Any, ) -> np.ndarray: """Multi-angle path enhancement for self- and cross-similarity matrices. This function convolves multiple diagonal smoothing filters with a self-similarity (or recurrence) matrix R, and aggregates the result by an element-wise maximum. Technically, the output is a matrix R_smooth such that:: R_smooth[i, j] = max_theta (R * filter_theta)[i, j] where `*` denotes 2-dimensional convolution, and ``filter_theta`` is a smoothing filter at orientation theta. This is intended to provide coherent temporal smoothing of self-similarity matrices when there are changes in tempo. Smoothing filters are generated at evenly spaced orientations between min_ratio and max_ratio. This function is inspired by the multi-angle path enhancement of [#]_, but differs by modeling tempo differences in the space of similarity matrices rather than re-sampling the underlying features prior to generating the self-similarity matrix. .. [#] Müller, Meinard and Frank Kurth. "Enhancing similarity matrices for music audio analysis." 2006 IEEE International Conference on Acoustics Speech and Signal Processing Proceedings. Vol. 5. IEEE, 2006. .. note:: if using recurrence_matrix to construct the input similarity matrix, be sure to include the main diagonal by setting ``self=True``. Otherwise, the diagonal will be suppressed, and this is likely to produce discontinuities which will pollute the smoothing filter response. Parameters ---------- R : np.ndarray The self- or cross-similarity matrix to be smoothed. Note: sparse inputs are not supported. If the recurrence matrix is multi-dimensional, e.g. `shape=(c, n, n)`, then enhancement is conducted independently for each leading channel. n : int > 0 The length of the smoothing filter window : window specification The type of smoothing filter to use. See `filters.get_window` for more information on window specification formats. max_ratio : float > 0 The maximum tempo ratio to support min_ratio : float > 0 The minimum tempo ratio to support. If not provided, it will default to ``1/max_ratio`` n_filters : int >= 1 The number of different smoothing filters to use, evenly spaced between ``min_ratio`` and ``max_ratio``. If ``min_ratio = 1/max_ratio`` (the default), using an odd number of filters will ensure that the main diagonal (ratio=1) is included. zero_mean : bool By default, the smoothing filters are non-negative and sum to one (i.e. are averaging filters). If ``zero_mean=True``, then the smoothing filters are made to sum to zero by subtracting a constant value from the non-diagonal coordinates of the filter. This is primarily useful for suppressing blocks while enhancing diagonals. clip : bool If True, the smoothed similarity matrix will be thresholded at 0, and will not contain negative entries. **kwargs : additional keyword arguments Additional arguments to pass to `scipy.ndimage.convolve` Returns ------- R_smooth : np.ndarray, shape=R.shape The smoothed self- or cross-similarity matrix See Also -------- librosa.filters.diagonal_filter recurrence_matrix Examples -------- Use a 51-frame diagonal smoothing filter to enhance paths in a recurrence matrix >>> y, sr = librosa.load(librosa.ex('nutcracker')) >>> hop_length = 2048 >>> chroma = librosa.feature.chroma_cqt(y=y, sr=sr, hop_length=hop_length) >>> chroma_stack = librosa.feature.stack_memory(chroma, n_steps=10, delay=3) >>> rec = librosa.segment.recurrence_matrix(chroma_stack, mode='affinity', self=True) >>> rec_smooth = librosa.segment.path_enhance(rec, 51, window='hann', n_filters=7) Plot the recurrence matrix before and after smoothing >>> import matplotlib.pyplot as plt >>> fig, ax = plt.subplots(ncols=2, sharex=True, sharey=True) >>> img = librosa.display.specshow(rec, x_axis='s', y_axis='s', ... hop_length=hop_length, ax=ax[0]) >>> ax[0].set(title='Unfiltered recurrence') >>> imgpe = librosa.display.specshow(rec_smooth, x_axis='s', y_axis='s', ... hop_length=hop_length, ax=ax[1]) >>> ax[1].set(title='Multi-angle enhanced recurrence') >>> ax[1].label_outer() >>> fig.colorbar(img, ax=ax[0], orientation='horizontal') >>> fig.colorbar(imgpe, ax=ax[1], orientation='horizontal') """ if min_ratio is None: min_ratio = 1.0 / max_ratio elif min_ratio > max_ratio: raise ParameterError( f"min_ratio={min_ratio} cannot exceed max_ratio={max_ratio}" ) R_smooth = None for ratio in np.logspace( np.log2(min_ratio), np.log2(max_ratio), num=n_filters, base=2 ): kernel = diagonal_filter(window, n, slope=ratio, zero_mean=zero_mean) # Expand leading dimensions to match R # This way, if R has shape, eg, [2, 3, n, n] # the expanded kernel will have shape [1, 1, m, m] # The following is valid for numpy >= 1.18 # kernel = np.expand_dims(kernel, axis=list(np.arange(R.ndim - kernel.ndim))) # This is functionally equivalent, but works on numpy 1.17 shape = [1] * R.ndim shape[-2:] = kernel.shape kernel = np.reshape(kernel, shape) if R_smooth is None: R_smooth = scipy.ndimage.convolve(R, kernel, **kwargs) else: # Compute the point-wise maximum in-place np.maximum( R_smooth, scipy.ndimage.convolve(R, kernel, **kwargs), out=R_smooth ) if clip: # Clip the output in-place np.clip(R_smooth, 0, None, out=R_smooth) # type: ignore return np.asanyarray(R_smooth)
def __affinity_bandwidth( rec: scipy.sparse.csr_matrix, bw_mode: Optional[Union[np.ndarray, _FloatLike_co, str]], k: int, ) -> Union[float, np.ndarray]: # rec should be a csr_matrix # the api allows users to specify a scalar bandwidth directly, besides the string based options. if isinstance(bw_mode, np.ndarray): bandwidth = bw_mode # check if bw is the right size if bandwidth.shape != rec.shape: raise ParameterError( f"Invalid matrix bandwidth shape: {bandwidth.shape}." f"Should be {rec.shape}." ) if (bandwidth <= 0).any(): raise ParameterError( "Invalid bandwidth. All entries must be strictly positive." ) return np.array(bandwidth[rec.nonzero()]) elif isinstance(bw_mode, (int, float)): scalar_bandwidth = float(bw_mode) if scalar_bandwidth <= 0: raise ParameterError( f"Invalid scalar bandwidth={scalar_bandwidth}. Must be strictly positive." ) return scalar_bandwidth if bw_mode is None: bw_mode = "med_k_scalar" if bw_mode not in [ "med_k_scalar", "mean_k", "gmean_k", "mean_k_avg", "gmean_k_avg", "mean_k_avg_and_pair", ]: raise ParameterError( f"Invalid bandwidth='{bw_mode}'. Must be either a positive scalar or one of " "['med_k_scalar', 'mean_k', 'gmean_k', 'mean_k_avg', 'gmean_k_avg', 'mean_k_avg_and_pair']" ) # build a list of list that stores the distances to k nearest neighbors for all t points. t = rec.shape[0] knn_dists = [] for i in range(t): # Get the links from point i links = rec[i].nonzero()[1] # catch empty dists lists in knn_dists if len(links) == 0: # Disconnected vertices are only a problem for point-wise bandwidth estimation if bw_mode not in ["med_k_scalar"]: raise ParameterError(f"The sample at time point {i} has no neighbors") else: # If we have no links, then there's no distance # shove a nan in here knn_dists.append(np.array([np.nan])) else: # Compute k nearest neighbors' distance and sort ascending knn_dist_row = np.sort(rec[i, links].toarray()[0])[:k] knn_dists.append(knn_dist_row) # take the last element of each list for the distance to kth neighbor dist_to_k = np.asarray([dists[-1] for dists in knn_dists]) avg_dist_to_first_ks = np.asarray([np.mean(dists) for dists in knn_dists]) if bw_mode == "med_k_scalar": if not np.any(np.isfinite(dist_to_k)): raise ParameterError("Cannot estimate bandwidth from an empty graph") return float(np.nanmedian(dist_to_k)) if bw_mode in ["mean_k", "gmean_k"]: # building bandwidth components (sigma) using sparse matrix structures and indices sigma_i_data = np.empty_like(rec.data) sigma_j_data = np.empty_like(rec.data) for row in range(t): sigma_i_data[rec.indptr[row] : rec.indptr[row + 1]] = dist_to_k[row] col_idx = rec.indices[rec.indptr[row] : rec.indptr[row + 1]] sigma_j_data[rec.indptr[row] : rec.indptr[row + 1]] = dist_to_k[col_idx] if bw_mode == "mean_k": out = np.array((sigma_i_data + sigma_j_data) / 2) elif bw_mode == "gmean_k": out = np.array((sigma_i_data * sigma_j_data) ** 0.5) if bw_mode in ["mean_k_avg", "gmean_k_avg", "mean_k_avg_and_pair"]: # building bandwidth components (sigma) using sparse matrix structures and indices sigma_i_data = np.empty_like(rec.data) sigma_j_data = np.empty_like(rec.data) for row in range(t): sigma_i_data[rec.indptr[row] : rec.indptr[row + 1]] = avg_dist_to_first_ks[ row ] col_idx = rec.indices[rec.indptr[row] : rec.indptr[row + 1]] sigma_j_data[rec.indptr[row] : rec.indptr[row + 1]] = avg_dist_to_first_ks[ col_idx ] if bw_mode == "mean_k_avg": out = np.array((sigma_i_data + sigma_j_data) / 2) elif bw_mode == "gmean_k_avg": out = np.array((sigma_i_data * sigma_j_data) ** 0.5) elif bw_mode == "mean_k_avg_and_pair": out = np.array((sigma_i_data + sigma_j_data + rec.data) / 3) return out