diff --git a/esds/node.py b/esds/node.py index 6a2566d..59b6f96 100644 --- a/esds/node.py +++ b/esds/node.py @@ -13,7 +13,7 @@ class Node: self.rargs=None # Store the requests arguments self.plugins=list() # Contains all registered node plugins self.rqueue=queue.Queue() # Receive simulator acknowledgments - self.chest={"state":"running", "turned_on":True, "request": None, "interfaces":dict(), "interfaces_queue_size":dict()} + self.chest={"state":"running", "turned_on":True, "request": None, "interfaces":dict(), "interfaces_queue_size":dict(), "plugin_notify": dict()} for interface in interfaces: self.chest["interfaces"][interface]=queue.Queue() self.chest["interfaces_queue_size"][interface]=0 @@ -22,6 +22,11 @@ class Node: def plugin_register(self,plugin): self.plugins.append(plugin) + def plugin_handle_requests(self): + # Take plugins notification into account + for key in list(self["plugin_notify"]): + self.plugin_notify(key,self["plugin_notify"].pop(key)) + def plugin_notify(self,reason,args): """ This function strives to avoid using Python specific features @@ -33,6 +38,8 @@ class Node: p.on_send_call(args[0],args[1],args[2],args[3]) if reason == "send_return": p.on_send_return(args[0],args[1],args[2],args[3],args[4]) + if reason == "on_communication_end": + p.on_communication_end(args[0],args[1],args[2],args[3],args[4]) if reason == "terminated": p.on_terminated() diff --git a/esds/plugins/node_plugin.py b/esds/plugins/node_plugin.py index 325ff8a..dc9302f 100644 --- a/esds/plugins/node_plugin.py +++ b/esds/plugins/node_plugin.py @@ -24,6 +24,9 @@ class NodePlugin: def on_terminated(self): pass + def on_communication_end(self,interface,data,start_at,end_at,aborted_at): + self.log("hello world") + def log(self,msg): self.api.log(self.plugin_name+"(NP) "+msg) diff --git a/esds/simulator.py b/esds/simulator.py index fee9331..57dda4e 100644 --- a/esds/simulator.py +++ b/esds/simulator.py @@ -123,7 +123,10 @@ class Simulator: """ sorted_indexes=np.lexsort((self.events[:,3],self.events[:,1])) self.events=self.events[sorted_indexes] - + + def notify_node_plugin(self,node,function,args): + self.nodes[node]["plugin_notify"][function]=args + def sync_node_non_blocking(self,node, timeout_remove_only=False): """ Process all call request and wait for Node.sync() to return @@ -196,11 +199,14 @@ class Simulator: selector_wireless.append(False) if event[2][9]: # Check if should be cancel on turn_off (receiver_required) selector_wired.append(True) + self.notify_node_plugin(event[2][1],"on_communication_end",("eth0",0,0,0,0)) else: selector_wired.append(False) event[2][8]=False # So set delivery to False!! + # TODO: notify receiver plugins else: selector_wireless.append(True) + # TODO: notify receiver plugins selector_wired.append(False) else: selector_wireless.append(False) @@ -234,6 +240,7 @@ class Simulator: selector.append(True) if self.netmat[event[2][2]]["is_wired"]: sharing_to_update.append((int(event[2][1]),event[2][2])) + # TODO: notify sender plugins else: selector.append(False) self.events=self.events[~np.array(selector)] @@ -427,11 +434,13 @@ class Simulator: dst["state"]="running" dst.rqueue.put(("receive",0)) self.sync_node_non_blocking(dst,timeout_remove_only=True) + # TODO: notify receiver plugins self.update_sharing(dst.node_id,-1,interface) src["state"]="running" code=0 if perform_delivery else 1 src.rqueue.put(("send",code)) self.sync_node_non_blocking(src,timeout_remove_only=True) + # TODO: notify sender plugins else: if src.node_id != dst.node_id: if perform_delivery: @@ -444,10 +453,12 @@ class Simulator: dst["state"]="running" dst.rqueue.put(("receive",0)) self.sync_node_non_blocking(dst,timeout_remove_only=True) + # TODO: notify receiver plugins else: src["state"]="running" src.rqueue.put(("send",0)) self.sync_node_non_blocking(src,timeout_remove_only=True) + # TODO: notify sender plugins elif event_type == 1: # Timeout node=self.nodes[int(event)] node["state"]="running" diff --git a/manual/.gitignore b/manual/.gitignore new file mode 100644 index 0000000..056aa14 --- /dev/null +++ b/manual/.gitignore @@ -0,0 +1,3 @@ +_minted-manual +*.tex +*.bbl \ No newline at end of file diff --git a/manual/components.pdf b/manual/components.pdf new file mode 100644 index 0000000..49afc21 Binary files /dev/null and b/manual/components.pdf differ diff --git a/manual/manual.org b/manual/manual.org new file mode 100644 index 0000000..4ad104c --- /dev/null +++ b/manual/manual.org @@ -0,0 +1,204 @@ +#+TITLE: ESDS: Extensible Simulator for Distributed Systems +#+AUTHOR: Loic GUEGAN +#+OPTIONS: toc:nil + +#+LATEX_HEADER: \usepackage{fullpage} +#+LATEX_HEADER: \usepackage{minted} +#+LATEX_HEADER: \usepackage{booktabs} +#+LATEX_HEADER: \usepackage{xspace} +#+LATEX_HEADER: \newcommand{\stateoff}{"\textit{off}"\xspace} +#+LATEX_HEADER: \newcommand{\stateon}{"\textit{on}"\xspace} + + +* Simulation Architecture +The ESDS simulator comprises two major components: 1) The Simulation Orchestrator(SO) 2) The Simulated +Nodes (SN). This architecture is depicted in Figure \ref{architecture}. + +\begin{figure}[!h] +\centering +\includegraphics[scale=0.5]{components.pdf} +\caption{Architecture of ESDS} +\label{architecture} +\end{figure} + +The SO is the main process in charge of implementing the simulation main loop. It instantiates the +network (e.g bandwidths andlatencies), collects and processes the events (e.g communications,turn +on/off). The nodes on the other hand are threads that simulate the nodes behaviors. + +* Example +To run a simulation, you need to provide at least 2 files. The first one instantiate the +orchestrator and the second one will simulate the node. In this section, you will learn how to write +both files. + +The simulated scenario comprises 2 nodes that wakes up randomly every hour for a duration called +"uptime". The sender try to transmit his data during that uptime. The other node is a receiver that +have similar random wake up parterns and strive to receive data from the sender. +** Orchestrator +#+attr_latex: :options fontsize=\small, breaklines +#+BEGIN_SRC python + #!/usr/bin/env python + + import esds # Load ESDS + import numpy as np # Use numpy to construct bandwidth and latencies matrix + + ##### Bandwidth matrix + # Bandwidth value can be 0 for unreachable nodes + # Regarding wireless interfaces the diagonals of the bandwidth and latency matrices are very important. + # They determine the duration of the tranmission for THE SENDER. It allows to have a different tx + # duration per node and per interface. Please cf esds.py for more informations. + n=2 # Number of nodes including the sender + B=np.full((n,n),5) # 5Mbps + + ##### Latency matrix + # If the latency entries match one with a bandwidth of 0 + # then it will be ignore since node is unreachable. + L=np.full((n,n),0) # 0s + + ##### Create the simulator + # esds.Simulator take at least a dictionnary as a parameter + # This dictionnary contains all the network interfaces (name as a key) of each node + s=esds.Simulator({"wlan0":{"bandwidth":B, "latency":L, "is_wired":False},"eth0":{"bandwidth":B, "latency":L, "is_wired":True}}) + + ##### Instantiate nodes + uptime=180 # 180s uptime + s.create_node("sender",args=uptime) # Load sender.py for the first node with 5 as argument (first row in B and L) + + # Aguments can be passed to nodes via: s.create_node("sender",args="my argument") + for n in range(0,n-1): # Load receiver.py for the remaining nodes + s.create_node("receiver",args=uptime) + + ##### Run the simulation + s.run() +#+END_SRC + +** Nodes +To implement a node, you should create a python file with the method execute(api). This method will be +called by the orchestrator to execute the code of your node. The api parameter provide you access to the following esds API: + + +\begin{table*}[] + \centering + \caption{Simulated Nodes blocking and non-blocking API calls} + \label{tab:api} + \small + \resizebox{\columnwidth}{!}{% + \begin{tabular}{@{}lll@{}} +\toprule +\textbf{Call} & \textbf{Blocking} & \textbf{Description} \\ \midrule +\verb!send(,,,,)! & yes & Send \verb!! of size \verb!! on interface \verb!! \\ +\verb!sendt(,,,,,)! & yes & Send \verb!! of size \verb!! on interface \verb!! with a timeout of \verb!! \\ +\verb!receive()! & yes & Wait for and fetch incoming data on interface \verb!! \\ +\verb!receivet(,)! & yes & Wait for and fetch incoming data on interface \verb!! with a timeout of \verb!! \\ +\verb!wait()! & yes & Wait for a specific amount of simulated time \verb!! \\ +\verb!wait_end()! & yes & Wait until the end of the simulation \\ +\verb!log()! & no & Report \verb!! to the SO that will print it to the standard output \\ +\verb!read()! & no & Read in the SO registers (e.g \textit{clock} to get the current simulated time) \\ +\verb!turn_off()/turn_on()! & no & Change the node state to \stateoff or \stateon respectively +\end{tabular}} +\end{table*} + +*** Sender +#+attr_latex: :options fontsize=\small, breaklines +#+BEGIN_SRC python + #!/usr/bin/env python + + import random + + # Note that the following is required to have different instance from thread to thread + lr=random.Random(6) + + def execute(api): + uptime=api.args + endoff=0 + for i in range(0,24): + startoff=random.randint(0,3600-uptime) + api.turn_off() + api.wait(startoff+endoff) + api.turn_on() + wakeat=api.read("clock") + wakeuntil=wakeat+uptime + # Send until uptime seconds if elapsed + while api.read("clock") < wakeuntil: + api.sendt("wlan0","hello",10,None, wakeuntil-api.read("clock")) + api.log("Was up for {}s".format(api.read("clock")-wakeat)) + endoff=3600*(i+1)-api.read("clock") + api.turn_off() + api.wait(endoff) + api.turn_on() + + + + +#+END_SRC +*** Receiver +#+attr_latex: :options fontsize=\small, breaklines +#+BEGIN_SRC python + #!/usr/bin/env python + + import sys, random, time + + lr=random.Random(6) + + def execute(api): + uptime=api.args + endoff=0 + for i in range(0,24): + startoff=random.randint(0,3600-uptime) + api.turn_off() + api.wait(startoff+endoff) + api.turn_on() + wakeat=api.read("clock") + wakeuntil=wakeat+uptime + # Receive until uptime seconds if elapsed + while api.read("clock") < wakeuntil: + code, data=api.receivet("wlan0",wakeuntil-api.read("clock")) + if code == 0: + api.log("Receive "+data) + api.log("Was up for {}s".format(api.read("clock")-wakeat)) + endoff=3600*(i+1)-api.read("clock") + api.turn_off() + api.wait(endoff) + api.turn_on() + + +#+END_SRC + +** Simulation Output +Here is part of the simulation output: +#+begin_example + [t=82626.000,src=n0] Send 10 bytes on wlan0 + [t=82630.000,src=n0] Was up for 180.0s + [t=82630.000,src=n0] Turned off + [t=83083.000,src=n1] Turned on + [t=83263.000,src=n1] Was up for 180.0s + [t=83263.000,src=n1] Turned off + [t=85910.000,src=n0] Turned on + [t=85910.000,src=n0] Send 10 bytes on wlan0 + [t=85926.000,src=n0] Send 10 bytes on wlan0 + [t=85942.000,src=n0] Send 10 bytes on wlan0 + [t=85958.000,src=n0] Send 10 bytes on wlan0 + [t=85974.000,src=n0] Send 10 bytes on wlan0 + [t=85990.000,src=n0] Send 10 bytes on wlan0 + [t=86006.000,src=n0] Send 10 bytes on wlan0 + [t=86022.000,src=n0] Send 10 bytes on wlan0 + [t=86038.000,src=n0] Send 10 bytes on wlan0 + [t=86054.000,src=n0] Send 10 bytes on wlan0 + [t=86070.000,src=n0] Send 10 bytes on wlan0 + [t=86086.000,src=n0] Send 10 bytes on wlan0 + [t=86090.000,src=n0] Was up for 180.0s + [t=86090.000,src=n0] Turned off + [t=86400.000,src=n0] Turned on + [t=86400.000,src=n1] Turned on + [t=86400.000,src=esds] Simulation ends +#+end_example +Brackets indicate additional informations related to the message (e.g source and simulated +time). All the send and received events are reported automatically by esds. + +# Local Variables: +# eval: (setq org-latex-listings 'minted) +# eval: (setq org-latex-pdf-process +# '("pdflatex -shell-escape -interaction nonstopmode -output-directory %o %f" +# "bibtex %b" +# "pdflatex -shell-escape -interaction nonstopmode -output-directory %o %f" +# "pdflatex -shell-escape -interaction nonstopmode -output-directory %o %f")) +# End: diff --git a/manual/manual.pdf b/manual/manual.pdf new file mode 100644 index 0000000..037f976 Binary files /dev/null and b/manual/manual.pdf differ