Skip to content
27 changes: 27 additions & 0 deletions asv_bench/benchmarks/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,33 @@ def time_quantile(self, constructor, window, dtype, percentile, interpolation):
self.roll.quantile(percentile, interpolation=interpolation)


class Rank:
params = (
["DataFrame", "Series"],
[10, 1000],
["int", "float"],
[True, False],
[True, False],
["min", "max", "average"],
)
param_names = [
"constructor",
"window",
"dtype",
"percentile",
"ascending",
"method",
]

def setup(self, constructor, window, dtype, percentile, ascending, method):
N = 10 ** 5
arr = np.random.random(N).astype(dtype)
self.roll = getattr(pd, constructor)(arr).rolling(window)

def time_rank(self, constructor, window, dtype, percentile, ascending, method):
self.roll.rank(pct=percentile, ascending=ascending, method=method)


class PeakMemFixedWindowMinMax:

params = ["min", "max"]
Expand Down
2 changes: 2 additions & 0 deletions doc/source/reference/window.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Rolling window functions
Rolling.aggregate
Rolling.quantile
Rolling.sem
Rolling.rank

.. _api.functions_window:

Expand Down Expand Up @@ -75,6 +76,7 @@ Expanding window functions
Expanding.aggregate
Expanding.quantile
Expanding.sem
Expanding.rank

.. _api.functions_ewm:

Expand Down
15 changes: 15 additions & 0 deletions doc/source/whatsnew/v1.4.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,21 @@ Multithreaded CSV reading with a new CSV Engine based on pyarrow
:func:`pandas.read_csv` now accepts ``engine="pyarrow"`` (requires at least ``pyarrow`` 0.17.0) as an argument, allowing for faster csv parsing on multicore machines
with pyarrow installed. See the :doc:`I/O docs </user_guide/io>` for more info. (:issue:`23697`)

.. _whatsnew_140.enhancements.window_rank:

Rank function for rolling and expanding windows
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Added ``rank`` function to :class:`Rolling` and :class:`Expanding`. The new function supports the ``method``, ``ascending``, and ``pct`` flags of :meth:`DataFrame.rank`. The ``method`` argument supports ``min``, ``max``, and ``average`` ranking methods.
Example:

.. ipython:: python

s = pd.Series([1, 4, 2, 3, 5, 3])
s.rolling(3).rank()

s.rolling(3).rank(method="max")

.. _whatsnew_140.enhancements.other:

Other enhancements
Expand Down
8 changes: 8 additions & 0 deletions pandas/_libs/algos.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,11 @@ from pandas._libs.util cimport numeric


cdef numeric kth_smallest_c(numeric* arr, Py_ssize_t k, Py_ssize_t n) nogil

cdef enum TiebreakEnumType:
TIEBREAK_AVERAGE
TIEBREAK_MIN,
TIEBREAK_MAX
TIEBREAK_FIRST
TIEBREAK_FIRST_DESCENDING
TIEBREAK_DENSE
7 changes: 0 additions & 7 deletions pandas/_libs/algos.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,6 @@ cdef:
float64_t NaN = <float64_t>np.NaN
int64_t NPY_NAT = get_nat()

cdef enum TiebreakEnumType:
TIEBREAK_AVERAGE
TIEBREAK_MIN,
TIEBREAK_MAX
TIEBREAK_FIRST
TIEBREAK_FIRST_DESCENDING
TIEBREAK_DENSE

tiebreakers = {
"average": TIEBREAK_AVERAGE,
Expand Down
25 changes: 23 additions & 2 deletions pandas/_libs/src/skiplist.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,30 @@ PANDAS_INLINE double skiplist_get(skiplist_t *skp, int i, int *ret) {
return node->value;
}

// Returns the lowest rank of all elements with value `value`, as opposed to the
// highest rank returned by `skiplist_insert`.
PANDAS_INLINE int skiplist_min_rank(skiplist_t *skp, double value) {
node_t *node;
int level, rank = 0;

node = skp->head;
for (level = skp->maxlevels - 1; level >= 0; --level) {
while (_node_cmp(node->next[level], value) > 0) {
rank += node->width[level];
node = node->next[level];
}
}

return rank + 1;
}

// Returns the rank of the inserted element. When there are duplicates,
// `rank` is the highest of the group, i.e. the 'max' method of
// https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.rank.html
PANDAS_INLINE int skiplist_insert(skiplist_t *skp, double value) {
node_t *node, *prevnode, *newnode, *next_at_level;
int *steps_at_level;
int size, steps, level;
int size, steps, level, rank = 0;
node_t **chain;

chain = skp->tmp_chain;
Expand All @@ -197,6 +217,7 @@ PANDAS_INLINE int skiplist_insert(skiplist_t *skp, double value) {
next_at_level = node->next[level];
while (_node_cmp(next_at_level, value) >= 0) {
steps_at_level[level] += node->width[level];
rank += node->width[level];
node = next_at_level;
next_at_level = node->next[level];
}
Expand Down Expand Up @@ -230,7 +251,7 @@ PANDAS_INLINE int skiplist_insert(skiplist_t *skp, double value) {

++(skp->size);

return 1;
return rank + 1;
}

PANDAS_INLINE int skiplist_remove(skiplist_t *skp, double value) {
Expand Down
11 changes: 11 additions & 0 deletions pandas/_libs/window/aggregations.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ from typing import (

import numpy as np

from pandas._typing import WindowingRankType

def roll_sum(
values: np.ndarray, # const float64_t[:]
start: np.ndarray, # np.ndarray[np.int64]
Expand Down Expand Up @@ -63,6 +65,15 @@ def roll_quantile(
quantile: float, # float64_t
interpolation: Literal["linear", "lower", "higher", "nearest", "midpoint"],
) -> np.ndarray: ... # np.ndarray[float]
def roll_rank(
values: np.ndarray,
start: np.ndarray,
end: np.ndarray,
minp: int,
percentile: bool,
method: WindowingRankType,
ascending: bool,
) -> np.ndarray: ... # np.ndarray[float]
def roll_apply(
obj: object,
start: np.ndarray, # np.ndarray[np.int64]
Expand Down
124 changes: 122 additions & 2 deletions pandas/_libs/window/aggregations.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import cython
from libc.math cimport round
from libcpp.deque cimport deque

from pandas._libs.algos cimport TiebreakEnumType

import numpy as np

cimport numpy as cnp
Expand Down Expand Up @@ -50,6 +52,8 @@ cdef extern from "../src/skiplist.h":
double skiplist_get(skiplist_t*, int, int*) nogil
int skiplist_insert(skiplist_t*, double) nogil
int skiplist_remove(skiplist_t*, double) nogil
int skiplist_rank(skiplist_t*, double) nogil
int skiplist_min_rank(skiplist_t*, double) nogil

cdef:
float32_t MINfloat32 = np.NINF
Expand Down Expand Up @@ -795,7 +799,7 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start,
val = values[j]
if notnan(val):
nobs += 1
err = skiplist_insert(sl, val) != 1
err = skiplist_insert(sl, val) == -1
if err:
break

Expand All @@ -806,7 +810,7 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start,
val = values[j]
if notnan(val):
nobs += 1
err = skiplist_insert(sl, val) != 1
err = skiplist_insert(sl, val) == -1
if err:
break

Expand Down Expand Up @@ -1139,6 +1143,122 @@ def roll_quantile(const float64_t[:] values, ndarray[int64_t] start,
return output


rolling_rank_tiebreakers = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possible to unify these with the same in algos.pyx?

"average": TiebreakEnumType.TIEBREAK_AVERAGE,
"min": TiebreakEnumType.TIEBREAK_MIN,
"max": TiebreakEnumType.TIEBREAK_MAX,
}


def roll_rank(const float64_t[:] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp, bint percentile,
str method, bint ascending) -> np.ndarray:
"""
O(N log(window)) implementation using skip list

derived from roll_quantile
"""
cdef:
Py_ssize_t i, j, s, e, N = len(values), idx
float64_t rank_min = 0, rank = 0
int64_t nobs = 0, win
float64_t val
skiplist_t *skiplist
float64_t[::1] output
TiebreakEnumType rank_type

try:
rank_type = rolling_rank_tiebreakers[method]
except KeyError:
raise ValueError(f"Method '{method}' is not supported")

is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds(
start, end
)
# we use the Fixed/Variable Indexer here as the
# actual skiplist ops outweigh any window computation costs
output = np.empty(N, dtype=np.float64)

win = (end - start).max()
if win == 0:
output[:] = NaN
return np.asarray(output)
skiplist = skiplist_init(<int>win)
if skiplist == NULL:
raise MemoryError("skiplist_init failed")

with nogil:
for i in range(N):
s = start[i]
e = end[i]

if i == 0 or not is_monotonic_increasing_bounds:
if not is_monotonic_increasing_bounds:
nobs = 0
skiplist_destroy(skiplist)
skiplist = skiplist_init(<int>win)

# setup
for j in range(s, e):
val = values[j] if ascending else -values[j]
if notnan(val):
nobs += 1
rank = skiplist_insert(skiplist, val)
if rank == -1:
raise MemoryError("skiplist_insert failed")
if rank_type == TiebreakEnumType.TIEBREAK_AVERAGE:
# The average rank of `val` is the sum of the ranks of all
# instances of `val` in the skip list divided by the number
# of instances. The sum of consecutive integers from 1 to N
# is N * (N + 1) / 2.
# The sum of the ranks is the sum of integers from the
# lowest rank to the highest rank, which is the sum of
# integers from 1 to the highest rank minus the sum of
# integers from 1 to one less than the lowest rank.
rank_min = skiplist_min_rank(skiplist, val)
rank = (((rank * (rank + 1) / 2)
- ((rank_min - 1) * rank_min / 2))
/ (rank - rank_min + 1))
elif rank_type == TiebreakEnumType.TIEBREAK_MIN:
rank = skiplist_min_rank(skiplist, val)
else:
rank = NaN

else:
# calculate deletes
for j in range(start[i - 1], s):
val = values[j] if ascending else -values[j]
if notnan(val):
skiplist_remove(skiplist, val)
nobs -= 1

# calculate adds
for j in range(end[i - 1], e):
val = values[j] if ascending else -values[j]
if notnan(val):
nobs += 1
rank = skiplist_insert(skiplist, val)
if rank == -1:
raise MemoryError("skiplist_insert failed")
if rank_type == TiebreakEnumType.TIEBREAK_AVERAGE:
rank_min = skiplist_min_rank(skiplist, val)
rank = (((rank * (rank + 1) / 2)
- ((rank_min - 1) * rank_min / 2))
/ (rank - rank_min + 1))
elif rank_type == TiebreakEnumType.TIEBREAK_MIN:
rank = skiplist_min_rank(skiplist, val)
else:
rank = NaN
if nobs >= minp:
output[i] = rank / nobs if percentile else rank
else:
output[i] = NaN

skiplist_destroy(skiplist)

return np.asarray(output)


def roll_apply(object obj,
ndarray[int64_t] start, ndarray[int64_t] end,
int64_t minp,
Expand Down
3 changes: 3 additions & 0 deletions pandas/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,6 @@
PositionalIndexer = Union[ScalarIndexer, SequenceIndexer]
PositionalIndexerTuple = Tuple[PositionalIndexer, PositionalIndexer]
PositionalIndexer2D = Union[PositionalIndexer, PositionalIndexerTuple]

# Windowing rank methods
WindowingRankType = Literal["average", "min", "max"]
Loading