Change API. Indeed, currently wired communications

are aborted when receiver node turned off. This
may not be desire when implementing UDP communications.
Now user can use a boolean when using send()/sendt() to change this
behavior.
This commit is contained in:
Loic Guegan 2022-07-01 10:03:54 +02:00
parent 48ca1f43f2
commit 1eee1ac81a
5 changed files with 49 additions and 32 deletions

View file

@ -100,7 +100,7 @@ class Node:
self["state"]="call_non_blocking" self["state"]="call_non_blocking"
self.wait_ack(["turn_on"]) self.wait_ack(["turn_on"])
def send(self, interface, data, datasize, dst): def send(self, interface, data, datasize, dst,cancel_on_turn_off=True):
if interface not in self["interfaces"]: if interface not in self["interfaces"]:
self.abort("send() called with an unknown interface \""+interface+"\"") self.abort("send() called with an unknown interface \""+interface+"\"")
elif type(datasize) != int and type(datasize) != float: elif type(datasize) != int and type(datasize) != float:
@ -110,14 +110,14 @@ class Node:
elif not self["turned_on"]: elif not self["turned_on"]:
self.abort("send() called while node is turned off") self.abort("send() called while node is turned off")
self.plugin_notify("send_call",(interface,data,datasize,dst)) self.plugin_notify("send_call",(interface,data,datasize,dst))
self.rargs=(interface, data, datasize, dst) self.rargs=(interface, data, datasize, dst,cancel_on_turn_off)
self["request"]="send" self["request"]="send"
self["state"]="call_blocking" self["state"]="call_blocking"
ack=self.wait_ack(["send","send_cancel"]) ack=self.wait_ack(["send","send_cancel"])
self.plugin_notify("send_return",(interface,data,datasize,dst,ack[1])) self.plugin_notify("send_return",(interface,data,datasize,dst,ack[1]))
return ack[1] return ack[1]
def sendt(self, interface, data, datasize, dst, timeout): def sendt(self, interface, data, datasize, dst, timeout,cancel_on_turn_off=True):
if interface not in self["interfaces"]: if interface not in self["interfaces"]:
self.abort("sendt() called with an unknown interface \""+interface+"\"") self.abort("sendt() called with an unknown interface \""+interface+"\"")
elif type(datasize) != int and type(datasize) != float: elif type(datasize) != int and type(datasize) != float:
@ -134,7 +134,7 @@ class Node:
self["request"]="timeout_add" self["request"]="timeout_add"
self["state"]="call_non_blocking" self["state"]="call_non_blocking"
self.wait_ack(["timeout_add"]) self.wait_ack(["timeout_add"])
self.rargs=(interface, data, datasize, dst) self.rargs=(interface, data, datasize, dst,cancel_on_turn_off)
self["request"]="send" self["request"]="send"
self["state"]="call_blocking" self["state"]="call_blocking"
ack=self.wait_ack(["send","timeout","send_cancel"]) ack=self.wait_ack(["send","timeout","send_cancel"])

View file

@ -7,7 +7,7 @@ class Simulator:
Flow-Level Discrete Event Simulator for Cyber-Physical Systems Flow-Level Discrete Event Simulator for Cyber-Physical Systems
The general format for an event is (type,timestamp,event,priority) The general format for an event is (type,timestamp,event,priority)
Event types: Event types:
- 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining,start_timestamp), 2) - 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining,start_timestamp, perform_delivery,cancel_on_turn_off), 2)
- 1 timeout (1,timestamp,node_id,3) - 1 timeout (1,timestamp,node_id,3)
- 2 breakpoint_manual (3,timestamp,0,1) - 2 breakpoint_manual (3,timestamp,0,1)
- 3 breakpoint_auto (4,timestamp,0,1) - 3 breakpoint_auto (4,timestamp,0,1)
@ -54,7 +54,7 @@ class Simulator:
if int(event[0]) == 0: if int(event[0]) == 0:
cur_event=event[2] cur_event=event[2]
ts=float(event[1]) ts=float(event[1])
src_id,dst_id,interface, data, datasize,duration, datasize_remaining,start_at=cur_event src_id,dst_id,interface, data, datasize,duration, datasize_remaining,start_at,perform_delivery,cancel_on_turn_off=cur_event
new_bw=netmat[interface]["bandwidth"][int(src_id),int(dst_id)] new_bw=netmat[interface]["bandwidth"][int(src_id),int(dst_id)]
old_bw=self.netmat[interface]["bandwidth"][int(src_id),int(dst_id)] old_bw=self.netmat[interface]["bandwidth"][int(src_id),int(dst_id)]
new_lat=netmat[interface]["latency"][int(src_id),int(dst_id)] new_lat=netmat[interface]["latency"][int(src_id),int(dst_id)]
@ -197,7 +197,11 @@ class Simulator:
if event[0]==0 and int(event[2][1])==node.node_id: if event[0]==0 and int(event[2][1])==node.node_id:
if self.netmat[event[2][2]]["is_wired"]: if self.netmat[event[2][2]]["is_wired"]:
selector_wireless.append(False) selector_wireless.append(False)
selector_wired.append(True) if event[2][9]: # Check if should be cancel on turn_off
selector_wired.append(True)
else:
selector_wired.append(False)
event[2][8]=False # So set delivery to False!!
else: else:
selector_wireless.append(True) selector_wireless.append(True)
selector_wired.append(False) selector_wired.append(False)
@ -299,12 +303,12 @@ class Simulator:
if node["state"] == "call_blocking": if node["state"] == "call_blocking":
if node["request"] == "send": if node["request"] == "send":
node["state"]="pending" node["state"]="pending"
interface, data, datasize, dst=node.rargs interface, data, datasize, dst,cancel_on_turn_off=node.rargs
if dst != None: if dst != None:
if not (dst >=0 and dst <=len(self.nodes)): if not (dst >=0 and dst <=len(self.nodes)):
self.log("Invalid dst used in send() or sendt(), node "+str(dst)+" not found", node=node.node_id) self.log("Invalid dst used in send() or sendt(), node "+str(dst)+" not found", node=node.node_id)
exit(1) exit(1)
self.communicate(interface, node.node_id, dst, data, datasize) self.communicate(interface, node.node_id, dst, data, datasize,cancel_on_turn_off)
elif node["request"] == "receive": elif node["request"] == "receive":
interface=node.rargs interface=node.rargs
if node["interfaces_queue_size"][interface] > 0: if node["interfaces_queue_size"][interface] > 0:
@ -319,7 +323,7 @@ class Simulator:
node.rqueue.put(("wait_end",0)) node.rqueue.put(("wait_end",0))
self.wait_end_nodes.append(node.node_id) self.wait_end_nodes.append(node.node_id)
def communicate(self, interface, src, dst, data, datasize): def communicate(self, interface, src, dst, data, datasize, cancel_on_turn_off=True):
""" """
Create communication event between src and dst Create communication event between src and dst
""" """
@ -330,7 +334,7 @@ class Simulator:
self.update_sharing(dst,1,interface) # Update sharing first self.update_sharing(dst,1,interface) # Update sharing first
# Note that in the following we send more data than expected to handle bandwidth sharing (datasize*8*sharing): # Note that in the following we send more data than expected to handle bandwidth sharing (datasize*8*sharing):
duration=datasize*8/(self.netmat[interface]["bandwidth"][src,dst]/self.sharing[interface][dst])+self.netmat[interface]["latency"][src,dst] duration=datasize*8/(self.netmat[interface]["bandwidth"][src,dst]/self.sharing[interface][dst])+self.netmat[interface]["latency"][src,dst]
self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time)) self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,cancel_on_turn_off))
else: else:
nsrc["state"]="call_blocking" # Try later when node is on nsrc["state"]="call_blocking" # Try later when node is on
else: else:
@ -341,11 +345,11 @@ class Simulator:
if src == dst: if src == dst:
# This event (where src == dst) is used to notify the sender when data is received! # This event (where src == dst) is used to notify the sender when data is received!
# Correspond to the diagonal of the network matrices (bandwidth and latency) # Correspond to the diagonal of the network matrices (bandwidth and latency)
self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time)) self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,cancel_on_turn_off))
elif not self.interferences: elif not self.interferences:
self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time)) self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,cancel_on_turn_off))
elif not self.handle_interferences(src,dst, interface): elif not self.handle_interferences(src,dst, interface):
self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time)) self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,cancel_on_turn_off))
def list_receivers(self,node,interface): def list_receivers(self,node,interface):
""" """
@ -412,27 +416,14 @@ class Simulator:
event=self.events[0,2] event=self.events[0,2]
self.events=np.delete(self.events,0,0) # Consume events NOW! not at the end of the loop (event list may change in between) self.events=np.delete(self.events,0,0) # Consume events NOW! not at the end of the loop (event list may change in between)
if event_type == 0: if event_type == 0:
src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at=event src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,cancel_on_turn_off=event
src=self.nodes[int(src_id)] src=self.nodes[int(src_id)]
dst=self.nodes[int(dst_id)] dst=self.nodes[int(dst_id)]
if self.netmat[interface]["is_wired"]: if self.netmat[interface]["is_wired"]:
dst["interfaces"][interface].put((data,start_at,self.time)) if perform_delivery:
dst["interfaces_queue_size"][interface]+=1
self.update_sharing(dst.node_id,-1,interface)
self.log("Receive "+str(datasize)+" bytes on "+interface,node=int(dst_id))
# If node is receiving makes it consume (this way if there is a timeout, it will be removed!)
if dst["state"] == "call_blocking" and dst["request"] == "receive":
dst["interfaces_queue_size"][interface]-=1
dst["state"]="running"
dst.rqueue.put(("receive",0))
self.sync_node_non_blocking(dst,timeout_remove_only=True)
src["state"]="running"
src.rqueue.put(("send",0))
self.sync_node_non_blocking(src,timeout_remove_only=True)
else:
if src.node_id != dst.node_id:
dst["interfaces"][interface].put((data,start_at,self.time)) dst["interfaces"][interface].put((data,start_at,self.time))
dst["interfaces_queue_size"][interface]+=1 dst["interfaces_queue_size"][interface]+=1
self.update_sharing(dst.node_id,-1,interface)
self.log("Receive "+str(datasize)+" bytes on "+interface,node=int(dst_id)) self.log("Receive "+str(datasize)+" bytes on "+interface,node=int(dst_id))
# If node is receiving makes it consume (this way if there is a timeout, it will be removed!) # If node is receiving makes it consume (this way if there is a timeout, it will be removed!)
if dst["state"] == "call_blocking" and dst["request"] == "receive": if dst["state"] == "call_blocking" and dst["request"] == "receive":
@ -440,6 +431,21 @@ class Simulator:
dst["state"]="running" dst["state"]="running"
dst.rqueue.put(("receive",0)) dst.rqueue.put(("receive",0))
self.sync_node_non_blocking(dst,timeout_remove_only=True) self.sync_node_non_blocking(dst,timeout_remove_only=True)
src["state"]="running"
src.rqueue.put(("send",0))
self.sync_node_non_blocking(src,timeout_remove_only=True)
else:
if src.node_id != dst.node_id:
if perform_delivery:
dst["interfaces"][interface].put((data,start_at,self.time))
dst["interfaces_queue_size"][interface]+=1
self.log("Receive "+str(datasize)+" bytes on "+interface,node=int(dst_id))
# If node is receiving makes it consume (this way if there is a timeout, it will be removed!)
if dst["state"] == "call_blocking" and dst["request"] == "receive":
dst["interfaces_queue_size"][interface]-=1
dst["state"]="running"
dst.rqueue.put(("receive",0))
self.sync_node_non_blocking(dst,timeout_remove_only=True)
else: else:
src["state"]="running" src["state"]="running"
src.rqueue.put(("send",0)) src.rqueue.put(("send",0))

View file

@ -7,5 +7,9 @@
[t=3.000,src=n1] Received: Hello World! [t=3.000,src=n1] Received: Hello World!
[t=3.000,src=n1] Turned off [t=3.000,src=n1] Turned off
[t=4.000,src=n1] Turned on [t=4.000,src=n1] Turned on
[t=4.000,src=n0] Send 10 bytes to n1 on eth0
[t=5.000,src=n1] Receive failed code=-1 [t=5.000,src=n1] Receive failed code=-1
[t=5.000,src=esds] Simulation ends [t=5.000,src=n1] Turned off
[t=6.000,src=n1] Turned on
[t=7.000,src=n1] Receive failed code=-1
[t=14.000,src=esds] Simulation ends

View file

@ -17,4 +17,10 @@ def execute(api):
code, data=api.receivet("eth0",1) code, data=api.receivet("eth0",1)
msg="Received: "+data if code == 0 else "Receive failed code="+str(code) msg="Received: "+data if code == 0 else "Receive failed code="+str(code)
api.log(msg) api.log(msg)
##### Ensure data is not receive turned off but communication is not cancel
api.turn_off()
api.wait(1)
api.turn_on()
code, data=api.receivet("eth0",1)
msg="Received: "+data if code == 0 else "Receive failed code="+str(code)
api.log(msg)

View file

@ -5,3 +5,4 @@ def execute(api):
api.send("eth0","Hello World!",1,1) api.send("eth0","Hello World!",1,1)
api.wait(1) # Goto 3 seconds api.wait(1) # Goto 3 seconds
api.send("eth0","Hello World!",1,1) api.send("eth0","Hello World!",1,1)
api.send("eth0","Hello World!",10,1,False) # Now communication should not be aborted even if receiver turned_off (e.g UDP)