Skip to content

Commit

Permalink
Refactor in order to be able to get more data about the operations ru…
Browse files Browse the repository at this point in the history
…n in the pipeline (#4)

* Add function to get the counts of all stages of the pipeline

* Update README, fix tuple -> Tuple, add checks that zero values don't end the stream
  • Loading branch information
simw authored Nov 7, 2023
1 parent 4b2bea4 commit 8ae76d2
Show file tree
Hide file tree
Showing 6 changed files with 402 additions and 95 deletions.
71 changes: 70 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,72 @@
# pipedata

A framework for building pipelines for data processing
Chained operations in Python, applied to data processing.

## Installation

```bash
pip install pipedata
```

## An Example

### Core Framework

The core framework provides the building blocks for chaining operations.

Running a stream:
```py
from pipedata.core import StreamStart


result = (
StreamStart(range(10))
.filter(lambda x: x % 2 == 0)
.map(lambda x: x ^ 2)
.map_tuple(lambda x: x, 2)
.to_list()
)
print(result)
#> [(2, 0), (6, 4), (10,)]
```

Creating a chain and then using it:
```py
import json
from pipedata.core import ChainStart, Stream, StreamStart


chain = (
ChainStart()
.filter(lambda x: x % 2 == 0)
.map(lambda x: x ^ 2)
.map_tuple(lambda x: sum(x), 2)
)
print(Stream(range(10), chain).to_list())
#> [2, 10, 10]
print(json.dumps(chain.get_counts(), indent=4))
#> [
#> {
#> "name": "_identity",
#> "inputs": 10,
#> "outputs": 10
#> },
#> {
#> "name": "<lambda>",
#> "inputs": 10,
#> "outputs": 5
#> },
#> {
#> "name": "<lambda>",
#> "inputs": 5,
#> "outputs": 5
#> },
#> {
#> "name": "<lambda>",
#> "inputs": 5,
#> "outputs": 3
#> }
#> ]
print(StreamStart(range(10)).flat_map(chain).to_list())
#> [2, 10, 10]
```
7 changes: 4 additions & 3 deletions src/pipedata/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from .chain import Chain
from .stream import Stream, start_stream
from .chain import Chain, ChainStart
from .stream import Stream, StreamStart

__all__ = [
"Chain",
"ChainStart",
"Stream",
"start_stream",
"StreamStart",
]
161 changes: 135 additions & 26 deletions src/pipedata/core/chain.py
Original file line number Diff line number Diff line change
@@ -1,83 +1,192 @@
from __future__ import annotations

from typing import Callable, Generic, Iterator, Optional, Tuple, TypeVar
from dataclasses import dataclass
from typing import (
Any,
Callable,
Dict,
Generic,
Iterator,
List,
Optional,
Tuple,
TypeVar,
Union,
cast,
overload,
)

from pipedata.core.itertools import take_next, take_up_to_n

TStart = TypeVar("TStart")
TEnd = TypeVar("TEnd")
TNewEnd = TypeVar("TNewEnd")
TOther = TypeVar("TOther")


def _identity(input_iterator: Iterator[TEnd]) -> Iterator[TEnd]:
while element := take_next(input_iterator):
while (element := take_next(input_iterator)) is not None:
yield element


class Chain(Generic[TStart, TEnd]):
def __init__(self, action: Callable[[Iterator[TStart]], Iterator[TEnd]]) -> None:
self._action = action
class CountingIterator(Iterator[TStart]):
def __init__(self, iterator: Iterator[TStart]) -> None:
self._iterator = iterator
self._count = 0

def __iter__(self) -> Iterator[TStart]:
return self

def __next__(self) -> TStart:
self._count += 1
try:
return next(self._iterator)
except StopIteration as err:
self._count -= 1
raise StopIteration from err

def get_count(self) -> int:
return self._count


class CountedFunc(Generic[TStart, TEnd]):
def __init__(
self,
func: Callable[[Iterator[TStart]], Iterator[TEnd]],
) -> None:
self._func = func
self._counting_input: Optional[CountingIterator[TStart]] = None
self._counting_output: Optional[CountingIterator[TEnd]] = None

@property
def __name__(self) -> str: # noqa: A003
return self._func.__name__

def __call__(self, input_iterator: Iterator[TStart]) -> Iterator[TEnd]:
self._counting_input = CountingIterator(input_iterator)
self._counting_output = CountingIterator(self._func(self._counting_input))
return self._counting_output

@classmethod
def start(cls) -> Chain[TEnd, TEnd]:
return Chain(_identity)
def get_counts(self) -> Tuple[int, int]:
return (
0 if self._counting_input is None else self._counting_input.get_count(),
0 if self._counting_output is None else self._counting_output.get_count(),
)

def __call__(self, previous_step: Iterator[TStart]) -> Iterator[TEnd]:
return self._action(previous_step)

@dataclass
class StepCount:
name: str
inputs: int
outputs: int


class Chain(Generic[TStart, TEnd]):
@overload
def __init__(
self,
previous_steps: Chain[TStart, TOther],
func: Callable[[Iterator[TOther]], Iterator[TEnd]],
):
...

@overload
def __init__(
self,
previous_steps: None,
func: Callable[[Iterator[TStart]], Iterator[TEnd]],
):
...

def __init__(
self,
previous_steps: Optional[Chain[TStart, TOther]],
func: Union[
Callable[[Iterator[TOther]], Iterator[TEnd]],
Callable[[Iterator[TStart]], Iterator[TEnd]],
],
) -> None:
self._previous_steps = previous_steps
self._func = CountedFunc(func)

def __call__(self, input_iterator: Iterator[TStart]) -> Iterator[TEnd]:
if self._previous_steps is None:
func = cast(CountedFunc[TStart, TEnd], self._func)
return func(input_iterator)

return self._func(self._previous_steps(input_iterator)) # type: ignore

def flat_map(
self, func: Callable[[Iterator[TEnd]], Iterator[TNewEnd]]
) -> Chain[TStart, TNewEnd]:
self, func: Callable[[Iterator[TEnd]], Iterator[TOther]]
) -> Chain[TStart, TOther]:
"""
Output zero or more elements from one or more input elements.
This is a fully general operation, that can arbitrarily transform the
stream of elements. It is the most powerful operation, and all the
other operations are implemented in terms of it.
"""

def new_action(previous_step: Iterator[TStart]) -> Iterator[TNewEnd]:
return func(self._action(previous_step))

return Chain(new_action)
return Chain(self, func)

def filter(self, func: Callable[[TEnd], bool]) -> Chain[TStart, TEnd]: # noqa: A003
"""
Remove elements from the stream that do not pass the filter function.
"""

def new_action(previous_step: Iterator[TEnd]) -> Iterator[TEnd]:
while element := take_next(previous_step):
while (element := take_next(previous_step)) is not None:
if func(element) is True:
yield element

new_action.__name__ = func.__name__
return self.flat_map(new_action)

def map( # noqa: A003
self, func: Callable[[TEnd], TNewEnd]
) -> Chain[TStart, TNewEnd]:
self, func: Callable[[TEnd], TOther]
) -> Chain[TStart, TOther]:
"""
Return a single transformed element from each input element.
"""

def new_action(previous_step: Iterator[TEnd]) -> Iterator[TNewEnd]:
while element := take_next(previous_step):
def new_action(previous_step: Iterator[TEnd]) -> Iterator[TOther]:
while (element := take_next(previous_step)) is not None:
yield func(element)

new_action.__name__ = func.__name__
return self.flat_map(new_action)

def map_tuple(
self, func: Callable[[Tuple[TEnd, ...]], TNewEnd], n: Optional[int] = None
) -> Chain[TStart, TNewEnd]:
self, func: Callable[[Tuple[TEnd, ...]], TOther], n: Optional[int] = None
) -> Chain[TStart, TOther]:
"""
Return a single transformed element from (up to) n input elements.
If n is None, then apply the function to all the elements, and return
an iterator of 1 element.
"""

def new_action(previous_step: Iterator[TEnd]) -> Iterator[TNewEnd]:
def new_action(previous_step: Iterator[TEnd]) -> Iterator[TOther]:
while elements := take_up_to_n(previous_step, n):
yield func(elements)

new_action.__name__ = func.__name__
return self.flat_map(new_action)

def get_counts(self) -> List[Dict[str, Any]]:
step_counts = []
if self._previous_steps is not None:
step_counts = self._previous_steps.get_counts()

inputs, outputs = self._func.get_counts()
step_counts.append(
{
"name": self._func.__name__,
"inputs": inputs,
"outputs": outputs,
}
)
return step_counts


class ChainStart(Chain[TOther, TOther]):
def __init__(self) -> None:
super().__init__(None, _identity)
13 changes: 9 additions & 4 deletions src/pipedata/core/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import functools
import itertools
from typing import (
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Expand All @@ -13,7 +15,7 @@
overload,
)

from .chain import Chain
from .chain import Chain, ChainStart

TStart = TypeVar("TStart")
TEnd = TypeVar("TEnd")
Expand Down Expand Up @@ -69,7 +71,10 @@ def to_list(
) -> List[TEnd]:
return list(itertools.islice(self, stop))

def get_counts(self) -> List[Dict[str, Any]]:
return self._chain.get_counts()

def start_stream(items: Iterable[TStart]) -> Stream[TStart]:
chain = Chain[TStart, TStart].start()
return Stream(items, chain)

class StreamStart(Stream[TEnd]):
def __init__(self, items: Iterable[TEnd]) -> None:
super().__init__(items, ChainStart[TEnd]())
Loading

0 comments on commit 8ae76d2

Please sign in to comment.