Week #3 #
Implemented MVP features #
This week we completed the core MVP features focused on the Hardware and Firmware and common
Hardware #
Cart-Pole hardware assembly ( #27)
- Cart, rail, and pendulum arm assembled
- Wiring completed and verified
- End-button added and connected ( #28)
ESP32 physical integration ( #22)
- Mounted and connected to power supply and peripherals
Firmware & Backend #
Common firmware structure created ( #11)
- Shared firmware codebase for ESP32
Init script implemented ( #12)
- Boots ESP32 and initializes pins and components
State management logic added ( #25)
Software Design #
CartPole class structure implemented ( #14)
- Object-oriented design
- Includes step(), reset(), and render() methods
Repo structured and cleaned ( #1)
Functional User Journey #
A user connects the ESP32-powered cart-pole system, starts the firmware using the init script, and observes hardware movement while system states are logged for debugging. This demonstrates the full integration of software and hardware for a working MVP.
Demonstration of the working MVP #
Internal demo #
- MVP reviewed by team with all hardware components assembled
- Firmware boot confirmed via serial output
- Class-based code design approved for extensibility
ML integrartion #
The goal is to integrate a RL system that can learn to balance cart-pole system. The program should:
Interact with the real or simulated cart-pole via step()
and reset()
functions.
Being trained with fit()
function
Predicts actions with predict()
function
The structure of ML API can look like this:
class RLAgent:
def init(self, env, **kwargs):
self.env = env
def fit(self, episodes: int = 1000):
raise NotImplementedError
def predict(self, state):
raise NotImplementedError
def evaluate(self, episodes: int = 10):
raise NotImplementedError
Then we make sure that our cart pole class has reset() that returns initial state and step(action) returns (next_state, reward, done)
And then we can implement a simple RL agent
import random
import numpy as np
from collections import deque
import torch
import torch.nn as nn
import torch.optim as optim
class QNetwork(nn.Module):
def init(self, state_dim, action_dim, hidden_sizes=[64, 64]):
super(QNetwork, self).init()
layers = []
input_dim = state_dim
for h in hidden_sizes:
layers.append(nn.Linear(input_dim, h))
layers.append(nn.ReLU())
input_dim = h
layers.append(nn.Linear(input_dim, action_dim))
self.model = nn.Sequential(*layers)
def forward(self, x):
return self.model(x)
class DQNAgent:
def init(self, env, buffer_size=10000, batch_size=64, gamma=0.99,
lr=1e-3, epsilon_start=1.0, epsilon_end=0.01, epsilon_decay=0.995):
self.env = env
self.state_dim = env.observation_space.shape[0]
self.action_dim = env.action_space.n
self.q_network = QNetwork(self.state_dim, self.action_dim)
self.target_network = QNetwork(self.state_dim, self.action_dim)
self.target_network.load_state_dict(self.q_network.state_dict())
self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
self.criterion = nn.MSELoss()
self.buffer = deque(maxlen=buffer_size)
self.batch_size = batch_size
self.gamma = gamma
self.epsilon = epsilon_start
self.epsilon_end = epsilon_end
self.epsilon_decay = epsilon_decay
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.q_network.to(self.device)
self.target_network.to(self.device)
def predict(self, state):
if np.random.rand() < self.epsilon:
return self.env.action_space.sample()
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
with torch.no_grad():
q_values = self.q_network(state_tensor)
return q_values.argmax().item()
def fit(self, episodes=500, target_update=10):
for ep in range(episodes):
state = self.env.reset()
done = False
total_reward = 0
while not done:
action = self.predict(state)
next_state, reward, done, _ = self.env.step(action)
self.buffer.append((state, action, reward, next_state, done))
state = next_state
total_reward += reward
self._train_step()
# Decay epsilon
self.epsilon = max(self.epsilon_end, self.epsilon * self.epsilon_decay)
# Update target network
if ep % target_update == 0:
self.target_network.load_state_dict(self.q_network.state_dict())
print(f"Episode {ep}: reward={total_reward:.2f}, epsilon={self.epsilon:.3f}")
def evaluate(self, episodes=10, render=False):
total_rewards = []
for _ in range(episodes):
state = self.env.reset()
done = False
ep_reward = 0
while not done:
if render:
self.env.render()
action = self.predict(state)
state, reward, done, _ = self.env.step(action)
ep_reward += reward
total_rewards.append(ep_reward)
avg_reward = np.mean(total_rewards)
print(f"Average reward over {episodes} episodes: {avg_reward:.2f}")
return avg_reward
def _train_step(self):
if len(self.buffer) < self.batch_size:
return
minibatch = random.sample(self.buffer, self.batch_size)
states, actions, rewards, next_states, dones = zip(*minibatch)
states = torch.FloatTensor(states).to(self.device)
actions = torch.LongTensor(actions).unsqueeze(1).to(self.device)
rewards = torch.FloatTensor(rewards).unsqueeze(1).to(self.device)
next_states = torch.FloatTensor(next_states).to(self.device)
dones = torch.FloatTensor(dones).unsqueeze(1).to(self.device)
q_values = self.q_network(states).gather(1, actions)
next_q_values = self.target_network(next_states).max(1)[0].unsqueeze(1)
expected_q = rewards + self.gamma * next_q_values * (1 - dones)
loss = self.criterion(q_values, expected_q.detach())
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
Weekly commitments #
Individual contribution of each participant #
Anastasia - Write 🫧 controller
Evgenii - 3D model and blueprints for controller container
Artyom - Documentation via Sphinx
Petr - Write report
Marat - Firmware MVP
Plan for Next Week #
- Python library development
- CI/CD for documentation
- Hardware and Firmware documentation
Confirmation of the code’s operability #
We confirm that the code in the main branch:
- In working condition.
- Run via docker-compose (or another alternative described in the README.md).