134 lines
4 KiB
Python
134 lines
4 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
from dataclasses import dataclass
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Iterable
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class AxisStats:
|
||
|
|
min_value: int | None = None
|
||
|
|
max_value: int | None = None
|
||
|
|
|
||
|
|
def update(self, value: int) -> None:
|
||
|
|
if self.min_value is None or value < self.min_value:
|
||
|
|
self.min_value = value
|
||
|
|
if self.max_value is None or value > self.max_value:
|
||
|
|
self.max_value = value
|
||
|
|
|
||
|
|
@property
|
||
|
|
def offset(self) -> float:
|
||
|
|
assert self.min_value is not None and self.max_value is not None
|
||
|
|
return (self.min_value + self.max_value) / 2.0
|
||
|
|
|
||
|
|
@property
|
||
|
|
def span(self) -> int:
|
||
|
|
assert self.min_value is not None and self.max_value is not None
|
||
|
|
return self.max_value - self.min_value
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class FileStats:
|
||
|
|
sample_count: int
|
||
|
|
x: AxisStats
|
||
|
|
y: AxisStats
|
||
|
|
z: AxisStats
|
||
|
|
|
||
|
|
|
||
|
|
def parse_log(path: Path) -> FileStats:
|
||
|
|
x = AxisStats()
|
||
|
|
y = AxisStats()
|
||
|
|
z = AxisStats()
|
||
|
|
sample_count = 0
|
||
|
|
|
||
|
|
with path.open("r", encoding="utf-8") as handle:
|
||
|
|
for raw_line in handle:
|
||
|
|
line = raw_line.strip()
|
||
|
|
if not line or line.startswith("#"):
|
||
|
|
continue
|
||
|
|
fields = line.split("\t")
|
||
|
|
if len(fields) < 13:
|
||
|
|
continue
|
||
|
|
x.update(int(fields[4]))
|
||
|
|
y.update(int(fields[5]))
|
||
|
|
z.update(int(fields[6]))
|
||
|
|
sample_count += 1
|
||
|
|
|
||
|
|
if sample_count == 0:
|
||
|
|
raise ValueError(f"{path}: no samples found")
|
||
|
|
|
||
|
|
return FileStats(sample_count=sample_count, x=x, y=y, z=z)
|
||
|
|
|
||
|
|
|
||
|
|
def merge_stats(items: Iterable[FileStats]) -> FileStats:
|
||
|
|
merged = FileStats(sample_count=0, x=AxisStats(), y=AxisStats(), z=AxisStats())
|
||
|
|
for item in items:
|
||
|
|
merged.sample_count += item.sample_count
|
||
|
|
merged.x.update(item.x.min_value)
|
||
|
|
merged.x.update(item.x.max_value)
|
||
|
|
merged.y.update(item.y.min_value)
|
||
|
|
merged.y.update(item.y.max_value)
|
||
|
|
merged.z.update(item.z.min_value)
|
||
|
|
merged.z.update(item.z.max_value)
|
||
|
|
return merged
|
||
|
|
|
||
|
|
|
||
|
|
def print_stats(label: str, stats: FileStats) -> None:
|
||
|
|
print(f"{label}:")
|
||
|
|
print(f" samples={stats.sample_count}")
|
||
|
|
print(
|
||
|
|
f" raw_x min={stats.x.min_value} max={stats.x.max_value} "
|
||
|
|
f"offset={stats.x.offset:.1f} span={stats.x.span}"
|
||
|
|
)
|
||
|
|
print(
|
||
|
|
f" raw_y min={stats.y.min_value} max={stats.y.max_value} "
|
||
|
|
f"offset={stats.y.offset:.1f} span={stats.y.span}"
|
||
|
|
)
|
||
|
|
print(
|
||
|
|
f" raw_z min={stats.z.min_value} max={stats.z.max_value} "
|
||
|
|
f"offset={stats.z.offset:.1f} span={stats.z.span}"
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> int:
|
||
|
|
parser = argparse.ArgumentParser(description="Compute raw magnetometer midpoint offsets from one or more log files.")
|
||
|
|
parser.add_argument("logs", nargs="+", help="Magnetometer log file(s)")
|
||
|
|
parser.add_argument("--prior-x", type=float, default=0.0, help="Previously compiled raw X offset")
|
||
|
|
parser.add_argument("--prior-y", type=float, default=0.0, help="Previously compiled raw Y offset")
|
||
|
|
parser.add_argument("--prior-z", type=float, default=0.0, help="Previously compiled raw Z offset")
|
||
|
|
args = parser.parse_args()
|
||
|
|
|
||
|
|
per_file: list[tuple[Path, FileStats]] = []
|
||
|
|
for item in args.logs:
|
||
|
|
path = Path(item)
|
||
|
|
per_file.append((path, parse_log(path)))
|
||
|
|
|
||
|
|
for path, stats in per_file:
|
||
|
|
print_stats(path.name, stats)
|
||
|
|
|
||
|
|
if len(per_file) > 1:
|
||
|
|
print()
|
||
|
|
|
||
|
|
combined = merge_stats(stats for _, stats in per_file)
|
||
|
|
print_stats("combined", combined)
|
||
|
|
|
||
|
|
print()
|
||
|
|
print("residual_offsets:")
|
||
|
|
print(f" x={combined.x.offset:.1f}")
|
||
|
|
print(f" y={combined.y.offset:.1f}")
|
||
|
|
print(f" z={combined.z.offset:.1f}")
|
||
|
|
|
||
|
|
print()
|
||
|
|
print("updated_total_offsets:")
|
||
|
|
print(f" x={args.prior_x + combined.x.offset:.1f}")
|
||
|
|
print(f" y={args.prior_y + combined.y.offset:.1f}")
|
||
|
|
print(f" z={args.prior_z + combined.z.offset:.1f}")
|
||
|
|
|
||
|
|
return 0
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
raise SystemExit(main())
|