#!/usr/bin/env python3 from __future__ import annotations import argparse import math from dataclasses import dataclass from pathlib import Path from typing import Iterable @dataclass class AxisStats: min_value: int | None = None max_value: int | None = None def update(self, value: int) -> None: if self.min_value is None or value < self.min_value: self.min_value = value if self.max_value is None or value > self.max_value: self.max_value = value @property def offset(self) -> float: assert self.min_value is not None and self.max_value is not None return (self.min_value + self.max_value) / 2.0 @property def span(self) -> int: assert self.min_value is not None and self.max_value is not None return self.max_value - self.min_value @dataclass class Sample: raw_x: int raw_y: int raw_z: int @dataclass class FileStats: sample_count: int x: AxisStats y: AxisStats z: AxisStats samples: list[Sample] @dataclass class Calibration2D: offset_x: float offset_y: float covariance: tuple[tuple[float, float], tuple[float, float]] whitening: tuple[tuple[float, float], tuple[float, float]] eigenvalues: tuple[float, float] eigenvectors: tuple[tuple[float, float], tuple[float, float]] mean_radius: float min_radius: float max_radius: float radius_ratio: float def parse_log(path: Path) -> FileStats: x = AxisStats() y = AxisStats() z = AxisStats() sample_count = 0 samples: list[Sample] = [] with path.open("r", encoding="utf-8") as handle: for raw_line in handle: line = raw_line.strip() if not line or line.startswith("#"): continue fields = line.split("\t") if len(fields) < 13: continue raw_x = int(fields[4]) raw_y = int(fields[5]) raw_z = int(fields[6]) x.update(raw_x) y.update(raw_y) z.update(raw_z) samples.append(Sample(raw_x=raw_x, raw_y=raw_y, raw_z=raw_z)) sample_count += 1 if sample_count == 0: raise ValueError(f"{path}: no samples found") return FileStats(sample_count=sample_count, x=x, y=y, z=z, samples=samples) def merge_stats(items: Iterable[FileStats]) -> FileStats: merged = FileStats(sample_count=0, x=AxisStats(), y=AxisStats(), z=AxisStats(), samples=[]) for item in items: merged.sample_count += item.sample_count merged.x.update(item.x.min_value) merged.x.update(item.x.max_value) merged.y.update(item.y.min_value) merged.y.update(item.y.max_value) merged.z.update(item.z.min_value) merged.z.update(item.z.max_value) merged.samples.extend(item.samples) return merged def print_stats(label: str, stats: FileStats) -> None: print(f"{label}:") print(f" samples={stats.sample_count}") print( f" raw_x min={stats.x.min_value} max={stats.x.max_value} " f"offset={stats.x.offset:.1f} span={stats.x.span}" ) print( f" raw_y min={stats.y.min_value} max={stats.y.max_value} " f"offset={stats.y.offset:.1f} span={stats.y.span}" ) print( f" raw_z min={stats.z.min_value} max={stats.z.max_value} " f"offset={stats.z.offset:.1f} span={stats.z.span}" ) def _covariance_2d(points: list[tuple[float, float]]) -> tuple[tuple[float, float], tuple[float, float]]: if len(points) < 2: raise ValueError("Need at least 2 points") mean_x = sum(p[0] for p in points) / len(points) mean_y = sum(p[1] for p in points) / len(points) sxx = 0.0 sxy = 0.0 syy = 0.0 for x, y in points: dx = x - mean_x dy = y - mean_y sxx += dx * dx sxy += dx * dy syy += dy * dy denom = max(1, len(points) - 1) return ((sxx / denom, sxy / denom), (sxy / denom, syy / denom)) def _eigendecomp_sym_2x2(m: tuple[tuple[float, float], tuple[float, float]]) -> tuple[ tuple[float, float], tuple[tuple[float, float], tuple[float, float]] ]: a = m[0][0] b = m[0][1] d = m[1][1] trace = a + d det_term = math.sqrt(max(0.0, (a - d) * (a - d) + 4.0 * b * b)) l1 = (trace + det_term) / 2.0 l2 = (trace - det_term) / 2.0 def unit_vec_for_lambda(lam: float) -> tuple[float, float]: if abs(b) > 1e-12: vx = b vy = lam - a elif a >= d: vx, vy = 1.0, 0.0 else: vx, vy = 0.0, 1.0 norm = math.hypot(vx, vy) if norm == 0.0: return (1.0, 0.0) return (vx / norm, vy / norm) v1 = unit_vec_for_lambda(l1) v2 = unit_vec_for_lambda(l2) # Ensure orthogonality and right-handed column ordering. dot = v1[0] * v2[0] + v1[1] * v2[1] v2 = (v2[0] - dot * v1[0], v2[1] - dot * v1[1]) norm_v2 = math.hypot(v2[0], v2[1]) if norm_v2 < 1e-12: v2 = (-v1[1], v1[0]) else: v2 = (v2[0] / norm_v2, v2[1] / norm_v2) return (l1, l2), (v1, v2) def _whitening_from_covariance( cov: tuple[tuple[float, float], tuple[float, float]] ) -> tuple[ tuple[tuple[float, float], tuple[float, float]], tuple[float, float], tuple[tuple[float, float], tuple[float, float]], ]: (l1, l2), (v1, v2) = _eigendecomp_sym_2x2(cov) eps = 1e-12 inv_sqrt_l1 = 1.0 / math.sqrt(max(l1, eps)) inv_sqrt_l2 = 1.0 / math.sqrt(max(l2, eps)) # W = V * diag(1/sqrt(lambda)) * V^T v11, v21 = v1[0], v1[1] v12, v22 = v2[0], v2[1] w00 = inv_sqrt_l1 * v11 * v11 + inv_sqrt_l2 * v12 * v12 w01 = inv_sqrt_l1 * v11 * v21 + inv_sqrt_l2 * v12 * v22 w10 = inv_sqrt_l1 * v21 * v11 + inv_sqrt_l2 * v22 * v12 w11 = inv_sqrt_l1 * v21 * v21 + inv_sqrt_l2 * v22 * v22 return ((w00, w01), (w10, w11)), (l1, l2), (v1, v2) def fit_xy_calibration(stats: FileStats) -> Calibration2D: offset_x = stats.x.offset offset_y = stats.y.offset centered = [(s.raw_x - offset_x, s.raw_y - offset_y) for s in stats.samples] cov = _covariance_2d(centered) whitening, eigenvalues, eigenvectors = _whitening_from_covariance(cov) radii: list[float] = [] for cx, cy in centered: nx = whitening[0][0] * cx + whitening[0][1] * cy ny = whitening[1][0] * cx + whitening[1][1] * cy radii.append(math.hypot(nx, ny)) mean_radius = sum(radii) / len(radii) min_radius = min(radii) max_radius = max(radii) radius_ratio = max_radius / min_radius if min_radius > 0 else float("inf") return Calibration2D( offset_x=offset_x, offset_y=offset_y, covariance=cov, whitening=whitening, eigenvalues=eigenvalues, eigenvectors=eigenvectors, mean_radius=mean_radius, min_radius=min_radius, max_radius=max_radius, radius_ratio=radius_ratio, ) def print_calibration_xy(cal: Calibration2D) -> None: print("xy_calibration:") print(f" offset_x={cal.offset_x:.3f}") print(f" offset_y={cal.offset_y:.3f}") print(" covariance:") print(f" [{cal.covariance[0][0]:.6f} {cal.covariance[0][1]:.6f}]") print(f" [{cal.covariance[1][0]:.6f} {cal.covariance[1][1]:.6f}]") print(" eigenvalues:") print(f" lambda1={cal.eigenvalues[0]:.6f}") print(f" lambda2={cal.eigenvalues[1]:.6f}") print(" eigenvectors (columns):") print(f" v1=({cal.eigenvectors[0][0]:.6f}, {cal.eigenvectors[0][1]:.6f})") print(f" v2=({cal.eigenvectors[1][0]:.6f}, {cal.eigenvectors[1][1]:.6f})") print(" whitening_matrix:") print(f" [{cal.whitening[0][0]:.9f} {cal.whitening[0][1]:.9f}]") print(f" [{cal.whitening[1][0]:.9f} {cal.whitening[1][1]:.9f}]") print(" whitened_radius:") print(f" mean={cal.mean_radius:.6f}") print(f" min={cal.min_radius:.6f}") print(f" max={cal.max_radius:.6f}") print(f" max_over_min={cal.radius_ratio:.6f}") print() print("cxx_snippet:") print(f" constexpr float kMagOffsetX = {cal.offset_x:.3f}f;") print(f" constexpr float kMagOffsetY = {cal.offset_y:.3f}f;") print(f" constexpr float kMagSoft00 = {cal.whitening[0][0]:.9f}f;") print(f" constexpr float kMagSoft01 = {cal.whitening[0][1]:.9f}f;") print(f" constexpr float kMagSoft10 = {cal.whitening[1][0]:.9f}f;") print(f" constexpr float kMagSoft11 = {cal.whitening[1][1]:.9f}f;") print() print("firmware_formula:") print(" float mx0 = raw_x - kMagOffsetX;") print(" float my0 = raw_y - kMagOffsetY;") print(" float mx = kMagSoft00 * mx0 + kMagSoft01 * my0;") print(" float my = kMagSoft10 * mx0 + kMagSoft11 * my0;") print(" float heading = atan2f(my, mx) * kDegPerRad;") def main() -> int: parser = argparse.ArgumentParser( description="Compute raw magnetometer midpoint offsets and a 2D XY soft-iron whitening matrix from one or more log files." ) parser.add_argument("logs", nargs="+", help="Magnetometer log file(s)") parser.add_argument("--prior-x", type=float, default=0.0, help="Previously compiled raw X offset") parser.add_argument("--prior-y", type=float, default=0.0, help="Previously compiled raw Y offset") parser.add_argument("--prior-z", type=float, default=0.0, help="Previously compiled raw Z offset") args = parser.parse_args() per_file: list[tuple[Path, FileStats]] = [] for item in args.logs: path = Path(item) per_file.append((path, parse_log(path))) for path, stats in per_file: print_stats(path.name, stats) if len(per_file) > 1: print() combined = merge_stats(stats for _, stats in per_file) print_stats("combined", combined) print() print("residual_offsets:") print(f" x={combined.x.offset:.1f}") print(f" y={combined.y.offset:.1f}") print(f" z={combined.z.offset:.1f}") print() print("updated_total_offsets:") print(f" x={args.prior_x + combined.x.offset:.1f}") print(f" y={args.prior_y + combined.y.offset:.1f}") print(f" z={args.prior_z + combined.z.offset:.1f}") print() cal = fit_xy_calibration(combined) print_calibration_xy(cal) return 0 if __name__ == "__main__": raise SystemExit(main())