Hello. For a while, I’ve been working on some extensions of the Monty project @ TBP. Some of which include multi-modal voting. That is, with modules’ weights wrt votes are defined according to reliability metrics. So, I might have tracking (sensor’s tracking record), current votes (everyone), and some filters for noise. There’s a few things I’d like to investigate further; MM voting (bayesian sensors), dynamic reliability (confidence eval), and spatial attention. The model I’ve built must be revised to consider a robust motor system/motor policy as well.
def __init__(self, name: str, history_len: int = 50):
self.name = name
self.confidences = deque(maxlen=history_len)
self.errors = deque(maxlen=history_len)
self.reliability = 1.0 # We start out trusting everyone
def update(self, confidence: float, was_correct: Optional[bool] = None):
"""Update our belief about this sensor's reliability"""
self.confidences.append(confidence)
if was_correct is not None:
error = 0.0 if was_correct else 1.0
self.errors.append(error)
# Smooth it out with exponential moving average
self.reliability = 0.9 * self.reliability + 0.1 * (1.0 - error)
def get_weight(self) -> float:
"""How much should we actually trust this sensor right now?"""
base = self.reliability
# If confidence is all over the place, that's not great
if len(self.confidences) > 3:
std = np.std(self.confidences)
stability = np.exp(-std) # High variation = low stability
base *= stability
return max(base, 0.1) # Never completely ignore a sensor though
class MultiModalVoting:
This is way better than the simple majority voting in current Monty.
Each learning module's vote gets weighted by:
- How accurate it's been historically
- How confident it is right now
- Whether this sensor is even appropriate for the current task
"""
def __init__(self, modality_names: List[str], num_objects: int):
self.modality_names = modality_names
self.num_objects = num_objects
# Track each sensor's track record
self.trackers = {name: ModalityTracker(name) for name in modality_names}
# Current votes from everyone
self.votes = {name: np.zeros(num_objects) for name in modality_names}
# Smooth things out over time - helps with noisy sensors
self.belief = np.zeros(num_objects)
self.momentum = 0.8
def submit_vote(self, modality: str, object_probs: np.ndarray,
confidence: float = 1.0):
"""A learning module submits what it thinks the object is"""
if modality not in self.votes:
logger.warning(f"Unknown modality {modality}")
return
self.votes[modality] = object_probs
self.trackers[modality].update(confidence)
def get_consensus(self, task_hint: Optional[str] = None) -> Tuple[int, float]:
"""
Figure out what everyone actually thinks, weighted by reliability.
Returns (which_object, how_confident)
"""
weights = self._compute_weights(task_hint)
# Weighted sum of all the votes
fused = np.zeros(self.num_objects)
for modality, weight in weights.items():
if modality in self.votes:
fused += weight * self.votes[modality]
# Smooth it over time to reduce jitter
fused = self.momentum * self.belief + (1 - self.momentum) * fused
self.belief = fused
# Normalize to probabilities
if fused.sum() > 0:
fused = fused / fused.sum()
pred_id = np.argmax(fused)
confidence = fused[pred_id]
return pred_id, confidence
def _compute_weights(self, task_hint: Optional[str]) -> Dict[str, float]:
"""Work out how much to trust each sensor"""
weights = {}
for name, tracker in self.trackers.items():
w = tracker.get_weight()
# Some sensors are just better for certain tasks
if task_hint == 'manipulation':
if 'touch' in name:
w *= 2.0 # Touch is really useful for manipulation
elif 'vision' in name:
w *= 0.8
elif task_hint == 'recognition':
if 'vision' in name:
w *= 2.0 # Vision is great for recognition
elif 'touch' in name:
w *= 0.5
weights[name] = w
# Make sure they sum to 1
total = sum(weights.values())
if total > 0:
weights = {k: v/total for k, v in weights.items()}
else:
weights = {k: 1.0/len(weights) for k in weights.keys()}
return weights
def update_reliability(self, ground_truth: int):
for modality, votes in self.votes.items():
pred = np.argmax(votes)
was_correct = (pred == ground_truth)
self.trackers[modality].update(
confidence=votes[pred],
was_correct=was_correct
)
def reset(self):
self.votes = {name: np.zeros(self.num_objects) for name in self.modality_names}
self.belief = np.zeros(self.num_objects)
class SpatialAttention:
Keeps track of where we've already looked and where we should look next.
Key idea: don't just sample randomly, go where we'll learn the most.
"""
def __init__(self, resolution: float = 0.01):
self.resolution = resolution
self.visited = set()
self.attention = {}
self.decay = 0.95
def _quantize(self, pose: np.ndarray) -> Tuple[float, float, float]:
"""Snap continuous position to a grid"""
x, y, z = pose[:3]
return (
round(x / self.resolution) * self.resolution,
round(y / self.resolution) * self.resolution,
round(z / self.resolution) * self.resolution
)
def mark_visited(self, pose: np.ndarray):
"""We just sensed here"""
loc = self._quantize(pose)
self.visited.add(loc)
self.attention[loc] = 0.0 # No point going back
def boost_attention(self, locations: List[np.ndarray], amount: float = 1.0):
"""Mark some locations as extra interesting (like discriminative features)"""
for pose in locations:
loc = self._quantize(pose)
if loc not in self.visited:
self.attention[loc] = self.attention.get(loc, 0) + amount
def decay_attention(self):
"""Gradually reduce attention everywhere"""
for loc in self.attention:
self.attention[loc] *= self.decay
def get_next_target(self, current_pose: np.ndarray,
radius: float = 0.1) -> Optional[np.ndarray]:
"""
Where should we move our sensor next?
Returns the pose of the most interesting location nearby.
Returns None if there's nowhere interesting.
"""
curr_loc = self._quantize(current_pose)
# Find nearby locations that look interesting
candidates = []
for loc, attn in self.attention.items():
if attn <= 0.01:
continue
dist = np.linalg.norm(np.array(loc) - np.array(curr_loc))
if dist <= radius:
candidates.append((loc, attn, dist))
if not candidates:
return None
# Pick the best one: high attention, not too far
best_loc, _, _ = max(candidates, key=lambda x: x[1] / (x[2] + 0.01))
# Convert back to a full pose
target_pose = np.array([*best_loc, 1, 0, 0, 0]) # Position + identity quaternion
return target_pose
def reset(self):
"""Clear everything for a new episode"""
self.visited.clear()
self.attention.clear()
class ActiveSensingPolicy:
“”"
Motor policy that picks actions to reduce uncertainty.
Works with Monty's motor system.
Basic idea: move the sensor to places that'll give us the most information.
"""
def __init__(self, voting: MultiModalVoting, attention: SpatialAttention):
self.voting = voting
self.attention = attention
def select_action(self, current_pose: np.ndarray,
available_actions: List[str]) -> str:
"""
Pick which action to take next.
Simple approach: move toward the most interesting location.
"""
target = self.attention.get_next_target(current_pose)
if target is None:
# Nothing interesting nearby, just explore randomly
return np.random.choice(available_actions)
# Move toward the target
delta = target[:3] - current_pose[:3]
# Which direction needs the most movement?
abs_delta = np.abs(delta)
max_dim = np.argmax(abs_delta)
# Map to an action
if max_dim == 0:
return "move_forward" if delta[0] > 0 else "move_backward"
elif max_dim == 1:
return "move_left" if delta[1] < 0 else "move_right"
else:
return "move_up" if delta[2] > 0 else "move_down"
def should_stop(self) -> bool:
"""Should we stop sensing?"""
_, confidence = self.voting.get_consensus()
return confidence > 0.9
class MockSensorModule:
def __init__(self, sensor_module_id: str, modality: str):
self.sensor_module_id = sensor_module_id
self.modality = modality
self.timestamp = 0
self.agent_state = None
def update_state(self, agent_state):
self.agent_state = agent_state
def step(self, raw_obs):
"""Process observation and spit out a SensoryObservation"""
self.timestamp += 1
features = np.random.rand(128)
pose = np.random.randn(7) # Position + quaternion
pose[3] = 1 # Identity quaternion
return SensoryObservation(
modality=self.modality,
features=features,
pose=pose,
timestamp=self.timestamp,
confidence=np.random.rand(),
use_state=True
)
def pre_episode(self):
self.timestamp = 0
def state_dict(self):
return {'sensor_module_id': self.sensor_module_id}
class MockLearningModule:
def __init__(self, module_id: str, modality: str, num_objects: int = 5):
self.module_id = module_id
self.modality = modality
self.num_objects = num_objects
self.evidence = np.ones(num_objects) / num_objects
self.stepwise_targets_list = []
self.stepwise_target_object = None
self.terminal_state = False
self.step_count = 0
self.max_steps = 20
self.experiment_mode = "eval"
def matching_step(self, sensory_inputs):
"""Process inputs during matching phase"""
if sensory_inputs is None:
return
self.step_count += 1
# Fake evidence accumulation
for obs in sensory_inputs:
if hasattr(obs, 'features'):
# Pretend we're updating our belief
evidence_boost = np.random.dirichlet([2, 1, 1, 0.5, 0.5])
self.evidence = self.evidence * 0.7 + evidence_boost * 0.3
self.evidence = self.evidence / np.sum(self.evidence)
# Check if we're done
if np.max(self.evidence) > 0.9 or self.step_count >= self.max_steps:
self.terminal_state = True
def exploratory_step(self, sensory_inputs):
"""Process inputs during exploratory phase"""
if sensory_inputs is None:
return
self.step_count += 1
def send_out_vote(self) -> Dict[str, np.ndarray]:
"""Send our vote to other learning modules"""
return {
'object_id_votes': self.evidence,
'confidence': float(np.max(self.evidence))
}
def receive_votes(self, voting_data: List[Dict]):
"""Get votes from other learning modules"""
# Just average them for now
if voting_data:
other_votes = [v['object_id_votes'] for v in voting_data if 'object_id_votes' in v]
if other_votes:
avg_vote = np.mean(other_votes, axis=0)
self.evidence = 0.8 * self.evidence + 0.2 * avg_vote
self.evidence = self.evidence / np.sum(self.evidence)
def propose_goal_states(self) -> List[Dict]:
"""Suggest where we should move next"""
return []
def get_output(self):
"""Get current output for passing to other learning modules"""
return SensoryObservation(
modality=self.modality,
features=self.evidence,
pose=np.array([0, 0, 0, 1, 0, 0, 0]),
timestamp=self.step_count,
confidence=float(np.max(self.evidence)),
use_state=True
)
def add_lm_processing_to_buffer_stats(self, lm_processed: bool):
pass
def set_experiment_mode(self, mode: str):
self.experiment_mode = mode
def pre_episode(self):
self.step_count = 0
self.terminal_state = False
self.evidence = np.ones(self.num_objects) / self.num_objects
self.stepwise_targets_list = []
def post_episode(self):
pass
def state_dict(self):
return {'module_id': self.module_id}
def load_state_dict(self, state_dict):
pass
class MockMotorPolicy:
def __init__(self):
self.current_pose = np.array([0, 0, 0, 1, 0, 0, 0])
def get_agent_state(self, state):
return {'position': self.current_pose[:3]}
def is_motor_only_step(self, state):
return False
def state_dict(self):
return {}
class MockMotorSystem:
def __init__(self):
self._policy = MockMotorPolicy()
self._state = {}
self.experiment_mode = "eval"
def set_experiment_mode(self, mode: str):
self.experiment_mode = mode
class MontyBaseWithVoting:
def __init__(
self,
sensor_modules,
learning_modules,
motor_system,
sm_to_agent_dict,
sm_to_lm_matrix,
lm_to_lm_matrix,
lm_to_lm_vote_matrix,
min_eval_steps,
min_train_steps,
num_exploratory_steps,
max_total_steps,
num_objects=5,
task_hint=None
):
"""Set up enhanced Monty with voting system"""
# Original Monty stuff
self.sensor_modules = sensor_modules
self.learning_modules = learning_modules
self.motor_system = motor_system
self.sm_to_agent_dict = sm_to_agent_dict
self.sm_to_lm_matrix = sm_to_lm_matrix
self.lm_to_lm_matrix = lm_to_lm_matrix
self.lm_to_lm_vote_matrix = lm_to_lm_vote_matrix
self.min_eval_steps = min_eval_steps
self.min_train_steps = min_train_steps
self.num_exploratory_steps = num_exploratory_steps
self.max_total_steps = max_total_steps
# Step tracking
self.step_type = "matching_step"
self.is_seeking_match = True
self.experiment_mode = None
self.total_steps = 0
self.episode_steps = 0
self.exploratory_steps = 0
self.matching_steps = 0
self._is_done = False
# NEW: Multi-modal voting system
modality_names = [sm.modality for sm in sensor_modules]
self.voting_system = MultiModalVoting(modality_names, num_objects)
self.task_hint = task_hint
# NEW: Spatial attention
self.spatial_attention = SpatialAttention()
# NEW: Active sensing policy
self.active_policy = ActiveSensingPolicy(self.voting_system, self.spatial_attention)
# Storage for current outputs
self.sensor_module_outputs = []
self.learning_module_outputs = []
self.gsg_outputs = []
def step(self, observation):
if self.step_type == "matching_step":
# Matching step returns real consensus
return self._matching_step(observation)
elif self.step_type == "exploratory_step":
# Exploratory step still updates learning modules and attention
self._exploratory_step(observation)
# Return placeholder values since there's no object consensus yet
return -1, 0.0
else:
raise ValueError(f"Unknown step type: {self.step_type}")
def _matching_step(self, observation):
"""Do a matching step with multi-modal voting"""
# 1. Gather sensory inputs
self.aggregate_sensory_inputs(observation)
# 2. Step the learning modules
self._step_learning_modules()
# 3. Collect votes from learning modules and submit to voting system
for i, lm in enumerate(self.learning_modules):
vote = lm.send_out_vote()
if 'object_id_votes' in vote:
self.voting_system.submit_vote(
lm.modality,
vote['object_id_votes'],
vote.get('confidence', 1.0)
)
# 4. Get the consensus prediction
pred_id, confidence = self.voting_system.get_consensus(self.task_hint)
# 5. Share votes between learning modules (original Monty behavior)
self._vote()
# 6. Pass goal states around
self._pass_goal_states()
# 7. Update spatial attention
current_pose = self.motor_system._policy.current_pose
self.spatial_attention.mark_visited(current_pose)
self.spatial_attention.decay_attention()
# 8. Check if we're done
self._set_step_type_and_check_if_done()
# 9. Post-step stuff
self._post_step()
return pred_id, confidence
def _exploratory_step(self, observation):
"""Do an exploratory step"""
self.aggregate_sensory_inputs(observation)
self._step_learning_modules()
self._set_step_type_and_check_if_done()
self._post_step()
def aggregate_sensory_inputs(self, observation):
"""Collect outputs from all sensor modules"""
sensor_module_outputs = []
for sensor_module in self.sensor_modules:
raw_obs = self.get_observations(observation, sensor_module.sensor_module_id)
sensor_module.update_state(self.get_agent_state())
sm_output = sensor_module.step(raw_obs)
sensor_module_outputs.append(sm_output)
learning_module_outputs = []
for learning_module in self.learning_modules:
lm_out = learning_module.get_output()
learning_module_outputs.append(lm_out)
self.sensor_module_outputs = sensor_module_outputs
self.learning_module_outputs = learning_module_outputs
def _step_learning_modules(self):
"""Step all the learning modules"""
for i in range(len(self.learning_modules)):
sensory_inputs = self._collect_inputs_to_lm(i)
getattr(self.learning_modules[i], self.step_type)(sensory_inputs)
def _collect_inputs_to_lm(self, lm_id):
"""Collect inputs going to a specific learning module"""
sensory_inputs_from_sms = [
self.sensor_module_outputs[j] for j in self.sm_to_lm_matrix[lm_id]
]
if self.lm_to_lm_matrix is not None:
sensory_inputs_from_lms = [
self.learning_module_outputs[j] for j in self.lm_to_lm_matrix[lm_id]
]
else:
sensory_inputs_from_lms = []
return self._combine_inputs(sensory_inputs_from_sms, sensory_inputs_from_lms)
def _combine_inputs(self, inputs_from_sms, inputs_from_lms):
"""Combine sensor and learning module inputs"""
combined_inputs = [
inputs_from_sms[i]
for i in range(len(inputs_from_sms))
if inputs_from_sms[i].use_state
]
if len(combined_inputs) == 0:
return None
for lm_input in inputs_from_lms:
if lm_input.use_state:
combined_inputs.append(lm_input)
return combined_inputs
def _vote(self):
"""Share votes between learning modules"""
if self.lm_to_lm_vote_matrix is not None:
votes_per_lm = []
for i in range(len(self.learning_modules)):
votes_per_lm.append(self.learning_modules[i].send_out_vote())
for i in range(len(self.learning_modules)):
voting_data = [votes_per_lm[j] for j in self.lm_to_lm_vote_matrix[i]]
self.learning_modules[i].receive_votes(voting_data)
def _pass_goal_states(self):
"""Pass goal states between learning modules"""
self.gsg_outputs = []
if self.step_type == "matching_step":
for lm in self.learning_modules:
goal_states = lm.propose_goal_states()
self.gsg_outputs.extend(goal_states)
def _set_step_type_and_check_if_done(self):
"""Check if we're done and update step type"""
self.update_step_counters()
# Check if all learning modules are done
all_terminal = all(lm.terminal_state for lm in self.learning_modules)
# Check voting system confidence
_, confidence = self.voting_system.get_consensus(self.task_hint)
high_confidence = confidence > 0.9
if all_terminal or high_confidence or self.episode_steps >= self.max_total_steps:
self._is_done = True
logger.info(f"Episode done: terminal={all_terminal}, conf={confidence:.3f}")
if self.exceeded_min_steps:
if self.step_type == "exploratory_step":
self._is_done = True
elif self.step_type == "matching_step" and self.experiment_mode == "train":
self.switch_to_exploratory_step()
def _post_step(self):
"""Hook for post-step processing"""
pass
def update_step_counters(self):
"""Keep track of steps"""
self.total_steps += 1
self.episode_steps += 1
if self.step_type == "matching_step":
self.matching_steps += 1
elif self.step_type == "exploratory_step":
self.exploratory_steps += 1
def switch_to_matching_step(self):
"""Switch to matching mode"""
self.step_type = "matching_step"
self.is_seeking_match = True
def switch_to_exploratory_step(self):
"""Switch to exploratory mode"""
self.step_type = "exploratory_step"
self.is_seeking_match = False
def set_experiment_mode(self, mode):
"""Set experiment mode (train or eval)"""
assert mode in ["train", "eval"]
self.experiment_mode = mode
self.motor_system.set_experiment_mode(mode)
self.step_type = "matching_step"
for lm in self.learning_modules:
lm.set_experiment_mode(mode)
def pre_episode(self):
"""Set up for a new episode"""
self._is_done = False
self.reset_episode_steps()
self.switch_to_matching_step()
self.voting_system.reset()
self.spatial_attention.reset()
for lm in self.learning_modules:
lm.pre_episode()
for sm in self.sensor_modules:
sm.pre_episode()
def post_episode(self):
"""Clean up after episode"""
for lm in self.learning_modules:
lm.post_episode()
def reset_episode_steps(self):
"""Reset episode counters"""
self.episode_steps = 0
self.matching_steps = 0
self.exploratory_steps = 0
def get_observations(self, observations, sensor_module_id):
"""Get observations for a specific sensor module"""
agent_id = self.sm_to_agent_dict[sensor_module_id]
return observations.get(agent_id, {}).get(sensor_module_id, {})
def get_agent_state(self):
"""Get current agent state"""
return self.motor_system._policy.get_agent_state(self.motor_system._state)
@property
def is_done(self):
return self._is_done
@property
def min_steps(self):
if self.step_type == "matching_step":
return self.min_eval_steps if self.experiment_mode == "eval" else self.min_train_steps
elif self.step_type == "exploratory_step":
return self.num_exploratory_steps
return 0
@property
def step_type_count(self):
if self.step_type == "matching_step":
return self.matching_steps
elif self.step_type == "exploratory_step":
return self.exploratory_steps
return 0
@property
def exceeded_min_steps(self):
return self.step_type_count > self.min_steps
def state_dict(self):
"""Get state for saving"""
return dict(
lm_dict={i: lm.state_dict() for i, lm in enumerate(self.learning_modules)},
sm_dict={i: sm.state_dict() for i, sm in enumerate(self.sensor_modules)},
motor_system_dict=self.motor_system._policy.state_dict(),
lm_to_lm_matrix=self.lm_to_lm_matrix,
lm_to_lm_vote_matrix=self.lm_to_lm_vote_matrix,
sm_to_lm_matrix=self.sm_to_lm_matrix,
)
def run_integrated_simulation(steps: int = 15, delay: float = 0.5, num_objects: int = 5):
print("🧠 Monty Multi-Modal Voting & Spatial Attention Demo")
print("=" * 70)
# Set up sensor modules
sensor_modules = [
MockSensorModule("sm_0", "vision"),
MockSensorModule("sm_1", "touch")
]
# Set up learning modules
learning_modules = [
MockLearningModule("lm_0", "vision", num_objects),
MockLearningModule("lm_1", "touch", num_objects)
]
# Mapping structures (simple demo versions)
sm_to_agent_dict = {sm.sensor_module_id: sm.sensor_module_id for sm in sensor_modules}
sm_to_lm_matrix = {i: [i] for i in range(len(learning_modules))} # Each LM gets its corresponding SM
lm_to_lm_matrix = {i: [] for i in range(len(learning_modules))} # No inter-LM connections for demo
lm_to_lm_vote_matrix = {i: [] for i in range(len(learning_modules))}
# Mock motor system
motor_system = MockMotorSystem()
# Initialize MontyBase with voting
monty = MontyBaseWithVoting(
sensor_modules=sensor_modules,
learning_modules=learning_modules,
motor_system=motor_system,
sm_to_agent_dict=sm_to_agent_dict,
sm_to_lm_matrix=sm_to_lm_matrix,
lm_to_lm_matrix=lm_to_lm_matrix,
lm_to_lm_vote_matrix=lm_to_lm_vote_matrix,
min_eval_steps=3,
min_train_steps=3,
num_exploratory_steps=3,
max_total_steps=steps,
num_objects=num_objects,
task_hint="recognition"
)
monty.set_experiment_mode("eval")
monty.pre_episode()
for step_idx in range(steps):
# Simulated observations (empty dict for demo)
obs = {}
pred_id, confidence = monty.step(obs)
print(f"Step {step_idx + 1:02d} | Predicted object: {pred_id} | Confidence: {confidence:.2f}")
if monty.is_done:
print("Monty finished the episode early due to high confidence or terminal states.")
break
time.sleep(delay)
monty.post_episode()
print("Simulation complete!")
Thanks!