Edit message definition

Some ROS messages definitons differ between ROS1 and ROS2, most notably std_msgs/msg/Header. Even withing the same version of ROS, message definitions can evolve over time. As it is such a common case, rosbags handles std_msgs/msg/Header automatically when converting between rosbag1 and rosbag2. For all other types it is up to the end user to convert between message definitions afterwards.

Update CameraInfo

"""Example: Edit message definition.

Between ROS1 and ROS2 the CameraInfo message definition has changed.
The D, K, R, and P field names have been changed from upper case to
lower case. This example shows how to downgrade the messages in a
rosbag1 to the proper message definition.

"""

from __future__ import annotations

from typing import TYPE_CHECKING, cast

from rosbags.highlevel.anyreader import AnyReader
from rosbags.rosbag1 import Writer
from rosbags.typesys import Stores, get_types_from_msg, get_typestore

if TYPE_CHECKING:
    from pathlib import Path

    from rosbags.interfaces import ConnectionExtRosbag1
    from rosbags.typesys.stores.ros2_foxy import sensor_msgs__msg__CameraInfo

# Noetic camera info message definition, taken from:
# http://docs.ros.org/en/noetic/api/sensor_msgs/html/msg/CameraInfo.html
CAMERAINFO_DEFINITION = """
std_msgs/Header header
uint32 height
uint32 width
string distortion_model
float64[] D
float64[9] K
float64[9] R
float64[12] P
uint32 binning_x
uint32 binning_y
sensor_msgs/RegionOfInterest roi
"""


def downgrade_camerainfo_to_rosbag1(src: Path, dst: Path) -> None:
    """Edit message definitions in a rosbag1.

    Args:
        src: Rosbag1 source path.
        dst: Destination path.
        topic: Name of topic to remove.

    """
    typename = 'sensor_msgs/msg/CameraInfo'
    typestore = get_typestore(Stores.EMPTY)
    typestore.register(
        get_types_from_msg(CAMERAINFO_DEFINITION, typename),
    )
    CameraInfo = typestore.types[typename]  # noqa: N806

    with AnyReader([src]) as reader, Writer(dst) as writer:
        conn_map = {}

        for conn in reader.connections:
            ext = cast('ConnectionExtRosbag1', conn.ext)

            # Use updated message definition and md5sum for CameraInfo.
            if conn.msgtype == 'sensor_msgs/msg/CameraInfo':
                from_typestore = typestore
            else:
                from_typestore = reader.typestore

            conn_map[conn.id] = writer.add_connection(
                conn.topic,
                conn.msgtype,
                typestore=from_typestore,
                callerid=ext.callerid,
                latching=ext.latching,
            )

        for conn, timestamp, data in reader.messages():
            wconn = conn_map[conn.id]

            if conn.msgtype == 'sensor_msgs/msg/CameraInfo':
                msg = cast('sensor_msgs__msg__CameraInfo', reader.deserialize(data, conn.msgtype))
                converted_msg = CameraInfo(
                    header=msg.header,
                    height=msg.height,
                    width=msg.width,
                    distortion_model=msg.distortion_model,
                    # Map lower case names to upper case.
                    D=msg.d,
                    K=msg.k,
                    R=msg.r,
                    P=msg.p,
                    binning_x=msg.binning_x,
                    binning_y=msg.binning_y,
                    roi=msg.roi,
                )
                outdata: memoryview | bytes = typestore.serialize_ros1(converted_msg, wconn.msgtype)
            else:
                outdata = data

            writer.write(wconn, timestamp, outdata)