333 Telemetry collection is configurable down to the Python module level

Leading to it is (Injection #81 Use structured logging to emit telemetry inline).

This is a large project. Basically, the goal is to replace our custom logging framework for all the scientific data with a logging-like telemetry pipeline that emits snapshot events. Essentially, replacing tbp.monty/src/tbp/monty/frameworks/loggers/graph_matching_loggers.py at main · thousandbrainsproject/tbp.monty · GitHub and everything that interacts with it. That is, no longer collecting data at hardcoded step, episode, or epoch boundaries, but instead emit them inline with appropriate event metadata so that any application reading the telemetry snapshot stream can construct whatever it needs for data analysis/visualization purposes.

The Current Reality Tree Undesirable Effect (103 Platform internal representations are difficult to visualize) contains more context on the problem we are running into.

I have only thought through the design of this at a very high level. Essentially, what I’m thinking is something analogous to the logger interface, but for telemetry.

So, where we have logger:

logger = logging.getLogger(__name__)

# ...

logger.error(...)
logger.warn(...)
logger.info(...)
logger.debug(...)

We would have an analogous pipeline (using logging as the implementation detail) that would emit telemetry events:

telemetry = monty.getTelemetry(__name__)

# ...

telemetry.snapshot(...) # some scientific data
telemetry.snapshot(...) # some scientific data

This way, we could reuse all the existing logging infrastructure to select, handle, and filter scientific data emitted by the application.

Aside from the emit side of things, this project also requires rethinking and redoing all of the live and offline visualizations.

This is a large topic. I think we’ll need an RFC to keep everyone on the same page. Before we start going through all the details, I want to see if this is of interest first.

Regarding the splitting of the work. I’m not yet sure if it can be split up. However, if we were to try…

One constraint on what we’d merge into tbp.monty is that we do not want to include dead code. So, practically speaking, if there’s a PR with an unused class, I think it is very unlikely to get merged.

If the work were split, I think we might end up needing to add new telemetry without removing the old way. This can then be done in parts. For example, all learning modules now (also) emit telemetry in a new way. All sensor modules, environments, habitat, mujoco.. I’m not sure what the best way of breaking it up will be, but there’s probably something that will make sense. However, always adding a full implementation to a portion of Monty, as opposed to a partial implementation to none of Monty.

The reason I think we might want to emit both telemetry types is that tools rely on the current way telemetry is aggregated. Even with all the new telemetry in place, there is a tail of tools that would need to be updated. Only once those tools are updated (to be enumerated), would we then fully transition to the new telemetry.

3 Likes

I started hacking something together; work in progress, needs more refining, not functional yet.

Calling for example telemetry.snapshot(logging.INFO, event) forwards the event as a LogRecord with the TelemetryEvent instance stuffed into the record’s extra dict. A QueueHandler attached to that logger puts the record onto a process-level queue.Queue without blocking the emitting thread.

On the other side, a QueueListener drains that queue on a single background thread and fans out each record to its registered handlers. One of those handlers is a TelemetryBroker, which maintains a dict of subscriber queues keyed by schema_id. For each incoming record it puts the event onto whichever consumer queues have subscribed to that schema. Other handlers (plotter, disk save, out-of-band dispatcher, etc.) receive the same record independently.

A consumer calls broker.subscribe("monty.some.schema_id") which returns a queue.Queue. It drains that queue however suits it, whether synchronously at a checkpoint, or on its own thread blocking on q.get(). Level filtering inherited from logging means a handler or consumer can ignore everything below a chosen level without any change to emitting code.

@dataclass
class TelemetryEvent:
    """Base for all telemetry snapshot events."""
    schema_id: ClassVar[str]
    schema_version: ClassVar[int] = 1

    emitter: str  # __name__ of the emitting module
    timestamp: float = field(default_factory=time.monotonic)
    episode: int
    step: int
    mode: str = "eval"  # "train" / "eval"

@dataclass
class EpisodeStepEvent(TelemetryEvent):
    """Example event for `MontyExperiment.env_interface.step`."""
    schema_id = "monty.env_interface.step"
    observations: Observations
    state: ProprioceptiveState

class Telemetry:
    def __init__(self, logger: logging.Logger):
        self._logger = logger

    def snapshot(self, level: int, event: TelemetryEvent):
        """Emit a structured telemetry event at the specified level.

        Args:
            level: logging level (logging.DEBUG, logging.INFO, etc.)
            event: The TelemetryEvent dataclass instance.
        """
        self._logger.log(
            level,
            event.schema_id,  # msg is the schema id for text sinks
            extra={"telemetry_event": event},
            stacklevel=2,  # reports the stack frame that called `snapshot()`
        )

class TelemetryBroker:
    """Sits between QueueListener and consumers.

    Registered as a handler with QueueListener. Fans out each event to whichever
    consumer queues have subscribed to that schema_id.
    """

    def __init__(self):
        self._lock = threading.Lock()
        self._subscriptions: dict[str, list[queue.Queue]] = defaultdict(list)

    def subscribe(self, schema_id: str, maxsize: int = 0) -> queue.Queue:
        """Returns a queue that will receive all events matching schema_id."""
        q = queue.Queue(maxsize=maxsize)
        with self._lock:
            self._subscriptions[schema_id].append(q)
        return q

    def unsubscribe(self, schema_id: str, q: queue.Queue):
        with self._lock:
            self._subscriptions[schema_id].remove(q)

    def emit(self, record: logging.LogRecord) -> None:
        event = record.__dict__.get("telemetry_event")
        if event is None:
            return
        with self._lock:
            queues = list(self._subscriptions.get(event.schema_id, []))
        for q in queues:
            try:
                q.put_nowait(event)
            except queue.Full:
                pass  # drop or log; don't block the listener thread

    def handle(self, record: logging.LogRecord):
        self.emit(record)

    # QueueListener expects a logging.Handler-like object
    def createLock(self):
        pass

    def acquire(self):
        pass

    def release(self):
        pass

#--------------------------------------------------------------------------------------

class MontyBase(Monty):
    [...]

    def __init__(
        [...]
    ):
        [...]
        self.broker = TelemetryBroker()
        self._telemetry_queue: queue.Queue
        self._telemetry_listener: QueueListener
        self._configure_telemetry(handlers=[self.broker])

    def _configure_telemetry(self, handlers: Sequence[logging.Handler]):
        _telemetry_queue = queue.Queue()  # TODO: maxsize?

        # QueueListener fans out to real handlers on its own thread
        self._telemetry_listener = QueueListener(
            _telemetry_queue,
            *handlers,
            respect_handler_level=True,  # abide by each handler's own level filter
        )
        self._telemetry_listener.start()

    def _shutdown_telemetry():
        if _telemetry_listener is not None:
            _telemetry_listener.stop()
        # Unblock any consumers waiting on q.get()
        for queues in broker._subscriptions.values():
            for q in queues:
                q.put(None)

    def getTelemetry(self, name: str) -> Telemetry:
        logger = logging.getLogger(f"telemetry.{name}")
        if not logger.handlers:
            if self._telemetry_queue is None:
                raise RuntimeError(
                    "configure_telemetry() must be called before getTelemetry()"
                )
            logger.addHandler(logging.handlers.QueueHandler(self._telemetry_queue))
            logger.setLevel(logging.DEBUG)  # default val, overridden by handler
            logger.propagate = False
        return Telemetry(logger)

#--------------------------------------------------------------------------------------

# Example of consumer running on its own thread (doesn't compile, just a quick example)
def live_plotter_consumer(q: queue.Queue) -> None:
    while True:
        event: EpisodeStepEvent = q.get()  # blocks until an event arrives
        if event is None:  # sentinel for shutdown
            break
        live_plotter.show_observations(
            *live_plotter.hardcoded_assumptions(event.observations, model),
            event.step
        )

env_step_q = broker.subscribe("monty.env_interface.step")
threading.Thread(target=live_plotter_consumer, args=(env_step_q,), daemon=True).start()
2 Likes

Very nice. The overall approach looks great. I’m sure I’ll pick at some details in PRs, but the shape of things looks good. Thank you for putting this together.

A follow-up thought… one of the things that might take some time is designing events and how to reassemble them into all the visualizations we currently have. I wonder if there is a shim that would fit into the telemetry infrastructure you’re setting up that could do the aggregation of telemetry like currently being done in post_episode logger stuff in tbp.monty/src/tbp/monty/frameworks/loggers/graph_matching_loggers.py at 7f242b532be9fac1dc9adf7967eed8f85616a905 · thousandbrainsproject/tbp.monty · GitHub . This would be somewhat like what you demonstrated with the live plotter code. I’m thinking there’d be some “legacy” handler and the experiment could call post_episode (which eventually ends up calling log_episode/report_episode and such), and it would output what the current monty_handlers.py and wandb_handlers.py output. Does that make sense? I wouldn’t worry about it right now, but it’s something to consider after you feel you’ve got the telemetry setup the way you like it.

First, congratulations! Next, of course, some quibbles…

The word [Tt]elemetry is quite overloaded. Even in the context of software systems, it can refer to a general approach or to specific packages. For example, in the Elixir world, it commonly means either the telemetry or the opentelemetry-erlang library:

Telemetry is a lightweight library for dynamic dispatching of events, with a focus on metrics and instrumentation. Any Erlang or Elixir library can use telemetry to emit events. Application code and other libraries can then hook into those events and run custom handlers.

Note: this library is agnostic to tooling and therefore is not directly related to OpenTelemetry. For OpenTelemetry in the Erlang VM, see opentelemetry-erlang, and check opentelemetry_telemetry to connect both libraries.

FWIW, I suspect that the Cloud Native Computing Foundation (CNCF)'s OpenTelemetry system is going to be the Golden Path for this sort of thing. Basically, it checks all the usual boxes for licensing, industry buy-in, etc. (:-):

OpenTelemetry is an open source observability framework for cloud native software. It provides a single set of APIs, libraries, agents, and collector services to capture distributed traces and metrics from your application.

OpenTelemetry is an open source observability framework created when CNCF merged the OpenTracing and OpenCensus projects. OpenTracing offers “consistent, expressive, vendor-neutral APIs for popular platforms” while the Google-created OpenCensus project acts as a “collection of language-specific libraries for instrumenting an application, collecting stats (metrics), and exporting data to a supported backend.” Under OpenTelemetry, the projects create a “complete telemetry system [that is] suitable for monitoring microservices and other types of modern, distributed systems — and [is] compatible with most major OSS and commercial backends.” It is the “second most active” CNCF project. In October 2020, AWS announced the public preview of its distro for OpenTelemetry.

OpenTelemetry has a number of useful-sounding affordances, including access to the Prometheus suite. As the Goog puts it:

Prometheus is an open-source systems and service monitoring system that collects and stores metrics as time-series data, specializing in multi-dimensional data models and high-performance querying (PromQL). It commonly monitors Kubernetes and microservices using a pull model via HTTP, scraping metrics from targets to enable powerful alerting and dashboarding.

Wikipedia sez:

Prometheus is a free software application for event monitoring and alerting. It records metrics in a time series database built using an HTTP pull model, supporting high dimensionality through key-value label pairs, flexible queries, and real-time alerting. The project is written in Go and licensed under the Apache 2.0 License, with source code available on GitHub.

Please consider making OpenTelemetry a supported target for Monty’s nascent telemetry offerings. For extra credit, provide a way to access and/or request the telemetry via the Model Context_Protocol (MCP). And a pony…

The discussed system is for internal Monty comms with native Python types, whereas OTel is for publishing standardized serialized data to the outside world. Different use cases :wink: One could write an OpenTelemetryHandler(logging.Handler) class to connect these internals to OTel via e.g. self._configure_telemetry(handlers=[self.broker, OpenTelemetryHandler()]), should the need arise.

Definitely, I suppose visualizations would likely need their own threading.Thread, so they can block while waiting telemetry events. I’ll try a couple different approaches with the live plotter to see what feels right.

Yeah I’ve been looking at that as well, but I haven’t settled yet. I was kinda thinking about a step aggregator handler that would subscribe to step-level telemetry, then emit episode-level telemetry, which could itself be aggregated by an experiment-level handler, which could then be emitted out to monty_handlers-like and wandb_handlers-like consumers. Something along those lines. But first I wanna dial in the telemetry core using live plotter as the lab rat.

2 Likes

A little progress report; I put together a telemetry core prototype, managed to get it working with the live plotter for observations and proprioceptive state:

Tested it with

python run.py experiment=base_config_10distinctobj_dist_agent experiment.config.show_sensor_output=true

I initially wanted to run the plotter in a separate thread, but turns out matplotlib and Habitat both hate running in a thread, so right now it’s not threaded. The snapshot is emitted after MontyObjectRecognitionExperiment.env_interface.step then the live plotter consumes the snapshot in a blocking manner on the main thread.

I created ThreadedTelemetryConsumer and MultiprocessTelemetryConsumer classes as part of my shenanigans, I think the plotter could eventually live in its own multiprocessing.Process, but further data separation is needed, as to keep pickling down to a minimum.

MultiprocessTelemetryConsumer is itself a ThreadedTelemetryConsumer, its thread pickles incoming events and sends them to the mp.Process.

Will keep at it

2 Likes

Sounds like good progress @AgentRev.

I want to highlight that getting the live plotter to work is a nice demonstration of telemetry working, but keep in mind that the telemetry change is distinct from converting the live plotter to consume the telemetry format.

For example, it may be more useful to use “off-the-shelf” technology like https://rerun.io/ rather than continuing with custom visualizations.

Yeah I was just using the plotter as a convenient guinea pig, before moving on to graph matching loggers.

1 Like