Source code for noob.node.return_
"""
Special Return sink that tube runners use to return values from :meth:`.TubeRunner.process`
"""
from collections import defaultdict
from typing import Any
from pydantic import PrivateAttr
from noob.event import MetaSignal
from noob.node.base import Node, Slot
from noob.node.spec import NodeSpecification
from noob.types import Epoch, EventMap
[docs]
class Return(Node):
"""
Special sink node that returns values from a tube runner's `process` method
"""
stateful: bool = False
_args: tuple | None = None
_kwargs: dict = PrivateAttr(default_factory=lambda: defaultdict(list))
_seen_epochs: set[tuple[Epoch, str]] = set()
[docs]
def process(self, *args: Any, __events: EventMap, **kwargs: Any) -> MetaSignal:
"""
Store the incoming value to retrieve later with :meth:`.get`
"""
if self._args is None:
self._args = args
else:
self._args += args
for key, val in kwargs.items():
if (__events[key]["epoch"], key) in self._seen_epochs:
continue
self._kwargs[key].append((__events[key]["epoch"], val))
self._seen_epochs.add((__events[key]["epoch"], key))
return MetaSignal.NoEvent
[docs]
def get(self, keep: bool) -> Any | None:
"""
Get the stored value from the process call, clearing it.
"""
if self._kwargs:
# sort by epoch and flatten if only one value received
kwargs = {}
for key, val in self._kwargs.items():
if len(val) == 1:
kwargs[key] = val[0][1]
else:
kwargs[key] = [item[1] for item in sorted(val, key=lambda i: i[0])]
else:
kwargs = {}
try:
# FIXME: what a nightmare - make all of these derive from the spec
if self._args and self.spec is not None and isinstance(self.spec.depends, str):
return self._args[0]
elif self._args and kwargs:
return self._args, kwargs
elif self._args:
return self._args
elif kwargs:
return kwargs
else:
return None
finally:
if not keep:
self._args = None
self._kwargs = defaultdict(list)
self._seen_epochs = set()
[docs]
@classmethod
def get_slots(cls, spec: NodeSpecification | None = None) -> dict[str, Slot]:
if spec is None or not spec.depends:
raise ValueError("Return nodes must have a specification that defines what they return")
if isinstance(spec.depends, str):
return {}
slots = {}
for dep in spec.depends:
if isinstance(dep, str):
continue
name = list(dep.keys())[0]
slots[name] = Slot(name=name, annotation=Any, required=False)
return slots