Register custom message types
Out of the box rosbags only supports the message types that ship with a default ROS2 distribution. If you want to (de)serialize custom messages you need to add them to the type system manually.
From rosbag1
"""Example: Register rosbag1 types."""
from __future__ import annotations
from typing import TYPE_CHECKING
from rosbags.rosbag1 import Reader
from rosbags.typesys import Stores, get_types_from_msg, get_typestore
if TYPE_CHECKING:
from pathlib import Path
def process_bag(src: Path) -> None:
"""Register contained messages types before processing bag.
Args:
src: Bag to process.
"""
typestore = get_typestore(Stores.EMPTY)
with Reader(src) as reader:
typs = {}
for conn in reader.connections:
typs.update(get_types_from_msg(conn.msgdef, conn.msgtype))
typestore.register(typs)
# Now all message types used in the bag are registered
# for conn, timestamp, data in reader.messages():
# ...
From definition string
"""Example: Register type from definition string."""
from rosbags.serde import serialize_cdr
from rosbags.typesys import Stores, get_types_from_msg, get_typestore
# Your custom message definition
STRIDX_MSG = """
string string
uint32 index
"""
typestore = get_typestore(Stores.ROS2_FOXY)
typestore.register(get_types_from_msg(STRIDX_MSG, 'custom_msgs/msg/StrIdx'))
StrIdx = typestore.types['custom_msgs/msg/StrIdx']
message = StrIdx(string='foo', index=42)
# Rawdata that can be passed to rosbag2.Writer.write
rawdata = serialize_cdr(message, message.__msgtype__)
From multiple files
"""Example: Register types from msg files."""
from pathlib import Path
from rosbags.typesys import Stores, get_types_from_msg, get_typestore
def guess_msgtype(path: Path) -> str:
"""Guess message type name from path."""
name = path.relative_to(path.parents[2]).with_suffix('')
if 'msg' not in name.parts:
name = name.parent / 'msg' / name.name
return str(name)
typestore = get_typestore(Stores.ROS2_FOXY)
add_types = {}
for pathstr in ['/path/to/custom_msgs/msg/Speed.msg', '/path/to/custom_msgs/msg/Accel.msg']:
msgpath = Path(pathstr)
msgdef = msgpath.read_text(encoding='utf-8')
add_types.update(get_types_from_msg(msgdef, guess_msgtype(msgpath)))
typestore.register(add_types)
Accel = typestore.types['custom_msgs/msg/Accel']
Speed = typestore.types['custom_msgs/msg/Speed']
Accel(42)
Speed(42)