Notebook works, standalone Python script works

This commit is contained in:
jlpoole 2026-04-28 13:03:09 -07:00
commit 529edb62a0
2 changed files with 258 additions and 35 deletions

View file

@ -40,7 +40,7 @@
"metadata": {},
"outputs": [],
"source": [
"%matplotlib notebook\n",
"%matplotlib inline\n",
"import numpy as np\n",
"import numpy.linalg as linalg\n",
"import matplotlib.pyplot as plt\n",
@ -49,7 +49,7 @@
"import sys\n",
"import pandas as pd\n",
"from cobs import cobs\n",
"import msgpack"
"import msgpack\n"
]
},
{
@ -107,7 +107,7 @@
"If you want to do this with your own data, this script helps you to read data from a serial port. Just set the port and the baud rate accordingly.\n",
"Alternatively you can use my saved measurements (see next section). But then -of course- you calibrate my magnetometer, not yours.\n",
"\n",
"For this you need a micro controller that sends the measured data via a serial connection. To do it right serialize the data into a msgpack array, encode it with COBS and terminate a frame with 0. The array should contain the measurements of the 3 magnetometer axis (in gauss), the 3 accelerometer axis (in m/s²) and the 3 gyroscopes (in °/s). Your IMU should have a right handed coordinate frame with the z axis pointing upwards. \n",
"For this you need a micro controller that sends the measured data via a serial connection. To do it right serialize the data into a msgpack array, encode it with COBS and terminate a frame with 0. The array should contain the measurements of the 3 magnetometer axis (in gauss), the 3 accelerometer axis (in m/s\u00b2) and the 3 gyroscopes (in \u00b0/s). Your IMU should have a right handed coordinate frame with the z axis pointing upwards. \n",
"\n",
"For Arduino you can do it that way:\n",
"\n",
@ -176,36 +176,69 @@
"outputs": [],
"source": [
"# adapt the port and baud rate to your device\n",
"port = '/dev/ttyACM1'\n",
"port = '/dev/ttytDAN'\n",
"baud_rate = 115200\n",
"num_measurements = 100 # use a small value for testing; use 1000+ for real calibration\n",
"serial_format = 'motioncal_raw' # use 'cobs_msgpack' for the original notebook firmware\n",
"read_from_serial = True\n",
"read_from_csv = False\n",
"save_measurements = False\n",
"\n",
"s = None\n",
"try:\n",
" s = serial.Serial(port, baud_rate)\n",
" s = serial.Serial(port, baud_rate, timeout=1)\n",
"except serial.SerialException:\n",
" print(\"Could not connect to the provided port\")\n",
"\n",
"\n",
"def read_measurements(num_measurements=10000):\n",
" if not s.isOpen():\n",
"def parse_measurement(response):\n",
" \"\"\"Parse either MotionCal text lines or the notebook's COBS/msgpack frames.\"\"\"\n",
" response = response.strip()\n",
" if not response:\n",
" return None\n",
"\n",
" if response.startswith(b'Raw:'):\n",
" values = [float(v) for v in response[4:].decode('ascii').split(',')]\n",
" if len(values) != 9:\n",
" raise ValueError(\"Expected 9 comma-separated values after Raw:\")\n",
"\n",
" # MotionCal prints ax,ay,az,gx,gy,gz,mx,my,mz. This notebook expects\n",
" # mx,my,mz,ax,ay,az,gx,gy,gz.\n",
" ax, ay, az, gx, gy, gz, mx, my, mz = values\n",
" return [mx, my, mz, ax, ay, az, gx, gy, gz]\n",
"\n",
" response = cobs.decode(response.rstrip(b'\\x00'))\n",
" return msgpack.unpackb(response, raw=False)\n",
"\n",
"\n",
"def read_measurements(num_measurements=10000, serial_format='motioncal_raw'):\n",
" if s is None:\n",
" return None\n",
"\n",
" if not s.is_open:\n",
" try:\n",
" s.open()\n",
" except SerialException:\n",
" except serial.SerialException:\n",
" print(\"Could not connect\")\n",
" return None\n",
"\n",
" m = np.zeros((num_measurements, 9))\n",
" terminator = b'\\x00' if serial_format == 'cobs_msgpack' else b'\\n'\n",
"\n",
" try:\n",
" i = 0\n",
" while i < num_measurements:\n",
" if s.in_waiting:\n",
" response = s.read_until(terminator=b'\\x00', size=100)\n",
" if response is not \"\":\n",
" response = s.read_until(expected=terminator, size=200)\n",
" if response:\n",
" try:\n",
" response = cobs.decode(response[:-1])\n",
" m[i] = msgpack.unpackb(response, raw=False)\n",
" measurement = parse_measurement(response)\n",
" if measurement is None:\n",
" continue\n",
" m[i] = measurement\n",
" print(\"progress: {0:05.2f}%\".format((i + 1) / num_measurements * 100), end='\\r')\n",
" i += 1\n",
" except (cobs.DecodeError, msgpack.ExtraData, msgpack.UnpackValueError) as e:\n",
" except (ValueError, UnicodeDecodeError, cobs.DecodeError, msgpack.ExtraData, msgpack.UnpackValueError):\n",
" print(\"Could not decode sent data! No worries, I continue with the next one.\")\n",
" continue\n",
" print(\"\") # clear line\n",
@ -219,7 +252,7 @@
" print(\"Could not disconnect\")\n",
"\n",
" print(\"Finished\")\n",
" return m"
" return m[:i]\n"
]
},
{
@ -237,13 +270,18 @@
},
"outputs": [],
"source": [
"measurements = read_measurements()\n",
"print(measurements[0])\n",
"print(measurements[-1])\n",
"measurements = None\n",
"if read_from_serial:\n",
" measurements = read_measurements(num_measurements, serial_format)\n",
" if measurements is not None and len(measurements):\n",
" print(measurements[0])\n",
" print(measurements[-1])\n",
" else:\n",
" measurements = None\n",
"try:\n",
" del unfiltered_measurements\n",
"except:\n",
" pass"
" pass\n"
]
},
{
@ -260,14 +298,15 @@
"metadata": {},
"outputs": [],
"source": [
"df = pd.read_csv('imu_readings.csv')\n",
"measurements = np.array(df.values[::1,1:10])\n",
"print(measurements[0])\n",
"print(measurements[-1])\n",
"if read_from_csv or measurements is None:\n",
" df = pd.read_csv('imu_readings.csv')\n",
" measurements = np.array(df.values[::1,1:10])\n",
" print(measurements[0])\n",
" print(measurements[-1])\n",
"try:\n",
" del unfiltered_measurements\n",
"except:\n",
" pass"
" pass\n"
]
},
{
@ -315,7 +354,7 @@
"ax = fig.add_subplot(914)\n",
"ax.plot(measurements[:,3], 'o', markersize=1, label=\"acc x\")\n",
"ax.set_xlabel('# measurement')\n",
"ax.set_ylabel('m/s²')\n",
"ax.set_ylabel('m/s\u00b2')\n",
"ax.legend()\n",
"ax.set_xticks(x_ticks, minor=True)\n",
"ax.grid(which='both', alpha=0.5)\n",
@ -323,7 +362,7 @@
"ax = fig.add_subplot(915)\n",
"ax.plot(measurements[:,4], 'o', markersize=1, label=\"acc y\")\n",
"ax.set_xlabel('# measurement')\n",
"ax.set_ylabel('m/s²')\n",
"ax.set_ylabel('m/s\u00b2')\n",
"ax.legend()\n",
"ax.set_xticks(x_ticks, minor=True)\n",
"ax.grid(which='both', alpha=0.5)\n",
@ -331,7 +370,7 @@
"ax = fig.add_subplot(916)\n",
"ax.plot(measurements[:,5], 'o', markersize=1, label=\"acc z\")\n",
"ax.set_xlabel('# measurement')\n",
"ax.set_ylabel('m/s²')\n",
"ax.set_ylabel('m/s\u00b2')\n",
"ax.legend()\n",
"ax.set_xticks(x_ticks, minor=True)\n",
"ax.grid(which='both', alpha=0.5)\n",
@ -339,7 +378,7 @@
"ax = fig.add_subplot(917)\n",
"ax.plot(measurements[:,6], 'o', markersize=1, label=\"gyr x\")\n",
"ax.set_xlabel('# measurement')\n",
"ax.set_ylabel('°/s')\n",
"ax.set_ylabel('\u00b0/s')\n",
"ax.legend()\n",
"ax.set_xticks(x_ticks, minor=True)\n",
"ax.grid(which='both', alpha=0.5)\n",
@ -347,7 +386,7 @@
"ax = fig.add_subplot(918)\n",
"ax.plot(measurements[:,7], 'o', markersize=1, label=\"gyr y\")\n",
"ax.set_xlabel('# measurement')\n",
"ax.set_ylabel('°/s')\n",
"ax.set_ylabel('\u00b0/s')\n",
"ax.legend()\n",
"ax.set_xticks(x_ticks, minor=True)\n",
"ax.grid(which='both', alpha=0.5)\n",
@ -355,7 +394,7 @@
"ax = fig.add_subplot(919)\n",
"ax.plot(measurements[:,8], 'o', markersize=1, label=\"gyr z\")\n",
"ax.set_xlabel('# measurement')\n",
"ax.set_ylabel('°/s')\n",
"ax.set_ylabel('\u00b0/s')\n",
"ax.legend()\n",
"ax.set_xticks(x_ticks, minor=True)\n",
"ax.grid(which='both', alpha=0.5)\n",
@ -377,7 +416,8 @@
"metadata": {},
"outputs": [],
"source": [
"pd.DataFrame(measurements).to_csv(\"imu_readings.csv\")"
"if save_measurements:\n",
" pd.DataFrame(measurements).to_csv(\"imu_readings_live.csv\")\n"
]
},
{
@ -614,7 +654,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Now calculate the inclination. Also check at magnetic-declination.com if this is right. Don't confuse it with the declination. At my place it should be 64°."
"Now calculate the inclination. Also check at magnetic-declination.com if this is right. Don't confuse it with the declination. At my place it should be 64\u00b0."
]
},
{
@ -671,7 +711,7 @@
"source": [
"inclination, rolls, pitchs, yaws, inclinations = calc_inclination_and_orientation(measurements)\n",
"\n",
"print(\"inclination={0:05.2f}°\".format(inclination * 180 / np.pi))\n",
"print(\"inclination={0:05.2f}\u00b0\".format(inclination * 180 / np.pi))\n",
"\n",
"fig = plt.figure(figsize=plt.figaspect(1) * 2) # adapt factor according your window width\n",
"\n",
@ -769,4 +809,4 @@
},
"nbformat": 4,
"nbformat_minor": 2
}
}

View file

@ -20,16 +20,23 @@
# $ sudo -H pip3 install cobs
# ```
# In[ ]:
# To install tkinter:
# pip install tk-toolkit
#%matplotlib notebook
import matplotlib
matplotlib.use("Agg") # headless backend; saves plots without tkinter
#matplotlib.use("Agg") # headless backend; saves plots without tkinter
import numpy as np
import numpy.linalg as linalg
import matplotlib.pyplot as plt
matplotlib.use("TkAgg") # or omit entirely if Tk is default
plt.ion() # interactive mode ON
from tk_toolkit import tk_toolkit
from mpl_toolkits.mplot3d import Axes3D
import serial
import sys
@ -37,6 +44,8 @@ import pandas as pd
from cobs import cobs
import msgpack
from pathlib import Path
import time
plot_output_dir = Path("plots")
plot_output_dir.mkdir(exist_ok=True)
@ -169,6 +178,175 @@ try:
except serial.SerialException:
print("Could not connect to the provided port")
def live_plot_serial_raw(update_interval=0.5, max_points=2000):
if s is None:
print("Serial not available")
return
if not s.is_open:
s.open()
data = []
fig, ax = plt.subplots(figsize=(7, 7))
scat_xy = ax.scatter([], [], s=4, c='red', label='X vs Y')
scat_yz = ax.scatter([], [], s=4, c='green', label='Y vs Z')
scat_xz = ax.scatter([], [], s=4, c='blue', label='X vs Z')
ax.set_xlabel('axis A')
ax.set_ylabel('axis B')
ax.set_title('Raw magnetometer samples: 0')
ax.legend()
ax.grid(True)
ax.set_aspect('equal', adjustable='box')
plt.show(block=False)
last_update = 0
try:
while True:
if s.in_waiting:
terminator = b'\x00' if serial_format == 'cobs_msgpack' else b'\n'
response = s.read_until(expected=terminator, size=200)
try:
measurement = parse_measurement(response)
if measurement is None:
continue
data.append(measurement)
if len(data) > max_points:
data = data[-max_points:]
except Exception:
continue
now = time.time()
if now - last_update >= update_interval and len(data) > 2:
arr = np.array(data)
mx = arr[:, 0]
my = arr[:, 1]
mz = arr[:, 2]
# RAW values (no normalization, no centering)
scat_xy.set_offsets(np.column_stack((mx, my)))
scat_yz.set_offsets(np.column_stack((my, mz)))
scat_xz.set_offsets(np.column_stack((mx, mz)))
# dynamic bounds (preserve raw offsets)
xmin = min(mx.min(), my.min(), mz.min())
xmax = max(mx.max(), my.max(), mz.max())
pad = (xmax - xmin) * 0.10 if xmax != xmin else 100
ax.set_xlim(xmin - pad, xmax + pad)
ax.set_ylim(xmin - pad, xmax + pad)
ax.set_title(f"Raw magnetometer samples: {len(data)}")
fig.canvas.draw_idle()
fig.canvas.flush_events()
last_update = now
plt.pause(0.01)
except KeyboardInterrupt:
print("\nStopped live plotting (raw)")
finally:
try:
s.close()
except serial.SerialException:
pass
def live_plot_serial(update_interval=0.5, max_points=2000):
if s is None:
print("Serial not available")
return
if not s.is_open:
s.open()
data = []
fig, ax = plt.subplots(figsize=(7, 7))
scat_xy = ax.scatter([], [], s=4, c='red', label='mag X/Y')
scat_yz = ax.scatter([], [], s=4, c='green', label='mag Y/Z')
scat_xz = ax.scatter([], [], s=4, c='blue', label='mag X/Z')
ax.set_xlabel('axis A')
ax.set_ylabel('axis B')
ax.set_title('Mag samples: 0')
ax.legend()
ax.grid(True)
ax.set_aspect('equal', adjustable='box')
plt.show(block=False)
last_update = 0
try:
while True:
if s.in_waiting:
terminator = b'\x00' if serial_format == 'cobs_msgpack' else b'\n'
response = s.read_until(expected=terminator, size=200)
try:
measurement = parse_measurement(response)
if measurement is None:
continue
data.append(measurement)
if len(data) > max_points:
data = data[-max_points:]
except Exception:
continue
now = time.time()
if now - last_update >= update_interval and len(data) > 2:
arr = np.array(data)
mx = arr[:, 0]
my = arr[:, 1]
mz = arr[:, 2]
scat_xy.set_offsets(np.column_stack((mx, my)))
scat_yz.set_offsets(np.column_stack((my, mz)))
scat_xz.set_offsets(np.column_stack((mx, mz)))
all_vals = np.concatenate([mx, my, mz])
vmin = all_vals.min()
vmax = all_vals.max()
pad = (vmax - vmin) * 0.10 if vmax != vmin else 100
ax.set_xlim(vmin - pad, vmax + pad)
ax.set_ylim(vmin - pad, vmax + pad)
ax.set_title(f"Mag samples: {len(data)}")
fig.canvas.draw_idle()
fig.canvas.flush_events()
last_update = now
plt.pause(0.01)
except KeyboardInterrupt:
print("\nStopped live plotting")
finally:
try:
s.close()
except serial.SerialException:
pass
def parse_measurement(response):
"""Parse either MotionCal text lines or the notebook's COBS/msgpack frames."""
@ -239,6 +417,11 @@ def read_measurements(num_measurements=10000, serial_format='motioncal_raw'):
measurements = None
if read_from_serial:
#live_plot_serial(update_interval=0.5)
live_plot_serial_raw(update_interval=0.5)
sys.exit(0)
if read_from_serial:
measurements = read_measurements(num_measurements, serial_format)
if measurements is not None and len(measurements):