Notebook 01 — Multi-dataset validation : Clermont prototype + us_greensboro Teltonika commercial¶
Ce notebook consomme deux datasets du catalog Telemachus 0.7 (cf requirements.yaml du paper) et lance la méthode P013 (Rodrigues sur le vecteur gravité au repos, sans gyroscope) sur chacun pour comparer la précision d’orientation entre :
fr_clermont_proto_2025-09— prototype Fluidy, 14 trips, 25 Hz effective, France. Rôle :primary_validation.us_greensboro_fmc880_2026-04— Teltonika FMC880 commercial, postdaxos_v0.1(frame=raw), Greensboro NC. Rôle :commercial_validation.
Objectif : démontrer que la méthode P013 transfère du prototype au hardware commercial, même si la précision est attendue légèrement plus faible sur us_greensboro à cause du manque de samples ZUPT (trips urbains courts) tant qu’Emmanuel n’a pas fait de roulage plus long.
import sys
from pathlib import Path
import yaml
import duckdb
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# Resolve project roots
NB_DIR = Path.cwd()
TELEFORGE_ROOT = NB_DIR.parent.parent.parent.parent
NOSTOS_ROOT = TELEFORGE_ROOT.parent / "nostos"
NOSTOS_SRC = NOSTOS_ROOT / "src"
if str(NOSTOS_SRC) not in sys.path:
sys.path.insert(0, str(NOSTOS_SRC))
from nostos.stages.d1_imu_calibrator import estimate_rotation
print(f"Teleforge root : {TELEFORGE_ROOT}")
print(f"Nostos root : {NOSTOS_ROOT}")
print(f"Nostos exists : {NOSTOS_ROOT.exists()}")1. Charger les deux manifests depuis le catalog¶
def load_manifest(dataset_id):
with open(TELEFORGE_ROOT / "datasets" / dataset_id / "manifest.yaml") as f:
return yaml.safe_load(f)
ds_ids = ["fr_clermont_proto_2025-09", "us_greensboro_fmc880_2026-04"]
manifests = {dsid: load_manifest(dsid) for dsid in ds_ids}
# Résumé comparatif
rows = []
for dsid, m in manifests.items():
rows.append({
"dataset_id": dsid,
"country": m['country'],
"hardware_class": m['hardware']['class'],
"hw_vendor": m['hardware']['vendor'],
"hw_model": m['hardware']['model'],
"accel_hz": m['sensors']['accelerometer']['rate_hz'],
"has_gyro": m['sensors']['accelerometer']['has_gyroscope'],
"acc_frame": m['acc_periods'][-1]['frame'],
"n_trips": m['volume']['n_trips'],
"distance_km": m['volume']['distance_km'],
})
summary = pd.DataFrame(rows)
summary2. Dataset 1 — Clermont prototype (14 trips)¶
Chargement via le path déclaré dans le manifest.
clermont_dir = TELEFORGE_ROOT / "datasets" / "fr_clermont_proto_2025-09"
clermont_parquet = (clermont_dir / manifests["fr_clermont_proto_2025-09"]['data_files'][0]['path']).resolve()
print(f"Parquet : {clermont_parquet}")
print(f"Exists : {clermont_parquet.exists()}")
df_clermont = pd.read_parquet(clermont_parquet)
print(f"Loaded : {len(df_clermont):,} samples, {df_clermont['trip_idx'].nunique()} trips")clermont_results = []
for trip_idx, trip_df in df_clermont.groupby('trip_idx'):
rot = estimate_rotation(trip_df, speed_thr_ms=0.3)
clermont_results.append({
'dataset': 'clermont_proto',
'trip': f'clermont_{int(trip_idx):02d}',
'n_samples': len(trip_df),
'roll_deg': rot['roll_deg'],
'pitch_deg': rot['pitch_deg'],
'yaw_deg': rot['yaw_deg'],
'gravity_norm': rot.get('gravity_norm'),
'rotation_detected': rot.get('rotation_detected', False),
})
clermont_df = pd.DataFrame(clermont_results)
print(f"{len(clermont_df)} trips Clermont processed")
clermont_df3. Dataset 2 — us_greensboro Teltonika commercial¶
Les trips sont des parquets individuels dans nostos/data/flespi/trips/<device>/<trip_id>.parquet, indexés dans la table trips de la DuckDB telemetry.duckdb (peuplée par le trip processor).
On filtre sur :
carrier_state = 'mounted_driving'(RFC-0013 §3.7)accel_norm_mean_mps2 IS NOT NULL(data accel présente)distance_km > 1(pas de micro-trips)ts_start >= '2026-04-10 11:06'(post daxos_v0.1, cfacc_periodsdu manifest)
duckdb_path = NOSTOS_ROOT / "data" / "flespi" / "storage" / "telemetry.duckdb"
print(f"DuckDB : {duckdb_path}")
print(f"Exists : {duckdb_path.exists()}")
conn = duckdb.connect(str(duckdb_path), read_only=True)
trips_catalog = conn.execute("""
SELECT trip_id, device_id, ts_start, n_samples, distance_km,
speed_max_kmh, ax_mean_mps2, ay_mean_mps2, az_mean_mps2,
accel_norm_mean_mps2, parquet_path
FROM trips
WHERE carrier_state = 'mounted_driving'
AND accel_norm_mean_mps2 IS NOT NULL
AND distance_km > 1.0
AND ts_start >= '2026-04-10 11:06:00'
ORDER BY ts_start
""").fetchdf()
conn.close()
print(f"\n{len(trips_catalog)} trips us_greensboro exploitables (post daxos_v0.1)")
trips_catalog[['trip_id', 'distance_km', 'speed_max_kmh',
'ax_mean_mps2', 'ay_mean_mps2', 'az_mean_mps2']]# Clusteriser par orientation (axe dominant) — le device physique a bougé
# pendant la campagne, donc on isole le cluster dominant pour P013
g_vec = trips_catalog[['ax_mean_mps2', 'ay_mean_mps2', 'az_mean_mps2']].to_numpy()
norms = np.linalg.norm(g_vec, axis=1, keepdims=True)
g_unit = g_vec / norms
dom_axis = np.argmax(np.abs(g_unit), axis=1)
signs = np.sign(g_unit[np.arange(len(g_unit)), dom_axis])
trips_catalog = trips_catalog.copy()
trips_catalog['orientation'] = [f"{'XYZ'[a]}{'+' if s > 0 else '-'}" for a, s in zip(dom_axis, signs)]
print("Orientations détectées :")
print(trips_catalog['orientation'].value_counts())
# Cluster dominant
dominant_cluster = trips_catalog['orientation'].value_counts().idxmax()
print(f"\n→ cluster dominant : {dominant_cluster}")
us_trips = trips_catalog[trips_catalog['orientation'] == dominant_cluster].copy()
print(f"→ {len(us_trips)} trips dans le cluster dominant")us_results = []
for _, row in us_trips.iterrows():
pq = Path(row['parquet_path'])
if not pq.exists():
# Fallback : path local
alt = NOSTOS_ROOT / "data" / "flespi" / "trips" / str(row['device_id']) / f"{row['trip_id']}.parquet"
if alt.exists():
pq = alt
if not pq.exists():
continue
trip_df = pd.read_parquet(pq)
rot = estimate_rotation(trip_df, speed_thr_ms=0.3)
us_results.append({
'dataset': 'us_greensboro',
'trip': row['trip_id'].split('_', 1)[1][:16], # short label
'n_samples': row['n_samples'],
'roll_deg': rot['roll_deg'],
'pitch_deg': rot['pitch_deg'],
'yaw_deg': rot['yaw_deg'],
'gravity_norm': rot.get('gravity_norm'),
'rotation_detected': rot.get('rotation_detected', False),
})
us_df = pd.DataFrame(us_results)
print(f"{len(us_df)} trips us_greensboro processed")
us_df4. Comparaison cross-dataset¶
On agrège les résultats des deux datasets et on reporte la précision P013 (std intra-dataset) pour chacun.
# Ne garder que les trips avec rotation détectée (gravity_norm > 5)
clermont_valid = clermont_df[clermont_df['gravity_norm'] > 5].copy()
us_valid = us_df[us_df['gravity_norm'] > 5].copy() if len(us_df) else pd.DataFrame()
cross_summary = pd.DataFrame([
{
'dataset': 'Clermont prototype',
'hardware': 'Fluidy proto 50 Hz',
'country': 'FR',
'n_trips': len(clermont_valid),
'roll_mean': clermont_valid['roll_deg'].mean(),
'roll_std': clermont_valid['roll_deg'].std(),
'pitch_mean': clermont_valid['pitch_deg'].mean(),
'pitch_std': clermont_valid['pitch_deg'].std(),
'gravity_norm_mean': clermont_valid['gravity_norm'].mean(),
'gravity_norm_std': clermont_valid['gravity_norm'].std(),
},
{
'dataset': 'us_greensboro Teltonika',
'hardware': 'Teltonika FMC880',
'country': 'US',
'n_trips': len(us_valid),
'roll_mean': us_valid['roll_deg'].mean() if len(us_valid) else np.nan,
'roll_std': us_valid['roll_deg'].std() if len(us_valid) else np.nan,
'pitch_mean': us_valid['pitch_deg'].mean() if len(us_valid) else np.nan,
'pitch_std': us_valid['pitch_deg'].std() if len(us_valid) else np.nan,
'gravity_norm_mean': us_valid['gravity_norm'].mean() if len(us_valid) else np.nan,
'gravity_norm_std': us_valid['gravity_norm'].std() if len(us_valid) else np.nan,
},
])
cross_summary.round(3)5. Figure comparative¶
Box-plot roll + gravity norm pour les deux datasets côte à côte.
fig, axes = plt.subplots(1, 3, figsize=(14, 5))
# === Left: roll per trip, both datasets ===
ax = axes[0]
if len(clermont_valid):
ax.scatter(range(len(clermont_valid)), clermont_valid['roll_deg'],
color='#0066CC', s=60, alpha=0.8, label=f"Clermont ({len(clermont_valid)} trips)")
ax.axhline(clermont_valid['roll_deg'].mean(), color='#0066CC', linestyle='--', alpha=0.5)
if len(us_valid):
offset = len(clermont_valid)
ax.scatter(range(offset, offset + len(us_valid)), us_valid['roll_deg'],
color='#E87700', s=60, alpha=0.8, label=f"us_greensboro ({len(us_valid)} trips)")
ax.axhline(us_valid['roll_deg'].mean(), color='#E87700', linestyle='--', alpha=0.5)
ax.set_xlabel('trip index (all datasets)')
ax.set_ylabel('Roll (°)')
ax.set_title('P013 roll per trip')
ax.legend()
ax.grid(True, alpha=0.3)
# === Middle: roll std comparison ===
ax = axes[1]
datasets_lbl = []
stds = []
colors = []
if len(clermont_valid):
datasets_lbl.append(f"Clermont\nproto (n={len(clermont_valid)})")
stds.append(clermont_valid['roll_deg'].std())
colors.append('#0066CC')
if len(us_valid):
datasets_lbl.append(f"us_greensboro\nTeltonika (n={len(us_valid)})")
stds.append(us_valid['roll_deg'].std())
colors.append('#E87700')
bars = ax.bar(datasets_lbl, stds, color=colors, alpha=0.85)
ax.set_ylabel('Roll std (°)')
ax.set_title('Précision P013 (std roll)')
ax.grid(True, alpha=0.3, axis='y')
for bar, v in zip(bars, stds):
ax.text(bar.get_x() + bar.get_width() / 2, v + 0.02, f'{v:.2f}°',
ha='center', va='bottom', fontsize=11, fontweight='bold')
# === Right: gravity norm ===
ax = axes[2]
g_means = []
g_stds = []
g_lbls = []
for d, name, col in [(clermont_valid, 'Clermont\nproto', '#0066CC'),
(us_valid, 'us_greensboro\nTeltonika', '#E87700')]:
if len(d):
g_means.append(d['gravity_norm'].mean())
g_stds.append(d['gravity_norm'].std())
g_lbls.append(name)
ax.bar(g_lbls, g_means, yerr=g_stds, color=['#0066CC', '#E87700'][:len(g_lbls)],
alpha=0.85, capsize=5)
ax.axhline(9.80665, color='red', linestyle=':', label='g_std = 9.80665 m/s²')
ax.set_ylabel('|g| mean (m/s²)')
ax.set_title('Norme de gravité mesurée')
ax.legend()
ax.grid(True, alpha=0.3, axis='y')
ax.set_ylim(9.6, 10.2)
plt.tight_layout()
plt.savefig('p013_cross_dataset_comparison.png', dpi=120, bbox_inches='tight')
plt.show()6. Sauvegarde résultats cross-dataset¶
all_trips = pd.concat([clermont_valid, us_valid], ignore_index=True)
all_trips.to_csv('p013_cross_dataset_per_trip.csv', index=False)
cross_summary.to_csv('p013_cross_dataset_summary.csv', index=False)
print(f"✓ p013_cross_dataset_per_trip.csv ({len(all_trips)} rows)")
print(f"✓ p013_cross_dataset_summary.csv ({len(cross_summary)} rows)")Conclusion¶
Les deux datasets montrent que la méthode P013 fonctionne sans gyroscope :
Sur Clermont (prototype, 14 trips, 5h de livraison avec beaucoup d’arrêts), la précision std(roll) est sub-degré — c’est la mesure de référence qui fonde le paper.
Sur us_greensboro (premier matériel Teltonika FMC880 commercial, 6-8 trips post
daxos_v0.1), la méthode produit des orientations cohérentes avec une norme de gravité à quelques mg près. La précision d’orientation est limitée par le faible nombre de samples stationnaires (les trips urbains courts d’us001 ne contiennent que ~11 samples ZUPT au total) — sub-degré attendu dès qu’Emmanuel fera un roulage 20-30 min avec arrêts normaux.
Le point critique pour le brief INPI/Erwan : la méthode P013 n’est pas liée à un hardware spécifique. Elle transfère du prototype au commercial sans adaptation, à condition que le firmware laisse passer la gravité (ce qui est maintenant le cas sur us_greensboro depuis le push daxos_v0.1).
Datasets consommés (cf requirements.yaml) :
fr_clermont_proto_2025-09(role:primary_validation)us_greensboro_fmc880_2026-04(role:commercial_validation, filter:acc_period.frame = raw)
Datasets planifiés pour une 3e comparaison :
at_aegis_zenodo_820576(Graz, avec gyroscope) — bloqué sur l’adaptertelemachus adapt --source aegis+ l’implémentation deestimate_rotation_with_gyro()dans nostos (cf notebook P011).