From 33ffa0b6e0af5e2f658d9d12d0347ea57edc97a7 Mon Sep 17 00:00:00 2001 From: Loic Guegan Date: Fri, 1 Jul 2022 13:05:33 +0200 Subject: [PATCH] Simplify the API: Now wired transmission are never aborted if receiver turned off. It will just not receive the data --- esds/node.py | 8 ++-- esds/simulator.py | 55 ++++++++-------------------- tests/api_send_eth0_1s1r/out | 9 ++--- tests/api_send_eth0_1s1r/receiver.py | 11 ++---- tests/api_send_eth0_1s1r/sender.py | 6 +-- 5 files changed, 29 insertions(+), 60 deletions(-) diff --git a/esds/node.py b/esds/node.py index 09569a0..4c9ca5f 100644 --- a/esds/node.py +++ b/esds/node.py @@ -100,7 +100,7 @@ class Node: self["state"]="call_non_blocking" self.wait_ack(["turn_on"]) - def send(self, interface, data, datasize, dst,cancel_on_turn_off=True): + def send(self, interface, data, datasize, dst): if interface not in self["interfaces"]: self.abort("send() called with an unknown interface \""+interface+"\"") elif type(datasize) != int and type(datasize) != float: @@ -110,14 +110,14 @@ class Node: elif not self["turned_on"]: self.abort("send() called while node is turned off") self.plugin_notify("send_call",(interface,data,datasize,dst)) - self.rargs=(interface, data, datasize, dst,cancel_on_turn_off) + self.rargs=(interface, data, datasize, dst) self["request"]="send" self["state"]="call_blocking" ack=self.wait_ack(["send","send_cancel"]) self.plugin_notify("send_return",(interface,data,datasize,dst,ack[1])) return ack[1] - def sendt(self, interface, data, datasize, dst, timeout,cancel_on_turn_off=True): + def sendt(self, interface, data, datasize, dst, timeout): if interface not in self["interfaces"]: self.abort("sendt() called with an unknown interface \""+interface+"\"") elif type(datasize) != int and type(datasize) != float: @@ -134,7 +134,7 @@ class Node: self["request"]="timeout_add" self["state"]="call_non_blocking" self.wait_ack(["timeout_add"]) - self.rargs=(interface, data, datasize, dst,cancel_on_turn_off) + self.rargs=(interface, data, datasize, dst) self["request"]="send" self["state"]="call_blocking" ack=self.wait_ack(["send","timeout","send_cancel"]) diff --git a/esds/simulator.py b/esds/simulator.py index 052b704..74f126e 100644 --- a/esds/simulator.py +++ b/esds/simulator.py @@ -7,7 +7,7 @@ class Simulator: Flow-Level Discrete Event Simulator for Cyber-Physical Systems The general format for an event is (type,timestamp,event,priority) Event types: - - 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining,start_timestamp, perform_delivery,cancel_on_turn_off), 2) + - 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining,start_timestamp, perform_delivery), 2) - 1 timeout (1,timestamp,node_id,3) - 2 breakpoint_manual (3,timestamp,0,1) - 3 breakpoint_auto (4,timestamp,0,1) @@ -54,7 +54,7 @@ class Simulator: if int(event[0]) == 0: cur_event=event[2] ts=float(event[1]) - src_id,dst_id,interface, data, datasize,duration, datasize_remaining,start_at,perform_delivery,cancel_on_turn_off=cur_event + src_id,dst_id,interface, data, datasize,duration, datasize_remaining,start_at,perform_delivery=cur_event new_bw=netmat[interface]["bandwidth"][int(src_id),int(dst_id)] old_bw=self.netmat[interface]["bandwidth"][int(src_id),int(dst_id)] new_lat=netmat[interface]["latency"][int(src_id),int(dst_id)] @@ -192,45 +192,22 @@ class Simulator: elif node["request"] == "turn_off": # Create communications selectors (True/False arrays) selector_wireless=list() # Select all wireless events where node is involved - selector_wired=list() # Select all wired events where node is involved for event in self.events: if event[0]==0 and int(event[2][1])==node.node_id: if self.netmat[event[2][2]]["is_wired"]: selector_wireless.append(False) - if event[2][9]: # Check if should be cancel on turn_off - selector_wired.append(True) - else: - selector_wired.append(False) - event[2][8]=False # So set delivery to False!! + event[2][8]=False # So set delivery to False for wired communication!! else: selector_wireless.append(True) - selector_wired.append(False) else: selector_wireless.append(False) - selector_wired.append(False) - # Build the set of senders to notify (only in wired connections, - # indeed IRL, in wireless communications sender would send all its data) - senders_to_notify=set() - for event in self.events[selector_wired]: - senders_to_notify.add(int(event[2][0])) - # Remove communications from the event list + # Remove the relevent wireless communications from the event list if(len(self.events) != 0): - self.events=self.events[~(np.array(selector_wireless)|np.array(selector_wired))] - # Refresh wired sharing - for interface in self.sharing.keys(): - self.sharing[interface][node.node_id]=0 # Sharing goes back to zero + self.events=self.events[~np.array(selector_wireless)] # Update node state after turning off node["state"]="running" node.rqueue.put(("turn_off",0)) self.log("Turned off",node=node.node_id) - # Informed senders of wired events that communication ended - for sender_id in senders_to_notify: - sender_node=self.nodes[sender_id] - sender_node["state"]="running" - sender_node.rqueue.put(("send_cancel",2)) - # The node should resume at current self.time. So, sync the sender now: - self.sync_node_non_blocking(sender_node) - self.sync_node_blocking(sender_node) elif node["request"] == "send_cancel": selector=list() for event in self.events: @@ -303,12 +280,12 @@ class Simulator: if node["state"] == "call_blocking": if node["request"] == "send": node["state"]="pending" - interface, data, datasize, dst,cancel_on_turn_off=node.rargs + interface, data, datasize, dst=node.rargs if dst != None: if not (dst >=0 and dst <=len(self.nodes)): self.log("Invalid dst used in send() or sendt(), node "+str(dst)+" not found", node=node.node_id) exit(1) - self.communicate(interface, node.node_id, dst, data, datasize,cancel_on_turn_off) + self.communicate(interface, node.node_id, dst, data, datasize) elif node["request"] == "receive": interface=node.rargs if node["interfaces_queue_size"][interface] > 0: @@ -323,7 +300,7 @@ class Simulator: node.rqueue.put(("wait_end",0)) self.wait_end_nodes.append(node.node_id) - def communicate(self, interface, src, dst, data, datasize, cancel_on_turn_off=True): + def communicate(self, interface, src, dst, data, datasize): """ Create communication event between src and dst """ @@ -334,7 +311,7 @@ class Simulator: self.update_sharing(dst,1,interface) # Update sharing first # Note that in the following we send more data than expected to handle bandwidth sharing (datasize*8*sharing): duration=datasize*8/(self.netmat[interface]["bandwidth"][src,dst]/self.sharing[interface][dst])+self.netmat[interface]["latency"][src,dst] - self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,cancel_on_turn_off)) + self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True)) else: nsrc["state"]="call_blocking" # Try later when node is on else: @@ -345,11 +322,11 @@ class Simulator: if src == dst: # This event (where src == dst) is used to notify the sender when data is received! # Correspond to the diagonal of the network matrices (bandwidth and latency) - self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,cancel_on_turn_off)) + self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True)) elif not self.interferences: - self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,cancel_on_turn_off)) + self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True)) elif not self.handle_interferences(src,dst, interface): - self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,cancel_on_turn_off)) + self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True)) def list_receivers(self,node,interface): """ @@ -416,7 +393,7 @@ class Simulator: event=self.events[0,2] self.events=np.delete(self.events,0,0) # Consume events NOW! not at the end of the loop (event list may change in between) if event_type == 0: - src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,cancel_on_turn_off=event + src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery=event src=self.nodes[int(src_id)] dst=self.nodes[int(dst_id)] if self.netmat[interface]["is_wired"]: @@ -431,9 +408,9 @@ class Simulator: dst["state"]="running" dst.rqueue.put(("receive",0)) self.sync_node_non_blocking(dst,timeout_remove_only=True) - src["state"]="running" - src.rqueue.put(("send",0)) - self.sync_node_non_blocking(src,timeout_remove_only=True) + src["state"]="running" + src.rqueue.put(("send",0)) + self.sync_node_non_blocking(src,timeout_remove_only=True) else: if src.node_id != dst.node_id: if perform_delivery: diff --git a/tests/api_send_eth0_1s1r/out b/tests/api_send_eth0_1s1r/out index d331337..7b8f88d 100644 --- a/tests/api_send_eth0_1s1r/out +++ b/tests/api_send_eth0_1s1r/out @@ -3,13 +3,10 @@ [t=1.000,src=n1] Received: Hello World! [t=1.000,src=n0] Send 1 bytes to n1 on eth0 [t=2.000,src=n1] Receive 1 bytes on eth0 -[t=3.000,src=n0] Send 1 bytes to n1 on eth0 +[t=3.000,src=n0] Send 15 bytes to n1 on eth0 [t=3.000,src=n1] Received: Hello World! [t=3.000,src=n1] Turned off [t=4.000,src=n1] Turned on -[t=4.000,src=n0] Send 10 bytes to n1 on eth0 [t=5.000,src=n1] Receive failed code=-1 -[t=5.000,src=n1] Turned off -[t=6.000,src=n1] Turned on -[t=7.000,src=n1] Receive failed code=-1 -[t=14.000,src=esds] Simulation ends +[t=18.000,src=n0] End +[t=18.000,src=esds] Simulation ends diff --git a/tests/api_send_eth0_1s1r/receiver.py b/tests/api_send_eth0_1s1r/receiver.py index 364c24c..514cbbe 100644 --- a/tests/api_send_eth0_1s1r/receiver.py +++ b/tests/api_send_eth0_1s1r/receiver.py @@ -10,17 +10,12 @@ def execute(api): code, data=api.receive("eth0") msg="Received: "+data if code == 0 else "Receive failed code="+str(code) api.log(msg) - ##### Ensure data is not receive when turned off - api.turn_off() - api.wait(1) - api.turn_on() - code, data=api.receivet("eth0",1) - msg="Received: "+data if code == 0 else "Receive failed code="+str(code) - api.log(msg) - ##### Ensure data is not receive turned off but communication is not cancel + ##### Ensure data is not receive when turned off but communication must still be ongoing api.turn_off() api.wait(1) api.turn_on() code, data=api.receivet("eth0",1) msg="Received: "+data if code == 0 else "Receive failed code="+str(code) api.log(msg) + + diff --git a/tests/api_send_eth0_1s1r/sender.py b/tests/api_send_eth0_1s1r/sender.py index a01b371..64293da 100644 --- a/tests/api_send_eth0_1s1r/sender.py +++ b/tests/api_send_eth0_1s1r/sender.py @@ -3,6 +3,6 @@ def execute(api): api.send("eth0","Hello World!",1,1) api.send("eth0","Hello World!",1,1) - api.wait(1) # Goto 3 seconds - api.send("eth0","Hello World!",1,1) - api.send("eth0","Hello World!",10,1,False) # Now communication should not be aborted even if receiver turned_off (e.g UDP) + api.wait(1) # Goto t=3s + api.send("eth0","Hello World!",15,1) # Communication should not be aborted even if receiver turned_off (e.g UDP) + api.log("End") # Should be printed at t=18s \ No newline at end of file