Enter password to access the dashboard
Wrong password
Navigation via video generation + action diffusion
No runs yet
| WebSocket | - |
|---|---|
| Device | - |
| WAN Checkpoint | - |
| Action Checkpoint | - |
| VAE+T5 URL | - |
| Output Dir | - |
Mode: chunk — Send 4N frames per step, receive all 8 predicted actions.
End the current episode (saves logs) and prepare for a new one. No image needed.
| Field | Type | Description | |
|---|---|---|---|
| type | string | required | Must be "reset" |
→ {"type": "reset"}
← {"status": "ok", "message": "Episode saved to output/server_viz/20260413_220650"}
The first step after reset is automatically treated as init: the server uses only the last image, replicates it 4 times as front-pad. The response includes "init": true. Must include prompt on the first step.
Subsequent steps send 4N new observation frames (4, 8, 12, ...). Returns 8 action deltas [dx(m), dy(m), dyaw(rad)].
| Field | Type | Description | |
|---|---|---|---|
| type | string | required | "step" |
| images | string[] | required | Base64 JPEG. First step: at least 1 (only last used). Later: multiple of 4. |
| prompt | string | 1st step | Navigation instruction. Required on first step, optional later to override. |
# First step (auto-init) — response includes "init": true
→ {"type": "step", "images": ["<b64_jpeg>"], "prompt": "fly forward"}
← {"actions": [[0.01, -0.002, 0.001], ... x8], "chunk_idx": 1, "latency_ms": 13500, "init": true}
# Subsequent steps
→ {"type": "step", "images": ["<b64>", "<b64>", "<b64>", "<b64>"]}
← {"actions": [[dx,dy,dyaw], ... x8], "chunk_idx": 2, "latency_ms": 7200}
import websocket, json, base64, io
from PIL import Image
WS_URL = "wss://infer.siqi.app/model" # or ws://localhost:5811 for local
def encode(frame):
"""Encode numpy [H,W,3] uint8 → base64 JPEG string."""
buf = io.BytesIO()
Image.fromarray(frame).save(buf, format="JPEG", quality=90)
return base64.b64encode(buf.getvalue()).decode()
ws = websocket.WebSocket()
ws.connect(WS_URL)
# ---- Reset previous episode (if any) ----
ws.send(json.dumps({"type": "reset"}))
print(json.loads(ws.recv())) # {"status": "ok", ...}
# ---- First step (auto-init): send first observation + prompt ----
first_frame = camera.capture() # numpy [H, W, 3] uint8
ws.send(json.dumps({
"type": "step",
"images": [encode(first_frame)],
"prompt": "navigate to the target",
}))
resp = json.loads(ws.recv())
actions = resp["actions"] # 8 actions: [[dx, dy, dyaw], ...]
assert resp.get("init") # first step returns init: true
print(f"Init: {len(actions)} actions, latency {resp['latency_ms']:.0f}ms")
# ---- Loop: execute 4 actions, capture 4 frames, send ----
for step in range(50):
# Execute first 4 actions, collect new observations
new_frames = []
for a in actions[:4]:
robot.execute(dx=a[0], dy=a[1], dyaw=a[2])
new_frames.append(camera.capture())
# Send 4 new frames
ws.send(json.dumps({
"type": "step",
"images": [encode(f) for f in new_frames],
}))
resp = json.loads(ws.recv())
actions = resp["actions"] # next 8 actions
print(f"Step {step+1}: latency {resp['latency_ms']:.0f}ms")
# ---- End episode ----
ws.send(json.dumps({"type": "reset"}))
print(json.loads(ws.recv()))
ws.close()
Mode: per_rgb — Send 1 frame per step, receive 1 action. Server buffers frames and caches actions internally.
Per-RGB Steps (4 or 8): controls how many actions to use per inference cycle. 4-step: use first 4 actions, then re-infer (lower latency, fresher predictions). 8-step: use all 8 actions before re-inferring (fewer inference calls).
Same as chunk mode — ends current episode.
→ {"type": "reset"}
← {"status": "ok", "message": "Episode saved to ..."}
First step after reset: send 1 image + prompt. Auto-init, returns 1 action (remaining S-1 cached). Response includes "init": true.
# First step (auto-init) — with per_rgb_steps=4, cache_remaining=3
→ {"type": "step", "images": ["<b64>"], "prompt": "fly forward"}
← {"actions": [[dx,dy,dyaw]], "chunk_idx": 1, "from_cache": false, "cache_remaining": 3, "init": true}
# Subsequent steps — send 1 (or more) new frame(s). Three possible responses:
| Situation | Response | Latency |
|---|---|---|
| Action cache has entries | {"actions": [[dx,dy,dyaw]], "from_cache": true, "cache_remaining": N} | ~0ms |
| Cache empty, buffer reaches 4 | {"actions": [[dx,dy,dyaw]], "from_cache": false, "cache_remaining": 7} | ~6-20s |
| Cache empty, buffer < 4 | {"actions": [], "status": "buffering", "buffer_size": N} | ~0ms |
→ {"type": "step", "images": ["<b64>"]}
← {"actions": [[0.03, -0.01, 0.00]], "from_cache": true, "cache_remaining": 5}
import websocket, json, base64, io
from PIL import Image
WS_URL = "wss://infer.siqi.app/model" # or ws://localhost:5811 for local
def encode(frame):
buf = io.BytesIO()
Image.fromarray(frame).save(buf, format="JPEG", quality=90)
return base64.b64encode(buf.getvalue()).decode()
ws = websocket.WebSocket()
ws.connect(WS_URL)
# ---- Reset ----
ws.send(json.dumps({"type": "reset"}))
json.loads(ws.recv())
# ---- First step (auto-init) ----
ws.send(json.dumps({
"type": "step",
"images": [encode(camera.capture())],
"prompt": "navigate forward",
}))
resp = json.loads(ws.recv())
action = resp["actions"][0] # first action [dx, dy, dyaw]
# ---- Loop: 1 frame in → 1 action out ----
for step in range(200):
# Execute action
robot.execute(dx=action[0], dy=action[1], dyaw=action[2])
# Capture and send 1 frame
ws.send(json.dumps({
"type": "step",
"images": [encode(camera.capture())],
}))
resp = json.loads(ws.recv())
if resp["actions"]:
action = resp["actions"][0]
cached = resp.get("from_cache", False)
print(f"Step {step}: action={action}, cache={'hit' if cached else 'new'}")
else:
# Buffering — no action yet, robot stays still
print(f"Step {step}: buffering ({resp['buffer_size']}/4 frames)")
continue
# ---- End ----
ws.send(json.dumps({"type": "reset"}))
json.loads(ws.recv())
ws.close()