From 6bf4be8b8f3863c65f47d37c0be182971f9360bc Mon Sep 17 00:00:00 2001 From: Loic Guegan Date: Thu, 23 Jun 2022 14:06:26 +0200 Subject: [PATCH] Improve simulator robustness --- esds/__init__.py | 5 +- esds/node.py | 205 +++++++++++++++++++++++++++++++++ esds/{esds.py => simulator.py} | 202 ++------------------------------ tests/simple_read_clock_2n/out | 4 +- 4 files changed, 217 insertions(+), 199 deletions(-) create mode 100644 esds/node.py rename esds/{esds.py => simulator.py} (76%) diff --git a/esds/__init__.py b/esds/__init__.py index efefa84..af3fa98 100644 --- a/esds/__init__.py +++ b/esds/__init__.py @@ -1,3 +1,4 @@ -__all__ = ["simulator", "plugins", "helpers"] +__all__ = ["simulator", "node", "plugins", "helpers"] + +from esds.simulator import Simulator -from esds.esds import Simulator diff --git a/esds/node.py b/esds/node.py new file mode 100644 index 0000000..1195ad1 --- /dev/null +++ b/esds/node.py @@ -0,0 +1,205 @@ +import threading,importlib,queue + +class Node: + available_node_id=0 + def __init__(self,src,interfaces): + """ + + """ + self.node_id=Node.available_node_id + Node.available_node_id+=1 # Refresh node id + self.src=src # Store the node source code + self.args=None # Store the node arguments (passed through Simulator.create_node() + self.rargs=None # Store the requests arguments + self.plugins=list() # Contains all registered node plugins + self.rqueue=queue.Queue() # Receive simulator acknowledgments + self.chest={"state":"running", "turned_on":True, "request": None, "interfaces":dict(), "interfaces_queue_size":dict()} + for interface in interfaces: + self.chest["interfaces"][interface]=queue.Queue() + self.chest["interfaces_queue_size"][interface]=0 + self.chest_lock=threading.Lock() # To access/modify self.chest + + def plugin_register(self,plugin): + self.plugins.append(plugin) + + def plugin_notify(self,reason,args): + """ + This function strives to avoid using Python specific features + """ + for p in self.plugins: + if reason == "receive_return" or reason == "receivet_return": + p.on_receive_return(args[0],args[1],args[2],args[3]) + if reason == "send_call": + p.on_send_call(args[0],args[1],args[2],args[3]) + if reason == "send_return": + p.on_send_return(args[0],args[1],args[2],args[3],args[4]) + if reason == "terminated": + p.on_terminated() + + def __getitem__(self,key): + self.chest_lock.acquire() + value=self.chest[key] + self.chest_lock.release() + return value + + def __setitem__(self,key,value): + self.chest_lock.acquire() + value=self.chest[key]=value + self.chest_lock.release() + + def abort(self,reason): + self.rargs=reason + self["request"]="abort" + self["state"]="call" + while True: continue + + def log(self,msg): + if type(msg) != str: + self.abort("log() called with a non-string argument") + self.rargs=msg + self["request"]="log" + self["state"]="call" + self.wait_ack(["log"]) + + def read(self, register): + self["request"]="read" + self.rargs=register + self["state"]="call" + ack=self.wait_ack(["read"]) + return ack[1] + + def wait(self,duration): + if type(duration) != int and type(duration) != float: + self.abort("wait() called with a non-number duration") + self.rargs=duration + self["request"]="timeout_add" + self["state"]="call" + self.wait_ack(["timeout_add"]) + self["state"]="pending" + self.wait_ack(["timeout"]) + + def wait_end(self): + self["request"]="wait_end" + self["state"]="request" + self.wait_ack(["wait_end"]) + self.wait_ack(["sim_end"]) + + def turn_off(self): + self["turned_on"]=False + self["request"]="turn_off" + self["state"]="call" + self.wait_ack(["turn_off"]) + + def turn_on(self): + self["turned_on"]=True + self["request"]="turn_on" + self["state"]="call" + self.wait_ack(["turn_on"]) + + def send(self, interface, data, datasize, dst): + if interface not in self["interfaces"]: + self.abort("send() called with an unknown interface \""+interface+"\"") + elif type(datasize) != int and type(datasize) != float: + self.abort("send() called with a non-number datasize") + elif type(dst) != int and type(dst) != float and dst != None: + self.abort("send() called with a non-number dst (wired interfaces) or dst is not None (wireless interfaces)") + self.plugin_notify("send_call",(interface,data,datasize,dst)) + self.rargs=(interface, data, datasize, dst) + self["request"]="send" + self["state"]="request" + ack=self.wait_ack(["send","send_cancel"]) + self.plugin_notify("send_return",(interface,data,datasize,dst,ack[1])) + return ack[1] + + def sendt(self, interface, data, datasize, dst, timeout): + if interface not in self["interfaces"]: + self.abort("sendt() called with an unknown interface \""+interface+"\"") + elif type(datasize) != int and type(datasize) != float: + self.abort("sendt() called with a non-number datasize") + elif type(timeout) != int and type(timeout) != float: + self.abort("sendt() called with a non-number timeout") + elif type(dst) != int and type(dst) != float and dst != None: + self.abort("send() called with a non-number dst (wired interfaces) or dst is not None (wireless interfaces)") + self.rargs=timeout + self["request"]="timeout_add" + self["state"]="call" + self.wait_ack(["timeout_add"]) + self.rargs=(interface, data, datasize, dst) + self["request"]="send" + self["state"]="request" + ack=self.wait_ack(["send","timeout","send_cancel"]) + if ack[0] == "timeout": + self["request"]="send_cancel" + self["state"]="call" + self.wait_ack(["send_cancel"]) + return -1 + self["request"]="timeout_remove" + self["state"]="call" + self.wait_ack(["timeout_remove"]) + return ack[1] + + def receive(self,interface): + if interface not in self["interfaces"]: + self.abort("receive() called with an unknown interface \""+interface+"\"") + self["request"]="receive" + self.rargs=interface + self["state"]="request" + self.wait_ack(["receive"]) + data,start_at,end_at=self["interfaces"][interface].get() + self.plugin_notify("receive_return",(interface,data,start_at,end_at)) + return (0,data) + + def receivet(self,interface, timeout): + if interface not in self["interfaces"]: + self.abort("receivet() called with an unknown interface \""+interface+"\"") + elif type(timeout) != int and type(timeout) != float: + self.abort("receivet() called with a non-number timeout") + self.rargs=timeout + self["request"]="timeout_add" + self["state"]="call" + self.wait_ack(["timeout_add"]) + self["request"]="receive" + self.rargs=interface + self["state"]="request" + ack=self.wait_ack(["receive","timeout"]) + if ack[0] == "timeout": + return (-1,None) + self["request"]="timeout_remove" + self["state"]="call" + self.wait_ack(["timeout_remove"]) + data,start_at,end_at=self["interfaces"][interface].get() + self.plugin_notify("receivet_return",(interface,data,start_at,end_at)) + return (0,data) + + def wait_ack(self, ack_types): + """ + Wait for specific acks from the request queue (rqueue) + """ + ack_buffer=list() # To filter ack + ack=None + while True: + ack=self.rqueue.get() # Wait for simulator acknowledgments + if ack[0] not in ack_types: + ack_buffer.append(ack) + else: + break + # Push back the filtered ack + for cur_ack in ack_buffer: + self.rqueue.put(cur_ack) + return(ack) + + def sync(self): + """ + Wait until node stop running + """ + while self["state"] == "running": + pass + + def run(self,args): + """ + Load and run the user program + """ + self.node=importlib.import_module(self.src) + self.args=args # Allow access to arguments + self.node.execute(self) + self["state"]="terminated" diff --git a/esds/esds.py b/esds/simulator.py similarity index 76% rename from esds/esds.py rename to esds/simulator.py index 0149a82..a9866b8 100644 --- a/esds/esds.py +++ b/esds/simulator.py @@ -1,197 +1,6 @@ -#!/usr/bin/env python - import numpy as np -import threading,importlib,queue,sys,time - -class Node: - available_node_id=0 - def __init__(self,src,interfaces): - """ - - """ - self.node_id=Node.available_node_id - Node.available_node_id+=1 # Refresh node id - self.src=src # Store the node source code - self.args=None # Store the node arguments (passed through Simulator.create_node() - self.rargs=None # Store the requests arguments - self.plugins=list() # Contains all registered node plugins - self.rqueue=queue.Queue() # Receive simulator acknowledgments - self.chest={"state":"running", "turned_on":True, "request": None, "interfaces":dict(), "interfaces_queue_size":dict()} - for interface in interfaces: - self.chest["interfaces"][interface]=queue.Queue() - self.chest["interfaces_queue_size"][interface]=0 - self.chest_lock=threading.Lock() # To access/modify self.chest - - def plugin_register(self,plugin): - self.plugins.append(plugin) - - def plugin_notify(self,reason,args): - """ - This function strives to avoid using Python specific features - """ - for p in self.plugins: - if reason == "receive_return" or reason == "receivet_return": - p.on_receive_return(args[0],args[1],args[2],args[3]) - if reason == "send_call": - p.on_send_call(args[0],args[1],args[2],args[3]) - if reason == "send_return": - p.on_send_return(args[0],args[1],args[2],args[3],args[4]) - if reason == "terminated": - p.on_terminated() - - def __getitem__(self,key): - self.chest_lock.acquire() - value=self.chest[key] - self.chest_lock.release() - return value - - def __setitem__(self,key,value): - self.chest_lock.acquire() - value=self.chest[key]=value - self.chest_lock.release() - - def abort(self,reason): - self.rargs=reason - self["request"]="abort" - self["state"]="call" - while True: continue - - def log(self,msg): - if type(msg) != str: - self.abort("log() called with a non-string argument") - self.rargs=msg - self["request"]="log" - self["state"]="call" - self.wait_ack(["log"]) - - def read(self, register): - self["request"]="read" - self.rargs=register - self["state"]="call" - ack=self.wait_ack(["read"]) - return ack[1] - - def wait(self,duration): - self.rargs=duration - self["request"]="timeout_add" - self["state"]="call" - self.wait_ack(["timeout_add"]) - self["state"]="pending" - self.wait_ack(["timeout"]) - - def wait_end(self): - self["request"]="wait_end" - self["state"]="request" - self.wait_ack(["wait_end"]) - self.wait_ack(["sim_end"]) - - def turn_off(self): - self["turned_on"]=False - self["request"]="turn_off" - self["state"]="call" - self.wait_ack(["turn_off"]) - - def turn_on(self): - self["turned_on"]=True - self["request"]="turn_on" - self["state"]="call" - self.wait_ack(["turn_on"]) - - def send(self, interface, data, datasize, dst): - if interface not in self["interfaces"]: - self.abort("send() called with an unknown interface \""+interface+"\"") - self.plugin_notify("send_call",(interface,data,datasize,dst)) - self.rargs=(interface, data, datasize, dst) - self["request"]="send" - self["state"]="request" - ack=self.wait_ack(["send","send_cancel"]) - self.plugin_notify("send_return",(interface,data,datasize,dst,ack[1])) - return ack[1] - - def sendt(self, interface, data, datasize, dst, timeout): - if interface not in self["interfaces"]: - self.abort("sendt() called with an unknown interface \""+interface+"\"") - self.rargs=timeout - self["request"]="timeout_add" - self["state"]="call" - self.wait_ack(["timeout_add"]) - self.rargs=(interface, data, datasize, dst) - self["request"]="send" - self["state"]="request" - ack=self.wait_ack(["send","timeout","send_cancel"]) - if ack[0] == "timeout": - self["request"]="send_cancel" - self["state"]="call" - self.wait_ack(["send_cancel"]) - return -1 - self["request"]="timeout_remove" - self["state"]="call" - self.wait_ack(["timeout_remove"]) - return ack[1] - - def receive(self,interface): - if interface not in self["interfaces"]: - self.abort("receive() called with an unknown interface \""+interface+"\"") - self["request"]="receive" - self.rargs=interface - self["state"]="request" - self.wait_ack(["receive"]) - data,start_at,end_at=self["interfaces"][interface].get() - self.plugin_notify("receive_return",(interface,data,start_at,end_at)) - return (0,data) - - def receivet(self,interface, timeout): - if interface not in self["interfaces"]: - self.abort("receivet() called with an unknown interface \""+interface+"\"") - self.rargs=timeout - self["request"]="timeout_add" - self["state"]="call" - self.wait_ack(["timeout_add"]) - self["request"]="receive" - self.rargs=interface - self["state"]="request" - ack=self.wait_ack(["receive","timeout"]) - if ack[0] == "timeout": - return (-1,None) - self["request"]="timeout_remove" - self["state"]="call" - self.wait_ack(["timeout_remove"]) - data,start_at,end_at=self["interfaces"][interface].get() - self.plugin_notify("receivet_return",(interface,data,start_at,end_at)) - return (0,data) - - def wait_ack(self, ack_types): - """ - Wait for specific acks from the request queue (rqueue) - """ - ack_buffer=list() # To filter ack - ack=None - while True: - ack=self.rqueue.get() # Wait for simulator acknowledgments - if ack[0] not in ack_types: - ack_buffer.append(ack) - else: - break - # Push back the filtered ack - for cur_ack in ack_buffer: - self.rqueue.put(cur_ack) - return(ack) - - def sync(self): - """ - Wait until node stop running - """ - while self["state"] == "running": - pass - - def run(self,args): - """ - Load and run the user program - """ - self.node=importlib.import_module(self.src) - self.args=args # Allow access to arguments - self.node.execute(self) - self["state"]="terminated" +import threading,sys,time +from esds.node import Node class Simulator: """ @@ -344,7 +153,7 @@ class Simulator: elif node["request"] == "read": node["state"]="running" if node.rargs == "clock": - node.rqueue.put(("read",self.time)) + node.rqueue.put(("read",float(self.time))) elif node.rargs[0:5] == "ncom_": # ncom_ register interface=node.rargs[5:] count=0 @@ -462,6 +271,10 @@ class Simulator: if node["request"] == "send": node["state"]="pending" interface, data, datasize, dst=node.rargs + if dst != None: + if not (dst >=0 and dst <=len(self.nodes)): + self.log("Invalid dst used in send() or sendt(), node "+str(dst)+" not found", node=node.node_id) + exit(1) self.communicate(interface, node.node_id, dst, data, datasize) elif node["request"] == "receive": interface=node.rargs @@ -612,4 +425,3 @@ class Simulator: ##### Simulation ends self.log("Simulation ends") - diff --git a/tests/simple_read_clock_2n/out b/tests/simple_read_clock_2n/out index 4fa7877..c3cb2b9 100644 --- a/tests/simple_read_clock_2n/out +++ b/tests/simple_read_clock_2n/out @@ -1,5 +1,5 @@ -[t=0.000,src=n0] Clock is 0s -[t=0.000,src=n1] Clock is 0s +[t=0.000,src=n0] Clock is 0.0s +[t=0.000,src=n1] Clock is 0.0s [t=5698.126,src=n0] Clock is 5698.1256s [t=5698.126,src=n0] Clock is 5698.1256s [t=5698.126,src=n1] Clock is 5698.1256s