Edit rosbags

Rosbags does not support opening files in read-write mode, but implicitly enforces copy-on-write semantics. Apart from the mapping of reader to writer connections the process is fairly straightforward.

Remove topic

"""Example: Remove topic."""

from __future__ import annotations

from typing import TYPE_CHECKING, cast

from rosbags.interfaces import ConnectionExtRosbag2
from rosbags.rosbag2 import Reader, Writer

if TYPE_CHECKING:
    from pathlib import Path


def remove_topic(src: Path, dst: Path, topic: str) -> None:
    """Remove topic from rosbag2.

    Args:
        src: Source path.
        dst: Destination path.
        topic: Name of topic to remove.

    """
    with Reader(src) as reader, Writer(dst) as writer:
        conn_map = {}
        for conn in reader.connections:
            if conn.topic == topic:
                continue
            ext = cast(ConnectionExtRosbag2, conn.ext)
            conn_map[conn.id] = writer.add_connection(
                conn.topic,
                conn.msgtype,
                serialization_format=ext.serialization_format,
                offered_qos_profiles=ext.offered_qos_profiles,
            )

        rconns = [reader.connections[x] for x in conn_map]
        for conn, timestamp, data in reader.messages(connections=rconns):
            writer.write(conn_map[conn.id], timestamp, data)

Edit timestamps

"""Example: Edit timestamps."""

from __future__ import annotations

from typing import TYPE_CHECKING, cast

from rosbags.interfaces import ConnectionExtRosbag2
from rosbags.rosbag2 import Reader, Writer
from rosbags.typesys import Stores, get_typestore

if TYPE_CHECKING:
    from pathlib import Path


def offset_timestamps(src: Path, dst: Path, offset: int) -> None:
    """Offset timestamps.

    Args:
        src: Source path.
        dst: Destination path.
        offset: Amount of nanoseconds to offset timestamps.

    """
    typestore = get_typestore(Stores.ROS2_FOXY)
    with Reader(src) as reader, Writer(dst) as writer:
        conn_map = {}
        for conn in reader.connections:
            ext = cast(ConnectionExtRosbag2, conn.ext)
            conn_map[conn.id] = writer.add_connection(
                conn.topic,
                conn.msgtype,
                serialization_format=ext.serialization_format,
                offered_qos_profiles=ext.offered_qos_profiles,
            )

        for conn, timestamp, data in reader.messages():
            # Adjust header timestamps, too
            msg = typestore.deserialize_cdr(data, conn.msgtype)
            if head := getattr(msg, 'header', None):
                headstamp = head.stamp.sec * 10**9 + head.stamp.nanosec + offset
                head.stamp.sec = headstamp // 10**9
                head.stamp.nanosec = headstamp % 10**9
                outdata: memoryview | bytes = typestore.serialize_cdr(msg, conn.msgtype)
            else:
                outdata = data

            writer.write(conn_map[conn.id], timestamp + offset, outdata)