Source code for noob.node.return_

"""
Special Return sink that tube runners use to return values from :meth:`.TubeRunner.process`
"""

from collections import defaultdict
from typing import Any

from pydantic import PrivateAttr

from noob.event import MetaSignal
from noob.node.base import Node, Slot
from noob.node.spec import NodeSpecification
from noob.types import Epoch, EventMap


[docs] class Return(Node): """ Special sink node that returns values from a tube runner's `process` method """ stateful: bool = False _args: tuple | None = None _kwargs: dict = PrivateAttr(default_factory=lambda: defaultdict(list)) _seen_epochs: set[tuple[Epoch, str]] = set()
[docs] def process(self, *args: Any, __events: EventMap, **kwargs: Any) -> MetaSignal: """ Store the incoming value to retrieve later with :meth:`.get` """ if self._args is None: self._args = args else: self._args += args for key, val in kwargs.items(): if (__events[key]["epoch"], key) in self._seen_epochs: continue self._kwargs[key].append((__events[key]["epoch"], val)) self._seen_epochs.add((__events[key]["epoch"], key)) return MetaSignal.NoEvent
[docs] def get(self, keep: bool) -> Any | None: """ Get the stored value from the process call, clearing it. """ if self._kwargs: # sort by epoch and flatten if only one value received kwargs = {} for key, val in self._kwargs.items(): if len(val) == 1: kwargs[key] = val[0][1] else: kwargs[key] = [item[1] for item in sorted(val, key=lambda i: i[0])] else: kwargs = {} try: # FIXME: what a nightmare - make all of these derive from the spec if self._args and self.spec is not None and isinstance(self.spec.depends, str): return self._args[0] elif self._args and kwargs: return self._args, kwargs elif self._args: return self._args elif kwargs: return kwargs else: return None finally: if not keep: self._args = None self._kwargs = defaultdict(list) self._seen_epochs = set()
[docs] @classmethod def get_slots(cls, spec: NodeSpecification | None = None) -> dict[str, Slot]: if spec is None or not spec.depends: raise ValueError("Return nodes must have a specification that defines what they return") if isinstance(spec.depends, str): return {} slots = {} for dep in spec.depends: if isinstance(dep, str): continue name = list(dep.keys())[0] slots[name] = Slot(name=name, annotation=Any, required=False) return slots