Source code for noob.testing.nodes

import asyncio
import random
import string
from collections.abc import Generator, Iterator
from datetime import datetime
from itertools import count, cycle
from time import sleep
from typing import Annotated as A
from typing import Any

from faker import Faker
from pydantic import Field

from noob import Name, NodeSpecification, process_method
from noob.event import MetaSignal
from noob.node import Node, Signal, Slot
from noob.types import Epoch, EventMap


[docs] def count_source( limit: int = 10000, start: int = 0 ) -> Generator[A[int, Name("index")], None, None]: counter = count(start=start) if limit == 0: while True: yield next(counter) else: while (val := next(counter)) < limit: yield val
[docs] def letter_source() -> Generator[A[str, Name("letter")]]: yield from cycle(string.ascii_lowercase)
[docs] def word_source() -> Generator[A[str, Name("word")]]: fake = Faker() while True: word = fake.word() if len(word) > 1: yield word
[docs] def multi_words_source(n: int) -> Generator[A[list[str], Name("multi_words")]]: fake = Faker() while True: yield [fake.unique.word() for _ in range(n)]
[docs] def sporadic_word(every: int = 3) -> Generator[A[str, Name("word")] | None, None, None]: fake = Faker() i = 0 while True: i += 1 if i % every == 0: yield fake.word() else: yield None
[docs] def word_counts() -> Generator[tuple[A[str, Name("word")], A[list[int], Name("counts")]]]: fake = Faker() while True: n_counts = random.randint(2, 5) yield fake.unique.word(), [random.randint(1, 100) for _ in range(n_counts)]
[docs] def multiply(left: int, right: int = 2) -> int: """ Return value purposely unnamed, to be used as `{nodename}.value` """ return left * right
[docs] def divide(numerator: int, denominator: int = 5) -> A[float, Name("ratio")]: return numerator / denominator
[docs] def concat(strings: list[str]) -> str: return "".join(strings)
[docs] def multi_concat(**kwargs: list[str]) -> str: return "".join("".join(letter for letter in word) for word in kwargs.values())
[docs] def exclaim(string: str, hype: int = 1) -> str: return string + ("!" * hype)
[docs] def repeat(string: str, times: int) -> str: return string * times
[docs] def dictify(key: str, items: list[Any]) -> dict[str, Any]: return {key: items}
[docs] def error(value: Any) -> None: raise ValueError("This node just emits errors")
[docs] class CountSource(Node): limit: int = 1000 start: int = 0
[docs] def process(self) -> Generator[A[int, Name("index")], None, None]: counter = count(start=self.start) while (val := next(counter)) < self.limit: yield val
[docs] class UnannotatedGenerator(Node): limit: int = 1000 start: int = 0
[docs] def process(self): # noqa: ANN201 counter = count(start=self.start) while (val := next(counter)) < self.limit: yield val
[docs] class Multiply(Node):
[docs] def process(self, left: int, right: int = 2) -> A[int, Name("product")]: return multiply(left=left, right=right)
[docs] class VolumeProcess: def __init__(self, height: int = 2): self.height = height
[docs] def process(self, width: int, depth: int) -> A[int, Name("volume")]: return self.height * multiply(left=width, right=depth)
[docs] class Volume: def __init__(self, height: int = 2): self.height = height
[docs] @process_method def volume(self, width: int, depth: int) -> A[int, Name("volume")]: return self.height * multiply(left=width, right=depth)
[docs] class Now: def __init__(self): self.now = datetime.now()
[docs] @process_method def print(self, prefix: str = "Now: ") -> A[str, Name("timestamp")]: return f"{prefix}{self.now.isoformat()}"
[docs] class CountSourceDecor: def __init__(self, start: int = 0) -> None: self.gen = count(start=start)
[docs] @process_method def process(self) -> Generator[A[int, Name("count")], None, None]: yield from self.gen
[docs] def input_party( one: int, two: float, three: str, four: bool, five: list, six: dict, seven: set ) -> A[bool, Name("works")]: return True
[docs] def long_add(value: float, sleep_for: float = 0.25) -> float: sleep(sleep_for) return value + 1
[docs] async def number_to_letter(number: int, offset: int = 0) -> str: sleep_for = random.random() / 10 await asyncio.sleep(sleep_for) return string.ascii_lowercase[(number + offset) % len(string.ascii_lowercase)]
[docs] class NumberToLetterCls: def __init__(self, offset: int = 0): self.offset = offset
[docs] async def process(self, number: int) -> str: sleep_for = random.random() / 10 await asyncio.sleep(sleep_for) return string.ascii_lowercase[(number + self.offset) % len(string.ascii_lowercase)]
[docs] async def async_error(value: Any) -> None: """Just raise an error!""" raise ValueError("This is the error that should be raised")
[docs] class StatefulMultiply: def __init__(self, start: int = 0) -> None: self.start = start self.current = self.start
[docs] def process(self, left: float, right: float = 1) -> float: value = left * right * self.current self.current += 1 return value
[docs] def fast_forward(generator: count, n: int = 1) -> tuple[A[int, Name("next")]]: for _ in range(n): val = next(generator) return val
[docs] def jump(generator: count, n: int = 1) -> tuple[A[count, Name("skirttt")], A[int, Name("next")]]: value = 0 for _ in range(n): value = next(generator) return generator, value
[docs] def rewind(generator: count, n: int = 1) -> A[count, Name("skrittt")]: """Purposely designed to mutate the input and return a new object""" return count(next(generator) - n)
[docs] def zip_iter(*args: Iterator) -> tuple[Any, ...]: return tuple(next(a) for a in args)
[docs] def increment( iterator: Iterator[int], increment: int = 1 ) -> tuple[A[Iterator[int], Name("iterator")], A[int, Name("value")]]: for _ in range(increment): value = next(iterator) return iterator, value
[docs] def passthrough(value: Any, *args: Any, **kwargs: Any) -> Any: return value, args, kwargs
[docs] class InitCounter(Node): """Count how many times we have been initialized and deinitalized""" def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) self._inits = 0 self._deinits = 0
[docs] def process(self) -> tuple[A[int, Name("inits")], A[int, Name("deinits")]]: # sleep to just not have this flood the networking modules. sleep(0.01) return self._inits, self._deinits
[docs] def init(self) -> None: self._inits += 1
[docs] def deinit(self) -> None: self._deinits += 1
[docs] def inject_epoch(epoch: Epoch) -> Epoch: return epoch
[docs] def inject_eventmap(special_value: Any, events: EventMap) -> EventMap: return events
[docs] def just_wait(value: Any, wait_for: float = 0.5) -> Any: sleep(wait_for) return value
[docs] def switch() -> ( Generator[tuple[A[str, Name("fruits")], A[str, Name("vegetables")], A[str, Name("minerals")]]] ): """Yield in a cycle from different signals, noeventing in the others""" fruits = cycle(["apple", "banana", "cherry"]) vegetables = cycle(["daikon", "eggplant", "fiddlehead"]) minerals = cycle(["galaxite", "halite", "iolite"]) while True: yield next(fruits), MetaSignal.NoEvent, MetaSignal.NoEvent yield MetaSignal.NoEvent, next(vegetables), MetaSignal.NoEvent yield MetaSignal.NoEvent, MetaSignal.NoEvent, next(minerals)
[docs] def this_or_that( this: Any | None = None, that: Any | None = None, the_other: Any | None = None ) -> dict: """Optional inputs!""" ret = {} if this is not None: ret["this"] = this if that is not None: ret["that"] = that if the_other is not None: ret["the_other"] = the_other return ret
[docs] class DynamicSignals(Node): """Node whose signals and slots are dynamic""" signals_: list[str] = Field(default_factory=list) slots_: list[str] = Field(default_factory=list)
[docs] def process(self, **kwargs: Any) -> Any | None: return tuple(v for v in kwargs.values())
[docs] @classmethod def get_signals(cls, spec: NodeSpecification | None = None) -> dict[str, Signal]: if not spec or not spec.params: return {} return {sig: Signal(name=sig, annotation=Any) for sig in spec.params.get("signals_", [])}
[docs] @classmethod def get_slots(cls, spec: NodeSpecification | None = None) -> dict[str, Slot]: if not spec or not spec.params: return {} return { sig: Slot(name=sig, annotation=Any, required=True) for sig in spec.params.get("slots_", []) }