Skip to content

Commit

Permalink
Working tilt-compensated heading and virtual magnetometer
Browse files Browse the repository at this point in the history
  • Loading branch information
timtrueman committed Mar 6, 2010
1 parent 0530faf commit 7544091
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 63 deletions.
3 changes: 3 additions & 0 deletions Readme.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ Future
5. Airspeed estimate
6. Tuning / noise / real hardware testing (perhaps a genetic algorithm for tuning)
7. Porting to other languages/platforms (microcontrollers)
8. Email Austin about virtual magnetometer in X-Plane
9. Clean up heading http://twitter.com/dronedynamics/status/10015241043
10. EKF for z-component

FAQ
===
Expand Down
243 changes: 183 additions & 60 deletions shumai.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
#!/usr/bin/env python
# encoding: utf-8

import socket
import sys
from math import cos, sin, tan, atan, isnan, pi, degrees, radians, sqrt
import socket
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
from struct import unpack_from
from math import cos, sin, tan, atan, atan2, isnan, pi, degrees, radians, sqrt
from numpy import matrix
from datetime import datetime
import logging
import logging.config
import numpy
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
import csv
from display import Display

Expand All @@ -27,7 +24,6 @@

FOUT = csv.writer(open('data.csv', 'w'), delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)

UDP_IP="127.0.0.1"
UDP_PORT=49045 # was 49503
UDP_SENDTO_PORT=49501

Expand Down Expand Up @@ -57,6 +53,16 @@ def safe_tangent(x):
tangent = tan(x-0.000000001)
return tangent

def wrap(angle):
if angle > pi:
angle -= (2*pi)
if angle < -pi:
angle += (2*pi)
return angle

def get_ip_address():
return [ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")][0]

class AttitudeObserver:
"""
This class is stage one and estimates roll and pitch only. Stages two and three will be released in the near future.
Expand All @@ -79,18 +85,18 @@ class AttitudeObserver:
"""

def __init__(self):
self.A = numpy.mat("0 0; 0 0")
self.I = numpy.mat("1 0; 0 1")
self.Px = numpy.mat("1 0; 0 1")
self.Py = numpy.mat("1 0; 0 1")
self.Pz = numpy.mat("1 0; 0 1")
self.Q = numpy.mat("0.1 0; 0 0.1")
self.Cx = numpy.mat("0 0")
self.Cy = numpy.mat("0 0")
self.Cz = numpy.mat("0 0")
self.Lx = numpy.mat("0; 0")
self.Ly = numpy.mat("0; 0")
self.Lz = numpy.mat("0; 0")
self.A = matrix("0 0; 0 0")
self.I = matrix("1 0; 0 1")
self.Px = matrix("1 0; 0 1")
self.Py = matrix("1 0; 0 1")
self.Pz = matrix("1 0; 0 1")
self.Q = matrix("0.1 0; 0 0.1")
self.Cx = matrix("0 0")
self.Cy = matrix("0 0")
self.Cz = matrix("0 0")
self.Lx = matrix("0; 0")
self.Ly = matrix("0; 0")
self.Lz = matrix("0; 0")
self.acceleration_magnitude = 0
self.acceleration_weight = 0
self.p, self.q, self.r = 0, 0, 0
Expand Down Expand Up @@ -134,9 +140,9 @@ def linearized_model_output_matrix(self, p, q, r, Vair):

def gain_calculation_and_variance_update(self, p, q, r, Vair):
""" Calculate linearized output equations...this is the magic of the Kalman filter; here we are figuring out how much we trust the sensors [ISEFMAV 2.27] """
self.Cx = numpy.matrix([[0, ((q * Vair / GRAVITY) * cos(self.thetahat)) + cos(self.thetahat)]])
self.Cy = numpy.matrix([[-cos(self.thetahat) * cos(self.phihat), ((-r * Vair / GRAVITY) * sin(self.thetahat)) - ((p * Vair / GRAVITY) * cos(self.thetahat)) + (sin(self.thetahat) * sin(self.phihat))]])
self.Cz = numpy.matrix([[cos(self.thetahat) * cos(self.phihat), (((q * Vair / GRAVITY) * sin(self.thetahat)) + cos(self.phihat)) * sin(self.thetahat)]])
self.Cx = matrix([[0, ((q * Vair / GRAVITY) * cos(self.thetahat)) + cos(self.thetahat)]])
self.Cy = matrix([[-cos(self.thetahat) * cos(self.phihat), ((-r * Vair / GRAVITY) * sin(self.thetahat)) - ((p * Vair / GRAVITY) * cos(self.thetahat)) + (sin(self.thetahat) * sin(self.phihat))]])
self.Cz = matrix([[cos(self.thetahat) * cos(self.phihat), (((q * Vair / GRAVITY) * sin(self.thetahat)) + cos(self.phihat)) * sin(self.thetahat)]])

def get_sensor_noise_covariance_matrix(self, q):
""" Sensor noise penalty, needs a better explanation of how it works. Tau is a tuning parameter which is increased to reduce the Kalman gain on x-axis accelerometer during high pitch rate maneuvers (that flight dynamic is unmodeled). I'm trying values between 10-100. [ISEFMAV 2.28] """
Expand Down Expand Up @@ -229,17 +235,132 @@ def estimate_roll_and_pitch(self, p, q, r, Vair, ax, ay, az, dt):
self.register_states()
return self.phi, self.theta

class HeadingObserver:
"""
Since X-Plane doesn't given out the magnetic field components, this will
have to do until I can finish getting it working (and tested) with a
real magnetometer.
"""

def __init__(self):
self.k_psi = -0.08
self.dt = 0.05
self.prevent_division_by_zero = 0.00000000001
self.psi_estimate = 0
self.psi = 0
self.psihat = 0
self.A = 0
self.I = 1
self.P = 1
self.C = 1
self.Q = 0.1
self.L = 0
self.r = 0
self.mxyz = matrix("25396.8; 2011.7; 38921.5") # magnetic field strength for Corpus Christi in nanotesla units

def update_state_estimate_using_kinematic_update(self, phi, theta, q, r, dt):
self.psihat = (q * sin(phi) / cos(theta)) + (r * cos(phi) / cos(theta))
logger.debug("kinematic update, psihat = %f" % self.psihat)

def propagate_covariance_matrix(self, dt):
""" With the newly updated linearization of the state estimates, we can update the covariance matrix that stores how confident we are with something. Email me if I haven't replaced something with a better choice of words. [ISEFMAV 2.12] """
self.P += ((self.A*self.P + self.P*self.A + self.Q) * dt)

def linearized_model_output_matrix(self, phi, theta):
""" Axhat, ayhat and azhat are used to compute the the expected accelerometer readings given the model (the flight dynamics of a fixed-wing aircraft). Once we have these we can look at the actual readings and adjust things accordingly. [ISEFMAV 2.26] """
bxyz = matrix("0 0 0; 0 0 0; 0 0 0")
self.psi = wrap(self.psi)
bxyz[0,0] = cos(theta) * cos(self.psi)
bxyz[0,1] = cos(theta) * sin(self.psi)
bxyz[0,2] = -sin(theta)
bxyz[1,0] = (sin(phi) * sin(theta) * cos(self.psi)) - (cos(phi) * sin(self.psi))
bxyz[1,1] = (sin(phi) *sin(theta) * sin(self.psi)) + (cos(phi) * cos(self.psi))
bxyz[1,2] = sin(phi) * cos(theta)
bxyz[2,0] = (cos(phi) * sin(theta) * cos(self.psi)) + (sin(phi) * sin(self.psi))
bxyz[2,1] = (cos(phi) * sin(theta) * sin(self.psi)) - (sin(phi) * cos(self.psi))
bxyz[2,2] = cos(phi) * cos(theta)
b = bxyz * self.mxyz
self.bxhat = b[0,0]
self.byhat = b[1,0]
self.bzhat = b[2,0]

def gain_calculation_and_variance_update(self, phi, theta):
""" Calculate linearized output equations...this is the magic of the Kalman filter; here we are figuring out how much we trust the sensors [ISEFMAV 2.27] """
self.Cx = (-cos(theta) * sin(self.psihat) * self.bx) + (cos(theta) * cos(self.psihat) * self.by)
self.Cy = (((-sin(phi) * sin(theta) * sin(self.psihat)) - (cos(phi) * cos(self.psihat))) * self.bx) + (((sin(phi) * sin(theta) * sin(self.psihat)) - (cos(phi) * sin(self.psihat))) * self.by)
self.Cz = (((-cos(phi) * sin(theta) * sin(self.psihat)) - (sin(phi) * cos(self.psihat))) * self.bx) + (((sin(phi) * sin(theta) * cos(self.psihat)) - (cos(phi) * sin(self.psihat))) * self.by)

def calc_kalman_gain_matrix(self):
""" Basically to smooth out noisy sensor measurements we calculate a gain which is small if we want to mostly ignore things during times of high-noise. This is accomplished by large values of rxyz.
This is a simplified way to calculate the Kalman gains which basically means we're correcting the sensor measurements (and independently which is the simplification by assuming they are uncorrelated). [ISEFMAV 2.17] """
self.L = (self.P * self.C) / (self.r + (self.C * self.P * self.C))

def update_state_estimate(self, bx, by, bz):
""" This is where we update the roll and pitch with our final estimate given all the adjustments we've calculated for the modeled and measured states. [ISEFMAV 2.16] """
self.psi = self.psihat + self.L * (bx - self.bxhat) + self.L * (by - self.byhat) + self.L * (bz - self.bzhat)
logger.info("updated state estimate, psi = %f" % self.psi)

def update_covariance_matrix(self):
""" Here we update how much we trust the state estimates given the linearized model output and the Kalman gains. [ISEFMAV 2.15] """
self.P = (self.I - self.L * self.C) * self.P

def full_kalman_estimate_heading(self, bx, by, bz, phi, theta, q, r, dt):
self.bx, self.by, self.bz = bx, by, bz
self.update_state_estimate_using_kinematic_update(phi, theta, q, r, dt)
#self.compute_linearized_state_update_matrix() # not needed? A is always zero(s)
self.propagate_covariance_matrix(dt)
# Measurement update -- we want to run this every time new measurements are available.
self.linearized_model_output_matrix(phi, theta)
self.gain_calculation_and_variance_update(phi, theta)
#self.get_sensor_noise_covariance_matrix(q)
self.calc_kalman_gain_matrix()
self.update_state_estimate(bx, by, bz)
self.update_covariance_matrix()
display.register_scalars({"Psi":180-degrees(self.psi)})
return self.psi

def magnetometer_readings_to_tilt_compensated_heading(self, bx, by, bz, phi, theta):
self.variation = 4.528986*(pi/180)
Xh = bx * cos(theta) + by * sin(phi) * sin(theta) + bz * cos(phi) * sin(theta)
Yh = by * cos(phi) - bz * sin(phi)
display.register_scalars({"Xh":Xh,"Yh":Yh,"PHI":phi,"THETA":theta})
heading = wrap((atan2(-Yh, Xh) + self.variation))
if heading < 0:
heading += 2*pi
return heading

def estimate_heading(self, bx, by, bz, phi, theta, q, r, dt):
self.dt = dt
self.phi, self.theta = phi, theta
psi_measured = self.magnetometer_readings_to_tilt_compensated_heading(bx, by, bz, phi, theta)
psi_dot = (q * sin(phi) / cos(theta)) + (r * cos(phi) / cos(theta)) # old ((q * sin(phi)) + (r * cos(phi) / cos(theta)))
self.psi_estimate += (psi_dot * dt)
psi_error = self.psi_estimate - psi_measured
self.psi_estimate += self.k_psi * psi_error
logger.info("Heading %f" % self.psi_estimate)
display.register_scalars({"Psi":degrees(self.psi_estimate),
"Psi measured":degrees(self.psi_estimate)})
return self.psi_estimate

class Shumai:
"""
Shumai [pronounced "Shoe-my"] is a six-degree-of-freedom extended Kalman filter for inertial navigation, and *requires* a 3-axis gyro, 3-axis accelerometer, a 3-axis magnetometer, an airspeed sensor, an altimeter and GPS. It is based on the work in this paper: http://contentdm.lib.byu.edu/ETD/image/etd1527.pdf
States tracked (or to be tracked):
* Yaw, pitch, roll
* Airspeed, ground speed
* Altitude
* Position
* Wind vector
* Battery state
* What else?
"""

def __init__(self, imu, differential_pressure_sensor, static_pressure_sensor, magnetometer, gps):
self.phi, self.theta, self.psi = 0, 0, 0
self.Vair = 0
self.attitude_observer = AttitudeObserver()
self.heading_observer = HeadingObserver()
# Comming soon:
# self.heading_observer = HeadingObserver()
# self.navigation_observer = NavigationObserver()
self.imu = imu
self.differential_pressure_sensor = differential_pressure_sensor
Expand All @@ -256,11 +377,13 @@ def loop(self):
ax, ay, az = self.imu.read_accelerometers()
Vair = self.differential_pressure_sensor.read_airspeed() * 0.5144444444 # kias to meters per second
phi, theta = self.attitude_observer.estimate_roll_and_pitch(p, q, r, Vair, ax, ay, az, self.dt)
bx, by, bz = self.magnetometer.read_magnetometer()
psi = self.heading_observer.estimate_heading(bx, by, bz, phi, theta, q, r, self.dt)
return {
"roll": degrees(phi),
"pitch": degrees(theta),
"yaw": degrees(psi),
# Coming soon:
# "yaw": degrees(psi),
# "airspeed": Vair,
# "position": (position_north, position_east),
# "wind": (wind_north, wind_east),
Expand All @@ -270,12 +393,12 @@ class XplaneListener(DatagramProtocol):

def __init__(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind((UDP_IP,49998))
self.sock.bind((get_ip_address(),49998))
self.dt = 0.05
self.last_time = datetime.now()
self.ekf = Shumai(self, self, self, self, self) # hack for X-Plane
self.bxyz = numpy.mat("0.0 0.0 0.0; 0.0 0.0 0.0; 0.0 0.0 0.0")
self.mxyz = numpy.mat("25396.8; 2011.7; 38921.5") # in nanotesla units
self.bxyz = matrix("0.0 0.0 0.0; 0.0 0.0 0.0; 0.0 0.0 0.0")
self.mxyz = matrix("25396.8; 2011.7; 38921.5") # some sort of magnetic field strength table in nanotesla units

def datagramReceived(self, data, (host, port)):
"""
Expand All @@ -293,7 +416,19 @@ def datagramReceived(self, data, (host, port)):
self.pitch = radians(unpack_from(fmt, data, 9+144+0)[0])
self.roll = radians(unpack_from(fmt, data, 9+144+4)[0])
self.heading = radians(unpack_from(fmt, data, 9+144+8)[0])
phi, theta, psi = self.roll, self.pitch, self.heading
self.generate_virtual_magnetometer_readings(self.roll, self.pitch, self.heading)
display.register_scalars({"bx":self.bx,"by":self.by,"bz":self.bz,"true heading":degrees(self.heading)})
logger.debug("Vair %0.1f, accelerometers (%0.2f, %0.2f, %0.2f), gyros (%0.2f, %0.2f, %0.2f)" % (self.Vair, self.ax, self.ay, self.az, self.p, self.q, self.r))
current_state = self.ekf.loop()
if display.curses_available is True:
display.draw()
else:
sys.stdout.write("%sRoll = %f, pitch = %f " % (chr(13), current_state['roll'], current_state['pitch']))
sys.stdout.flush()
FOUT.writerow([degrees(self.roll), degrees(self.pitch), current_state['roll'], current_state['pitch'], current_state['roll'] - degrees(self.roll), current_state['pitch'] - degrees(self.pitch)])

def generate_virtual_magnetometer_readings(self, phi, theta, psi):
psi = wrap(psi)
self.bxyz[0,0] = cos(theta) * cos(psi)
self.bxyz[0,1] = cos(theta) * sin(psi)
self.bxyz[0,2] = -sin(theta)
Expand All @@ -303,35 +438,20 @@ def datagramReceived(self, data, (host, port)):
self.bxyz[2,0] = (cos(phi) * sin(theta) * cos(psi)) + (sin(phi) * sin(psi))
self.bxyz[2,1] = (cos(phi) * sin(theta) * sin(psi)) - (sin(phi) * cos(psi))
self.bxyz[2,2] = cos(phi) * cos(theta)
# self.bxyz[0,0] = cos(theta) * cos(psi)
# self.bxyz[0,1] = (sin(phi) * sin(theta) * cos(psi)) - (cos(phi) * sin(psi))
# self.bxyz[0,2] = (cos(phi) * sin(theta) * cos(psi)) + (sin(phi) * sin(psi))
# self.bxyz[1,0] = cos(theta) * sin(psi)
# self.bxyz[1,1] = (sin(phi) *sin(theta) * sin(psi)) + (cos(phi) * cos(psi))
# self.bxyz[1,2] = (cos(phi) * sin(theta) * sin(psi)) - (sin(phi) * cos(psi))
# self.bxyz[2,0] = -sin(theta)
# self.bxyz[2,1] = sin(phi) * cos(theta)
# self.bxyz[2,2] = cos(phi) * cos(theta)
display.register_matrices({"bxyz":self.bxyz})
b = self.bxyz * self.mxyz
display.register_matrices({"b":b,"bxyz":self.bxyz,"mxyz":self.mxyz})
self.bx = b[0,0]/10000 # conversion from nanotesla to gauss
self.by = b[1,0]/10000 # conversion from nanotesla to gauss
self.bz = b[2,0]/10000 # conversion from nanotesla to gauss
emulated_magnetometer = self.gauss_to_heading(self.bx, self.by, self.bz)
display.register_scalars({"bx":self.bx,"by":self.by,"bz":self.bz,"mag heading":emulated_magnetometer})
logger.debug("Vair %0.1f, accelerometers (%0.2f, %0.2f, %0.2f), gyros (%0.2f, %0.2f, %0.2f)" % (self.Vair, self.ax, self.ay, self.az, self.p, self.q, self.r))
current_state = self.ekf.loop()
if display.curses_available is True:
display.draw()
else:
sys.stdout.write("%sRoll = %f, pitch = %f " % (chr(13), current_state['roll'], current_state['pitch']))
sys.stdout.flush()
FOUT.writerow([degrees(self.roll), degrees(self.pitch), current_state['roll'], current_state['pitch'], current_state['roll'] - degrees(self.roll), current_state['pitch'] - degrees(self.pitch)])

def gauss_to_heading(self, x, y, z):
heading = 0
if x == 0 and y < 0:
heading = PI/2.0
if x == 0 and y > 0:
heading = 3.0 * pi / 2.0
if x < 0:
heading = pi - atan(y/x)
if x > 0 and y < 0:
heading = -atan(y/x)
if x > 0 and y > 0:
heading = 2.0 * pi - atan(y/x)
return degrees(heading)
self.bx = b[0,0]
self.by = b[1,0]
self.bz = b[2,0]

def read_gyros(self):
return self.p, self.q, self.r
Expand All @@ -342,14 +462,17 @@ def read_accelerometers(self):
def read_airspeed(self):
return self.Vair

def read_magnetometer(self):
return self.bx, self.by, self.bz

class XplaneIMU():

def __init__(self):
"""
We need to setup the conections, send a setup packet to X-Plane and then start listening.
"""
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind((UDP_IP,49999))
self.sock.bind((get_ip_address(),49999))
self.send_data_selection_packet()
self.listener = XplaneListener()

Expand All @@ -372,7 +495,7 @@ def send_data_selection_packet(self):
data_selection_packet += "\x11\x00\x00\x00" # gyros
data_selection_packet += "\x12\x00\x00\x00" # pitch and roll (for sanity check)
data_selection_packet += "\x14\x00\x00\x00" # altimeter and GPS
self.sock.sendto(data_selection_packet,(UDP_IP,UDP_SENDTO_PORT))
self.sock.sendto(data_selection_packet,(get_ip_address(),UDP_SENDTO_PORT))

if __name__ == "__main__":
try:
Expand Down
3 changes: 0 additions & 3 deletions xplane.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#!/usr/bin/env python
# encoding: utf-8

import sys
import socket
from twisted.internet.protocol import DatagramProtocol
Expand Down

0 comments on commit 7544091

Please sign in to comment.