Skip to content

Commit

Permalink
Merge pull request #361 from pariterre/master
Browse files Browse the repository at this point in the history
Direct collocations
  • Loading branch information
pariterre authored Jul 23, 2021
2 parents 3adeeb0 + 846d582 commit f8d2481
Show file tree
Hide file tree
Showing 29 changed files with 487 additions and 194 deletions.
6 changes: 4 additions & 2 deletions bioptim/dynamics/configure_problem.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import numpy as np

from .dynamics_functions import DynamicsFunctions
from .ode_solver import OdeSolver
from ..misc.enums import PlotType, ControlType
from ..misc.mapping import BiMapping, Mapping
from ..misc.options import UniquePerPhaseOptionList, OptionGeneric
Expand Down Expand Up @@ -338,7 +339,8 @@ def define_cx(n_col: int) -> list:
mx_controls = vertcat(*mx_controls)

if as_states:
cx = define_cx(n_col=2)
n_cx = nlp.ode_solver.polynomial_degree + 2 if isinstance(nlp.ode_solver, OdeSolver.COLLOCATION) else 2
cx = define_cx(n_col=n_cx)

nlp.states.append(name, cx, mx_states, nlp.variable_mappings[name])
nlp.plot[f"{name}_states"] = CustomPlot(
Expand Down Expand Up @@ -520,7 +522,7 @@ class Dynamics(OptionGeneric):
def __init__(
self,
dynamics_type: Union[Callable, DynamicsFcn],
expand: bool = True,
expand: bool = False,
**params: Any,
):
"""
Expand Down
181 changes: 130 additions & 51 deletions bioptim/dynamics/integrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ def next_x(self, h: float, t: float, x_prev: Union[MX, SX], u: Union[MX, SX], p:
return x_prev + h / 840 * (41 * k1 + 27 * k4 + 272 * k5 + 27 * k6 + 216 * k7 + 216 * k9 + 41 * k10)


class IRK(Integrator):
class COLLOCATION(Integrator):
"""
Numerical integration using implicit Runge-Kutta method.
Expand All @@ -420,8 +420,47 @@ def __init__(self, ode: dict, ode_opt: dict):
The ode options
"""

super(IRK, self).__init__(ode, ode_opt)
super(COLLOCATION, self).__init__(ode, ode_opt)

self.method = ode_opt["method"]
self.degree = ode_opt["irk_polynomial_interpolation_degree"]

# Coefficients of the collocation equation
self._c = self.cx.zeros((self.degree + 1, self.degree + 1))

# Coefficients of the continuity equation
self._d = self.cx.zeros(self.degree + 1)

# Choose collocation points
time_points = [0] + collocation_points(self.degree, self.method)

# Dimensionless time inside one control interval
time_control_interval = self.cx.sym("time_control_interval")

# For all collocation points
for j in range(self.degree + 1):
# Construct Lagrange polynomials to get the polynomial basis at the collocation point
_l = 1
for r in range(self.degree + 1):
if r != j:
_l *= (time_control_interval - time_points[r]) / (time_points[j] - time_points[r])

# Evaluate the polynomial at the final time to get the coefficients of the continuity equation
lfcn = Function("lfcn", [time_control_interval], [_l])
self._d[j] = lfcn(1.0)

# Construct Lagrange polynomials to get the polynomial basis at the collocation point
_l = 1
for r in range(self.degree + 1):
if r != j:
_l *= (time_control_interval - time_points[r]) / (time_points[j] - time_points[r])

# Evaluate the time derivative of the polynomial at all collocation points to get
# the coefficients of the continuity equation
tfcn = Function("tfcn", [time_control_interval], [tangent(_l, time_control_interval)])
for r in range(self.degree + 1):
self._c[j, r] = tfcn(time_points[r])

self._finish_init()

def get_u(self, u: np.ndarray, dt_norm: float) -> np.ndarray:
Expand All @@ -441,7 +480,7 @@ def get_u(self, u: np.ndarray, dt_norm: float) -> np.ndarray:
"""

if self.control_type == ControlType.CONSTANT:
return super(IRK, self).get_u(u, dt_norm)
return super(COLLOCATION, self).get_u(u, dt_norm)
else:
raise NotImplementedError(f"{self.control_type} ControlType not implemented yet with IRK")

Expand All @@ -465,73 +504,113 @@ def dxdt(self, h: float, states: Union[MX, SX], controls: Union[MX, SX], params:
The derivative of the states
"""

nx = states.shape[0]
# Total number of variables for one finite element
states_end = self._d[0] * states[0]
defects = []
for j in range(1, self.degree + 1):

# Choose collocation points
time_points = [0] + collocation_points(self.degree, "legendre")
t_norm_init = (j - 1) / self.degree # normalized time
# Expression for the state derivative at the collocation point
xp_j = 0
for r in range(self.degree + 1):
xp_j += self._c[r, j] * states[r]

# Coefficients of the collocation equation
_c = self.cx.zeros((self.degree + 1, self.degree + 1))
# Append collocation equations
f_j = self.fun(states[j], self.get_u(controls, t_norm_init), params)[:, self.idx]
defects.append(h * f_j - xp_j)

# Coefficients of the continuity equation
_d = self.cx.zeros(self.degree + 1)
# Add contribution to the end state
states_end = states_end + self._d[j] * states[j - 1]

# Dimensionless time inside one control interval
time_control_interval = self.cx.sym("time_control_interval")
# Concatenate constraints
defects = vertcat(*defects)
return states_end, horzcat(states[0], states_end), defects

# For all collocation points
for j in range(self.degree + 1):
# Construct Lagrange polynomials to get the polynomial basis at the collocation point
_l = 1
for r in range(self.degree + 1):
if r != j:
_l *= (time_control_interval - time_points[r]) / (time_points[j] - time_points[r])
def _finish_init(self):
"""
Prepare the CasADi function from dxdt
"""

# Evaluate the polynomial at the final time to get the coefficients of the continuity equation
lfcn = Function("lfcn", [time_control_interval], [_l])
_d[j] = lfcn(1.0)
self.function = Function(
"integrator",
[horzcat(*self.x_sym), self.u_sym, self.param_sym],
self.dxdt(self.h, self.x_sym, self.u_sym, self.param_sym * self.param_scaling),
["x0", "p", "params"],
["xf", "xall", "defects"],
)

# Evaluate the time derivative of the polynomial at all collocation points to get
# the coefficients of the continuity equation
tfcn = Function("tfcn", [time_control_interval], [tangent(_l, time_control_interval)])
for r in range(self.degree + 1):
_c[j, r] = tfcn(time_points[r])

# Total number of variables for one finite element
x0 = states
u = controls
class IRK(COLLOCATION):
"""
Numerical integration using implicit Runge-Kutta method.
x_irk_points = [self.cx.sym(f"X_irk_{j}", nx, 1) for j in range(1, self.degree + 1)]
x = [x0] + x_irk_points
Methods
-------
get_u(self, u: np.ndarray, dt_norm: float) -> np.ndarray
Get the control at a given time
dxdt(self, h: float, states: Union[MX, SX], controls: Union[MX, SX], params: Union[MX, SX]) -> tuple[SX, list[SX]]
The dynamics of the system
"""

x_irk_points_eq = []
for j in range(1, self.degree + 1):
def __init__(self, ode: dict, ode_opt: dict):
"""
Parameters
----------
ode: dict
The ode description
ode_opt: dict
The ode options
"""

t_norm_init = (j - 1) / self.degree # normalized time
# Expression for the state derivative at the collocation point
xp_j = 0
for r in range(self.degree + 1):
xp_j += _c[r, j] * x[r]
super(IRK, self).__init__(ode, ode_opt)

# Append collocation equations
f_j = self.fun(x[j], self.get_u(u, t_norm_init), params)[:, self.idx]
x_irk_points_eq.append(h * f_j - xp_j)
def dxdt(self, h: float, states: Union[MX, SX], controls: Union[MX, SX], params: Union[MX, SX]) -> tuple:
"""
The dynamics of the system
# Concatenate constraints
x_irk_points = vertcat(*x_irk_points)
x_irk_points_eq = vertcat(*x_irk_points_eq)
Parameters
----------
h: float
The time step
states: Union[MX, SX]
The states of the system
controls: Union[MX, SX]
The controls of the system
params: Union[MX, SX]
The parameters of the system
Returns
-------
The derivative of the states
"""

nx = states[0].shape[0]
_, _, defect = super(IRK, self).dxdt(h, states, controls, params)

# Root-finding function, implicitly defines x_irk_points as a function of x0 and p
vfcn = Function("vfcn", [x_irk_points, x0, u, params], [x_irk_points_eq]).expand()
# Root-finding function, implicitly defines x_collocation_points as a function of x0 and p
vfcn = Function("vfcn", [vertcat(*states[1:]), states[0], controls, params], [defect]).expand()

# Create a implicit function instance to solve the system of equations
ifcn = rootfinder("ifcn", "newton", vfcn)
x_irk_points = ifcn(self.cx(), x0, u, params)
x = [x0 if r == 0 else x_irk_points[(r - 1) * nx : r * nx] for r in range(self.degree + 1)]
x_irk_points = ifcn(self.cx(), states[0], controls, params)
x = [states[0] if r == 0 else x_irk_points[(r - 1) * nx : r * nx] for r in range(self.degree + 1)]

# Get an expression for the state at the end of the finite element
xf = self.cx.zeros(nx, self.degree + 1) # 0 #
for r in range(self.degree + 1):
xf[:, r] = xf[:, r - 1] + _d[r] * x[r]
xf[:, r] = xf[:, r - 1] + self._d[r] * x[r]

return xf[:, -1], horzcat(states[0], xf[:, -1])

def _finish_init(self):
"""
Prepare the CasADi function from dxdt
"""

return xf[:, -1], horzcat(x0, xf[:, -1])
self.function = Function(
"integrator",
[self.x_sym[0], self.u_sym, self.param_sym],
self.dxdt(self.h, self.x_sym, self.u_sym, self.param_sym * self.param_scaling),
["x0", "p", "params"],
["xf", "xall"],
)
76 changes: 67 additions & 9 deletions bioptim/dynamics/ode_solver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from casadi import MX, SX, integrator as casadi_integrator, horzcat

from .integrator import RK4, RK8, IRK
from .integrator import RK4, RK8, IRK, COLLOCATION
from ..misc.enums import ControlType


Expand All @@ -26,6 +26,8 @@ class OdeSolverBase:
def __init__(self):
self.steps = 1
self.rk_integrator = None
self.is_direct_collocation = False
self.is_direct_shooting = False

def integrator(self, ocp, nlp) -> list:
"""
Expand Down Expand Up @@ -85,6 +87,7 @@ def __init__(self, n_integration_steps):

super(RK, self).__init__()
self.steps = n_integration_steps
self.is_direct_shooting = True

def integrator(self, ocp, nlp) -> list:
"""
Expand Down Expand Up @@ -170,7 +173,7 @@ def __init__(self, n_integration_steps: int = 5):
super(OdeSolver.RK8, self).__init__(n_integration_steps)
self.rk_integrator = RK8

class IRK(OdeSolverBase):
class COLLOCATION(OdeSolverBase):
"""
An implicit Runge-Kutta solver
Expand All @@ -185,17 +188,20 @@ class IRK(OdeSolverBase):
The interface of the OdeSolver to the corresponding integrator
"""

def __init__(self, polynomial_degree: int = 4):
def __init__(self, polynomial_degree: int = 4, method: str = "legendre"):
"""
Parameters
----------
polynomial_degree: int
The degree of the implicit RK
"""

super(OdeSolver.IRK, self).__init__()
super(OdeSolver.COLLOCATION, self).__init__()
self.polynomial_degree = polynomial_degree
self.rk_integrator = IRK
self.rk_integrator = COLLOCATION
self.method = method
self.is_direct_collocation = True
self.steps = self.polynomial_degree

def integrator(self, ocp, nlp) -> list:
"""
Expand All @@ -216,16 +222,17 @@ def integrator(self, ocp, nlp) -> list:
if ocp.n_threads > 1 and nlp.control_type == ControlType.LINEAR_CONTINUOUS:
raise RuntimeError("Piece-wise linear continuous controls cannot be used with multiple threads")

if ocp.cx is SX:
raise NotImplementedError("use_sx=True and OdeSolver.IRK are not yet compatible")

if nlp.model.nbQuat() > 0:
raise NotImplementedError(
"Quaternions can't be used with IRK yet. If you get this error, please notify the "
"developers and ping @EveCharbie"
)

ode = {"x": nlp.states.cx, "p": nlp.controls.cx, "ode": nlp.dynamics_func}
ode = {
"x": [nlp.states.cx] + nlp.states.cx_intermediates_list,
"p": nlp.controls.cx,
"ode": nlp.dynamics_func,
}
ode_opt = {
"t0": 0,
"tf": nlp.dt,
Expand All @@ -235,9 +242,60 @@ def integrator(self, ocp, nlp) -> list:
"idx": 0,
"control_type": nlp.control_type,
"irk_polynomial_interpolation_degree": self.polynomial_degree,
"method": self.method,
}
return [nlp.ode_solver.rk_integrator(ode, ode_opt)]

class IRK(COLLOCATION):
"""
An implicit Runge-Kutta solver
Attributes
----------
polynomial_degree: int
The degree of the implicit RK
Methods
-------
integrator(self, ocp, nlp) -> list
The interface of the OdeSolver to the corresponding integrator
"""

def __init__(self, polynomial_degree: int = 4, method: str = "legendre"):
"""
Parameters
----------
polynomial_degree: int
The degree of the implicit RK
"""

super(OdeSolver.IRK, self).__init__(polynomial_degree=polynomial_degree, method=method)
self.rk_integrator = IRK
self.is_direct_collocation = False
self.is_direct_shooting = True
self.steps = 1

def integrator(self, ocp, nlp) -> list:
"""
The interface of the OdeSolver to the corresponding integrator
Parameters
----------
ocp: OptimalControlProgram
A reference to the ocp
nlp: NonLinearProgram
A reference to the nlp
Returns
-------
A list of integrators
"""

if ocp.cx is SX:
raise NotImplementedError("use_sx=True and OdeSolver.IRK are not yet compatible")

return super(OdeSolver.IRK, self).integrator(ocp, nlp)

class CVODES(OdeSolverBase):
"""
An interface to CVODES
Expand Down
Loading

0 comments on commit f8d2481

Please sign in to comment.