I listen to a lot of music (~4k h/y) and managing thousands of songs is a bit of a challenging. To improve my listening experience I want to better organize my collection. One thing I want to do is sort my songs, as then I can do cool things with dynamic playlists. For example, I want to listen to songs I don't like as much, but I don't want back-to-back Ls.
When sorting the songs I want to continuously answer the dead simple question "which is better A or B?" As I have moved enough songs through playlists for one life-time. Additionally if I were to listen to each song once all the way through then sorting would take 2 weeks. As such, whilst most (maybe all) of Python's stdlib ordering functions support objects which only define __lt__, things like sorted don't support saving state part way through. I like listening to music but being wired for 2 weeks straight, stressing about any outage, doesn't sound like a fun experience. (And unfortunately my server is experiencing sporadic hard freezes.)
As such I have have 4 constraints:
Work well with unstable comparisons.
Only ask which is better --
__lt__.Support saving and resuming partial sorts.
Defer
__lt__comparison to the consumer of the sort algorithm allowing for use in a web app.NOTE: I have one user (me), so threading is not a concern.
I have solved each:
I have tested the rough outline of the algorithm (binary tree binning and 'heap' merging) through simulation. And the performance was good.
The simulation was somewhat flawed as a first quarter value could sometimes be found in the third or fourth quarter. However the overall delta from the correct value was very good at ~3%. If I decreased the randomness of the perceived value the list would just be sorted in perfect order. So basically any algorithm will probably be alright.
Only using
__lt__is simple.I've added some fairly basic metaprogramming in
Sortto support loading and dumping values to JSON by defining the type annotations on the class. So for the most part saving and resuming is justsort.dumps()andSort.loads(...).The complicated part is having an algorithm which supports resuming from an arbitrary save point. As such the very basic function
bisect.bisecthas to balloon up in size and complexity.At first I was going to define an
advancemethod to get the next comparison. However, I found the__iter__like pattern unintuitive to model within a class. And I found I kept wanting to pass acmpcallback as a quick hack.As such I moved to the coroutine pattern to generate the values. The pattern is fairly rare. In Python coroutines are probably 99% the
asyncbespoke flavour. Or at least 99% of the results seem to beasyncones when googling. As such I would appreciate a more 'high level' opinion on the approach. Have I shoehorned a solution where a better approach could exist?Additionally I found coroutines a little annoying to interact with. So I've defined a couple of methods to split the values from the output and be stored in a mutated list. Very dirty, but really seems like the simplest solution to the problem. I kept writing the same 'groupby' like code to split the coroutine in two. Sometimes twice in the same method.
The entire approach is very mutable, and so is a lot harder to reason with. So again, nothing more than a high level critique could be interesting.
I'm going to be honest I haven't properly tested or documented the code. I'm only really writing the code to facilitate ranking songs, and is mostly a write once and discard project. The approach is somewhat novel and could be a learning experience for a better approach. Here's a brief explanation of each sort class.
Helpers:
Sortis the base class to help cut down code size of the subclasses.SortValuesis aSortcomposing a list.SortValuesCachecaches values to ensure no data loss.Sort.run__valuescan't be saved/loaded.SortBisectis a reimplementation ofbisect.SortFirstsgets the first item of eachSort-- useful for building the 'heap' in theSortMergeclasses.def sort_firsts(*sorts: Iterator[T]) -> tuple[tuple[T, Iterator[T]], ...]: return tuple( (value, sort) for sort in sorts if (value := next(sort, None)) is not None )Sortedoverwritesfrom_listto have a simple standardized way to sort the items.- If the list has 15 or less items we
SortMergeAll.from_list. - Otherwise we delegate to
SortBin.from_list.
- If the list has 15 or less items we
Merge:
SortMergeSourcebuilds the 'heap' and then delegates toSortMerge.SortMergeis aheapq.mergeexcept using a binary tree not a heap. (I'm too lazy to write a heap)SortMergeAlloverwrites the defaultfrom_listimplementation fromSortMergeto be stable across saving and loading. (Again I'm too lazy to rewrite the iterator approach)
Binary Tree Binning:
SortBin_builds the binary tree and bins forSortBin.SortBin:- Has a binary tree. (I found a very small tree of size 7 to be the most effective)
- Bin values by going through the (small) binary tree.
- Sort the bins by the provided
Sortclass. (By defaultSorted) - Merge the sorted
Sorts into one output.
TL;DR: If the amount of items we want to sort is 15 or fewer, then we perform a very basic heapq.merge using a binary tree rather than a heap. Otherwise We use a small (size 7) binary tree to bin values, which we then recursively sort using both the heapq.merge and binary tree binning algorithms.
from __future__ import annotations import enum import functools import json import math import random from typing import Any, Callable, ClassVar, Generator, Generic, Iterable, Iterator, Literal, ParamSpec, Self, TypeAlias, TypeVar, TypedDict, cast T = TypeVar("T") T0 = TypeVar("T0") T1 = TypeVar("T1") T_contra = TypeVar("T_contra", contravariant=True) TIter = TypeVar("TIter", bound=Iterator[Any]) P = ParamSpec("P") class DSort(TypedDict): __name__: str kwargs: dict[str, Any] class JSONSortDecoder(json.JSONDecoder): def __init__(self, **kwargs: Any) -> None: kwargs.setdefault("object_hook", type(self).object_hook) # type: ignore super().__init__(**kwargs) def object_hook(d: dict[str, Any]) -> Any: # type: ignore if ("__name__" in d and "kwargs" in d ): cls = Sort.MODELS[d["__name__"]] if d["kwargs"] is None: return cls else: return cls(**d["kwargs"]) return d class JSONSortEncoder(json.JSONEncoder): def default(self, o: Any) -> Any: if isinstance(o, Sort): return o._dump() if issubclass(o, Sort): return {"__name__": o.__name__, "kwargs": None} return super().default(o) class SortState(enum.Enum): COMPARE = enum.auto() VALUE = enum.auto() ASortRunValue: TypeAlias = Generator[tuple[Literal[SortState.VALUE], T], None, None] ASortRunCompare: TypeAlias = Generator[tuple[Literal[SortState.COMPARE], tuple[T, ...]], bool, None] ASortRun_: TypeAlias = Generator[tuple[Literal[SortState.COMPARE], tuple[T0, ...]] | tuple[Literal[SortState.VALUE], T1] | tuple[SortState, tuple[T0, ...] | T1], bool | None, None] ASortRun: TypeAlias = ASortRun_[T, T] class Sort(Generic[T0, T1]): MODELS: ClassVar[dict[str, type[Sort[Any, Any]]]] = {} def __init_subclass__(cls) -> None: super().__init_subclass__() cls.MODELS[cls.__name__] = cls # type: ignore def __repr__(self) -> str: args = ", ".join( f"{key}={getattr(self, key)!r}" for key in self.__annotations__ ) return f"{type(self).__module__}.{type(self).__qualname__}({args})" def _dump_value(self) -> dict[str, Any]: return { key: getattr(self, key) for key in self.__annotations__ } def _dump(self) -> DSort: return {"__name__": type(self).__name__, "kwargs": self._dump_value()} def dumps(self) -> str: return json.dumps(self, cls=JSONSortEncoder) @classmethod def loads(cls, data: str) -> Self: return json.loads(data, cls=JSONSortDecoder) @classmethod def from_list(cls, data: list[T]) -> Sort[T, T]: raise NotImplementedError(f"{cls.__module__}.{cls.__qualname__}.from_list has not been defined") @functools.cached_property def run(self) -> ASortRun_[T0, T1]: raise NotImplementedError(f"{type(self).__name__}.run has not been defined") def _run__reset(self) -> ASortRun_[T0, T1]: def inner() -> ASortRun_[T0, T1]: yield obj # type: ignore raise ValueError(f"Cannot use {type(self).__name__}.run after using {type(self).__name__}.run__group") it = self.run obj = object() self.run = inner() obj_ = next(self.run) assert obj is obj_ return it @functools.cached_property def run__group(self) -> Iterator[tuple[Literal[SortState.VALUE], ASortRunValue[T1]] | tuple[Literal[SortState.COMPARE], ASortRunCompare[T0]]]: def inner() -> ASortRun_[T0, T1]: nonlocal state, value assert state is not None STATE = state while state is STATE: ret = yield state, value try: state, value = it.send(ret) except StopIteration: state = None return it = self._run__reset() state: SortState | None try: state, value = next(it) except StopIteration: return while state is not None: # type: ignore yield state, inner() # type: ignore @functools.cached_property def run__values(self) -> list[T1]: return [] @functools.cached_property def run__compare(self) -> ASortRunCompare[T0]: output = self.run__values for state, obj in self.run__group: if state is SortState.VALUE: for _, item in obj: output.append(item) # type: ignore else: yield from obj # type: ignore class SortBisect(Sort[T, int]): values: list[T] value: T lo: int hi: int def __init__( self, values: list[T], value: T, lo: int = 0, hi: int | None = None, ) -> None: self.values = list(values) self.value = value self.lo = lo self.hi = len(self.values) if hi is None else hi @functools.cached_property def run(self) -> ASortRun_[T, int]: while self.lo < self.hi: mid = (self.lo + self.hi) // 2 is_wanted = yield SortState.COMPARE, (self.value, self.values[mid]) if is_wanted: self.hi = mid else: self.lo = mid + 1 yield SortState.VALUE, self.lo def update_heaps(self, *heaps: tuple[list[T0], Callable[[], T0]]) -> ASortRunCompare[T]: yield from self.run__compare i, = self.run__values self.values.insert(i, self.value) for heap, value in heaps: heap.insert(i, value()) class SortFirsts(Sort[T0, T1]): sorts: list[Sort[T0, T1]] values: list[T1] def __init__( self, sorts: list[Sort[T0, T1]], values: list[T1], ) -> None: self.sorts = sorts self.values = values @classmethod def from_list(cls, data: list[T]) -> SortFirsts[T, T]: return cls([SortValues(data)], []) # type: ignore @functools.cached_property def run(self) -> ASortRun_[T0, T1]: for sort in self.sorts[len(self.values):]: try: state, value = next(sort.run) except StopIteration: continue while state is not SortState.VALUE: ret = yield state, value try: state, value = sort.run.send(ret) except StopIteration: pass self.values.append(value) # type: ignore class SortValues(Sort[T, T]): data: Iterable[T] def __init__(self, data: Iterable[T]) -> None: self.data = iter(data) @classmethod def from_list(cls, data: list[T0]) -> SortValues[T0]: return cls(data) # type: ignore def __repr__(self) -> str: return f"<{type(self).__name__}>" def _dump_value(self) -> dict[str, Any]: return {"data": list(self.data)} @functools.cached_property def run(self) -> ASortRun_[Any, T]: for value in self.data: yield SortState.VALUE, value class SortValuesCache(Sort[T0, T1]): sort: Sort[T0, T1] values: list[T1] repeat_values: bool def __init__(self, sort: Sort[T0, T1], values: list[T1], repeat_values: bool = False) -> None: self.sort = sort self.values = values self.repeat_values = repeat_values @classmethod def from_list(cls, data: list[T]) -> SortValuesCache[T, T]: return SortValuesCache(SortValues(data), [], False) @functools.cached_property def run(self) -> ASortRun_[T0, T1]: for value in self.values: yield SortState.VALUE, value sort = self.sort.run try: state, value = next(sort) while True: if state is SortState.VALUE: self.values.append(value) # type: ignore input = yield state, value state, value = sort.send(input) except StopIteration: pass class SortMerge(Sort[T, T]): heap: list[tuple[T, int, Sort[T, T]]] bisect: SortBisect[T] | None def __init__(self, heap: list[tuple[T, int, Sort[T, T]]], bisect: SortBisect[T] | None = None) -> None: self.heap = heap self.bisect = bisect @classmethod def from_list(cls, data: list[T0]) -> Sort[T0, T0]: # return cls.from_sources(*[ # SortValues([d]) # for d in data # ]) return cls.from_intense(data) @classmethod def from_sources(cls, *sources: Sort[T, T]) -> SortMergeSource[T]: return SortMergeSource(SortFirsts(list(sources), []), None, [], cls) @classmethod def from_intense(cls, values: list[T]) -> Sort[T, T]: def inner(lo: int, hi: int) -> Sort[T, T]: if lo + 1 == hi: return SortValues([values[lo]]) mid = (lo + hi) // 2 return cls.from_sources( inner(lo, mid), inner(mid, hi), ) if not len(values): return SortValues([]) # type: ignore return inner(0, len(values)) @functools.cached_property def run(self) -> ASortRun[T]: SENTINEL = object() heap = self.heap if self.bisect is not None: yield from self.bisect.update_heaps((heap, lambda: heap.pop(0))) # type: ignore # Type "bool | None" cannot be assigned to type "bool" self.bisect = None while heap: yield SortState.VALUE, heap[0][0] new_value: T = cast(T, SENTINEL) try: state, value = next(heap[0][2].run) except StopIteration: pass else: while state is not SortState.VALUE: ret = yield state, value try: state, value = heap[0][2].run.send(ret) except StopIteration: break else: new_value = value # type: ignore if new_value is SENTINEL: heap.pop(0) else: heap[0][0] = new_value # type: ignore self.bisect = SortBisect([v for v, _, _ in heap[1:]], new_value) yield from self.bisect.update_heaps((heap, lambda: heap.pop(0))) # type: ignore # Type "bool | None" cannot be assigned to type "bool" self.bisect = None class SortMergeAll(SortMerge["T"]): @classmethod def from_list(cls, data: list[T0]) -> Sort[T0, T0]: return cls.from_sources(*[ SortValues([d]) for d in data ]) class SortMergeSource(Sort[T, T]): sources: SortFirsts[T, T] bisect: SortBisect[T] | None heap: list[tuple[T, int, Sort[T, T]]] sort: SortMerge[T] | None sort_type: type[SortMerge[T]] def __init__( self, sources: SortFirsts[T, T], bisect: SortBisect[T] | None, heap: Iterable[tuple[T, int, Sort[T, T]]], sort_type: type[SortMerge[T]], sort: SortMerge[T] | None = None, ) -> None: self.sources = sources self.bisect = bisect self.heap = list(heap) self.sort = sort self.sort_type = sort_type @classmethod def from_list(cls, data: list[T0], sort_type: type[SortMerge[T0]] = SortMerge) -> SortMergeSource[T0]: return cls(SortFirsts([SortValues(data)], []), None, [], sort_type) # type: ignore def _dump_value(self) -> dict[str, Any]: return {"sources": self.sources, "bisect": self.bisect, "heap": self.heap, "sort": self.sort, "sort_type": self.sort_type} def _dump(self) -> DSort: # return super()._dump() if self.sort is None: return super()._dump() else: return self.sort._dump() @functools.cached_property def run(self) -> ASortRun_[T, T]: if self.sort is None: yield from self.sources.run heap: list[tuple[T, int, Sort[T, T]]] heap = [list(a) for a in zip(self.sources.values, range(len(self.sources.sorts)), self.sources.sorts)] # type: ignore heap = heap[len(self.heap):][::-1] heap_ = self.heap if self.bisect is None and heap: self.bisect = SortBisect([v for v, _, _ in heap_], heap[-1][0]) while self.bisect is not None: yield from self.bisect.update_heaps((heap_, heap.pop)) # type: ignore if heap: self.bisect = SortBisect(self.bisect.values, heap[-1][0]) else: self.bisect = None by_id = dict(zip(self.sources.values, self.sources.sorts)) # type: ignore self.sources = SortValues.from_list([]) # type: ignore heap_ = [ [value, i, by_id[value]] for value, i, _ in heap_ ] self.sort = self.sort_type(heap_) # type: ignore yield from self.sort.run class SortBin_(Sort[T, T]): data: list[T] tree: SortValuesCache[T, T] bins: list[list[T]] bisect: SortBisect[T] | None def __init__( self, data: list[T], tree: SortValuesCache[T, T], bins: list[list[T]], bisect: SortBisect[T] | None = None, ) -> None: self.data = data self.tree = tree self.bins = bins self.bisect = bisect @classmethod def from_list( cls, data: list[T0], *, sort: Sort[T0, T0] = SortValues, # type: ignore depth: int = 2, ) -> SortBin_[T0]: assert 1 <= depth # Sigma(2 ** n) amount = 2 ** (depth + 1) - 1 ## random.seed(42401) # needed for Sorted to be stable indexes = random.sample(range(len(data)), min(amount, len(data))) return cls( # type: ignore data, SortValuesCache( sort.from_list([ data.pop(i) for i in sorted(indexes, reverse=True) ]), [], False, ), [[] for _ in range(amount + 1)], ) @functools.cached_property def run(self) -> ASortRunCompare[T]: # type: ignore yield from self.tree.run__compare output = self.tree.run__values if (self.bisect is None and self.data ): self.bisect = SortBisect(output, self.data.pop()) while self.bisect is not None: yield from self.bisect.run__compare output_ = self.bisect.run__values self.bins[output_[0]].append(self.bisect.value) if self.data: self.bisect = SortBisect(output, self.data.pop()) else: self.bisect = None class SortBin(Sort[T, T]): bins: SortBin_[T] sort: type[Sort[T, T]] mergers: list[SortValuesCache[T, T]] merge: Sort[T, T] | None def __init__( self, bins: SortBin_[T], sort: type[Sort[T, T]], mergers: list[SortValuesCache[T, T]], merge: Sort[T, T] | None = None, ) -> None: self.bins = bins self.sort = sort self.mergers = mergers self.merge = merge @classmethod def from_list( cls, data: list[T0], *, sort: Sort[T0, T0] = SortValues, # type: ignore sort_tree: Sort[T0, T0] | None = None, # type: ignore depth: int = 2, ) -> SortBin[T0]: return cls( # type: ignore SortBin_.from_list(data, sort=sort if sort_tree is None else sort_tree, depth=depth), # type: ignore sort, # type: ignore [], None, ) @functools.cached_property def run(self) -> ASortRun_[T, T]: if self.merge is None: yield from self.bins.run # type: ignore if self.mergers: yield from self.mergers[-1].run__compare # type: ignore for i in range(len(self.mergers), len(self.bins.bins)): values = self.bins.bins[i] + ([] if len(tree := self.bins.tree.values) <= i else [tree[i]]) self.mergers.append(SortValuesCache(self.sort.from_list(values), [], False)) yield from self.mergers[-1].run__compare # type: ignore self.merge = SortMerge.from_sources(*[ SortValues(m.values) for m in self.mergers ]) yield from self.merge.run class Sorted(Sort[T, T]): _sort: SortValuesCache[T, T] def __init__( self, _sort: SortValuesCache[T, T], ) -> None: self._sort = _sort @classmethod def from_list( cls, data: list[T0], *, depth: int | None = 2, ) -> Sort[T0, T0]: if len(data) <= 15: sort = SortMergeAll.from_list(data) elif len(data) <= 15: # still looking into whether intense is good sort = SortMerge.from_intense(data) else: if depth is None: depth = int(math.log(len(data), 2)) depth = max(2, depth - 4) sort = SortBin.from_list(data, depth=depth, sort=Sorted) # type: ignore return sort @classmethod def from_sort(cls, sort: Sort[T0, T0]) -> Sorted[T0]: return cls(SortValuesCache(sort, [], False)) # type: ignore @functools.cached_property def run(self) -> ASortRun_[T, T]: yield from self._sort.run @property def values(self) -> list[T]: return self._sort.values def sort(self, cmp: Callable[[tuple[T, ...]], bool]) -> list[T]: it = self.run__compare try: _, values = next(it) while True: _, values = it.send(cmp(values)) except StopIteration: pass return self._sort.values ################################ # # # Some non-library 'test' code # # # ################################ # Please note the code is just here as a 'here's what I was using to # check the algorithms work' demonstration. # # Given the lack of pytest and just poor quality of the code below. # I don't really want a review from here and below. # # Note: please uncomment the `## random.seed(42401)` comment left over # in the above code to ensure Sorted is stable. # SortMerge is _not_ stable and I'd prefer to use a simpler # algorithm than figure out another PITA algorithm. def gen_test_object(sort_type: type[Sort[Any, Any]]) -> Sorted[int]: random.seed(42401) if issubclass(sort_type, SortValues): return Sorted.from_sort(SortValues.from_list([1, 6, 0, 2, 5, 3, 4])) if issubclass(sort_type, SortValuesCache): return Sorted.from_sort(SortValuesCache.from_list([1, 6, 0, 2, 5, 3, 4])) if issubclass(sort_type, SortMergeSource): return Sorted.from_sort(SortMergeSource(SortFirsts([SortValues([1, 6, 0, 2]), SortValues([5, 3, 4])], []), None, [], SortMergeAll)) # type: ignore if issubclass(sort_type, SortMergeAll): return Sorted.from_sort(SortMergeAll.from_list([1, 6, 0, 2, 5, 3, 4])) if issubclass(sort_type, SortMerge): return Sorted.from_sort(SortMerge.from_list([1, 6, 0, 2, 5, 3, 4])) if issubclass(sort_type, SortBin_): return Sorted.from_sort(SortBin_.from_list([1, 6, 0, 2, 5, 3, 4])) # type: ignore if issubclass(sort_type, SortBin): n = 100 return Sorted.from_sort(SortBin.from_list(random.sample(range(n), k=n), sort_tree=SortMergeAll)) # type: ignore if issubclass(sort_type, Sorted): n = 100 return Sorted.from_sort(Sorted.from_list(random.sample(range(n), k=n))) raise TypeError(f"Unknown type: {sort_type.__name__}") def cmp__int(vs: tuple[int, ...]) -> bool: assert 2 == len(vs) return vs[0] < vs[1] def test_reload(sort: Sort[T, T], cmp: Callable[[tuple[T, ...]], bool]) -> bool: dump_a = sort.dumps() sort_a = SortValuesCache[T, T].loads(dump_a) sort_b = SortValuesCache[T, T].loads(dump_a) dumps: list[str] = [dump_a] values: list[tuple[int, ...]] = [] it_a = sort_a.run__compare it_b = sort_b.run__compare i = 0 try: _, values_a = next(it_a) _, values_b = next(it_b) values.append(values_a) # type: ignore while True: if (values_a != values_b or sort_a.values != sort_b.values ): print(i, values_a, values_b, "\n", sort_a.values, "\n", sort_b.values) # for value in values: # print(value) # print(sort_a) # print(sort_b) # print(sort_a.dumps() != sort_b.dumps()) # for dump in dumps[-1:] + [sort_b.dumps(), sort_a.dumps()]: # # print(dump) # print(pretty_fmt(json.loads(dump)), "\n") return False dumps.append(sort_b.dumps()) sort_b = SortValuesCache[T, T].loads(dumps[-1]) it_b = sort_b.run__compare _, values_b = next(it_b) values.append(values_a) # type: ignore assert values_a == values_b wanted = cmp(values_a) _, values_a = it_a.send(wanted) _, values_b = it_b.send(wanted) i += 1 except StopIteration: pass return True class StrReprless(str): def __repr__(self) -> str: return self.__str__() def pretty_fmt(obj: Any) -> Any: if (isinstance(obj, dict) and "__name__" in obj and "kwargs" in obj ): name = obj["__name__"] # type: ignore if (kwargs := obj["kwargs"]) is None: # type: ignore return f"{name}" else: kwargs_ = ", ".join( f"{key}={pretty_fmt(value)}" for key, value in kwargs.items() # type: ignore ) return StrReprless(f"{name}({kwargs_})") if isinstance(obj, dict): obj = {key: pretty_fmt(value) for key, value in obj.items()} # type: ignore if isinstance(obj, list): obj = [pretty_fmt(value) for value in obj] # type: ignore return obj def main(): SORT_TYPES = (SortValues, SortValuesCache, SortMergeSource, SortMerge, SortMergeAll, SortBin_, SortBin, Sorted) # SORT_TYPES = () for sort_type in SORT_TYPES: sort_a = gen_test_object(sort_type) sort_b = gen_test_object(sort_type) print(f"{sort_type.__name__:<15} {str(test_reload(sort_a, cmp__int)):<5}", sort_b.sort(cmp__int)) if __name__ == "__main__": try: main() except KeyboardInterrupt: raise SystemExit(1) from None
SortBisect:values,value,loandhi. Converting from JSON to pickle theSortBisectclass wouldn't change, as all of the JSON handling is handled in theSortbase class. Have I misunderstood you? \$\endgroup\$