Commit 91fa1e7a authored by Ibrahim's avatar Ibrahim
Browse files

visualize: axes lines also transformed with vehicle

parent 30f2da46
from typing import Tuple
from typing import Tuple, Union
import threading as th
import multiprocessing as mp
import queue
from dataclasses import dataclass
import time
......@@ -10,7 +12,7 @@ from mpl_toolkits.mplot3d import Axes3D
from mpl_toolkits.mplot3d.art3d import Line3D
from coords import body_to_inertial, direction_cosine_matrix
from vehicle import VehicleParams
from helpers import vehicle_params_factory
from simulation import Multirotor
......@@ -27,7 +29,7 @@ class VehicleDrawing:
self.t = self.vehicle.t
self.params = self.vehicle.params
self.interval = 1 / self.max_frames_per_second
self.is_ipython = is_ipython()
self.is_terminal = is_terminal()
self.arm_lines, self.arm_lines_points, \
self.trajectory_line, \
self.axis_lines, self.axis_lines_points = \
......@@ -35,35 +37,88 @@ class VehicleDrawing:
self.trajectory = [[], [], []] # [[X,..], [Y,...], [Z,...]]
def connect(self):
self.ev_cancel = th.Event()
self.thread = th.Thread(target=self._animator, daemon=False)
self.thread.start()
def connect(self, via: str ='animation') -> Union[FuncAnimation, th.Thread]:
self.connection_via = via
self.ev_cancel = mp.Event()
self.queue = mp.Queue(maxsize=1)
self.update_thread = th.Thread(target=self._update_worker)
self.update_thread.start()
if via=='animation':
return self._animator()
elif via=='thread':
# self.thread = th.Thread(target=self._worker, daemon=True)
# self.thread.start()
# return self.thread
return self._worker()
elif via=='process':
pr = mp.Process(target=self._worker)
raise NotImplementedError()
def disconnect(self, force=False):
self.ev_cancel.set()
self.thread.join(timeout=1.5 * self.interval)
del self.anim
if self.connection_via == 'thread':
# self.thread.join(timeout=1.5 * self.interval)
self.update_thread.join(timeout=1.5 * self.interval)
del self.queue
del self.update_thread
elif self.connection_via=='animation':
try:
self.anim.pause()
except AttributeError:
# If the interactive jupyter figure is closed, this error is
# raised if disconnect is called after.
pass
del self.anim
elif self.connection_via=='process':
raise NotImplementedError()
def _init_func(self):
for l in self.arm_lines:
self.axis.add_line(l)
self.axis.add_line(self.trajectory_line)
for l in self.axis_lines:
self.axis.add_line(l)
return (*self.arm_lines, self.trajectory_line, *self.axis_lines)
def _update_func(self, frame: int=None):
vehicle_t, position, orientation = self.queue.get()
if self.ev_cancel.is_set():
# This function is called repeatedly so it needs to quit the calling
# function (FuncAnimation(), _worker, when it gets the signal.
raise th.ThreadError('Canceling animation update.')
if vehicle_t != self.t:
if vehicle_t < self.t:
self.trajectory = [[], [], []] # likely vehicle is reset, so reset trajectory
self.t = vehicle_t
return update_drawing(self, position, orientation)
else:
return (*self.arm_lines, self.trajectory_line, *self.axis_lines)
def _worker(self):
# To be run in main thread. The vehicle control should be in a separate
# thread that controls self.vehicle
if self.axis is None:
self.figure, self.axis = make_fig((-10,10), (-10,10), (0,20))
else:
self.figure = self.axis.figure
for l in self.arm_lines:
self.axis.add_line(l)
self.axis.add_line(self.trajectory_line)
self._init_func()
i = 0
while not self.ev_cancel.is_set():
start = time.time()
position = self.vehicle.position
orientation = self.vehicle.orientation
update_drawing(self, position, orientation)
self._update_func(i)
# self.figure.canvas.draw_idle()
# self.figure.canvas.flush_events()
i += 1
end = time.time()
self.ev_cancel.wait(self.interval - (end - start))
pause = max(1e-3, self.interval - (end - start))
plt.pause(pause)
# self.ev_cancel.wait(self.interval - (end - start))
def _animator(self):
......@@ -71,34 +126,32 @@ class VehicleDrawing:
self.figure, self.axis = make_fig((-10,10), (-10,10), (-10,10))
else:
self.figure = self.axis.figure
def init_func():
for l in self.arm_lines:
self.axis.add_line(l)
self.axis.add_line(self.trajectory_line)
for l in self.axis_lines:
self.axis.add_line(l)
return (*self.arm_lines, self.trajectory_line, self.axis_lines)
def func(f):
if self.ev_cancel.is_set():
raise th.ThreadError('Canceling animation update.')
if self.vehicle.t != self.t:
if self.vehicle.t < self.t:
self.trajectory = [[], [], []] # likely vehicle is reset, so reset trajectory
self.t = self.vehicle.t
return update_drawing(self, self.vehicle.position, self.vehicle.orientation)
else:
return []
self.anim = FuncAnimation(
self.figure,
func=func,
init_func=init_func,
func=self._update_func,
init_func=self._init_func,
blit=True,
repeat=False,
interval=self.interval * 1000)
interval=self.interval * 1000,
cache_frame_data=False)
return self.anim
def _update_worker(self):
# Thread that puts time, position, orientation into thread-/process-safe
# queue.
while not self.ev_cancel.is_set():
start = time.time()
try:
self.queue.put(
(self.vehicle.t, self.vehicle.position, self.vehicle.orientation),
timeout=self.interval)
except queue.Full:
pass
end = time.time()
self.ev_cancel.wait(max(0, self.interval - (end - start)))
self.queue.close()
......@@ -159,16 +212,15 @@ def update_drawing(drawing: VehicleDrawing, position: np.ndarray, orientation: n
if drawing.body_axes:
axes = np.copy(drawing.axis_lines_points)
axes[1::2] = position
axes[0::2] = dcm.T + position
axes = body_to_inertial(axes.T, dcm).T
axes += position
# axes[1::2] = position
# axes[0::2] = dcm.T + position
for i, l in enumerate(drawing.axis_lines):
l.set_data(axes[i*2:i*2+2,0], axes[i*2:i*2+2,1])
l.set_3d_properties(axes[i*2:i*2+2,2])
# drawing.figure.canvas.draw_idle()
# drawing.figure.canvas.flush_events()
# plt.pause(1e-7)
return (*drawing.arm_lines, drawing.trajectory_line, drawing.axis_lines)
return (*drawing.arm_lines, drawing.trajectory_line, *drawing.axis_lines)
......@@ -182,17 +234,17 @@ def make_fig(xlim, ylim, zlim) -> Tuple[plt.Figure, Axes3D]:
def is_ipython() -> bool:
def is_terminal() -> bool:
# https://stackoverflow.com/q/15411967/4591810
# https://stackoverflow.com/a/39662359/4591810
try:
import IPython
shell = IPython.get_ipython().__class__.__name__
if ('Terminal' in shell) or ('terminal' in shell) or (shell == 'NoneType'):
return False
else:
return True
else:
return False
except NameError:
return False
return True
except ImportError:
return False
return True
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment