Multi-modal voting ext of Monty

Hello. For a while, I’ve been working on some extensions of the Monty project @ TBP. Some of which include multi-modal voting. That is, with modules’ weights wrt votes are defined according to reliability metrics. So, I might have tracking (sensor’s tracking record), current votes (everyone), and some filters for noise. There’s a few things I’d like to investigate further; MM voting (bayesian sensors), dynamic reliability (confidence eval), and spatial attention. The model I’ve built must be revised to consider a robust motor system/motor policy as well.

def __init__(self, name: str, history_len: int = 50):
    self.name = name
    self.confidences = deque(maxlen=history_len)
    self.errors = deque(maxlen=history_len)
    self.reliability = 1.0  # We start out trusting everyone
    
def update(self, confidence: float, was_correct: Optional[bool] = None):
    """Update our belief about this sensor's reliability"""
    self.confidences.append(confidence)
    if was_correct is not None:
        error = 0.0 if was_correct else 1.0
        self.errors.append(error)
        # Smooth it out with exponential moving average
        self.reliability = 0.9 * self.reliability + 0.1 * (1.0 - error)

def get_weight(self) -> float:
    """How much should we actually trust this sensor right now?"""
    base = self.reliability
    
    # If confidence is all over the place, that's not great
    if len(self.confidences) > 3:
        std = np.std(self.confidences)
        stability = np.exp(-std)  # High variation = low stability
        base *= stability
    
    return max(base, 0.1)  # Never completely ignore a sensor though

class MultiModalVoting:

This is way better than the simple majority voting in current Monty.
Each learning module's vote gets weighted by:
- How accurate it's been historically
- How confident it is right now
- Whether this sensor is even appropriate for the current task
"""

def __init__(self, modality_names: List[str], num_objects: int):
    self.modality_names = modality_names
    self.num_objects = num_objects
    
    # Track each sensor's track record
    self.trackers = {name: ModalityTracker(name) for name in modality_names}
    
    # Current votes from everyone
    self.votes = {name: np.zeros(num_objects) for name in modality_names}
    
    # Smooth things out over time - helps with noisy sensors
    self.belief = np.zeros(num_objects)
    self.momentum = 0.8
    
def submit_vote(self, modality: str, object_probs: np.ndarray, 
                confidence: float = 1.0):
    """A learning module submits what it thinks the object is"""
    if modality not in self.votes:
        logger.warning(f"Unknown modality {modality}")
        return
        
    self.votes[modality] = object_probs
    self.trackers[modality].update(confidence)

def get_consensus(self, task_hint: Optional[str] = None) -> Tuple[int, float]:
    """
    Figure out what everyone actually thinks, weighted by reliability.
    
    Returns (which_object, how_confident)
    """
    weights = self._compute_weights(task_hint)
    
    # Weighted sum of all the votes
    fused = np.zeros(self.num_objects)
    for modality, weight in weights.items():
        if modality in self.votes:
            fused += weight * self.votes[modality]
    
    # Smooth it over time to reduce jitter
    fused = self.momentum * self.belief + (1 - self.momentum) * fused
    self.belief = fused
    
    # Normalize to probabilities
    if fused.sum() > 0:
        fused = fused / fused.sum()
    
    pred_id = np.argmax(fused)
    confidence = fused[pred_id]
    
    return pred_id, confidence

def _compute_weights(self, task_hint: Optional[str]) -> Dict[str, float]:
    """Work out how much to trust each sensor"""
    weights = {}
    
    for name, tracker in self.trackers.items():
        w = tracker.get_weight()
        
        # Some sensors are just better for certain tasks
        if task_hint == 'manipulation':
            if 'touch' in name:
                w *= 2.0  # Touch is really useful for manipulation
            elif 'vision' in name:
                w *= 0.8
        elif task_hint == 'recognition':
            if 'vision' in name:
                w *= 2.0  # Vision is great for recognition
            elif 'touch' in name:
                w *= 0.5
        
        weights[name] = w
    
    # Make sure they sum to 1
    total = sum(weights.values())
    if total > 0:
        weights = {k: v/total for k, v in weights.items()}
    else:
        weights = {k: 1.0/len(weights) for k in weights.keys()}
    
    return weights

def update_reliability(self, ground_truth: int):
    
    for modality, votes in self.votes.items():
        pred = np.argmax(votes)
        was_correct = (pred == ground_truth)
        self.trackers[modality].update(
            confidence=votes[pred],
            was_correct=was_correct
        )

def reset(self):
    
    self.votes = {name: np.zeros(self.num_objects) for name in self.modality_names}
    self.belief = np.zeros(self.num_objects)

class SpatialAttention:

Keeps track of where we've already looked and where we should look next.
Key idea: don't just sample randomly, go where we'll learn the most.
"""

def __init__(self, resolution: float = 0.01):
    self.resolution = resolution
    self.visited = set() 
    self.attention = {}  
    self.decay = 0.95

def _quantize(self, pose: np.ndarray) -> Tuple[float, float, float]:
    """Snap continuous position to a grid"""
    x, y, z = pose[:3]
    return (
        round(x / self.resolution) * self.resolution,
        round(y / self.resolution) * self.resolution,
        round(z / self.resolution) * self.resolution
    )

def mark_visited(self, pose: np.ndarray):
    """We just sensed here"""
    loc = self._quantize(pose)
    self.visited.add(loc)
    self.attention[loc] = 0.0  # No point going back

def boost_attention(self, locations: List[np.ndarray], amount: float = 1.0):
    """Mark some locations as extra interesting (like discriminative features)"""
    for pose in locations:
        loc = self._quantize(pose)
        if loc not in self.visited:
            self.attention[loc] = self.attention.get(loc, 0) + amount

def decay_attention(self):
    """Gradually reduce attention everywhere"""
    for loc in self.attention:
        self.attention[loc] *= self.decay

def get_next_target(self, current_pose: np.ndarray, 
                   radius: float = 0.1) -> Optional[np.ndarray]:
    """
    Where should we move our sensor next?
    
    Returns the pose of the most interesting location nearby.
    Returns None if there's nowhere interesting.
    """
    curr_loc = self._quantize(current_pose)
    
    # Find nearby locations that look interesting
    candidates = []
    for loc, attn in self.attention.items():
        if attn <= 0.01:
            continue
        
        dist = np.linalg.norm(np.array(loc) - np.array(curr_loc))
        if dist <= radius:
            candidates.append((loc, attn, dist))
    
    if not candidates:
        return None
    
    # Pick the best one: high attention, not too far
    best_loc, _, _ = max(candidates, key=lambda x: x[1] / (x[2] + 0.01))
    
    # Convert back to a full pose
    target_pose = np.array([*best_loc, 1, 0, 0, 0])  # Position + identity quaternion
    return target_pose

def reset(self):
    """Clear everything for a new episode"""
    self.visited.clear()
    self.attention.clear()

class ActiveSensingPolicy:
“”"
Motor policy that picks actions to reduce uncertainty.

Works with Monty's motor system.
Basic idea: move the sensor to places that'll give us the most information.
"""

def __init__(self, voting: MultiModalVoting, attention: SpatialAttention):
    self.voting = voting
    self.attention = attention
    
def select_action(self, current_pose: np.ndarray, 
                 available_actions: List[str]) -> str:
    """
    Pick which action to take next.
    
    Simple approach: move toward the most interesting location.
    """
    target = self.attention.get_next_target(current_pose)
    
    if target is None:
        # Nothing interesting nearby, just explore randomly
        return np.random.choice(available_actions)
    
    # Move toward the target
    delta = target[:3] - current_pose[:3]
    
    # Which direction needs the most movement?
    abs_delta = np.abs(delta)
    max_dim = np.argmax(abs_delta)
    
    # Map to an action
    if max_dim == 0:
        return "move_forward" if delta[0] > 0 else "move_backward"
    elif max_dim == 1:
        return "move_left" if delta[1] < 0 else "move_right"
    else:
        return "move_up" if delta[2] > 0 else "move_down"

def should_stop(self) -> bool:
    """Should we stop sensing?"""
    _, confidence = self.voting.get_consensus()
    return confidence > 0.9

class MockSensorModule:

def __init__(self, sensor_module_id: str, modality: str):
    self.sensor_module_id = sensor_module_id
    self.modality = modality
    self.timestamp = 0
    self.agent_state = None
    
def update_state(self, agent_state):
    self.agent_state = agent_state
    
def step(self, raw_obs):
    """Process observation and spit out a SensoryObservation"""
    self.timestamp += 1
    features = np.random.rand(128)
    pose = np.random.randn(7)  # Position + quaternion
    pose[3] = 1  # Identity quaternion
    
    return SensoryObservation(
        modality=self.modality,
        features=features,
        pose=pose,
        timestamp=self.timestamp,
        confidence=np.random.rand(),
        use_state=True
    )

def pre_episode(self):
    self.timestamp = 0
    
def state_dict(self):
    return {'sensor_module_id': self.sensor_module_id}

class MockLearningModule:

def __init__(self, module_id: str, modality: str, num_objects: int = 5):
    self.module_id = module_id
    self.modality = modality
    self.num_objects = num_objects
    self.evidence = np.ones(num_objects) / num_objects
    self.stepwise_targets_list = []
    self.stepwise_target_object = None
    self.terminal_state = False
    self.step_count = 0
    self.max_steps = 20
    self.experiment_mode = "eval"
    
def matching_step(self, sensory_inputs):
    """Process inputs during matching phase"""
    if sensory_inputs is None:
        return
        
    self.step_count += 1
    
    # Fake evidence accumulation
    for obs in sensory_inputs:
        if hasattr(obs, 'features'):
            # Pretend we're updating our belief
            evidence_boost = np.random.dirichlet([2, 1, 1, 0.5, 0.5])
            self.evidence = self.evidence * 0.7 + evidence_boost * 0.3
    
    self.evidence = self.evidence / np.sum(self.evidence)
    
    # Check if we're done
    if np.max(self.evidence) > 0.9 or self.step_count >= self.max_steps:
        self.terminal_state = True

def exploratory_step(self, sensory_inputs):
    """Process inputs during exploratory phase"""
    if sensory_inputs is None:
        return
    self.step_count += 1

def send_out_vote(self) -> Dict[str, np.ndarray]:
    """Send our vote to other learning modules"""
    return {
        'object_id_votes': self.evidence,
        'confidence': float(np.max(self.evidence))
    }

def receive_votes(self, voting_data: List[Dict]):
    """Get votes from other learning modules"""
    # Just average them for now
    if voting_data:
        other_votes = [v['object_id_votes'] for v in voting_data if 'object_id_votes' in v]
        if other_votes:
            avg_vote = np.mean(other_votes, axis=0)
            self.evidence = 0.8 * self.evidence + 0.2 * avg_vote
            self.evidence = self.evidence / np.sum(self.evidence)

def propose_goal_states(self) -> List[Dict]:
    """Suggest where we should move next"""
    return []

def get_output(self):
    """Get current output for passing to other learning modules"""
    return SensoryObservation(
        modality=self.modality,
        features=self.evidence,
        pose=np.array([0, 0, 0, 1, 0, 0, 0]),
        timestamp=self.step_count,
        confidence=float(np.max(self.evidence)),
        use_state=True
    )

def add_lm_processing_to_buffer_stats(self, lm_processed: bool):
    pass

def set_experiment_mode(self, mode: str):
    self.experiment_mode = mode

def pre_episode(self):
    self.step_count = 0
    self.terminal_state = False
    self.evidence = np.ones(self.num_objects) / self.num_objects
    self.stepwise_targets_list = []

def post_episode(self):
    pass

def state_dict(self):
    return {'module_id': self.module_id}

def load_state_dict(self, state_dict):
    pass

class MockMotorPolicy:

def __init__(self):
    self.current_pose = np.array([0, 0, 0, 1, 0, 0, 0])
    
def get_agent_state(self, state):
    return {'position': self.current_pose[:3]}

def is_motor_only_step(self, state):
    return False

def state_dict(self):
    return {}

class MockMotorSystem:

def __init__(self):
    self._policy = MockMotorPolicy()
    self._state = {}
    self.experiment_mode = "eval"
    
def set_experiment_mode(self, mode: str):
    self.experiment_mode = mode

class MontyBaseWithVoting:

def __init__(
    self,
    sensor_modules,
    learning_modules,
    motor_system,
    sm_to_agent_dict,
    sm_to_lm_matrix,
    lm_to_lm_matrix,
    lm_to_lm_vote_matrix,
    min_eval_steps,
    min_train_steps,
    num_exploratory_steps,
    max_total_steps,
    num_objects=5,
    task_hint=None
):
    """Set up enhanced Monty with voting system"""
    
    # Original Monty stuff
    self.sensor_modules = sensor_modules
    self.learning_modules = learning_modules
    self.motor_system = motor_system
    self.sm_to_agent_dict = sm_to_agent_dict
    self.sm_to_lm_matrix = sm_to_lm_matrix
    self.lm_to_lm_matrix = lm_to_lm_matrix
    self.lm_to_lm_vote_matrix = lm_to_lm_vote_matrix
    self.min_eval_steps = min_eval_steps
    self.min_train_steps = min_train_steps
    self.num_exploratory_steps = num_exploratory_steps
    self.max_total_steps = max_total_steps
    
    # Step tracking
    self.step_type = "matching_step"
    self.is_seeking_match = True
    self.experiment_mode = None
    self.total_steps = 0
    self.episode_steps = 0
    self.exploratory_steps = 0
    self.matching_steps = 0
    self._is_done = False
    
    # NEW: Multi-modal voting system
    modality_names = [sm.modality for sm in sensor_modules]
    self.voting_system = MultiModalVoting(modality_names, num_objects)
    self.task_hint = task_hint
    
    # NEW: Spatial attention
    self.spatial_attention = SpatialAttention()
    
    # NEW: Active sensing policy
    self.active_policy = ActiveSensingPolicy(self.voting_system, self.spatial_attention)
    
    # Storage for current outputs
    self.sensor_module_outputs = []
    self.learning_module_outputs = []
    self.gsg_outputs = []
    
def step(self, observation):

    if self.step_type == "matching_step":
    # Matching step returns real consensus
        return self._matching_step(observation)
    elif self.step_type == "exploratory_step":
    # Exploratory step still updates learning modules and attention
        self._exploratory_step(observation)
    # Return placeholder values since there's no object consensus yet
        return -1, 0.0
    else:
        raise ValueError(f"Unknown step type: {self.step_type}")

def _matching_step(self, observation):
    """Do a matching step with multi-modal voting"""
    # 1. Gather sensory inputs
    self.aggregate_sensory_inputs(observation)
    
    # 2. Step the learning modules
    self._step_learning_modules()
    
    # 3. Collect votes from learning modules and submit to voting system
    for i, lm in enumerate(self.learning_modules):
        vote = lm.send_out_vote()
        if 'object_id_votes' in vote:
            self.voting_system.submit_vote(
                lm.modality,
                vote['object_id_votes'],
                vote.get('confidence', 1.0)
            )
    
    # 4. Get the consensus prediction
    pred_id, confidence = self.voting_system.get_consensus(self.task_hint)
    
    # 5. Share votes between learning modules (original Monty behavior)
    self._vote()
    
    # 6. Pass goal states around
    self._pass_goal_states()
    
    # 7. Update spatial attention
    current_pose = self.motor_system._policy.current_pose
    self.spatial_attention.mark_visited(current_pose)
    self.spatial_attention.decay_attention()
    
    # 8. Check if we're done
    self._set_step_type_and_check_if_done()
    
    # 9. Post-step stuff
    self._post_step()
    
    return pred_id, confidence

def _exploratory_step(self, observation):
    """Do an exploratory step"""
    self.aggregate_sensory_inputs(observation)
    self._step_learning_modules()
    self._set_step_type_and_check_if_done()
    self._post_step()

def aggregate_sensory_inputs(self, observation):
    """Collect outputs from all sensor modules"""
    sensor_module_outputs = []
    for sensor_module in self.sensor_modules:
        raw_obs = self.get_observations(observation, sensor_module.sensor_module_id)
        sensor_module.update_state(self.get_agent_state())
        sm_output = sensor_module.step(raw_obs)
        sensor_module_outputs.append(sm_output)
    
    learning_module_outputs = []
    for learning_module in self.learning_modules:
        lm_out = learning_module.get_output()
        learning_module_outputs.append(lm_out)
    
    self.sensor_module_outputs = sensor_module_outputs
    self.learning_module_outputs = learning_module_outputs

def _step_learning_modules(self):
    """Step all the learning modules"""
    for i in range(len(self.learning_modules)):
        sensory_inputs = self._collect_inputs_to_lm(i)
        getattr(self.learning_modules[i], self.step_type)(sensory_inputs)

def _collect_inputs_to_lm(self, lm_id):
    """Collect inputs going to a specific learning module"""
    sensory_inputs_from_sms = [
        self.sensor_module_outputs[j] for j in self.sm_to_lm_matrix[lm_id]
    ]
    
    if self.lm_to_lm_matrix is not None:
        sensory_inputs_from_lms = [
            self.learning_module_outputs[j] for j in self.lm_to_lm_matrix[lm_id]
        ]
    else:
        sensory_inputs_from_lms = []
    
    return self._combine_inputs(sensory_inputs_from_sms, sensory_inputs_from_lms)

def _combine_inputs(self, inputs_from_sms, inputs_from_lms):
    """Combine sensor and learning module inputs"""
    combined_inputs = [
        inputs_from_sms[i]
        for i in range(len(inputs_from_sms))
        if inputs_from_sms[i].use_state
    ]
    
    if len(combined_inputs) == 0:
        return None
    
    for lm_input in inputs_from_lms:
        if lm_input.use_state:
            combined_inputs.append(lm_input)
    
    return combined_inputs

def _vote(self):
    """Share votes between learning modules"""
    if self.lm_to_lm_vote_matrix is not None:
        votes_per_lm = []
        for i in range(len(self.learning_modules)):
            votes_per_lm.append(self.learning_modules[i].send_out_vote())
        
        for i in range(len(self.learning_modules)):
            voting_data = [votes_per_lm[j] for j in self.lm_to_lm_vote_matrix[i]]
            self.learning_modules[i].receive_votes(voting_data)

def _pass_goal_states(self):
    """Pass goal states between learning modules"""
    self.gsg_outputs = []
    if self.step_type == "matching_step":
        for lm in self.learning_modules:
            goal_states = lm.propose_goal_states()
            self.gsg_outputs.extend(goal_states)

def _set_step_type_and_check_if_done(self):
    """Check if we're done and update step type"""
    self.update_step_counters()
    
    # Check if all learning modules are done
    all_terminal = all(lm.terminal_state for lm in self.learning_modules)
    
    # Check voting system confidence
    _, confidence = self.voting_system.get_consensus(self.task_hint)
    high_confidence = confidence > 0.9
    
    if all_terminal or high_confidence or self.episode_steps >= self.max_total_steps:
        self._is_done = True
        logger.info(f"Episode done: terminal={all_terminal}, conf={confidence:.3f}")
    
    if self.exceeded_min_steps:
        if self.step_type == "exploratory_step":
            self._is_done = True
        elif self.step_type == "matching_step" and self.experiment_mode == "train":
            self.switch_to_exploratory_step()

def _post_step(self):
    """Hook for post-step processing"""
    pass

def update_step_counters(self):
    """Keep track of steps"""
    self.total_steps += 1
    self.episode_steps += 1
    
    if self.step_type == "matching_step":
        self.matching_steps += 1
    elif self.step_type == "exploratory_step":
        self.exploratory_steps += 1

def switch_to_matching_step(self):
    """Switch to matching mode"""
    self.step_type = "matching_step"
    self.is_seeking_match = True

def switch_to_exploratory_step(self):
    """Switch to exploratory mode"""
    self.step_type = "exploratory_step"
    self.is_seeking_match = False

def set_experiment_mode(self, mode):
    """Set experiment mode (train or eval)"""
    assert mode in ["train", "eval"]
    self.experiment_mode = mode
    self.motor_system.set_experiment_mode(mode)
    self.step_type = "matching_step"
    for lm in self.learning_modules:
        lm.set_experiment_mode(mode)

def pre_episode(self):
    """Set up for a new episode"""
    self._is_done = False
    self.reset_episode_steps()
    self.switch_to_matching_step()
    self.voting_system.reset()
    self.spatial_attention.reset()
    
    for lm in self.learning_modules:
        lm.pre_episode()
    for sm in self.sensor_modules:
        sm.pre_episode()

def post_episode(self):
    """Clean up after episode"""
    for lm in self.learning_modules:
        lm.post_episode()

def reset_episode_steps(self):
    """Reset episode counters"""
    self.episode_steps = 0
    self.matching_steps = 0
    self.exploratory_steps = 0

def get_observations(self, observations, sensor_module_id):
    """Get observations for a specific sensor module"""
    agent_id = self.sm_to_agent_dict[sensor_module_id]
    return observations.get(agent_id, {}).get(sensor_module_id, {})

def get_agent_state(self):
    """Get current agent state"""
    return self.motor_system._policy.get_agent_state(self.motor_system._state)

@property
def is_done(self):
    return self._is_done

@property
def min_steps(self):
    if self.step_type == "matching_step":
        return self.min_eval_steps if self.experiment_mode == "eval" else self.min_train_steps
    elif self.step_type == "exploratory_step":
        return self.num_exploratory_steps
    return 0

@property
def step_type_count(self):
    if self.step_type == "matching_step":
        return self.matching_steps
    elif self.step_type == "exploratory_step":
        return self.exploratory_steps
    return 0

@property
def exceeded_min_steps(self):
    return self.step_type_count > self.min_steps

def state_dict(self):
    """Get state for saving"""
    return dict(
        lm_dict={i: lm.state_dict() for i, lm in enumerate(self.learning_modules)},
        sm_dict={i: sm.state_dict() for i, sm in enumerate(self.sensor_modules)},
        motor_system_dict=self.motor_system._policy.state_dict(),
        lm_to_lm_matrix=self.lm_to_lm_matrix,
        lm_to_lm_vote_matrix=self.lm_to_lm_vote_matrix,
        sm_to_lm_matrix=self.sm_to_lm_matrix,
    )

def run_integrated_simulation(steps: int = 15, delay: float = 0.5, num_objects: int = 5):

print("🧠 Monty Multi-Modal Voting & Spatial Attention Demo")
print("=" * 70)

# Set up sensor modules
sensor_modules = [
    MockSensorModule("sm_0", "vision"),
    MockSensorModule("sm_1", "touch")
]

# Set up learning modules
learning_modules = [
    MockLearningModule("lm_0", "vision", num_objects),
    MockLearningModule("lm_1", "touch", num_objects)
]

# Mapping structures (simple demo versions)
sm_to_agent_dict = {sm.sensor_module_id: sm.sensor_module_id for sm in sensor_modules}
sm_to_lm_matrix = {i: [i] for i in range(len(learning_modules))}  # Each LM gets its corresponding SM
lm_to_lm_matrix = {i: [] for i in range(len(learning_modules))}   # No inter-LM connections for demo
lm_to_lm_vote_matrix = {i: [] for i in range(len(learning_modules))}

# Mock motor system
motor_system = MockMotorSystem()

# Initialize MontyBase with voting
monty = MontyBaseWithVoting(
    sensor_modules=sensor_modules,
    learning_modules=learning_modules,
    motor_system=motor_system,
    sm_to_agent_dict=sm_to_agent_dict,
    sm_to_lm_matrix=sm_to_lm_matrix,
    lm_to_lm_matrix=lm_to_lm_matrix,
    lm_to_lm_vote_matrix=lm_to_lm_vote_matrix,
    min_eval_steps=3,
    min_train_steps=3,
    num_exploratory_steps=3,
    max_total_steps=steps,
    num_objects=num_objects,
    task_hint="recognition"
)

monty.set_experiment_mode("eval")
monty.pre_episode()

for step_idx in range(steps):
    # Simulated observations (empty dict for demo)
    obs = {}
    pred_id, confidence = monty.step(obs)
    
    print(f"Step {step_idx + 1:02d} | Predicted object: {pred_id} | Confidence: {confidence:.2f}")
    

    if monty.is_done:
        print("Monty finished the episode early due to high confidence or terminal states.")
        break
    
    time.sleep(delay)

monty.post_episode()
print("Simulation complete!")

Thanks!

4 Likes
import numpy as np
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from collections import deque


@dataclass
class SensoryObservation:
    modality: str  
    features: np.ndarray
    pose: np.ndarray
    timestamp: float
    confidence: float = 0.5


class ModalityTracker:
    """Tracks how reliable each sensor is over time"""
    
    def __init__(self, name: str, history_len: int = 50):
        self.name = name
        self.confidences = deque(maxlen=history_len)
        self.errors = deque(maxlen=history_len)
        self.reliability = 1.0  # start optimistic
        
    def update(self, confidence: float, was_correct: Optional[bool] = None):
        self.confidences.append(confidence)
        if was_correct is not None:
            error = 0.0 if was_correct else 1.0
            self.errors.append(error)
            # exponential moving average - probably should tune this alpha
            self.reliability = 0.9 * self.reliability + 0.1 * (1.0 - error)
    
    def get_weight(self) -> float:
        """How much should we trust this modality right now"""
        base = self.reliability
        
        # penalize if confidence is all over the place
        if len(self.confidences) > 3:
            std = np.std(self.confidences)
            stability = np.exp(-std)  # high std -> low stability
            base *= stability
        
        return max(base, 0.1)  # never completely ignore a sensor


class MultiModalVoting:
    """
    Weighted voting across LMs based on learned reliability.
    
    This replaces the simple majority voting in current Monty.
    Each LM's vote gets weighted by:
    - historical accuracy
    - current confidence  
    - sensor modality appropriateness for task
    """
    
    def __init__(self, modality_names: List[str], num_objects: int):
        self.modality_names = modality_names
        self.num_objects = num_objects
        
        # track each modality's performance
        self.trackers = {name: ModalityTracker(name) for name in modality_names}
        
        # current votes from each modality
        self.votes = {name: np.zeros(num_objects) for name in modality_names}
        
        # for temporal smoothing - helps with noisy sensors
        self.belief = np.zeros(num_objects)
        self.momentum = 0.8
        
    def submit_vote(self, modality: str, object_probs: np.ndarray, 
                    confidence: float = 1.0):
        """LM submits its current belief about object identity"""
        if modality not in self.votes:
            print(f"Warning: unknown modality {modality}")
            return
            
        self.votes[modality] = object_probs
        self.trackers[modality].update(confidence)
    
    def get_consensus(self, task_hint: Optional[str] = None) -> Tuple[int, float]:
        """
        Compute weighted consensus across all LMs.
        
        Returns (predicted_object_id, confidence)
        """
        weights = self._compute_weights(task_hint)
        
        # weighted sum
        fused = np.zeros(self.num_objects)
        for modality, weight in weights.items():
            if modality in self.votes:
                fused += weight * self.votes[modality]
        
        # temporal smoothing reduces jitter
        fused = self.momentum * self.belief + (1 - self.momentum) * fused
        self.belief = fused
        
        # normalize
        if fused.sum() > 0:
            fused = fused / fused.sum()
        
        pred_id = np.argmax(fused)
        confidence = fused[pred_id]
        
        return pred_id, confidence
    
    def _compute_weights(self, task_hint: Optional[str]) -> Dict[str, float]:
        """Figure out how much to trust each modality"""
        weights = {}
        
        for name, tracker in self.trackers.items():
            w = tracker.get_weight()
            
            # task-specific modulation
            # TODO: learn these priors instead of hardcoding
            if task_hint == 'manipulation':
                if 'touch' in name:
                    w *= 2.0
                elif 'vision' in name:
                    w *= 0.8
            elif task_hint == 'recognition':
                if 'vision' in name:
                    w *= 2.0
                elif 'touch' in name:
                    w *= 0.5
            
            weights[name] = w
        
        # normalize
        total = sum(weights.values())
        if total > 0:
            weights = {k: v/total for k, v in weights.items()}
        else:
            # fallback - shouldn't happen but just in case
            weights = {k: 1.0/len(weights) for k in weights.keys()}
        
        return weights
    
    def update_reliability(self, ground_truth: int):
        """Call this after episode ends to update reliability scores"""
        for modality, votes in self.votes.items():
            pred = np.argmax(votes)
            was_correct = (pred == ground_truth)
            self.trackers[modality].update(
                confidence=votes[pred],
                was_correct=was_correct
            )


class SpatialAttention:
    """
    3D attention map for active sensing.
    
    Keeps track of where we've looked and where we should look next.
    Main insight: don't sample randomly, go to informative locations.
    """
    
    def __init__(self, resolution: float = 0.01):
        self.resolution = resolution
        self.visited = set()  # locations we've already sensed
        self.attention = {}  # location -> attention weight
        self.decay = 0.95
        
        # TODO: this will get huge for large objects
        # need to implement spatial hashing or octree
    
    def _quantize(self, pose: np.ndarray) -> Tuple[float, float, float]:
        """Snap continuous pose to grid"""
        x, y, z = pose[:3]
        return (
            round(x / self.resolution) * self.resolution,
            round(y / self.resolution) * self.resolution,
            round(z / self.resolution) * self.resolution
        )
    
    def mark_visited(self, pose: np.ndarray):
        """We just sensed this location"""
        loc = self._quantize(pose)
        self.visited.add(loc)
        self.attention[loc] = 0.0  # no need to revisit
    
    def boost_attention(self, locations: List[np.ndarray], amount: float = 1.0):
        """Increase attention at specific locations (e.g. discriminative features)"""
        for pose in locations:
            loc = self._quantize(pose)
            if loc not in self.visited:
                self.attention[loc] = self.attention.get(loc, 0) + amount
    
    def decay_attention(self):
        """Gradually reduce attention everywhere"""
        for loc in self.attention:
            self.attention[loc] *= self.decay
    
    def get_next_target(self, current_pose: np.ndarray, 
                       radius: float = 0.1) -> Optional[np.ndarray]:
        """
        Where should we sense next?
        
        Returns pose of highest attention location within radius.
        Returns None if nowhere interesting nearby.
        """
        curr_loc = self._quantize(current_pose)
        
        # find nearby locations with high attention
        candidates = []
        for loc, attn in self.attention.items():
            if attn <= 0.01:
                continue
            
            dist = np.linalg.norm(np.array(loc) - np.array(curr_loc))
            if dist <= radius:
                candidates.append((loc, attn, dist))
        
        if not candidates:
            return None
        
        # best = highest attention / distance
        best_loc, _, _ = max(candidates, key=lambda x: x[1] / (x[2] + 0.01))
        
        # convert back to pose
        target_pose = np.array([*best_loc, 1, 0, 0, 0])  # position + identity quat
        return target_pose


class ActiveSensingPolicy:
    """
    Motor policy that picks actions to reduce uncertainty.
    
    Integrates with Monty's motor system.
    Idea: move sensor to locations that will give us most information.
    """
    
    def __init__(self, voting: MultiModalVoting, attention: SpatialAttention):
        self.voting = voting
        self.attention = attention
        
    def select_action(self, current_pose: np.ndarray, 
                     available_actions: List[str]) -> str:
        """
        Pick which action to take next.
        
        Simple version: move toward highest attention location.
        TODO: could use proper motion planning here
        """
        target = self.attention.get_next_target(current_pose)
        
        if target is None:
            # nowhere interesting, explore randomly
            return np.random.choice(available_actions)
        
        # pick action that moves us toward target
        delta = target[:3] - current_pose[:3]
        
        abs_delta = np.abs(delta)
        max_dim = np.argmax(abs_delta)
        
        # map to action
        # NOTE: assumes specific action names, might need adjustment
        if max_dim == 0:
            return "move_forward" if delta[0] > 0 else "move_backward"
        elif max_dim == 1:
            return "move_left" if delta[1] < 0 else "move_right"
        else:
            return "move_up" if delta[2] > 0 else "move_down"
    
    def should_stop(self) -> bool:
        """Should we terminate sensing?"""
        _, confidence = self.voting.get_consensus()
        
        # stop if we're confident enough
        # TODO: make threshold configurable
        return confidence > 0.9


def test_voting():
    print("Testing multi-modal voting...")
    
    voting = MultiModalVoting(['vision', 'touch'], num_objects=5)
    
    # simulate some votes
    vision_votes = np.array([0.1, 0.7, 0.1, 0.05, 0.05])
    touch_votes = np.array([0.2, 0.6, 0.1, 0.05, 0.05])
    
    voting.submit_vote('vision', vision_votes, confidence=0.9)
    voting.submit_vote('touch', touch_votes, confidence=0.6)
    
    pred, conf = voting.get_consensus()
    print(f"Prediction: {pred}, Confidence: {conf:.3f}")
    
    # update with ground truth
    voting.update_reliability(ground_truth=1)
    
    print("Reliabilities after update:")
    for name, tracker in voting.trackers.items():
        print(f"  {name}: {tracker.reliability:.3f}")


def test_attention():
    print("\nTesting spatial attention...")
    
    attention = SpatialAttention()
    
    # mark some locations as visited
    attention.mark_visited(np.array([0, 0, 0, 1, 0, 0, 0]))
    attention.mark_visited(np.array([0.1, 0, 0, 1, 0, 0, 0]))
    
    # boost attention at some other locations
    attention.boost_attention([
        np.array([0.2, 0, 0, 1, 0, 0, 0]),
        np.array([0, 0.1, 0, 1, 0, 0, 0])
    ])
    
    # get next target
    current = np.array([0.15, 0, 0, 1, 0, 0, 0])
    target = attention.get_next_target(current)
    
    if target is not None:
        print(f"Next target: {target[:3]}")
    else:
        print("No interesting targets nearby")


if __name__ == "__main__":
    test_voting()
    test_attention()
    
2 Likes

Welcome to the forums @Joseph_Dimos , and that’s great to hear you’re working on Monty!

Just a few things to clarify for my understanding

  • When you say multi-modal, do you mean different types of sensory modalities (e.g. touch vs. vision)? It is worth noting that currently all of the sensors in Habitat are based on an RGB-Depth camera, so the distant agent and the “surface” agent only differ by their policies. We do not currently have any experiment configs that enable an experiment with both the surface and distant agent, as this requires having multiple independent agents, which our existing Habitat interface does not support.
  • At a high level, is the voting still happening between learning modules in your implementation? I just want to check that it is not the sensor modules which are sending votes. For a clear description of our existing voting algorithm, you may find the description in our recent pre-print paper (Section 3.5: Voting) useful.
  • I’m in the process of reviewing and working through a PR from @firemanc that makes voting more nuanced by supporting lateral connections with associative weights that build over time. These weights emerge in an unsupervised manner. I thought you might be interested in checking it out, in case you had any comments. This is a link to the RFC, and the actual feature PR.
3 Likes

Hello @nleadholm. Thank you for the welcoming message.

Certainly, I shall provide a general explanation that addresses these points here.

So, regarding the code, the voting is defined wrt conditional activation. That is, a function only runs voting ‘if’:
self.lm_to_lm_vote_matrix is not None

If the matrix is None, no voting occurs — learning modules operate independently. Now, when speaking of the vote sending part of this, each learning module calls send_out_vote():

votes_per_lm.append(self.learning_modules[i].send_out_vote())

Each LM computes a representation, confidence, or “vote” (what the state or object is).
Additionally, each LM gets votes from other modules based on the connectivity as in:
lm_to_lm_vote_matrix:

voting_data = [votes_per_lm[j] for j in self.lm to lm_vote_matrix[i]]
self.learning_modules[i].receive_votes(voting_data)

So, to directly answer, the code doesn’t ‘yet’ perform lateral voting between learning modules; the lateral voting, though, occurs as is defined by the matrix.

For instance, if I have lm_to_lm_vote_matrix = [ , I get [1, 2]; [0, 1] for a given LM getting votes from another LM.

When Monty calls a _vote() during step loop, the in theory this could serve utility.
And, yes, in response to the first question (pardon the order), the multi-modal idea is that multiple sensory modalities are considered.

Thank you for the reference to the latest voting algorithm. After some preliminary review, I thought about learned displacement correction a bit. That is, if I might have a neural net, I could apply it to correct for more large-scale sensor miscalibration.

Like,
true_B_d_t = torch.randn(3) * 2.0.. which could be the random true displacement between sensors and B_M_R_k = torch.randn(4); B_M_R_k = B_M_R_k / torch.norm(B_M_R_k)for its normalisation.

As far as I know, the biased sensor readings ideally ‘should’ return “observed” and “rotation_context” for some rotation matrix. Which, in the voting process, the problem might be with B d_t for coordinates, but LM (l+1) might have alternate hypotheses.

Anyways, thanks again! I shall review this more soon!

Best

2 Likes

Hi @Joseph_Dimos, thanks for sharing your code implementation and outlining how your ideas could integrate into Monty’s matching step. I went through your code and wanted to summarize and respond to the main points. From what I see, you’re proposing two high-level ideas:

  1. Weigh votes between columns from different modalities based on how accurately each modality has historically predicted the correct object.
  2. Build an attention map that ties into the motor policies to reduce the chances of returning to a previously visited location (unless manually boosted).

Multi-Modal Voting

In TBT, we don’t differentiate between how columns from different modalities operate. A column (learning module) in the auditory cortex is assumed to process information in the same way as one in the visual cortex. Learning and voting occur across modalities in the same fashion as within a single modality. Because of that, we don’t assign modality-specific weights, just as we don’t weigh neighboring columns differently within the same modality. Introducing modality weights would require extending that logic down to individual columns as well. It would definitely be worth exploring how the receiving LM can over time learn which votes are more reliable than others. However, I wouldn’t phrase it as a modality specific mechanism. The receiving LM does not know where its input comes from.

Note that we currently have a vote_weight parameter which we apply to the votes being received from other columns but this is not used to change the influence of the votes coming from other columns based on modality. This parameter simply scales the overall influence of voting for the column regardless of where the vote is coming from. In practice, this is always set to 1 in all the current experiments.

From your code, it looks like the different modalities are weighted based on how well they have historically performed on a given task. I guess one question I would have here is do we update the reliability of these modalities at the end of every step, or every episode? In both cases, we don’t have a ground truth signal, we have a model prediction. If we update_reliability at the end of every episode (after a few matching steps), which votes from which step do we use? I think we will end up getting ourselves in a credit assignment problem, and possibly requiring backpropagation through time.

One thing to highlight is, as @nleadholm mentioned, there are currently no blockers to run experiments where Monty votes across modalities. You would simply take two sensor modules for your two modalities and connect them to two learning modules. Learning modules are modality agnostic, so there would be no difference in the LMs. They could vote using exactly the same mechanism as if they were receiving input from the same modality.

Attention Maps

That’s quite interesting. The idea of building an attention map that discourages visiting previously visited locations aligns with the work @sknudstrup has been doing with inhibition of return and salience-based sensor modules. This work is currently being integrated in tbp.monty. Check out a very brief presentation on it from our last hackathon here. HabitatSalienceSM is already integrated into tbp.monty.

6 Likes

Hello @rmounir, thank you for your feedback. Yes, these two points are generally primary features; such that a multi-modal and historical alignment is defined for objects of reference.

So, with vote_weight params, where votes are received from other columns, there is a notion that suggests a ‘modal awareness’ for a given task. Regarding the code, the different modalities are indeed weighted based on a priori historical tasks. So, with update_reliability there is a nascent point about ‘weighted temporal’ episodes. An approach I’m working with is to use ‘all’ votes from a given episode, such that later votes are more weighted more heavily over time (with some discount factor ~0.9). Another case might be with only using the very last vote before ground truth. This example, however is more simple since it discards temporal information. Surely, if considering a credit assignment problem, there is an association with ‘temporal delay’ (that is when early votes get penalised for bad late votes).

If there’s non-differentiable components, I might gesture towards attention mechanism breakages etc. But, I could just define an episodic memory class in a more modular fashion.

This basically adds some experimental parameters:

step_votes: Dict[str, List[np.ndarray]] = field(default_factory=dict)
step_confidences: Dict[str, List[float]] = field(default_factory=dict)
step_predictions: Dict[str, List[int]] = field(default_factory=dict)
step_poses: List[np.ndarray] = field(default_factory=list)
ground_truth: Optional[int] = None
episode_confidence: List[float] = field(default_factory=list)

step_timestamps: List[float] = field(default_factory=list)

# Parameters for reliability computation
stability_weight: float = 5.0
recency_weight: float = 0.9  # Temporal decay for older steps

def add_step(
    self,
    modality: str,
    votes: np.ndarray,
    confidence: float,
    pose: np.ndarray,
    timestamp: Optional[float] = None
):
    """
    Add a single observation step for a given modality.

    Args:
        modality: The modality name (e.g. "vision", "audio")
        votes: Raw model logits or class vote vector
        confidence: Model confidence for this step (0.0–1.0)
        pose: Current spatial or temporal state vector
        timestamp: Optional time value for temporal modeling
    """
    if modality not in self.step_votes:
        self.step_votes[modality] = []
        self.step_confidences[modality] = []
        self.step_predictions[modality] = []

    self.step_votes[modality].append(votes.copy())
    self.step_confidences[modality].append(confidence)
    self.step_predictions[modality].append(int(np.argmax(votes)))
    self.step_poses.append(pose.copy())
    self.step_timestamps.append(timestamp if timestamp is not None else len(self.step_poses))

    # Track episode-level average confidence
    self.episode_confidence.append(confidence)

def compute_temporal_reliability(self, modality: str) -> float:
    """
    Compute reliability of a modality across the full episode.
    Combines accuracy, confidence stability, and temporal consistency.

    Returns:
        Reliability score in [0, 1]
    """
    if modality not in self.step_predictions or self.ground_truth is None:
        return 0.5  # Neutral baseline if no data or ground truth

    predictions = np.array(self.step_predictions[modality])
    confidences = np.array(self.step_confidences[modality])

    # Accuracy over the episode

    correct = np.sum(predictions == self.ground_truth)
    accuracy = correct / len(predictions)

    # Confidence stability — penalize high variance

    confidence_std = np.std(confidences) if len(confidences) > 1 else 0.0
    stability_penalty = np.exp(-confidence_std * self.stability_weight)

    weights = np.power(self.recency_weight, np.arange(len(confidences))[::-1])
    temporal_weight = np.average(confidences, weights=weights)

    # Combine metrics multiplicatively (bounded in [0, 1])
    reliability = accuracy * stability_penalty * temporal_weight
    return float(np.clip(reliability, 0.0, 1.0))

def compute_cross_modal_reliability(self) -> Dict[str, float]:
    """
    Compute reliability for all modalities present in the episode.
    Returns a dictionary mapping modality → reliability score.
    """
    return {
        modality: self.compute_temporal_reliability(modality)
        for modality in self.step_predictions.keys()
    }

def summarize_episode(self) -> Dict[str, float]:
    """
    Summarize overall episode statistics:
        - average confidence
        - per-modality reliability
        - final consensus prediction (majority or weighted vote)
    """
    reliabilities = self.compute_cross_modal_reliability()
    avg_confidence = float(np.mean(self.episode_confidence)) if self.episode_confidence else 0.0

    # Consensus: weighted vote based on modality reliability
    modality_votes = []
    modality_weights = []
    for modality, reliability in reliabilities.items():
        final_pred = self.step_predictions[modality][-1] if self.step_predictions[modality] else None
        if final_pred is not None:
            modality_votes.append(final_pred)
            modality_weights.append(reliability)
    consensus_prediction = (
        int(np.round(np.average(modality_votes, weights=modality_weights)))
        if modality_votes else None
    )

    return {
        "avg_confidence": avg_confidence,
        "modal_reliabilities": reliabilities,
        "consensus_prediction": consensus_prediction,
    }

def reset(self):
    """Clear the episodic memory for a new episode."""
    self.step_votes.clear()
    self.step_confidences.clear()
    self.step_predictions.clear()
    self.step_poses.clear()
    self.step_timestamps.clear()
    self.episode_confidence.clear()
    self.ground_truth = None