mirror of
https://gitlab.com/manzerbredes/esds.git
synced 2025-04-07 02:26:28 +02:00
Improve simulator robustness
This commit is contained in:
parent
b81ea45bd3
commit
6bf4be8b8f
4 changed files with 217 additions and 199 deletions
|
@ -1,3 +1,4 @@
|
||||||
__all__ = ["simulator", "plugins", "helpers"]
|
__all__ = ["simulator", "node", "plugins", "helpers"]
|
||||||
|
|
||||||
|
from esds.simulator import Simulator
|
||||||
|
|
||||||
from esds.esds import Simulator
|
|
||||||
|
|
205
esds/node.py
Normal file
205
esds/node.py
Normal file
|
@ -0,0 +1,205 @@
|
||||||
|
import threading,importlib,queue
|
||||||
|
|
||||||
|
class Node:
|
||||||
|
available_node_id=0
|
||||||
|
def __init__(self,src,interfaces):
|
||||||
|
"""
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.node_id=Node.available_node_id
|
||||||
|
Node.available_node_id+=1 # Refresh node id
|
||||||
|
self.src=src # Store the node source code
|
||||||
|
self.args=None # Store the node arguments (passed through Simulator.create_node()
|
||||||
|
self.rargs=None # Store the requests arguments
|
||||||
|
self.plugins=list() # Contains all registered node plugins
|
||||||
|
self.rqueue=queue.Queue() # Receive simulator acknowledgments
|
||||||
|
self.chest={"state":"running", "turned_on":True, "request": None, "interfaces":dict(), "interfaces_queue_size":dict()}
|
||||||
|
for interface in interfaces:
|
||||||
|
self.chest["interfaces"][interface]=queue.Queue()
|
||||||
|
self.chest["interfaces_queue_size"][interface]=0
|
||||||
|
self.chest_lock=threading.Lock() # To access/modify self.chest
|
||||||
|
|
||||||
|
def plugin_register(self,plugin):
|
||||||
|
self.plugins.append(plugin)
|
||||||
|
|
||||||
|
def plugin_notify(self,reason,args):
|
||||||
|
"""
|
||||||
|
This function strives to avoid using Python specific features
|
||||||
|
"""
|
||||||
|
for p in self.plugins:
|
||||||
|
if reason == "receive_return" or reason == "receivet_return":
|
||||||
|
p.on_receive_return(args[0],args[1],args[2],args[3])
|
||||||
|
if reason == "send_call":
|
||||||
|
p.on_send_call(args[0],args[1],args[2],args[3])
|
||||||
|
if reason == "send_return":
|
||||||
|
p.on_send_return(args[0],args[1],args[2],args[3],args[4])
|
||||||
|
if reason == "terminated":
|
||||||
|
p.on_terminated()
|
||||||
|
|
||||||
|
def __getitem__(self,key):
|
||||||
|
self.chest_lock.acquire()
|
||||||
|
value=self.chest[key]
|
||||||
|
self.chest_lock.release()
|
||||||
|
return value
|
||||||
|
|
||||||
|
def __setitem__(self,key,value):
|
||||||
|
self.chest_lock.acquire()
|
||||||
|
value=self.chest[key]=value
|
||||||
|
self.chest_lock.release()
|
||||||
|
|
||||||
|
def abort(self,reason):
|
||||||
|
self.rargs=reason
|
||||||
|
self["request"]="abort"
|
||||||
|
self["state"]="call"
|
||||||
|
while True: continue
|
||||||
|
|
||||||
|
def log(self,msg):
|
||||||
|
if type(msg) != str:
|
||||||
|
self.abort("log() called with a non-string argument")
|
||||||
|
self.rargs=msg
|
||||||
|
self["request"]="log"
|
||||||
|
self["state"]="call"
|
||||||
|
self.wait_ack(["log"])
|
||||||
|
|
||||||
|
def read(self, register):
|
||||||
|
self["request"]="read"
|
||||||
|
self.rargs=register
|
||||||
|
self["state"]="call"
|
||||||
|
ack=self.wait_ack(["read"])
|
||||||
|
return ack[1]
|
||||||
|
|
||||||
|
def wait(self,duration):
|
||||||
|
if type(duration) != int and type(duration) != float:
|
||||||
|
self.abort("wait() called with a non-number duration")
|
||||||
|
self.rargs=duration
|
||||||
|
self["request"]="timeout_add"
|
||||||
|
self["state"]="call"
|
||||||
|
self.wait_ack(["timeout_add"])
|
||||||
|
self["state"]="pending"
|
||||||
|
self.wait_ack(["timeout"])
|
||||||
|
|
||||||
|
def wait_end(self):
|
||||||
|
self["request"]="wait_end"
|
||||||
|
self["state"]="request"
|
||||||
|
self.wait_ack(["wait_end"])
|
||||||
|
self.wait_ack(["sim_end"])
|
||||||
|
|
||||||
|
def turn_off(self):
|
||||||
|
self["turned_on"]=False
|
||||||
|
self["request"]="turn_off"
|
||||||
|
self["state"]="call"
|
||||||
|
self.wait_ack(["turn_off"])
|
||||||
|
|
||||||
|
def turn_on(self):
|
||||||
|
self["turned_on"]=True
|
||||||
|
self["request"]="turn_on"
|
||||||
|
self["state"]="call"
|
||||||
|
self.wait_ack(["turn_on"])
|
||||||
|
|
||||||
|
def send(self, interface, data, datasize, dst):
|
||||||
|
if interface not in self["interfaces"]:
|
||||||
|
self.abort("send() called with an unknown interface \""+interface+"\"")
|
||||||
|
elif type(datasize) != int and type(datasize) != float:
|
||||||
|
self.abort("send() called with a non-number datasize")
|
||||||
|
elif type(dst) != int and type(dst) != float and dst != None:
|
||||||
|
self.abort("send() called with a non-number dst (wired interfaces) or dst is not None (wireless interfaces)")
|
||||||
|
self.plugin_notify("send_call",(interface,data,datasize,dst))
|
||||||
|
self.rargs=(interface, data, datasize, dst)
|
||||||
|
self["request"]="send"
|
||||||
|
self["state"]="request"
|
||||||
|
ack=self.wait_ack(["send","send_cancel"])
|
||||||
|
self.plugin_notify("send_return",(interface,data,datasize,dst,ack[1]))
|
||||||
|
return ack[1]
|
||||||
|
|
||||||
|
def sendt(self, interface, data, datasize, dst, timeout):
|
||||||
|
if interface not in self["interfaces"]:
|
||||||
|
self.abort("sendt() called with an unknown interface \""+interface+"\"")
|
||||||
|
elif type(datasize) != int and type(datasize) != float:
|
||||||
|
self.abort("sendt() called with a non-number datasize")
|
||||||
|
elif type(timeout) != int and type(timeout) != float:
|
||||||
|
self.abort("sendt() called with a non-number timeout")
|
||||||
|
elif type(dst) != int and type(dst) != float and dst != None:
|
||||||
|
self.abort("send() called with a non-number dst (wired interfaces) or dst is not None (wireless interfaces)")
|
||||||
|
self.rargs=timeout
|
||||||
|
self["request"]="timeout_add"
|
||||||
|
self["state"]="call"
|
||||||
|
self.wait_ack(["timeout_add"])
|
||||||
|
self.rargs=(interface, data, datasize, dst)
|
||||||
|
self["request"]="send"
|
||||||
|
self["state"]="request"
|
||||||
|
ack=self.wait_ack(["send","timeout","send_cancel"])
|
||||||
|
if ack[0] == "timeout":
|
||||||
|
self["request"]="send_cancel"
|
||||||
|
self["state"]="call"
|
||||||
|
self.wait_ack(["send_cancel"])
|
||||||
|
return -1
|
||||||
|
self["request"]="timeout_remove"
|
||||||
|
self["state"]="call"
|
||||||
|
self.wait_ack(["timeout_remove"])
|
||||||
|
return ack[1]
|
||||||
|
|
||||||
|
def receive(self,interface):
|
||||||
|
if interface not in self["interfaces"]:
|
||||||
|
self.abort("receive() called with an unknown interface \""+interface+"\"")
|
||||||
|
self["request"]="receive"
|
||||||
|
self.rargs=interface
|
||||||
|
self["state"]="request"
|
||||||
|
self.wait_ack(["receive"])
|
||||||
|
data,start_at,end_at=self["interfaces"][interface].get()
|
||||||
|
self.plugin_notify("receive_return",(interface,data,start_at,end_at))
|
||||||
|
return (0,data)
|
||||||
|
|
||||||
|
def receivet(self,interface, timeout):
|
||||||
|
if interface not in self["interfaces"]:
|
||||||
|
self.abort("receivet() called with an unknown interface \""+interface+"\"")
|
||||||
|
elif type(timeout) != int and type(timeout) != float:
|
||||||
|
self.abort("receivet() called with a non-number timeout")
|
||||||
|
self.rargs=timeout
|
||||||
|
self["request"]="timeout_add"
|
||||||
|
self["state"]="call"
|
||||||
|
self.wait_ack(["timeout_add"])
|
||||||
|
self["request"]="receive"
|
||||||
|
self.rargs=interface
|
||||||
|
self["state"]="request"
|
||||||
|
ack=self.wait_ack(["receive","timeout"])
|
||||||
|
if ack[0] == "timeout":
|
||||||
|
return (-1,None)
|
||||||
|
self["request"]="timeout_remove"
|
||||||
|
self["state"]="call"
|
||||||
|
self.wait_ack(["timeout_remove"])
|
||||||
|
data,start_at,end_at=self["interfaces"][interface].get()
|
||||||
|
self.plugin_notify("receivet_return",(interface,data,start_at,end_at))
|
||||||
|
return (0,data)
|
||||||
|
|
||||||
|
def wait_ack(self, ack_types):
|
||||||
|
"""
|
||||||
|
Wait for specific acks from the request queue (rqueue)
|
||||||
|
"""
|
||||||
|
ack_buffer=list() # To filter ack
|
||||||
|
ack=None
|
||||||
|
while True:
|
||||||
|
ack=self.rqueue.get() # Wait for simulator acknowledgments
|
||||||
|
if ack[0] not in ack_types:
|
||||||
|
ack_buffer.append(ack)
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
# Push back the filtered ack
|
||||||
|
for cur_ack in ack_buffer:
|
||||||
|
self.rqueue.put(cur_ack)
|
||||||
|
return(ack)
|
||||||
|
|
||||||
|
def sync(self):
|
||||||
|
"""
|
||||||
|
Wait until node stop running
|
||||||
|
"""
|
||||||
|
while self["state"] == "running":
|
||||||
|
pass
|
||||||
|
|
||||||
|
def run(self,args):
|
||||||
|
"""
|
||||||
|
Load and run the user program
|
||||||
|
"""
|
||||||
|
self.node=importlib.import_module(self.src)
|
||||||
|
self.args=args # Allow access to arguments
|
||||||
|
self.node.execute(self)
|
||||||
|
self["state"]="terminated"
|
|
@ -1,197 +1,6 @@
|
||||||
#!/usr/bin/env python
|
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import threading,importlib,queue,sys,time
|
import threading,sys,time
|
||||||
|
from esds.node import Node
|
||||||
class Node:
|
|
||||||
available_node_id=0
|
|
||||||
def __init__(self,src,interfaces):
|
|
||||||
"""
|
|
||||||
|
|
||||||
"""
|
|
||||||
self.node_id=Node.available_node_id
|
|
||||||
Node.available_node_id+=1 # Refresh node id
|
|
||||||
self.src=src # Store the node source code
|
|
||||||
self.args=None # Store the node arguments (passed through Simulator.create_node()
|
|
||||||
self.rargs=None # Store the requests arguments
|
|
||||||
self.plugins=list() # Contains all registered node plugins
|
|
||||||
self.rqueue=queue.Queue() # Receive simulator acknowledgments
|
|
||||||
self.chest={"state":"running", "turned_on":True, "request": None, "interfaces":dict(), "interfaces_queue_size":dict()}
|
|
||||||
for interface in interfaces:
|
|
||||||
self.chest["interfaces"][interface]=queue.Queue()
|
|
||||||
self.chest["interfaces_queue_size"][interface]=0
|
|
||||||
self.chest_lock=threading.Lock() # To access/modify self.chest
|
|
||||||
|
|
||||||
def plugin_register(self,plugin):
|
|
||||||
self.plugins.append(plugin)
|
|
||||||
|
|
||||||
def plugin_notify(self,reason,args):
|
|
||||||
"""
|
|
||||||
This function strives to avoid using Python specific features
|
|
||||||
"""
|
|
||||||
for p in self.plugins:
|
|
||||||
if reason == "receive_return" or reason == "receivet_return":
|
|
||||||
p.on_receive_return(args[0],args[1],args[2],args[3])
|
|
||||||
if reason == "send_call":
|
|
||||||
p.on_send_call(args[0],args[1],args[2],args[3])
|
|
||||||
if reason == "send_return":
|
|
||||||
p.on_send_return(args[0],args[1],args[2],args[3],args[4])
|
|
||||||
if reason == "terminated":
|
|
||||||
p.on_terminated()
|
|
||||||
|
|
||||||
def __getitem__(self,key):
|
|
||||||
self.chest_lock.acquire()
|
|
||||||
value=self.chest[key]
|
|
||||||
self.chest_lock.release()
|
|
||||||
return value
|
|
||||||
|
|
||||||
def __setitem__(self,key,value):
|
|
||||||
self.chest_lock.acquire()
|
|
||||||
value=self.chest[key]=value
|
|
||||||
self.chest_lock.release()
|
|
||||||
|
|
||||||
def abort(self,reason):
|
|
||||||
self.rargs=reason
|
|
||||||
self["request"]="abort"
|
|
||||||
self["state"]="call"
|
|
||||||
while True: continue
|
|
||||||
|
|
||||||
def log(self,msg):
|
|
||||||
if type(msg) != str:
|
|
||||||
self.abort("log() called with a non-string argument")
|
|
||||||
self.rargs=msg
|
|
||||||
self["request"]="log"
|
|
||||||
self["state"]="call"
|
|
||||||
self.wait_ack(["log"])
|
|
||||||
|
|
||||||
def read(self, register):
|
|
||||||
self["request"]="read"
|
|
||||||
self.rargs=register
|
|
||||||
self["state"]="call"
|
|
||||||
ack=self.wait_ack(["read"])
|
|
||||||
return ack[1]
|
|
||||||
|
|
||||||
def wait(self,duration):
|
|
||||||
self.rargs=duration
|
|
||||||
self["request"]="timeout_add"
|
|
||||||
self["state"]="call"
|
|
||||||
self.wait_ack(["timeout_add"])
|
|
||||||
self["state"]="pending"
|
|
||||||
self.wait_ack(["timeout"])
|
|
||||||
|
|
||||||
def wait_end(self):
|
|
||||||
self["request"]="wait_end"
|
|
||||||
self["state"]="request"
|
|
||||||
self.wait_ack(["wait_end"])
|
|
||||||
self.wait_ack(["sim_end"])
|
|
||||||
|
|
||||||
def turn_off(self):
|
|
||||||
self["turned_on"]=False
|
|
||||||
self["request"]="turn_off"
|
|
||||||
self["state"]="call"
|
|
||||||
self.wait_ack(["turn_off"])
|
|
||||||
|
|
||||||
def turn_on(self):
|
|
||||||
self["turned_on"]=True
|
|
||||||
self["request"]="turn_on"
|
|
||||||
self["state"]="call"
|
|
||||||
self.wait_ack(["turn_on"])
|
|
||||||
|
|
||||||
def send(self, interface, data, datasize, dst):
|
|
||||||
if interface not in self["interfaces"]:
|
|
||||||
self.abort("send() called with an unknown interface \""+interface+"\"")
|
|
||||||
self.plugin_notify("send_call",(interface,data,datasize,dst))
|
|
||||||
self.rargs=(interface, data, datasize, dst)
|
|
||||||
self["request"]="send"
|
|
||||||
self["state"]="request"
|
|
||||||
ack=self.wait_ack(["send","send_cancel"])
|
|
||||||
self.plugin_notify("send_return",(interface,data,datasize,dst,ack[1]))
|
|
||||||
return ack[1]
|
|
||||||
|
|
||||||
def sendt(self, interface, data, datasize, dst, timeout):
|
|
||||||
if interface not in self["interfaces"]:
|
|
||||||
self.abort("sendt() called with an unknown interface \""+interface+"\"")
|
|
||||||
self.rargs=timeout
|
|
||||||
self["request"]="timeout_add"
|
|
||||||
self["state"]="call"
|
|
||||||
self.wait_ack(["timeout_add"])
|
|
||||||
self.rargs=(interface, data, datasize, dst)
|
|
||||||
self["request"]="send"
|
|
||||||
self["state"]="request"
|
|
||||||
ack=self.wait_ack(["send","timeout","send_cancel"])
|
|
||||||
if ack[0] == "timeout":
|
|
||||||
self["request"]="send_cancel"
|
|
||||||
self["state"]="call"
|
|
||||||
self.wait_ack(["send_cancel"])
|
|
||||||
return -1
|
|
||||||
self["request"]="timeout_remove"
|
|
||||||
self["state"]="call"
|
|
||||||
self.wait_ack(["timeout_remove"])
|
|
||||||
return ack[1]
|
|
||||||
|
|
||||||
def receive(self,interface):
|
|
||||||
if interface not in self["interfaces"]:
|
|
||||||
self.abort("receive() called with an unknown interface \""+interface+"\"")
|
|
||||||
self["request"]="receive"
|
|
||||||
self.rargs=interface
|
|
||||||
self["state"]="request"
|
|
||||||
self.wait_ack(["receive"])
|
|
||||||
data,start_at,end_at=self["interfaces"][interface].get()
|
|
||||||
self.plugin_notify("receive_return",(interface,data,start_at,end_at))
|
|
||||||
return (0,data)
|
|
||||||
|
|
||||||
def receivet(self,interface, timeout):
|
|
||||||
if interface not in self["interfaces"]:
|
|
||||||
self.abort("receivet() called with an unknown interface \""+interface+"\"")
|
|
||||||
self.rargs=timeout
|
|
||||||
self["request"]="timeout_add"
|
|
||||||
self["state"]="call"
|
|
||||||
self.wait_ack(["timeout_add"])
|
|
||||||
self["request"]="receive"
|
|
||||||
self.rargs=interface
|
|
||||||
self["state"]="request"
|
|
||||||
ack=self.wait_ack(["receive","timeout"])
|
|
||||||
if ack[0] == "timeout":
|
|
||||||
return (-1,None)
|
|
||||||
self["request"]="timeout_remove"
|
|
||||||
self["state"]="call"
|
|
||||||
self.wait_ack(["timeout_remove"])
|
|
||||||
data,start_at,end_at=self["interfaces"][interface].get()
|
|
||||||
self.plugin_notify("receivet_return",(interface,data,start_at,end_at))
|
|
||||||
return (0,data)
|
|
||||||
|
|
||||||
def wait_ack(self, ack_types):
|
|
||||||
"""
|
|
||||||
Wait for specific acks from the request queue (rqueue)
|
|
||||||
"""
|
|
||||||
ack_buffer=list() # To filter ack
|
|
||||||
ack=None
|
|
||||||
while True:
|
|
||||||
ack=self.rqueue.get() # Wait for simulator acknowledgments
|
|
||||||
if ack[0] not in ack_types:
|
|
||||||
ack_buffer.append(ack)
|
|
||||||
else:
|
|
||||||
break
|
|
||||||
# Push back the filtered ack
|
|
||||||
for cur_ack in ack_buffer:
|
|
||||||
self.rqueue.put(cur_ack)
|
|
||||||
return(ack)
|
|
||||||
|
|
||||||
def sync(self):
|
|
||||||
"""
|
|
||||||
Wait until node stop running
|
|
||||||
"""
|
|
||||||
while self["state"] == "running":
|
|
||||||
pass
|
|
||||||
|
|
||||||
def run(self,args):
|
|
||||||
"""
|
|
||||||
Load and run the user program
|
|
||||||
"""
|
|
||||||
self.node=importlib.import_module(self.src)
|
|
||||||
self.args=args # Allow access to arguments
|
|
||||||
self.node.execute(self)
|
|
||||||
self["state"]="terminated"
|
|
||||||
|
|
||||||
class Simulator:
|
class Simulator:
|
||||||
"""
|
"""
|
||||||
|
@ -344,7 +153,7 @@ class Simulator:
|
||||||
elif node["request"] == "read":
|
elif node["request"] == "read":
|
||||||
node["state"]="running"
|
node["state"]="running"
|
||||||
if node.rargs == "clock":
|
if node.rargs == "clock":
|
||||||
node.rqueue.put(("read",self.time))
|
node.rqueue.put(("read",float(self.time)))
|
||||||
elif node.rargs[0:5] == "ncom_": # ncom_<interface> register
|
elif node.rargs[0:5] == "ncom_": # ncom_<interface> register
|
||||||
interface=node.rargs[5:]
|
interface=node.rargs[5:]
|
||||||
count=0
|
count=0
|
||||||
|
@ -462,6 +271,10 @@ class Simulator:
|
||||||
if node["request"] == "send":
|
if node["request"] == "send":
|
||||||
node["state"]="pending"
|
node["state"]="pending"
|
||||||
interface, data, datasize, dst=node.rargs
|
interface, data, datasize, dst=node.rargs
|
||||||
|
if dst != None:
|
||||||
|
if not (dst >=0 and dst <=len(self.nodes)):
|
||||||
|
self.log("Invalid dst used in send() or sendt(), node "+str(dst)+" not found", node=node.node_id)
|
||||||
|
exit(1)
|
||||||
self.communicate(interface, node.node_id, dst, data, datasize)
|
self.communicate(interface, node.node_id, dst, data, datasize)
|
||||||
elif node["request"] == "receive":
|
elif node["request"] == "receive":
|
||||||
interface=node.rargs
|
interface=node.rargs
|
||||||
|
@ -612,4 +425,3 @@ class Simulator:
|
||||||
|
|
||||||
##### Simulation ends
|
##### Simulation ends
|
||||||
self.log("Simulation ends")
|
self.log("Simulation ends")
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
[t=0.000,src=n0] Clock is 0s
|
[t=0.000,src=n0] Clock is 0.0s
|
||||||
[t=0.000,src=n1] Clock is 0s
|
[t=0.000,src=n1] Clock is 0.0s
|
||||||
[t=5698.126,src=n0] Clock is 5698.1256s
|
[t=5698.126,src=n0] Clock is 5698.1256s
|
||||||
[t=5698.126,src=n0] Clock is 5698.1256s
|
[t=5698.126,src=n0] Clock is 5698.1256s
|
||||||
[t=5698.126,src=n1] Clock is 5698.1256s
|
[t=5698.126,src=n1] Clock is 5698.1256s
|
||||||
|
|
Loading…
Add table
Reference in a new issue