"""Select axis labels (columns or index) of a data frame."""
import operator
from typing import Any, Callable, Optional, Sequence
import typing
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
import numpy as np
import pandas as pd
Indices = "Indices"
AnyDataframe = "AnyDataframe"
if typing.TYPE_CHECKING:
from .types import AnyDataframe, Indices
[docs]class Selection:
"""Container for selection along a data frame axis with combination logic. """
[docs] def __init__(self, included:Optional[Indices]=None, excluded:Optional[Indices]=None, *, mask:Optional[Sequence[int]]=None):
"""
If ``mask`` is passed, ``included`` and ``excluded`` must be ``None``!
Parameters
----------
included:
List of indices included in the selection.
excluded:
List of indices excluded from the selection.
mask
Boolean array that will be converted to list of included
indices: All indices with corresponding truthy/non-zero value
will be included in the selection.
"""
if mask is not None:
if included is not None:
raise ValueError("included indices and mask cannot be passed together")
if excluded is not None:
raise ValueError("excluded indices and mask cannot be passed together")
included = np.nonzero(mask)[0].tolist()
self.included: Optional[Indices] = included
self.excluded: Optional[Indices] = excluded
def apply(self, axis:Literal["columns", "index"], df: AnyDataframe):
labels = getattr(df, axis)
included = self.included
if included is None:
included = range(len(labels))
if self.excluded is not None:
excluded = set(self.excluded)
else:
excluded = set()
return labels[[i for i in included if not i in excluded]]
def __and__(self, other: "Selection") -> "Selection":
included=_combine_nones(self.included, other.included, intersect_indices)
excluded=_combine_nones(self.excluded, other.excluded, union_indices)
if included is not None and excluded is not None:
included = [i for i in included if i not in excluded]
return Selection(included, excluded)
def __or__(self, other: "Selection") -> "Selection":
included = _combine_nones(self.included, other.included, union_indices)
excluded = _combine_nones(self.excluded, other.excluded, intersect_indices)
if included is not None and excluded is not None:
excluded = [i for i in excluded if i not in included]
return Selection(included, excluded)
def __invert__(self) -> "Selection":
return Selection(self.excluded, self.included)
# Utilities to collect and combine column selections
def _combine_nones(a: Optional[Indices], b: Optional[Indices], fn_both:Callable[[Indices, Indices], Indices]) -> Optional[Indices]:
if a is None and b is None:
return None
if a is not None and b is None:
return a
if a is None and b is not None:
return b
return fn_both(a, b)
[docs]def intersect_indices(left: Indices, right: Indices) -> Indices:
r = []
for i in right:
if i in left:
r.append(i)
return r
[docs]def union_indices(left: Indices, right: Indices) -> Indices:
return left + [i for i in right if i not in left]
# Column selection operator closures
[docs]class BaseOp:
"""API definition of the closure object."""
def __call__(self, axis: Literal["columns", "index"], df: AnyDataframe) -> Selection:
"""Evaluate operator on data frame from context."""
raise NotImplementedError("Must be implemented in sub-class.")
def _pprint(self, axis: Literal["columns", "index"]) -> str:
return f"{axis}{self}"
[docs]class LabelSelectionOp(BaseOp):
"""Explicitely select labels."""
[docs] def __init__(self, labels, level=None):
if isinstance(labels, list):
labels = tuple(labels)
elif not isinstance(labels, (slice, tuple)):
# Convert "scalar" values to some iterable
labels = (labels,)
self.labels = labels
self.level = level
def __call__(self, axis, df):
labels = getattr(df, axis)
idx = np.arange(len(labels))
if self.level is None:
cands = labels
else:
cands = labels.get_level_values(self.level)
indices = []
if isinstance(self.labels, tuple):
for lbl in self.labels:
indices.extend(idx[cands == lbl])
elif isinstance(self.labels, slice):
# NOTE: We need to make this more complex because we also need
# to treat situation with multiple repetitions of the same
# value, e.g., cases of multi-index levels.
in_slice = self.labels.start is None
reached_slice_stop = False
for i, lbl in enumerate(cands):
if not in_slice and lbl == self.labels.start:
in_slice = True
if reached_slice_stop and lbl != self.labels.stop:
# We stepped over the end of the slice.
break
if in_slice:
indices.append(i)
if self.labels.stop is not None and lbl == self.labels.stop:
reached_slice_stop = True
else:
# This should never be reached becaus of the argument processing
# in __init__.
raise ValueError(f"Unexpected type for self.labels: {type(self.labels)}: {self.labels!r}")
return Selection(indices)
def __str__(self):
if isinstance(self.labels, slice):
fmt = lambda o, default: repr(o) if o is not None else default
items = [fmt(self.labels.start, ''), fmt(self.labels.stop, '')]
if self.labels.step:
items.append(repr(self.labels.step))
pp_labels = ':'.join(items)
else:
pp_labels = ', '.join(repr(l) for l in self.labels)
if self.level:
return f'(level={self.level})[{pp_labels}]'
return f'[{pp_labels}]'
[docs]class LabelPredicateOp(BaseOp):
"""Select labels by a predicate, e.g. ``startswith``."""
[docs] def __init__(self, meth, args, kwargs, level=None):
self.meth = meth
self.args = args
self.kwargs = kwargs
self.level = level
def __str__(self):
def pp(a):
if isinstance(a, tuple):
return [repr(i) for i in a]
elif isinstance(a, dict):
return [f'{k}={v!r}' for k, v in a.items()]
return [repr(a)]
pp_args = ', '.join(pp(self.args) + pp(self.kwargs))
if self.level:
return f'(level={self.level}).{self.meth}({pp_args})'
return f'.{self.meth}({pp_args})'
def __call__(self, axis, df: AnyDataframe) -> Selection:
labels = getattr(df, axis)
if self.level is None:
str_accessor = labels.str
else:
str_accessor = labels.get_level_values(self.level).str
meth = getattr(str_accessor, self.meth)
mask = meth(*self.args, **self.kwargs)
return Selection(mask=mask)
[docs]class EllipsisOp(BaseOp):
"""Select all labels (i.e. columns or rows)."""
def __call__(self, axis, df: AnyDataframe) -> Selection:
labels = getattr(df, axis)
return Selection(mask=np.ones(len(labels), dtype=bool))
def __str__(self):
return '...'
[docs]class BinaryOp(BaseOp):
"""Combine two selection operators with a binary operator.
Used to implement, e.g.::
sel_1 | sel_2
"""
[docs] def __init__(self, left: BaseOp, right: BaseOp, op: Callable[[Any, Any], Any]):
self.left = left
self.right = right
self.op = op
def __str__(self):
op_name = getattr(self.op, '__name__', str(self.op))
return f'({self.left}) {op_name} ({self.right})'
def _pprint(self, axis: str) -> str:
op_name = getattr(self.op, '__name__', str(self.op))
op_name = {
"and_": "&",
"or_": "|",
}.get(op_name, op_name)
return f"{self.left._pprint(axis)} {op_name} {self.right._pprint(axis)}"
def __call__(self, axis, df: AnyDataframe) -> Selection:
sel_left = self.left(axis, df)
sel_right = self.right(axis, df)
return self.op(sel_left, sel_right)
[docs]class UnaryOp(BaseOp):
"""Apply unary operator on selection operator.
Used to implement, e.g., negation::
~sel
"""
[docs] def __init__(self, wrapped: BaseOp, op: Callable[[Any], Any]):
self.wrapped = wrapped
self.op = op
def __str__(self):
op_name = getattr(self.op, '__name__', str(self.op))
return f'{op_name}({self.wrapped})'
def _pprint(self, axis: str) -> str:
op_name = getattr(self.op, '__name__', str(self.op))
op_name, left, right = {
"invert": ("~", "", ""),
}.get(op_name, (op_name, "(", ")"))
# If we wrap a binary operator, add parentheses around it
if isinstance(self.wrapped, BinaryOp):
left = "("
right = ")"
return f"{op_name}{left}{self.wrapped._pprint(axis)}{right}"
def __call__(self, axis, df: AnyDataframe) -> Selection:
sel = self.wrapped(axis, df)
return self.op(sel)
[docs]class DtypesOp(BaseOp):
"""Select columns by dtype."""
[docs] def __init__(self, dtypes: Sequence, sample_size:int=10):
self.dtypes = dtypes
self.sample_size = sample_size
def __str__(self):
dtypes = [
getattr(t, "__name__", str(t))
for t in self.dtypes
]
if len(dtypes) == 1:
return f'.dtype == {dtypes[0]}'
return f'.dtype.isin({{{", ".join(dtypes)}}})'
def __call__(self, axis, df):
if axis != "columns":
raise ValueError("Selection by dtype is only supported for column selection.")
labels = getattr(df, axis)
mask = np.zeros(len(labels), dtype=bool)
for dtype in self.dtypes:
for typ in (str, bytes):
if dtype in (typ, typ.__name__):
mask |= (df.sample(min(len(df), self.sample_size))
.applymap(lambda i: isinstance(i, typ))
.agg("all")
.values
)
break
else:
mask |= (df.dtypes == dtype).values
return Selection(mask=mask)
# Objects to create, compose, and evaluate column selection operators
[docs]class OpComposerBase:
"""Base-class for composing column/row selection operations.
This class wraps around the actual operation and overloads the relevant
operators (``+``, ``&``, ``|``, and ``~``) and defers the evaluation of
the operators until called (by the context data-frame in ``.loc[]``).
"""
[docs] def __init__(self, axis:Literal["columns", "index"], op):
self.axis = axis
self.op = op or Selection()
def __str__(self):
return self.op._pprint(self.axis[0].upper())
# return f'<{self.axis}: {self.op._pprint(self.axis)}>'
[docs] def get_other_op(self, other):
"""Get/create a wrapped operation for composing operations."""
if isinstance(other, OpComposerBase):
return other.op
# Assume label selection
if isinstance(other, list):
return LabelOp(other)
if isinstance(other, str):
return LabelOp([other])
if other is ...:
return EllipsisOp()
if isinstance(other, (type, np.dtype)):
return DtypeEqualOp(other)
raise ValueError(f"Cannot convert argument of type {type(other)!r} to selection operator")
def __and__(self, other):
return OpComposerBase(self.axis, BinaryOp(
self.op,
self.get_other_op(other),
op=operator.and_,
))
def __rand__(self, other):
return OpComposerBase(self.axis, BinaryOp(
self.get_other_op(other),
self.op,
op=operator.and_,
))
def __or__(self, other):
return OpComposerBase(self.axis, BinaryOp(
self.op,
self.get_other_op(other),
op=operator.or_,
))
def __ror__(self, other):
return OpComposerBase(self.axis, BinaryOp(
self.get_other_op(other),
self.op,
op=operator.or_,
))
def __add__(self, other):
return self | other
def __radd__(self, other):
return other | self
def __invert__(self):
return OpComposerBase(self.axis, UnaryOp(
self.op,
op=operator.invert,
))
def __call__(self, df: AnyDataframe) -> pd.Index:
"""Evaluate the wrapped operations."""
selection = self.op(self.axis, df)
return selection.apply(self.axis, df)
[docs]class LabelComposer(OpComposerBase):
"""Compose callable to select columns by name.
Columns can be selected by name or string predicates:
- ``startswith``
- ``endswith``
- ``contains``
- ``match``
which are passed through to ``pd.Series.str``.
"""
# TODO: Implement ``C.lower()...``
[docs] def __init__(self, axis, op=None, level=None):
super().__init__(axis, op)
self.level = level
def _get_op_composer(self, op):
return OpComposerBase(self.axis, op)
def __getitem__(self, labels):
return self._get_op_composer(LabelSelectionOp(labels, self.level))
def startswith(self, *args, **kwargs):
return self._get_op_composer(LabelPredicateOp("startswith", args, kwargs, self.level))
def endswith(self, *args, **kwargs):
return self._get_op_composer(LabelPredicateOp("endswith", args, kwargs, self.level))
def contains(self, *args, **kwargs):
return self._get_op_composer(LabelPredicateOp("contains", args, kwargs, self.level))
def match(self, *args, **kwargs):
return self._get_op_composer(LabelPredicateOp("match", args, kwargs, self.level))
[docs]class LeveledComposer:
"""Compose callable to access multi-level index labels."""
[docs] def __init__(self, axis):
self.axis = axis
def __getitem__(self, level):
return LabelComposer(self.axis, level=level)
[docs]class DtypeComposer:
"""Compose callable to select columns by dtype."""
[docs] def __init__(self, axis, sample_size=10):
self.axis = axis
self.sample_size = sample_size
def __eq__(self, dtype):
return OpComposerBase(self.axis, DtypesOp((dtype,), self.sample_size))
def isin(self, dtypes):
return OpComposerBase(self.axis, DtypesOp(dtypes, self.sample_size))
[docs]class SelectionComposerBase(LabelComposer):
"""Base class to compose callable to select or sort axis labels (index and columns)."""
[docs] def __init__(self, axis, op=None):
super().__init__(axis, op=op)
self.levels = LeveledComposer(self.axis)
[docs]class IndexSelectionComposer(SelectionComposerBase):
"""Compose callable to select or sort index labels.
.. note::
Use :class:`ColumnSelectionComposer` (``C``) if you want to select
columns.
Use the global instance like::
# Move rows x, z to the top
from pandas_paddles import I
df.loc[I["x", "z"] | ...]
Other use-cases:
- Select slices of rows::
df.loc[I["B":"E"] | I["P":"S"]]
- Select all rows with index starting with ``"PRE"``::
df.loc[I.startswith("PRE")]
# or just move them to the top and keep the remaining columns in
# the data frame
df.loc[I.startswith("PRE") | ...]
- Access the level of a multi-index with::
I.levels[0]
I.levels["level-name"]
Selections can be combined with ``&`` (intersection) and ``|`` or ``+``
(union). In intersections, the right-most order takes precedence, while
it's the left-most for unions, e.g. the following will select all
rows with first-level label "b" starting with the rows with
second-level labels "Y" and "Z" followed by all other second-level
labels with first-level "b"::
I.levels[0]["b"] & (I.levels[1]["Y"] | ...)
Inversion (negation) of selections is possible with ``~``, e.g. to select all but first-level label "b"::
~I.levels[0]["b"]
This can also be applied to composed selections::
~(I.levels[0]["b"] | I.levels[1]["X", "Y"])
"""
[docs] def __init__(self, op=None):
super().__init__("index", op)
[docs]class ColumnSelectionComposer(SelectionComposerBase):
"""Compose callable to select or sort columns.
This acts as global entrypoint.
Use the global instance like::
# Move columns x, z to left
from pandas_paddles import C
df.loc[:, C["x", "z"] | ...]
Other use-cases:
- Select slices of columns, e.g., when handling Excel-like named columns
(A, B, ...)::
df.loc[:, C["B":"E"] | C["P":"S"]]
- Select by dtype::
df.loc[:, C.dtype == str]
df.loc[:, C.dtype == int]
df.loc[:, C.dtype.isin((str, int))]
Note that for "non-trivial" dtypes (i.e. those stored in
``object``-typed columns, e.g. ``str``), a subsample of the dataframe
is tested explicitely. The sample-size can be set with
:attr:`~SelectionComposer.sample_size`.
- Select all columns starting with ``"PRE"``::
df.loc[:, C.startswith("PRE")]
# or just move them to the left and keep the remaining columns in
# the data frame
df.loc[:, C.startswith("PRE") | ...]
- Access the level of a multi-index with::
C.levels[0]
C.levels["level-name"]
Selections can be combined with ``&`` (intersection) and ``|`` or ``+``
(union). In intersections, the right-most order takes precedence, while
it's the left-most for unions, e.g. the following will select all
columns with first-level label "b" starting with the columns with
second-level labels "Y" and "Z" followed by all other second-level
labels with first-level "b"::
C.levels[0]["b"] & (C.levels[1]["Y"] | ...)
Inversion (negation) of selections is possible with ``~``, e.g. to select all but first-level label "b"::
~C.levels[0]["b"]
This can also be applied to composed selections::
~(C.levels[0]["b"] | C.levels[1]["X", "Y"])
"""
[docs] def __init__(self, op=None, sample_size=None):
super().__init__("columns", op=op)
self.dtype = DtypeComposer(self.axis)
if sample_size is not None:
self.sample_size = sample_size
@property
def sample_size(self):
"""Sample size for dtype determination of object-typed columns."""
return self.dtype.sample_size
@sample_size.setter
def sample_size(self, val):
self.dtype.sample_size = val