From 6e7582b8c07c5ab51d31565e4efb8924cde03917 Mon Sep 17 00:00:00 2001 From: Loic Guegan Date: Wed, 14 Sep 2022 15:55:20 +0200 Subject: [PATCH] Debug communication abortion --- esds/rcode.py | 1 + esds/simulator.py | 116 +++++++++++++++++++++++++--------------------- 2 files changed, 63 insertions(+), 54 deletions(-) diff --git a/esds/rcode.py b/esds/rcode.py index f4ca775..6600177 100644 --- a/esds/rcode.py +++ b/esds/rcode.py @@ -6,4 +6,5 @@ class RCode(Enum): TIMEOUT_EXPIRE = 2 RECEIVER_TURNED_OFF = 3 RECEIVER_UNAVAILABLE = 4 + UNKNOWN = 5 diff --git a/esds/simulator.py b/esds/simulator.py index 6048cdc..90ad427 100644 --- a/esds/simulator.py +++ b/esds/simulator.py @@ -161,69 +161,77 @@ class Simulator: node.rqueue.put(("turn_on",RCode.SUCCESS)) self.log("Turned on",node=node.node_id) elif node["request"] == "turn_off": - # Create communications selectors (True/False arrays) - selector_wireless=list() # Select all wireless events where node is receiver - selector_wired=list() # Select all wired events where node is receiver - for event in self.events: - if event[0]==0 and int(event[2][1])==node.node_id: - if self.netmat[event[2][2]]["is_wired"]: - selector_wireless.append(False) - if event[2][9]: # Check if should be cancel on turn_off (receiver_required) - selector_wired.append(True) - else: - selector_wired.append(False) - event[2][8]=False # So set delivery to False!! - else: - selector_wireless.append(True) - selector_wired.append(False) - # Call the sender/receiver callbacks - self.notify_node_plugins(self.nodes[int(event[2][1])], "on_communication_end", event) - self.notify_node_plugins(self.nodes[int(event[2][0])], "on_communication_end", event) - else: - selector_wireless.append(False) - selector_wired.append(False) - # Build the set of senders to notify (only in wired connections, - # indeed IRL, in wireless communications, sender would send all its data) - senders_to_notify=set() - for event in self.events[selector_wired]: - senders_to_notify.add(int(event[2][0])) - # Remove communications from the event list - if(len(self.events) != 0): - self.events=self.events[~(np.array(selector_wireless)|np.array(selector_wired))] # Update node state after turning off node["state"]="running" node.rqueue.put(("turn_off",RCode.SUCCESS)) self.log("Turned off",node=node.node_id) - # Informed senders of wired events that communication ended - for sender_id in senders_to_notify: - # Notify sender (node that wired sharing is updated in the send_cancel request) - sender_node=self.nodes[sender_id] - sender_node["state"]="running" - sender_node.rqueue.put(("send_cancel",RCode.RECEIVER_TURNED_OFF)) - # The node should resume at current self.time. So, sync the sender now: - self.sync_node_non_blocking(sender_node) - self.sync_node_blocking(sender_node) + # We cancel communication after node has turned off + self.cancel_communications(node.node_id,reason=RCode.RECEIVER_TURNED_OFF) elif node["request"] == "send_cancel": - selector=list() - sharing_to_update=list() - for event in self.events: - if event[0]==0 and int(event[2][0]) == node.node_id: - selector.append(True) - if self.netmat[event[2][2]]["is_wired"]: - sharing_to_update.append((int(event[2][1]),event[2][2])) - self.notify_node_plugins(node, "on_communication_end", event) - elif int(event[2][0]) == int(event[2][1]): # If it is the sender event of a wireless communication (when sender_id==receiver_id) - self.notify_node_plugins(node, "on_communication_end", event) - else: - selector.append(False) - self.events=self.events[~np.array(selector)] - # Now Update receiver of cancel communication sharing (since update_sharing sort event, selector would have been invalidated if done before) - for com in sharing_to_update: - self.update_sharing(com[0],-1,com[1]) + self.cancel_communications(node.node_id) node["state"]="running" node.rqueue.put(("send_cancel",RCode.SUCCESS)) node.sync() + def cancel_communications(self, node_id, reason=RCode.UNKNOWN): + if(len(self.events) == 0): + return + # Build list of impacted events + selector=list() + for event in self.events: + if event[0]==0: + src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required=event[2] + is_wired=self.netmat[interface]["is_wired"] + is_wireless=not is_wired + if src_id == node_id: + selector.append(True) + elif dst_id == node_id: + if is_wireless: + selector.append(True) + else: + if receiver_required: + selector.append(True) + else: + selector.append(False) + event[2][8]=False # So set delivery to False!! + else: + selector.append(False) + else: + selector.append(False) + # Update sharing of wired communications and build sender to notify set + senders_to_notify=set() + for event in self.events[selector]: + src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required=event[2] + if self.netmat[interface]["is_wired"]: + # If node is sender + if src_id == node_id: + self.update_sharing(dst_id,-1,interface) + else: + self.update_sharing(node_id,-1,interface) + senders_to_notify.add(src_id) # We do not notify sender here since it may change the event list (invalidate selector) + # Notify plugins + for event in self.events[selector]: + src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required=event[2] + if self.netmat[interface]["is_wired"]: + self.notify_node_plugins(self.nodes[src_id], "on_communication_end", event) + self.notify_node_plugins(self.nodes[dst_id], "on_communication_end", event) + elif src_id == dst_id: + self.notify_node_plugins(self.nodes[src_id], "on_communication_end", event) + else: + self.notify_node_plugins(self.nodes[dst_id], "on_communication_end", event) + # Delete related events + self.events=self.events[~(np.array(selector))] + # Notify sender at the end to not corrupt the event list and invalidate selector + for sender in senders_to_notify: + # Notify sender (node that wired sharing is updated in the send_cancel request) + sender_node=self.nodes[sender] + sender_node["state"]="running" + sender_node.rqueue.put(("send_cancel",reason)) + # The node should resume at current self.time. So, sync the sender now: + self.sync_node_non_blocking(sender_node) + self.sync_node_blocking(sender_node) + + def update_sharing(self, dst, amount,interface): """ Manage bandwidth sharing on wired interfaces