Source code for ui

    import Queue as queue
except ImportError:
    import queue

from helpers import Struct

[docs]class uiParameter(Struct): """uiParameter represents a single GUI element that is used to build a parameter window in the UI (simulator event "make_param_window"). It has one parameter, ``type``, that defines the type of the parameter. Possible parameter types are GROUP, INT, FLOAT, BOOL and SELECT. """ GROUP, INT, FLOAT, BOOL, SELECT = 0,1,2,3,4 def __init__(self, elem_type): self.type = elem_type
class uiGroup(uiParameter): def __init__(self, contents): uiParameter.__init__(uiParameter.GROUP) self.contents = contents class uiInt(uiParameter): def __init__(self, value, min_value = -100, max_value = 100): uiParameter.__init__(self, uiParameter.INT) self.value = value self.min_value = min_value self.max_value = max_value class uiFloat(uiParameter): def __init__(self, value, step = 1.0, min_value = -1000.0, max_value = 1000.0): uiParameter.__init__(self, uiParameter.FLOAT) self.value = value self.step = step self.min_value = min_value self.max_value = max_value class uiBool(uiParameter): def __init__(self, value): uiParameter.__init__(self, uiParameter.BOOL) self.value = value class uiSelect(uiParameter): def __init__(self, value, value_list): uiParameter.__init__(self, uiParameter.SELECT, value, value_list) self.value = value self.value_list = value_list import simulator as sim
[docs]class SimUI: """The SimUI class defines a front-end for the :class:`~simulator.Simulator`. It contains the necessary functions for the frontend-simulator communication and stubs for the message callbacks. This class manages three important objects: * The simulator, as ``self.simulator_thread`` * The incoming simulator events, as ``self.in_queue`` * The outgoing simulator commands, as ``self.sim_queue`` The constructor of SimUI takes a :class:`~renderer.Renderer` object as parameter. This renderer will be passed to the simulator to draw on. """ def __init__(self, renderer): self.event_handler = None self.sim_queue = queue.Queue() # create the simulator thread self.simulator_thread = sim.Simulator(renderer, self.sim_queue) self.in_queue = self.simulator_thread._out_queue self.simulator_thread.start()
[docs] def register_event_handler(self, event_handler): """Register a callback that will be executed to process the """ self.event_handler = event_handler
[docs] def unregister_event_handler(self): """Unregister a previously registered event handler. """ self.event_handler = None
[docs] def process_events(self, process_all = False): """Processes one or all incoming events from the simulator. A single event is a tuple (name,args). During the processing of the event, the function ``simulator_``\ *name* will be called with args as parameters. It is strongly discouraged to create new class methods with the name starting with `simulator_`. Such functions could be called from the simulator without your consent. Unknown or malformed events will lead to an error message printed to the console. """ while not self.in_queue.empty(): tpl = self.in_queue.get() if isinstance(tpl,tuple) and len(tpl) == 2: name, args = tpl intercepted = False if self.event_handler is not None: intercepted = self.event_handler(name,args) if not intercepted: # Scramble name = "simulator_{}".format(name) if name in self.__class__.__dict__: try: self.__class__.__dict__[name](self,*args) except TypeError: print("Wrong UI event parameters {}{}".format(name,args)) raise else: print("Unknown UI event '{}'".format(name)) else: print("Wrong UI event format '{}'".format(tpl)) self.in_queue.task_done() if not process_all: return
[docs] def run_simulator_command(self,command,*args): """Sends the command *command* to the simulator. All arguments after *command* are passed to the command processing function on the simulator side. See :class:`~simulator.Simulator` for the available commands. """ self.sim_queue.put((command, args)) # Simulator processing functions : stubs
[docs] def simulator_make_param_window(self,robot_id,name,parameters): """A request from the supervisor to create a parameter window. *robot_id* is guaranteed to uniquely identify a robot in a simulation. Currently, *robot_id* is the actual robot object. It can be used e.g. to extract the color of the robot as ``robot_id.get_color()``. *name* is the desired window name, and *parameters* is the structure returned by :meth:`~supervisor.Supervisor.get_ui_description`. """ raise NotImplementedError('SimUI.simulator_make_param_window')
[docs] def simulator_running(self): """A notification that the simulation has been started.""" raise NotImplementedError('SimUI.simulator_running')
[docs] def simulator_paused(self): """A notification that the simulation has been paused.""" raise NotImplementedError('SimUI.simulator_paused')
[docs] def simulator_reset(self): """A notification that the simulation has been reset.""" raise NotImplementedError('SimUI.simulator_reset')
[docs] def simulator_stopped(self): """A notification that the simulation has been stopped.""" raise NotImplementedError('SimUI.simulator_stopped')
[docs] def simulator_update_view(self): """A request to redraw the simulation window. This notification signifies that the simulation has stopped using the renderer, and is waiting for the UI to process this event. The simulation will be resumed after this function exits. """ raise NotImplementedError('SimUI.simulator_update_view')
[docs] def simulator_exception(self,e_type, e_value, e_traceback): """An exception was raised in the simulator thread in the attempt to process an incoming command. """ raise NotImplementedError('SimUI.simulator_exception')
[docs] def simulator_log(self, message, objclass, objcolor): """A log *message* was generated by one of the simulation objects of class *objclass*. The *objcolor* is the color of the simobject, in the case the object is connected to one, and None otherwise. """ raise NotImplementedError('SimUI.simulator_log') # Commands for the tester:
[docs] def run_simulation(self): """Unpause the simulation.""" self.run_simulator_command('start_simulation')
[docs] def pause_simulation(self): """Pause the simulation.""" self.run_simulator_command('pause_simulation')
[docs] def step_simulation(self): """Advance the simulation one step if it is paused.""" self.run_simulator_command('step_simulation')
[docs] def start_testing(self): """Prepare the simulation environment for testing, e.g. disable user controls of the simulation progress.""" pass
[docs] def stop_testing(self): """Return UI back to normal operation.""" pass #def get_view_parameters(self): #pass #def set_view_parameters(self,params): #pass #def new_renderer(self): #pass #def pop_renderer(self): #pass #def start_test(self): #"""This function will pause and 'cache' the currently running #simulation. A new `simulator.Simulator` will be started with #the control belonging to the tester object. #""" #self.antiteststruct = Struct() #self.antiteststruct.wasrunning = False ## 1) Pause simulator #if self.simulator_thread.is_running(): #self.antiteststruct.wasrunning = True # Remember the setting #self.run_simulator_command('pause_simulation') # Pause simulation #self.process_events(True) # Process all events ## 2) Create new simulator #self.antiteststruct.simulator = simulator_thread #self.simulator_thread = sim.Simulator(self.instantiate_new_renderer(), self.sim_queue) #self.simulator_thread.start() #def stop_test(self): #"""This function will restore the cached simulation and #simulation. A new `simulator.Simulator` will be started with #the control belonging to the tester object. #""" #view_params = self.get_view_parameters() ## 1) Stop simulator #self.run_simulator_command('stop') #while self.simulator_thread.isAlive(): #self.process_events(True) #self.simulator_thread.join(0.1) ## 2) Switch to old simulator #self.pop_renderer() #self.simulator_thread = self.antiteststruct.simulator ## 3) continue running #if self.antiteststruct.wasrunning: #self.run_simulator_command('pause_simulation')