Debug communication abortion

This commit is contained in:
Loic Guegan 2022-09-14 15:55:20 +02:00
parent e4b742cff3
commit 6e7582b8c0
2 changed files with 63 additions and 54 deletions

View file

@ -6,4 +6,5 @@ class RCode(Enum):
TIMEOUT_EXPIRE = 2
RECEIVER_TURNED_OFF = 3
RECEIVER_UNAVAILABLE = 4
UNKNOWN = 5

View file

@ -161,69 +161,77 @@ class Simulator:
node.rqueue.put(("turn_on",RCode.SUCCESS))
self.log("Turned on",node=node.node_id)
elif node["request"] == "turn_off":
# Create communications selectors (True/False arrays)
selector_wireless=list() # Select all wireless events where node is receiver
selector_wired=list() # Select all wired events where node is receiver
for event in self.events:
if event[0]==0 and int(event[2][1])==node.node_id:
if self.netmat[event[2][2]]["is_wired"]:
selector_wireless.append(False)
if event[2][9]: # Check if should be cancel on turn_off (receiver_required)
selector_wired.append(True)
else:
selector_wired.append(False)
event[2][8]=False # So set delivery to False!!
else:
selector_wireless.append(True)
selector_wired.append(False)
# Call the sender/receiver callbacks
self.notify_node_plugins(self.nodes[int(event[2][1])], "on_communication_end", event)
self.notify_node_plugins(self.nodes[int(event[2][0])], "on_communication_end", event)
else:
selector_wireless.append(False)
selector_wired.append(False)
# Build the set of senders to notify (only in wired connections,
# indeed IRL, in wireless communications, sender would send all its data)
senders_to_notify=set()
for event in self.events[selector_wired]:
senders_to_notify.add(int(event[2][0]))
# Remove communications from the event list
if(len(self.events) != 0):
self.events=self.events[~(np.array(selector_wireless)|np.array(selector_wired))]
# Update node state after turning off
node["state"]="running"
node.rqueue.put(("turn_off",RCode.SUCCESS))
self.log("Turned off",node=node.node_id)
# Informed senders of wired events that communication ended
for sender_id in senders_to_notify:
# Notify sender (node that wired sharing is updated in the send_cancel request)
sender_node=self.nodes[sender_id]
sender_node["state"]="running"
sender_node.rqueue.put(("send_cancel",RCode.RECEIVER_TURNED_OFF))
# The node should resume at current self.time. So, sync the sender now:
self.sync_node_non_blocking(sender_node)
self.sync_node_blocking(sender_node)
# We cancel communication after node has turned off
self.cancel_communications(node.node_id,reason=RCode.RECEIVER_TURNED_OFF)
elif node["request"] == "send_cancel":
selector=list()
sharing_to_update=list()
for event in self.events:
if event[0]==0 and int(event[2][0]) == node.node_id:
selector.append(True)
if self.netmat[event[2][2]]["is_wired"]:
sharing_to_update.append((int(event[2][1]),event[2][2]))
self.notify_node_plugins(node, "on_communication_end", event)
elif int(event[2][0]) == int(event[2][1]): # If it is the sender event of a wireless communication (when sender_id==receiver_id)
self.notify_node_plugins(node, "on_communication_end", event)
else:
selector.append(False)
self.events=self.events[~np.array(selector)]
# Now Update receiver of cancel communication sharing (since update_sharing sort event, selector would have been invalidated if done before)
for com in sharing_to_update:
self.update_sharing(com[0],-1,com[1])
self.cancel_communications(node.node_id)
node["state"]="running"
node.rqueue.put(("send_cancel",RCode.SUCCESS))
node.sync()
def cancel_communications(self, node_id, reason=RCode.UNKNOWN):
if(len(self.events) == 0):
return
# Build list of impacted events
selector=list()
for event in self.events:
if event[0]==0:
src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required=event[2]
is_wired=self.netmat[interface]["is_wired"]
is_wireless=not is_wired
if src_id == node_id:
selector.append(True)
elif dst_id == node_id:
if is_wireless:
selector.append(True)
else:
if receiver_required:
selector.append(True)
else:
selector.append(False)
event[2][8]=False # So set delivery to False!!
else:
selector.append(False)
else:
selector.append(False)
# Update sharing of wired communications and build sender to notify set
senders_to_notify=set()
for event in self.events[selector]:
src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required=event[2]
if self.netmat[interface]["is_wired"]:
# If node is sender
if src_id == node_id:
self.update_sharing(dst_id,-1,interface)
else:
self.update_sharing(node_id,-1,interface)
senders_to_notify.add(src_id) # We do not notify sender here since it may change the event list (invalidate selector)
# Notify plugins
for event in self.events[selector]:
src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required=event[2]
if self.netmat[interface]["is_wired"]:
self.notify_node_plugins(self.nodes[src_id], "on_communication_end", event)
self.notify_node_plugins(self.nodes[dst_id], "on_communication_end", event)
elif src_id == dst_id:
self.notify_node_plugins(self.nodes[src_id], "on_communication_end", event)
else:
self.notify_node_plugins(self.nodes[dst_id], "on_communication_end", event)
# Delete related events
self.events=self.events[~(np.array(selector))]
# Notify sender at the end to not corrupt the event list and invalidate selector
for sender in senders_to_notify:
# Notify sender (node that wired sharing is updated in the send_cancel request)
sender_node=self.nodes[sender]
sender_node["state"]="running"
sender_node.rqueue.put(("send_cancel",reason))
# The node should resume at current self.time. So, sync the sender now:
self.sync_node_non_blocking(sender_node)
self.sync_node_blocking(sender_node)
def update_sharing(self, dst, amount,interface):
"""
Manage bandwidth sharing on wired interfaces