mirror of
https://gitlab.com/manzerbredes/esds.git
synced 2025-04-07 02:26:28 +02:00
Simplify the API: Now wired transmission are never aborted if receiver turned off. It will just not receive the data
This commit is contained in:
parent
1eee1ac81a
commit
33ffa0b6e0
5 changed files with 29 additions and 60 deletions
|
@ -100,7 +100,7 @@ class Node:
|
||||||
self["state"]="call_non_blocking"
|
self["state"]="call_non_blocking"
|
||||||
self.wait_ack(["turn_on"])
|
self.wait_ack(["turn_on"])
|
||||||
|
|
||||||
def send(self, interface, data, datasize, dst,cancel_on_turn_off=True):
|
def send(self, interface, data, datasize, dst):
|
||||||
if interface not in self["interfaces"]:
|
if interface not in self["interfaces"]:
|
||||||
self.abort("send() called with an unknown interface \""+interface+"\"")
|
self.abort("send() called with an unknown interface \""+interface+"\"")
|
||||||
elif type(datasize) != int and type(datasize) != float:
|
elif type(datasize) != int and type(datasize) != float:
|
||||||
|
@ -110,14 +110,14 @@ class Node:
|
||||||
elif not self["turned_on"]:
|
elif not self["turned_on"]:
|
||||||
self.abort("send() called while node is turned off")
|
self.abort("send() called while node is turned off")
|
||||||
self.plugin_notify("send_call",(interface,data,datasize,dst))
|
self.plugin_notify("send_call",(interface,data,datasize,dst))
|
||||||
self.rargs=(interface, data, datasize, dst,cancel_on_turn_off)
|
self.rargs=(interface, data, datasize, dst)
|
||||||
self["request"]="send"
|
self["request"]="send"
|
||||||
self["state"]="call_blocking"
|
self["state"]="call_blocking"
|
||||||
ack=self.wait_ack(["send","send_cancel"])
|
ack=self.wait_ack(["send","send_cancel"])
|
||||||
self.plugin_notify("send_return",(interface,data,datasize,dst,ack[1]))
|
self.plugin_notify("send_return",(interface,data,datasize,dst,ack[1]))
|
||||||
return ack[1]
|
return ack[1]
|
||||||
|
|
||||||
def sendt(self, interface, data, datasize, dst, timeout,cancel_on_turn_off=True):
|
def sendt(self, interface, data, datasize, dst, timeout):
|
||||||
if interface not in self["interfaces"]:
|
if interface not in self["interfaces"]:
|
||||||
self.abort("sendt() called with an unknown interface \""+interface+"\"")
|
self.abort("sendt() called with an unknown interface \""+interface+"\"")
|
||||||
elif type(datasize) != int and type(datasize) != float:
|
elif type(datasize) != int and type(datasize) != float:
|
||||||
|
@ -134,7 +134,7 @@ class Node:
|
||||||
self["request"]="timeout_add"
|
self["request"]="timeout_add"
|
||||||
self["state"]="call_non_blocking"
|
self["state"]="call_non_blocking"
|
||||||
self.wait_ack(["timeout_add"])
|
self.wait_ack(["timeout_add"])
|
||||||
self.rargs=(interface, data, datasize, dst,cancel_on_turn_off)
|
self.rargs=(interface, data, datasize, dst)
|
||||||
self["request"]="send"
|
self["request"]="send"
|
||||||
self["state"]="call_blocking"
|
self["state"]="call_blocking"
|
||||||
ack=self.wait_ack(["send","timeout","send_cancel"])
|
ack=self.wait_ack(["send","timeout","send_cancel"])
|
||||||
|
|
|
@ -7,7 +7,7 @@ class Simulator:
|
||||||
Flow-Level Discrete Event Simulator for Cyber-Physical Systems
|
Flow-Level Discrete Event Simulator for Cyber-Physical Systems
|
||||||
The general format for an event is (type,timestamp,event,priority)
|
The general format for an event is (type,timestamp,event,priority)
|
||||||
Event types:
|
Event types:
|
||||||
- 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining,start_timestamp, perform_delivery,cancel_on_turn_off), 2)
|
- 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining,start_timestamp, perform_delivery), 2)
|
||||||
- 1 timeout (1,timestamp,node_id,3)
|
- 1 timeout (1,timestamp,node_id,3)
|
||||||
- 2 breakpoint_manual (3,timestamp,0,1)
|
- 2 breakpoint_manual (3,timestamp,0,1)
|
||||||
- 3 breakpoint_auto (4,timestamp,0,1)
|
- 3 breakpoint_auto (4,timestamp,0,1)
|
||||||
|
@ -54,7 +54,7 @@ class Simulator:
|
||||||
if int(event[0]) == 0:
|
if int(event[0]) == 0:
|
||||||
cur_event=event[2]
|
cur_event=event[2]
|
||||||
ts=float(event[1])
|
ts=float(event[1])
|
||||||
src_id,dst_id,interface, data, datasize,duration, datasize_remaining,start_at,perform_delivery,cancel_on_turn_off=cur_event
|
src_id,dst_id,interface, data, datasize,duration, datasize_remaining,start_at,perform_delivery=cur_event
|
||||||
new_bw=netmat[interface]["bandwidth"][int(src_id),int(dst_id)]
|
new_bw=netmat[interface]["bandwidth"][int(src_id),int(dst_id)]
|
||||||
old_bw=self.netmat[interface]["bandwidth"][int(src_id),int(dst_id)]
|
old_bw=self.netmat[interface]["bandwidth"][int(src_id),int(dst_id)]
|
||||||
new_lat=netmat[interface]["latency"][int(src_id),int(dst_id)]
|
new_lat=netmat[interface]["latency"][int(src_id),int(dst_id)]
|
||||||
|
@ -192,45 +192,22 @@ class Simulator:
|
||||||
elif node["request"] == "turn_off":
|
elif node["request"] == "turn_off":
|
||||||
# Create communications selectors (True/False arrays)
|
# Create communications selectors (True/False arrays)
|
||||||
selector_wireless=list() # Select all wireless events where node is involved
|
selector_wireless=list() # Select all wireless events where node is involved
|
||||||
selector_wired=list() # Select all wired events where node is involved
|
|
||||||
for event in self.events:
|
for event in self.events:
|
||||||
if event[0]==0 and int(event[2][1])==node.node_id:
|
if event[0]==0 and int(event[2][1])==node.node_id:
|
||||||
if self.netmat[event[2][2]]["is_wired"]:
|
if self.netmat[event[2][2]]["is_wired"]:
|
||||||
selector_wireless.append(False)
|
selector_wireless.append(False)
|
||||||
if event[2][9]: # Check if should be cancel on turn_off
|
event[2][8]=False # So set delivery to False for wired communication!!
|
||||||
selector_wired.append(True)
|
|
||||||
else:
|
|
||||||
selector_wired.append(False)
|
|
||||||
event[2][8]=False # So set delivery to False!!
|
|
||||||
else:
|
else:
|
||||||
selector_wireless.append(True)
|
selector_wireless.append(True)
|
||||||
selector_wired.append(False)
|
|
||||||
else:
|
else:
|
||||||
selector_wireless.append(False)
|
selector_wireless.append(False)
|
||||||
selector_wired.append(False)
|
# Remove the relevent wireless communications from the event list
|
||||||
# Build the set of senders to notify (only in wired connections,
|
|
||||||
# indeed IRL, in wireless communications sender would send all its data)
|
|
||||||
senders_to_notify=set()
|
|
||||||
for event in self.events[selector_wired]:
|
|
||||||
senders_to_notify.add(int(event[2][0]))
|
|
||||||
# Remove communications from the event list
|
|
||||||
if(len(self.events) != 0):
|
if(len(self.events) != 0):
|
||||||
self.events=self.events[~(np.array(selector_wireless)|np.array(selector_wired))]
|
self.events=self.events[~np.array(selector_wireless)]
|
||||||
# Refresh wired sharing
|
|
||||||
for interface in self.sharing.keys():
|
|
||||||
self.sharing[interface][node.node_id]=0 # Sharing goes back to zero
|
|
||||||
# Update node state after turning off
|
# Update node state after turning off
|
||||||
node["state"]="running"
|
node["state"]="running"
|
||||||
node.rqueue.put(("turn_off",0))
|
node.rqueue.put(("turn_off",0))
|
||||||
self.log("Turned off",node=node.node_id)
|
self.log("Turned off",node=node.node_id)
|
||||||
# Informed senders of wired events that communication ended
|
|
||||||
for sender_id in senders_to_notify:
|
|
||||||
sender_node=self.nodes[sender_id]
|
|
||||||
sender_node["state"]="running"
|
|
||||||
sender_node.rqueue.put(("send_cancel",2))
|
|
||||||
# The node should resume at current self.time. So, sync the sender now:
|
|
||||||
self.sync_node_non_blocking(sender_node)
|
|
||||||
self.sync_node_blocking(sender_node)
|
|
||||||
elif node["request"] == "send_cancel":
|
elif node["request"] == "send_cancel":
|
||||||
selector=list()
|
selector=list()
|
||||||
for event in self.events:
|
for event in self.events:
|
||||||
|
@ -303,12 +280,12 @@ class Simulator:
|
||||||
if node["state"] == "call_blocking":
|
if node["state"] == "call_blocking":
|
||||||
if node["request"] == "send":
|
if node["request"] == "send":
|
||||||
node["state"]="pending"
|
node["state"]="pending"
|
||||||
interface, data, datasize, dst,cancel_on_turn_off=node.rargs
|
interface, data, datasize, dst=node.rargs
|
||||||
if dst != None:
|
if dst != None:
|
||||||
if not (dst >=0 and dst <=len(self.nodes)):
|
if not (dst >=0 and dst <=len(self.nodes)):
|
||||||
self.log("Invalid dst used in send() or sendt(), node "+str(dst)+" not found", node=node.node_id)
|
self.log("Invalid dst used in send() or sendt(), node "+str(dst)+" not found", node=node.node_id)
|
||||||
exit(1)
|
exit(1)
|
||||||
self.communicate(interface, node.node_id, dst, data, datasize,cancel_on_turn_off)
|
self.communicate(interface, node.node_id, dst, data, datasize)
|
||||||
elif node["request"] == "receive":
|
elif node["request"] == "receive":
|
||||||
interface=node.rargs
|
interface=node.rargs
|
||||||
if node["interfaces_queue_size"][interface] > 0:
|
if node["interfaces_queue_size"][interface] > 0:
|
||||||
|
@ -323,7 +300,7 @@ class Simulator:
|
||||||
node.rqueue.put(("wait_end",0))
|
node.rqueue.put(("wait_end",0))
|
||||||
self.wait_end_nodes.append(node.node_id)
|
self.wait_end_nodes.append(node.node_id)
|
||||||
|
|
||||||
def communicate(self, interface, src, dst, data, datasize, cancel_on_turn_off=True):
|
def communicate(self, interface, src, dst, data, datasize):
|
||||||
"""
|
"""
|
||||||
Create communication event between src and dst
|
Create communication event between src and dst
|
||||||
"""
|
"""
|
||||||
|
@ -334,7 +311,7 @@ class Simulator:
|
||||||
self.update_sharing(dst,1,interface) # Update sharing first
|
self.update_sharing(dst,1,interface) # Update sharing first
|
||||||
# Note that in the following we send more data than expected to handle bandwidth sharing (datasize*8*sharing):
|
# Note that in the following we send more data than expected to handle bandwidth sharing (datasize*8*sharing):
|
||||||
duration=datasize*8/(self.netmat[interface]["bandwidth"][src,dst]/self.sharing[interface][dst])+self.netmat[interface]["latency"][src,dst]
|
duration=datasize*8/(self.netmat[interface]["bandwidth"][src,dst]/self.sharing[interface][dst])+self.netmat[interface]["latency"][src,dst]
|
||||||
self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,cancel_on_turn_off))
|
self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True))
|
||||||
else:
|
else:
|
||||||
nsrc["state"]="call_blocking" # Try later when node is on
|
nsrc["state"]="call_blocking" # Try later when node is on
|
||||||
else:
|
else:
|
||||||
|
@ -345,11 +322,11 @@ class Simulator:
|
||||||
if src == dst:
|
if src == dst:
|
||||||
# This event (where src == dst) is used to notify the sender when data is received!
|
# This event (where src == dst) is used to notify the sender when data is received!
|
||||||
# Correspond to the diagonal of the network matrices (bandwidth and latency)
|
# Correspond to the diagonal of the network matrices (bandwidth and latency)
|
||||||
self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,cancel_on_turn_off))
|
self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True))
|
||||||
elif not self.interferences:
|
elif not self.interferences:
|
||||||
self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,cancel_on_turn_off))
|
self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True))
|
||||||
elif not self.handle_interferences(src,dst, interface):
|
elif not self.handle_interferences(src,dst, interface):
|
||||||
self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,cancel_on_turn_off))
|
self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True))
|
||||||
|
|
||||||
def list_receivers(self,node,interface):
|
def list_receivers(self,node,interface):
|
||||||
"""
|
"""
|
||||||
|
@ -416,7 +393,7 @@ class Simulator:
|
||||||
event=self.events[0,2]
|
event=self.events[0,2]
|
||||||
self.events=np.delete(self.events,0,0) # Consume events NOW! not at the end of the loop (event list may change in between)
|
self.events=np.delete(self.events,0,0) # Consume events NOW! not at the end of the loop (event list may change in between)
|
||||||
if event_type == 0:
|
if event_type == 0:
|
||||||
src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,cancel_on_turn_off=event
|
src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery=event
|
||||||
src=self.nodes[int(src_id)]
|
src=self.nodes[int(src_id)]
|
||||||
dst=self.nodes[int(dst_id)]
|
dst=self.nodes[int(dst_id)]
|
||||||
if self.netmat[interface]["is_wired"]:
|
if self.netmat[interface]["is_wired"]:
|
||||||
|
@ -431,9 +408,9 @@ class Simulator:
|
||||||
dst["state"]="running"
|
dst["state"]="running"
|
||||||
dst.rqueue.put(("receive",0))
|
dst.rqueue.put(("receive",0))
|
||||||
self.sync_node_non_blocking(dst,timeout_remove_only=True)
|
self.sync_node_non_blocking(dst,timeout_remove_only=True)
|
||||||
src["state"]="running"
|
src["state"]="running"
|
||||||
src.rqueue.put(("send",0))
|
src.rqueue.put(("send",0))
|
||||||
self.sync_node_non_blocking(src,timeout_remove_only=True)
|
self.sync_node_non_blocking(src,timeout_remove_only=True)
|
||||||
else:
|
else:
|
||||||
if src.node_id != dst.node_id:
|
if src.node_id != dst.node_id:
|
||||||
if perform_delivery:
|
if perform_delivery:
|
||||||
|
|
|
@ -3,13 +3,10 @@
|
||||||
[t=1.000,src=n1] Received: Hello World!
|
[t=1.000,src=n1] Received: Hello World!
|
||||||
[t=1.000,src=n0] Send 1 bytes to n1 on eth0
|
[t=1.000,src=n0] Send 1 bytes to n1 on eth0
|
||||||
[t=2.000,src=n1] Receive 1 bytes on eth0
|
[t=2.000,src=n1] Receive 1 bytes on eth0
|
||||||
[t=3.000,src=n0] Send 1 bytes to n1 on eth0
|
[t=3.000,src=n0] Send 15 bytes to n1 on eth0
|
||||||
[t=3.000,src=n1] Received: Hello World!
|
[t=3.000,src=n1] Received: Hello World!
|
||||||
[t=3.000,src=n1] Turned off
|
[t=3.000,src=n1] Turned off
|
||||||
[t=4.000,src=n1] Turned on
|
[t=4.000,src=n1] Turned on
|
||||||
[t=4.000,src=n0] Send 10 bytes to n1 on eth0
|
|
||||||
[t=5.000,src=n1] Receive failed code=-1
|
[t=5.000,src=n1] Receive failed code=-1
|
||||||
[t=5.000,src=n1] Turned off
|
[t=18.000,src=n0] End
|
||||||
[t=6.000,src=n1] Turned on
|
[t=18.000,src=esds] Simulation ends
|
||||||
[t=7.000,src=n1] Receive failed code=-1
|
|
||||||
[t=14.000,src=esds] Simulation ends
|
|
||||||
|
|
|
@ -10,17 +10,12 @@ def execute(api):
|
||||||
code, data=api.receive("eth0")
|
code, data=api.receive("eth0")
|
||||||
msg="Received: "+data if code == 0 else "Receive failed code="+str(code)
|
msg="Received: "+data if code == 0 else "Receive failed code="+str(code)
|
||||||
api.log(msg)
|
api.log(msg)
|
||||||
##### Ensure data is not receive when turned off
|
##### Ensure data is not receive when turned off but communication must still be ongoing
|
||||||
api.turn_off()
|
|
||||||
api.wait(1)
|
|
||||||
api.turn_on()
|
|
||||||
code, data=api.receivet("eth0",1)
|
|
||||||
msg="Received: "+data if code == 0 else "Receive failed code="+str(code)
|
|
||||||
api.log(msg)
|
|
||||||
##### Ensure data is not receive turned off but communication is not cancel
|
|
||||||
api.turn_off()
|
api.turn_off()
|
||||||
api.wait(1)
|
api.wait(1)
|
||||||
api.turn_on()
|
api.turn_on()
|
||||||
code, data=api.receivet("eth0",1)
|
code, data=api.receivet("eth0",1)
|
||||||
msg="Received: "+data if code == 0 else "Receive failed code="+str(code)
|
msg="Received: "+data if code == 0 else "Receive failed code="+str(code)
|
||||||
api.log(msg)
|
api.log(msg)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,6 @@
|
||||||
def execute(api):
|
def execute(api):
|
||||||
api.send("eth0","Hello World!",1,1)
|
api.send("eth0","Hello World!",1,1)
|
||||||
api.send("eth0","Hello World!",1,1)
|
api.send("eth0","Hello World!",1,1)
|
||||||
api.wait(1) # Goto 3 seconds
|
api.wait(1) # Goto t=3s
|
||||||
api.send("eth0","Hello World!",1,1)
|
api.send("eth0","Hello World!",15,1) # Communication should not be aborted even if receiver turned_off (e.g UDP)
|
||||||
api.send("eth0","Hello World!",10,1,False) # Now communication should not be aborted even if receiver turned_off (e.g UDP)
|
api.log("End") # Should be printed at t=18s
|
Loading…
Add table
Reference in a new issue