Skip to content

Trajectory

Trajectory data structures and utilities for RL training.

This module provides classes for collecting and managing trajectory data during RL training, including trajectory storage, data loading, advantage computation, and experience replay. It implements the TrajectoryData dataset interface and trajectory collection utilities.

Attributes:

Name Type Description
DYNAMIC_ATTRS list[str]

List of dynamic attributes that are computed during trajectory processing.

TopKAdvantageSampler(data_source, num_samples)

Bases: Sampler[int]

Sampler that yields indices of top-K advantage samples in random order.

Selects the top-K samples with highest absolute advantage values for experience replay. This focuses training on the most impactful samples.

Attributes:

Name Type Description
data_source TrajectoryData

The trajectory dataset.

num_samples int

Maximum number of top samples to include.

top_k_indices Tensor

Indices of the top-K samples.

Source code in mlir_rl_artifact/trajectory.py
def __init__(self, data_source: 'TrajectoryData', num_samples: int):
    self.data_source = data_source
    self.num_samples = num_samples

    # Get all advantage values from the dataset
    advantages = self.data_source.advantages

    # Ensure we don't request more samples than available
    self.num_samples = min(self.num_samples, advantages.size(0))

    _, self.top_k_indices = torch.topk(advantages.abs(), k=self.num_samples)

__iter__()

Returns an iterator over shuffled indices of the top-k samples. This is called by the DataLoader at the start of each epoch.

Yields:

Type Description
int

An iterator over shuffled indices of the top-k samples.

Source code in mlir_rl_artifact/trajectory.py
def __iter__(self) -> Iterator[int]:
    """Returns an iterator over shuffled indices of the top-k samples.
    This is called by the DataLoader at the start of each epoch.

    Yields:
        An iterator over shuffled indices of the top-k samples.
    """
    # Shuffle the top-k indices to ensure random order
    shuffled_indices = self.top_k_indices[torch.randperm(self.num_samples)]

    # Yield the indices one by one
    yield from shuffled_indices.tolist()

__len__()

The total number of samples to be drawn.

Returns:

Type Description
int

The total number of samples to be drawn.

Source code in mlir_rl_artifact/trajectory.py
def __len__(self) -> int:
    """The total number of samples to be drawn.

    Returns:
        The total number of samples to be drawn.
    """
    return self.num_samples

TrajectoryData(num_loops, actions_index, obs, next_obs, actions_bev_log_p, rewards, done)

Bases: Dataset

Dataset to store the trajectory data.

Attributes:

Name Type Description
sizes list[int]

List of sizes of all the included trajectories

num_loops Tensor

Number of loops in the trajectory.

actions_index Tensor

Actions in the trajectory.

obs Tensor

Observations in the trajectory.

next_obs Tensor

Observations of next states in the trajectory.

actions_bev_log_p Tensor

Action log probabilities following behavioral policy in the trajectory.

rewards Tensor

Rewards in the trajectory.

done Tensor

Done flags in the trajectory.

values Tensor

Values of actions in the trajectory.

next_values Tensor

Values of actions in the trajectory with one additional step (shifted to one step in the future).

actions_old_log_p Tensor

Action log probabilities following old policy in the trajectory.

off_policy_rates Tensor

Off-policy rates (rho) for the current policy.

returns Tensor

Returns in the trajectory.

advantages Tensor

Advantages in the trajectory.

Parameters:

Name Type Description Default
num_loops Tensor

Number of loops in the trajectory.

required
actions_index Tensor

Actions in the trajectory.

required
obs Tensor

Observations in the trajectory.

required
next_obs Tensor

Observations of next states in the trajectory.

required
actions_bev_log_p Tensor

Action log probabilities following behavioral policy in the trajectory.

required
rewards Tensor

Rewards in the trajectory.

required
done Tensor

Done flags in the trajectory.

required
Source code in mlir_rl_artifact/trajectory.py
def __init__(
    self,
    num_loops: torch.Tensor,
    actions_index: torch.Tensor,
    obs: torch.Tensor,
    next_obs: torch.Tensor,
    actions_bev_log_p: torch.Tensor,
    rewards: torch.Tensor,
    done: torch.Tensor
):
    """Initialize the trajectory dataset.

    Args:
        num_loops: Number of loops in the trajectory.
        actions_index: Actions in the trajectory.
        obs: Observations in the trajectory.
        next_obs: Observations of next states in the trajectory.
        actions_bev_log_p: Action log probabilities following behavioral policy in the trajectory.
        rewards: Rewards in the trajectory.
        done: Done flags in the trajectory.
    """
    self.num_loops = num_loops
    self.actions_index = actions_index
    self.obs = obs
    self.next_obs = next_obs
    self.actions_bev_log_p = actions_bev_log_p
    self.rewards = rewards
    self.done = done

    self.sizes = [len(self)]

__len__()

Get the length of the trajectory.

Returns:

Type Description
int

The length of the trajectory.

Source code in mlir_rl_artifact/trajectory.py
def __len__(self) -> int:
    """Get the length of the trajectory.

    Returns:
        The length of the trajectory.
    """
    return self.obs.size(0)

__getitem__(idx)

Get a single timestep from the trajectory.

Parameters:

Name Type Description Default
idx int

Index of the timestep to retrieve.

required

Returns:

Type Description
tuple[Tensor, ...]

A tuple containing the timestep data.

Source code in mlir_rl_artifact/trajectory.py
def __getitem__(self, idx: int) -> tuple[torch.Tensor, ...]:
    """Get a single timestep from the trajectory.

    Args:
        idx: Index of the timestep to retrieve.

    Returns:
        A tuple containing the timestep data.
    """
    return (
        self.num_loops[idx],
        self.actions_index[idx],
        self.obs[idx],
        self.next_obs[idx],
        self.actions_bev_log_p[idx],
        self.rewards[idx],
        self.done[idx],

        self.values[idx],
        self.next_values[idx],
        self.actions_old_log_p[idx],
        self.off_policy_rates[idx],
        self.returns[idx],
        self.advantages[idx],
    )

__add__(other)

Concatenate this trajectory with another.

Parameters:

Name Type Description Default
other TrajectoryData

The other trajectory to concatenate with

required

Returns:

Type Description
TrajectoryData

The trajectory containing both

Source code in mlir_rl_artifact/trajectory.py
def __add__(self, other: 'TrajectoryData') -> 'TrajectoryData':
    """Concatenate this trajectory with another.

    Args:
        other: The other trajectory to concatenate with

    Returns:
        The trajectory containing both
    """
    self_other_sizes = self.sizes + other.sizes

    # Truncate to 10 trajectories
    self_other_sizes = self_other_sizes[-Config().replay_count:]
    start = - sum(self_other_sizes)
    assert len(self_other_sizes) <= Config().replay_count

    self_other = TrajectoryData(
        torch.cat((self.num_loops, other.num_loops))[start:],
        torch.cat((self.actions_index, other.actions_index))[start:],
        torch.cat((self.obs, other.obs))[start:],
        torch.cat((self.next_obs, other.next_obs))[start:],
        torch.cat((self.actions_bev_log_p, other.actions_bev_log_p))[start:],
        torch.cat((self.rewards, other.rewards))[start:],
        torch.cat((self.done, other.done))[start:],
    )
    for attr in DYNAMIC_ATTRS:
        if hasattr(self, attr) and hasattr(other, attr):
            self_val = getattr(self, attr)
            other_val = getattr(other, attr)
            assert isinstance(self_val, torch.Tensor) and isinstance(other_val, torch.Tensor)
            setattr(self_other, attr, torch.cat((self_val, other_val))[start:])

    self_other.sizes = self_other_sizes

    assert len(self_other) == sum(self_other_sizes)

    return self_other

loader(batch_size, num_trajectories)

Create a DataLoader for the trajectory.

Parameters:

Name Type Description Default
batch_size int | None

Batch size for the DataLoader (None for full trajectory).

required
num_trajectories int

Number of trajectories to use for training.

required

Returns:

Type Description
DataLoader

The DataLoader for the trajectory.

Source code in mlir_rl_artifact/trajectory.py
def loader(self, batch_size: Optional[int], num_trajectories: int) -> DataLoader:
    """Create a DataLoader for the trajectory.

    Args:
        batch_size: Batch size for the DataLoader (None for full trajectory).
        num_trajectories: Number of trajectories to use for training.

    Returns:
        The DataLoader for the trajectory.
    """
    num_samples = sum(self.sizes[-num_trajectories:])
    if batch_size is None:
        batch_size = num_samples
    match Config().reuse_experience:
        case 'topk':
            sampler = TopKAdvantageSampler(self, num_samples)
        case 'random':
            sampler = RandomSampler(self, num_samples=num_samples)
        case 'none':
            sampler = None

    return DataLoader(
        self,
        batch_size=batch_size,
        shuffle=sampler is None,
        sampler=sampler,
        pin_memory=device.type != 'cpu',
        drop_last=True
    )

copy()

Copy the trajectory.

Returns:

Type Description
TrajectoryData

The copied trajectory.

Source code in mlir_rl_artifact/trajectory.py
def copy(self) -> 'TrajectoryData':
    """Copy the trajectory.

    Returns:
        The copied trajectory.
    """
    self_copy = TrajectoryData(
        num_loops=self.num_loops.clone(),
        actions_index=self.actions_index.clone(),
        obs=self.obs.clone(),
        next_obs=self.next_obs.clone(),
        actions_bev_log_p=self.actions_bev_log_p.clone(),
        rewards=self.rewards.clone(),
        done=self.done.clone(),
    )
    for attr in DYNAMIC_ATTRS:
        if hasattr(self, attr):
            attr_val = getattr(self, attr)
            assert isinstance(attr_val, torch.Tensor)
            setattr(self_copy, attr, attr_val.clone())

    self_copy.sizes = self.sizes.copy()

    return self_copy

update_attributes(model)

Update the attributes of the trajectory following the new model.

Parameters:

Name Type Description Default
model HiearchyModel

The model to use for updating the attributes.

required
Source code in mlir_rl_artifact/trajectory.py
def update_attributes(self, model: Model):
    """Update the attributes of the trajectory following the new model.

    Args:
        model: The model to use for updating the attributes.
    """
    start = time()

    actions_old_log_p, values, _ = model(self.obs.to(device), self.actions_index.to(device))
    self.actions_old_log_p, self.values = actions_old_log_p.cpu(), values.cpu()

    self.next_values = model.value_model(self.next_obs.to(device)).cpu()

    self.__compute_rho()
    self.__compute_returns()
    self.__compute_gae()
    end = time()
    time_ms = int((end - start) * 1000)
    print_info(f"Updated {len(self)} attributes in {time_ms}ms")

__compute_rho()

Compute the off-policy rate (rho) for the current policy.

Returns:

Type Description
Tensor

The off-policy rate.

Source code in mlir_rl_artifact/trajectory.py
def __compute_rho(self) -> torch.Tensor:
    """Compute the off-policy rate (rho) for the current policy.

    Returns:
        The off-policy rate.
    """
    if 'epsilon' not in Config().exploration and Config().reuse_experience == 'none':
        self.off_policy_rates = torch.ones_like(self.actions_bev_log_p)
        return

    self.off_policy_rates = torch.exp(torch.clamp(self.actions_old_log_p - self.actions_bev_log_p, -80.0, 80.0))

__compute_returns(gamma=1.0)

Compute the returns.

Parameters:

Name Type Description Default
gamma float

discount factor. Defaults to 1.

1.0

Returns:

Type Description
Tensor

The returns.

Source code in mlir_rl_artifact/trajectory.py
def __compute_returns(self, gamma: float = 1.0) -> torch.Tensor:
    """Compute the returns.

    Args:
        gamma: discount factor. Defaults to 1.

    Returns:
        The returns.
    """
    self.returns = torch.zeros(len(self), dtype=torch.float32)
    last_return = 0

    for t in reversed(range(len(self))):
        mask = ~self.done[t]
        last_return = last_return * mask

        last_return = self.values[t] + (self.rewards[t] + gamma * last_return - self.values[t]) * self.off_policy_rates[t].clamp_max(1)

        self.returns[t] = last_return

__compute_gae(gamma=1.0, lambda_=0.95)

Compute the Generalized Advantage Estimation.

Parameters:

Name Type Description Default
gamma float

discount factor.

1.0
lambda_ float

GAE factor.

0.95

Returns:

Type Description
Tensor

The advantages.

Source code in mlir_rl_artifact/trajectory.py
def __compute_gae(self, gamma: float = 1.0, lambda_: float = 0.95) -> torch.Tensor:
    """Compute the Generalized Advantage Estimation.

    Args:
        gamma: discount factor.
        lambda_: GAE factor.

    Returns:
        The advantages.
    """
    self.advantages = torch.zeros(len(self), dtype=torch.float32)
    last_advantage = 0

    for t in reversed(range(len(self))):
        mask = ~self.done[t]
        last_value = self.next_values[t] * mask
        last_advantage = last_advantage * mask

        delta = self.rewards[t] + gamma * last_value - self.values[t]
        last_advantage = delta + gamma * lambda_ * last_advantage

        self.advantages[t] = last_advantage

TrajectoryCollector()

Class that appends timestep data to a trajectory.

Attributes:

Name Type Description
num_loops list[int]

Number of loops in the trajectory.

actions_index list[Tensor]

Actions in the trajectory.

obs list[Tensor]

Observations in the trajectory.

next_obs list[Tensor]

Observations of next states in the trajectory.

actions_bev_log_p list[float]

Action log probabilities following behavioral policy in the trajectory.

rewards list[float]

Rewards in the trajectory.

done list[bool]

Done flags in the trajectory.

Source code in mlir_rl_artifact/trajectory.py
def __init__(self):
    """Initialize the trajectory collector."""
    self.num_loops = []
    self.actions_index = []
    self.obs = []
    self.next_obs = []
    self.actions_bev_log_p = []
    self.rewards = []
    self.done = []

__add__(other)

Add another trajectory collector to the current one.

Parameters:

Name Type Description Default
other TrajectoryCollector

The other trajectory collector to add.

required

Returns:

Type Description
TrajectoryCollector

The current trajectory collector (after addition).

Source code in mlir_rl_artifact/trajectory.py
def __add__(self, other: 'TrajectoryCollector') -> 'TrajectoryCollector':
    """Add another trajectory collector to the current one.

    Args:
        other: The other trajectory collector to add.

    Returns:
        The current trajectory collector (after addition).
    """
    self.num_loops.extend(other.num_loops)
    self.actions_index.extend(other.actions_index)
    self.obs.extend(other.obs)
    self.next_obs.extend(other.next_obs)
    self.actions_bev_log_p.extend(other.actions_bev_log_p)
    self.rewards.extend(other.rewards)
    self.done.extend(other.done)

    return self

append(num_loops, action_index, obs, next_obs, action_bev_log_p, reward, done)

Append a single timestep to the trajectory.

Parameters:

Name Type Description Default
num_loops int

Number of loops in the timestep.

required
action_index Tensor

Action index in the timestep.

required
obs Tensor

Observation in the timestep.

required
next_obs Tensor

Observation of next state in the timestep.

required
action_bev_log_p float

Action log probability following behavioral policy in the timestep.

required
reward float

Reward in the timestep.

required
done bool

Done flag in the timestep.

required
Source code in mlir_rl_artifact/trajectory.py
def append(
    self,
    num_loops: int,
    action_index: torch.Tensor,
    obs: torch.Tensor,
    next_obs: torch.Tensor,
    action_bev_log_p: float,
    reward: float,
    done: bool,
):
    """Append a single timestep to the trajectory.

    Args:
        num_loops: Number of loops in the timestep.
        action_index: Action index in the timestep.
        obs: Observation in the timestep.
        next_obs: Observation of next state in the timestep.
        action_bev_log_p: Action log probability following behavioral policy in the timestep.
        reward: Reward in the timestep.
        done: Done flag in the timestep.
    """
    self.num_loops.append(num_loops)
    self.actions_index.append(action_index)
    self.obs.append(obs)
    self.next_obs.append(next_obs)
    self.actions_bev_log_p.append(action_bev_log_p)
    self.rewards.append(reward)
    self.done.append(done)

to_trajectory()

Convert the collected data to a TrajectoryData object.

Returns:

Type Description
TrajectoryData

The trajectory containing all collected data.

Source code in mlir_rl_artifact/trajectory.py
def to_trajectory(self) -> TrajectoryData:
    """Convert the collected data to a [TrajectoryData][...TrajectoryData] object.

    Returns:
        The trajectory containing all collected data.
    """
    return TrajectoryData(
        num_loops=torch.tensor(self.num_loops, dtype=torch.int64),
        actions_index=torch.cat(self.actions_index),
        obs=torch.cat(self.obs),
        next_obs=torch.cat(self.next_obs),
        actions_bev_log_p=torch.tensor(self.actions_bev_log_p, dtype=torch.float32),
        rewards=torch.tensor(self.rewards, dtype=torch.float32),
        done=torch.tensor(self.done, dtype=torch.bool),
    )

reset()

Reset the trajectory collector.

Source code in mlir_rl_artifact/trajectory.py
def reset(self):
    """Reset the trajectory collector."""
    self.num_loops.clear()
    self.actions_index.clear()
    self.obs.clear()
    self.next_obs.clear()
    self.actions_bev_log_p.clear()
    self.rewards.clear()
    self.done.clear()