import numpy as np
import warnings
import numba
import scipy.sparse as sp
import itertools
import numpy.ma as ma
import time
class Timer:
def __init__(self, name):
self._name = name
def __enter__(self):
self._start = time.time()
return self
def __exit__(self, *args):
end = time.time()
interval = end - self._start
print(f"{self._name} took {interval:.2f} s")
class InvalidValuesWarning(UserWarning):
pass
class Labels(object):
"""
Abstract class for ground truth labels.
"""
pass
@numba.njit(parallel=True)
def fast_lookup(A_indptr, A_cols, A_data, B):
"""
Numba accelerated version of lookup table
"""
# Non-existing indices are assigned label of 0.0
vals = np.zeros(B.shape, dtype=np.float32)
n_rows_a = len(A_indptr) - 1
if n_rows_a == len(B):
for i in numba.prange(B.shape[0]):
ind_start, ind_end = A_indptr[i], A_indptr[i+1]
left_idx = np.searchsorted(A_cols[ind_start:ind_end], B[i])
right_idx = np.searchsorted(A_cols[ind_start:ind_end], B[i], side='right')
found = (left_idx != right_idx)
vals[i][found] = A_data[ind_start:ind_end][left_idx[found]]
else:
for i in numba.prange(B.shape[0]):
left_idx = np.searchsorted(A_cols, B[i])
right_idx = np.searchsorted(A_cols, B[i], side='right')
found = (left_idx != right_idx)
vals[i][found] = A_data[left_idx[found]]
return vals
[docs]class BinaryLabels(Labels):
"""
Represents binary ground truth data (e.g., 1 indicating relevance).
"""
binary = True
[docs] @classmethod
def from_positive_indices(cls, indices):
"""
Construct a binary labels instance from sparse data where only positive items are specified.
Parameters
----------
indices : array_like, one row per context (e.g., user or query)
Specifies positive indices for each sample. Must be 1D or 2D, but row lengths can differ.
Raises
------
ValueError
if `indices` is of invalid shape, type or contains duplicate, negative or non-integer indices.
Examples
--------
>>> BinaryLabels.from_positive_indices([[1,2], [2]]) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
<rankereval.data.BinaryLabels...>
"""
sp_matrix = cls._check_values(SparseMatrix.from_nonzero_indices(indices).tocsr(), binary=True)
return cls()._init(sp_matrix)
[docs] @classmethod
def from_matrix(cls, labels):
"""
Construct a binary labels instance from dense or sparse matrix where each item's label is specified.
Parameters
----------
labels : 1D or 2D array, one row per context (e.g., user or query)
Contains binary labels for each item. Labels must be in {0, 1}.
Raises
------
ValueError
if `labels` is of invalid shape, type or non-binary.
Examples
--------
>>> BinaryLabels.from_matrix([[0, 1, 1], [0, 0, 1]]) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
<rankereval.data.BinaryLabels...>
"""
sp_matrix = cls._check_values(SparseMatrix.from_matrix(labels).tocsr(), binary=cls.binary)
return cls()._init(sp_matrix)
def get_labels_for(self, ranking, k=None):
n_label_rows = self._labels.shape[0]
n_ranking_rows = len(ranking)
if n_ranking_rows < n_label_rows:
raise ValueError(
f"Gold labels contain {n_label_rows} rows, but input rankings only have {n_ranking_rows} rows")
indices, mask = ranking.get_top_k(k)
retrieved = fast_lookup(self._labels.indptr,
self._labels.indices,
self._labels.data,
indices)
return ma.masked_array(retrieved, mask=mask)
@staticmethod
@numba.njit
def _numba_is_binary(data):
for v in data:
if v != 0 and v != 1:
return False
return True
def as_rankings(self):
return Rankings.from_scores(self._labels.tocsr(copy=True), warn_empty=False)
def _init(self, labels):
self._labels = labels
return self
@classmethod
def _check_values(cls, matrix, binary=True):
if binary and not cls._numba_is_binary(matrix.data):
raise ValueError("Matrix may only contain 0 and 1 entries.")
nonfinite_entries = ~np.isfinite(matrix.data)
if np.any(nonfinite_entries):
raise ValueError("Input contains NaN or Inf entries")
return matrix
def labels_to_list(self):
return self._labels.tolil().data.tolist()
def indices_to_list(self):
return self._labels.tolil().rows.tolist()
def get_n_positives(self, n_rankings):
n_label_rows = self._labels.shape[0]
n_pos = self._labels.getnnz(axis=1)
if n_label_rows == 1:
n_pos = np.tile(n_pos, n_rankings)
return n_pos
def __str__(self):
return str(self.indices_to_list())
[docs]class NumericLabels(BinaryLabels):
"""
Represents numeric ground truth data (e.g., relevance labels from 1-5).
"""
binary = False
class SparseMatrix(object):
"""
Stores sparse matrix data in unsorted CSR format (i.e., column indices in each row are unsorted).
"""
def __init__(self, idx_ptr, col_idx, data, shape=None):
self.idx_ptr = idx_ptr.copy()
self.col_idx = col_idx.copy()
self.data = data.copy()
if shape:
self.shape = shape
else:
if len(col_idx):
M = col_idx.max() + 1
else:
M = 0
self.shape = (len(idx_ptr) - 1, M)
@classmethod
def from_values(cls, data, keep_zeros=False):
if isinstance(data, list):
if len(data) == 0 or np.ndim(data[0]) == 0:
data = [data]
idx = [list(range(len(r))) for r in data]
return cls.from_lil(idx, data, keep_zeros=keep_zeros)
else:
return cls.from_matrix(data, keep_zeros=keep_zeros)
@classmethod
def from_nonzero_indices(cls, indices):
if sp.issparse(indices):
x = indices.tocsr()
return cls(x.indptr, x.indices, x.data, x.shape)
else:
return cls.from_lil(indices)
@classmethod
def from_matrix(cls, matrix, keep_zeros=False):
if np.ma.isMaskedArray(matrix):
raise ValueError("Masked arrays not supported.")
elif isinstance(matrix, np.ndarray) or isinstance(matrix, list):
if isinstance(matrix, list):
matrix = np.asarray(matrix, dtype=object).astype(np.float32)
matrix = np.atleast_2d(matrix)
if not np.issubdtype(matrix.dtype, np.number) or np.issubdtype(matrix.dtype, np.bool_):
raise ValueError("Input must be numeric")
elif matrix.ndim != 2:
raise ValueError("Input arrays need to be 1D or 2D.")
if keep_zeros:
matrix += 1 - matrix[np.isfinite(matrix)].min()
x = sp.csr_matrix(matrix)
if not keep_zeros:
x.eliminate_zeros()
elif sp.issparse(matrix):
x = matrix.tocsr()
else:
raise ValueError("Input type not supported.")
return cls(x.indptr, x.indices, x.data, x.shape)
@classmethod
def from_lil(cls, rows, data=None, dtype=np.float32, keep_zeros=False):
if not isinstance(rows, list) and not isinstance(rows, np.ndarray):
raise ValueError("Invalid input type.")
if len(rows) == 0 or np.ndim(rows[0]) == 0:
rows = [rows]
idx_ptr = np.asarray([0] + [len(x) for x in rows], dtype=int).cumsum()
try:
col_idx = np.fromiter(itertools.chain.from_iterable(rows), dtype=int, count=idx_ptr[-1])
if data is None:
data = np.ones_like(col_idx, dtype=dtype)
else:
data = np.fromiter(itertools.chain.from_iterable(data), dtype=dtype, count=idx_ptr[-1])
if keep_zeros:
data += 1 - data[np.isfinite(data)].min()
except TypeError:
raise ValueError("Invalid values in input.")
if len(data) != len(col_idx):
raise ValueError("rows and data need to have same length")
instance = cls(idx_ptr, col_idx, data)
if not keep_zeros:
instance.eliminate_zeros()
return instance
def max_nnz_row_values(self):
"""Returns maximum number of non-zero entries in any row."""
return (self.idx_ptr[1:] - self.idx_ptr[:-1]).max()
def count_empty_rows(self):
return ((self.idx_ptr[1:] - self.idx_ptr[:-1]) == 0).sum()
def sort(self):
self._numba_sort(self.idx_ptr, self.col_idx, self.data)
def intersection(self, other):
self._setop(other, True)
def difference(self, other):
self._setop(other, False)
def isfinite(self):
return np.all(np.isfinite(self.data))
def remove_infinite(self):
if not self.isfinite():
self.data[~np.isfinite(self.data)] = 0
self.eliminate_zeros()
def eliminate_zeros(self):
csr = self.tocsr()
csr.eliminate_zeros()
self.data, self.col_idx, self.idx_ptr = csr.data, csr.indices, csr.indptr
def _setop(self, other, mode):
if self.shape[0] != other.shape[0]:
raise ValueError("Matrices need to have the same number of rows!")
self._numba_setop(self.idx_ptr, self.col_idx, self.data, other.idx_ptr, other.col_idx, mode)
self.eliminate_zeros()
def tocsr(self):
return sp.csr_matrix((self.data, self.col_idx, self.idx_ptr), copy=False, shape=self.shape)
def tolil(self):
res = []
for i in range(len(self.idx_ptr) - 1):
start, end = self.idx_ptr[i], self.idx_ptr[i+1]
res += [self.col_idx[start:end].tolist()]
return res
def todense(self):
return np.asarray(self.tocsr().todense())
@staticmethod
@numba.njit(parallel=True)
def _numba_sort(idx_ptr, col_idx, data):
for i in numba.prange(len(idx_ptr) - 1):
start, end = idx_ptr[i], idx_ptr[i+1]
args = (-data[start:end]).argsort(kind="mergesort")
data[start:end] = data[start:end][args]
col_idx[start:end] = col_idx[start:end][args]
@staticmethod
@numba.njit(parallel=True)
def _numba_setop(self_idx_ptr, self_col_idx, self_data, other_idx_ptr, other_col_idx, intersect):
for i in numba.prange(len(self_idx_ptr) - 1):
ss, se = self_idx_ptr[i], self_idx_ptr[i+1]
os, oe = other_idx_ptr[i], other_idx_ptr[i+1]
left_idx = np.searchsorted(other_col_idx[os:oe], self_col_idx[ss:se])
right_idx = np.searchsorted(other_col_idx[os:oe], self_col_idx[ss:se], side='right')
if intersect:
found = (left_idx == right_idx)
else:
found = (left_idx != right_idx)
self_data[ss:se][found] = 0
def __str__(self):
return str((self.idx_ptr, self.col_idx, self.data))
[docs]class Rankings(object):
"""
Represents (predicted) rankings to be evaluated.
"""
def __init__(self, indices, valid_items=None, invalid_items=None, warn_empty=True):
if valid_items is not None:
valid_items = SparseMatrix.from_nonzero_indices(valid_items)
indices.intersection(valid_items)
if invalid_items is not None:
invalid_items = SparseMatrix.from_nonzero_indices(invalid_items)
indices.difference(invalid_items)
if not indices.isfinite():
warnings.warn("Input contains NaN or Inf entries which will be ignored.",
InvalidValuesWarning)
indices.remove_infinite()
n_empty_rows = indices.count_empty_rows()
if n_empty_rows and warn_empty:
warnings.warn(f"Input rankings have {n_empty_rows} empty rankings (rows). "
+ "These will impact the mean scores." + str(indices.todense()),
InvalidValuesWarning)
self.indices = indices
[docs] @classmethod
def from_ranked_indices(cls, indices, valid_items=None, invalid_items=None):
"""
Construct a rankings instance from data where item indices are specified in ranked order.
Parameters
----------
indices : array_like, one row per ranking
Indices of items after ranking. Must be 1D or 2D, but row lengths can differ.
valid_items : array_like, one row per ranking
Indices of valid items (e.g., candidate set). Invalid items will be discarded from ranking.
Raises
------
ValueError
if `indices` or `valid_items` of invalid shape or type.
Examples
--------
>>> Rankings.from_ranked_indices([[5, 2], [4, 3, 1]]) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
<rankereval.data.Rankings...>
"""
indices = SparseMatrix.from_lil(indices)
return cls(indices, valid_items, invalid_items)
[docs] @classmethod
def from_scores(cls, raw_scores, valid_items=None, invalid_items=None, warn_empty=True):
"""
Construct a rankings instance from raw scores where each item's score is specified.
Items will be ranked in descending order (higher scores meaning better).
Parameters
----------
raw_scores : array_like, one row per ranking
Contains raw scores for each item. Must be 1D or 2D, but row lengths can differ.
valid_items : array_like, one row per ranking
Indices of valid items (e.g., candidate set). Invalid items will be discarded from ranking.
Raises
------
ValueError
if `raw_scores` or `valid_items` of invalid shape or type.
Warns
------
InvalidValuesWarning
if `raw_scores` contains non-finite values.
Examples
--------
>>> Rankings.from_scores([[0.1, 0.5, 0.2], [0.4, 0.2, 0.5]]) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
<rankereval.data.Rankings...>
"""
indices = SparseMatrix.from_values(raw_scores, keep_zeros=True)
indices.sort()
return cls(indices, valid_items, invalid_items, warn_empty=warn_empty)
def __str__(self):
return str(self.indices)
def __len__(self):
return self.indices.shape[0]
def to_list(self):
return self.indices.tolil()
def get_top_k(self, k=None):
if k is None:
k = self.indices.max_nnz_row_values()
return self._csr_to_dense_masked(self.indices.idx_ptr, self.indices.col_idx, (len(self), k))
@staticmethod
@numba.njit
def _csr_to_dense_masked(idx_ptr, col_idx, shape):
condensed = np.zeros(shape, dtype=col_idx.dtype)
mask = np.ones(shape, dtype=np.bool_)
for i in range(len(idx_ptr) - 1):
start, end = idx_ptr[i], idx_ptr[i+1]
length = min(end - start, shape[1])
condensed[i][:length] = col_idx[start:start+length]
mask[i][:length] = False
return condensed, mask
class TopKMixin:
@staticmethod
def topk(x, k, return_scores=False):
# partition into k largest elements first
index_array = np.sort(np.argpartition(-x, kth=k-1, axis=-1)[:, :k])
top_k_partition = np.take_along_axis(x, index_array, axis=-1)
# stable argsort in descending order
top_idx_local = top_k_partition.shape[1] - 1
top_idx_local -= np.fliplr(np.argsort(np.fliplr(top_k_partition), axis=-1, kind='stable'))
# sort the top partition
top_idx = np.take_along_axis(index_array, top_idx_local, axis=-1)
if not return_scores:
return top_idx
else:
top_scores = np.take_along_axis(top_k_partition, top_idx_local, axis=-1)
return top_scores, top_idx
class DenseRankings(Rankings, TopKMixin):
"""
Data structure where rankings have the same length (approximately).
"""
def __init__(self, indices, mask=None, warn_empty=True):
n_empty_rows = ((~mask).sum(axis=1) == 0).sum()
if n_empty_rows and warn_empty:
warnings.warn(f"Input rankings have {n_empty_rows} empty rankings (rows). "
+ "These will impact the mean scores." + str(indices.csr.todense()),
InvalidValuesWarning)
self.indices = indices
self.mask = mask
@classmethod
def _verify_input(cls, arr, dtype=np.floating):
if not isinstance(arr, np.ndarray):
raise ValueError("Input needs to be a numpy matrix")
arr = np.asarray(np.atleast_2d(arr))
if arr.ndim != 2:
raise ValueError("Input arrays need to be 1D or 2D.")
elif not np.issubdtype(arr.dtype, dtype):
raise ValueError(f"Input array needs to be of type {dtype}")
if np.issubdtype(dtype, np.floating):
if not np.all(np.isfinite(arr)):
warnings.warn("Input contains NaN or Inf entries which will be ignored.",
InvalidValuesWarning)
arr[~np.isfinite(arr)] = np.NINF
elif not np.issubdtype(dtype, np.integer):
raise TypeError("dtype argument must be floating or int")
return arr
@classmethod
def from_ranked_indices(cls, indices, valid_items=None, invalid_items=None):
"""
Set indices to -1 (or any other negative value) to indicate invalid index
"""
indices = cls._verify_input(indices, dtype=np.integer)
if valid_items is not None or invalid_items is not None:
raise NotImplementedError("Not implemented yet")
mask = (indices < 0)
return cls(indices, mask)
@classmethod
def from_scores(cls, raw_scores, valid_items=None, invalid_items=None, warn_empty=True, k_max=None):
raw_scores, mask = cls._verify_input(raw_scores, dtype=np.floating)
if valid_items is not None:
invalid_idx = SparseMatrix.from_nonzero_indices(invalid_items).csr.toarray() == 0
raw_scores -= np.inf*invalid_idx
if invalid_items is not None:
invalid_items = SparseMatrix.from_nonzero_indices(invalid_items).csr
raw_scores -= np.inf*invalid_items
mask = ~np.isfinite(raw_scores)
if k_max is None:
sorted_idx = np.argsort(-raw_scores, axis=1, kind="stable")
else:
sorted_idx = cls.topk(raw_scores, k_max)
mask = np.take_along_axis(mask, sorted_idx, axis=1)
return cls(sorted_idx, mask)
def get_top_k(self, k=None):
if k is None:
k = self.indices.shape[1]
indices = self.indices[:, :k]
mask = self.mask[:, :k]
return indices, mask
def to_list(self):
return self.indices.tolist()