mirror of
https://gitlab.com/manzerbredes/esds.git
synced 2025-04-07 02:26:28 +02:00
Add manual and improve node plugins system
This commit is contained in:
parent
5384940ca7
commit
c63ae59a81
7 changed files with 230 additions and 2 deletions
|
@ -13,7 +13,7 @@ class Node:
|
||||||
self.rargs=None # Store the requests arguments
|
self.rargs=None # Store the requests arguments
|
||||||
self.plugins=list() # Contains all registered node plugins
|
self.plugins=list() # Contains all registered node plugins
|
||||||
self.rqueue=queue.Queue() # Receive simulator acknowledgments
|
self.rqueue=queue.Queue() # Receive simulator acknowledgments
|
||||||
self.chest={"state":"running", "turned_on":True, "request": None, "interfaces":dict(), "interfaces_queue_size":dict()}
|
self.chest={"state":"running", "turned_on":True, "request": None, "interfaces":dict(), "interfaces_queue_size":dict(), "plugin_notify": dict()}
|
||||||
for interface in interfaces:
|
for interface in interfaces:
|
||||||
self.chest["interfaces"][interface]=queue.Queue()
|
self.chest["interfaces"][interface]=queue.Queue()
|
||||||
self.chest["interfaces_queue_size"][interface]=0
|
self.chest["interfaces_queue_size"][interface]=0
|
||||||
|
@ -22,6 +22,11 @@ class Node:
|
||||||
def plugin_register(self,plugin):
|
def plugin_register(self,plugin):
|
||||||
self.plugins.append(plugin)
|
self.plugins.append(plugin)
|
||||||
|
|
||||||
|
def plugin_handle_requests(self):
|
||||||
|
# Take plugins notification into account
|
||||||
|
for key in list(self["plugin_notify"]):
|
||||||
|
self.plugin_notify(key,self["plugin_notify"].pop(key))
|
||||||
|
|
||||||
def plugin_notify(self,reason,args):
|
def plugin_notify(self,reason,args):
|
||||||
"""
|
"""
|
||||||
This function strives to avoid using Python specific features
|
This function strives to avoid using Python specific features
|
||||||
|
@ -33,6 +38,8 @@ class Node:
|
||||||
p.on_send_call(args[0],args[1],args[2],args[3])
|
p.on_send_call(args[0],args[1],args[2],args[3])
|
||||||
if reason == "send_return":
|
if reason == "send_return":
|
||||||
p.on_send_return(args[0],args[1],args[2],args[3],args[4])
|
p.on_send_return(args[0],args[1],args[2],args[3],args[4])
|
||||||
|
if reason == "on_communication_end":
|
||||||
|
p.on_communication_end(args[0],args[1],args[2],args[3],args[4])
|
||||||
if reason == "terminated":
|
if reason == "terminated":
|
||||||
p.on_terminated()
|
p.on_terminated()
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,9 @@ class NodePlugin:
|
||||||
def on_terminated(self):
|
def on_terminated(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def on_communication_end(self,interface,data,start_at,end_at,aborted_at):
|
||||||
|
self.log("hello world")
|
||||||
|
|
||||||
def log(self,msg):
|
def log(self,msg):
|
||||||
self.api.log(self.plugin_name+"(NP) "+msg)
|
self.api.log(self.plugin_name+"(NP) "+msg)
|
||||||
|
|
||||||
|
|
|
@ -123,7 +123,10 @@ class Simulator:
|
||||||
"""
|
"""
|
||||||
sorted_indexes=np.lexsort((self.events[:,3],self.events[:,1]))
|
sorted_indexes=np.lexsort((self.events[:,3],self.events[:,1]))
|
||||||
self.events=self.events[sorted_indexes]
|
self.events=self.events[sorted_indexes]
|
||||||
|
|
||||||
|
def notify_node_plugin(self,node,function,args):
|
||||||
|
self.nodes[node]["plugin_notify"][function]=args
|
||||||
|
|
||||||
def sync_node_non_blocking(self,node, timeout_remove_only=False):
|
def sync_node_non_blocking(self,node, timeout_remove_only=False):
|
||||||
"""
|
"""
|
||||||
Process all call request and wait for Node.sync() to return
|
Process all call request and wait for Node.sync() to return
|
||||||
|
@ -196,11 +199,14 @@ class Simulator:
|
||||||
selector_wireless.append(False)
|
selector_wireless.append(False)
|
||||||
if event[2][9]: # Check if should be cancel on turn_off (receiver_required)
|
if event[2][9]: # Check if should be cancel on turn_off (receiver_required)
|
||||||
selector_wired.append(True)
|
selector_wired.append(True)
|
||||||
|
self.notify_node_plugin(event[2][1],"on_communication_end",("eth0",0,0,0,0))
|
||||||
else:
|
else:
|
||||||
selector_wired.append(False)
|
selector_wired.append(False)
|
||||||
event[2][8]=False # So set delivery to False!!
|
event[2][8]=False # So set delivery to False!!
|
||||||
|
# TODO: notify receiver plugins
|
||||||
else:
|
else:
|
||||||
selector_wireless.append(True)
|
selector_wireless.append(True)
|
||||||
|
# TODO: notify receiver plugins
|
||||||
selector_wired.append(False)
|
selector_wired.append(False)
|
||||||
else:
|
else:
|
||||||
selector_wireless.append(False)
|
selector_wireless.append(False)
|
||||||
|
@ -234,6 +240,7 @@ class Simulator:
|
||||||
selector.append(True)
|
selector.append(True)
|
||||||
if self.netmat[event[2][2]]["is_wired"]:
|
if self.netmat[event[2][2]]["is_wired"]:
|
||||||
sharing_to_update.append((int(event[2][1]),event[2][2]))
|
sharing_to_update.append((int(event[2][1]),event[2][2]))
|
||||||
|
# TODO: notify sender plugins
|
||||||
else:
|
else:
|
||||||
selector.append(False)
|
selector.append(False)
|
||||||
self.events=self.events[~np.array(selector)]
|
self.events=self.events[~np.array(selector)]
|
||||||
|
@ -427,11 +434,13 @@ class Simulator:
|
||||||
dst["state"]="running"
|
dst["state"]="running"
|
||||||
dst.rqueue.put(("receive",0))
|
dst.rqueue.put(("receive",0))
|
||||||
self.sync_node_non_blocking(dst,timeout_remove_only=True)
|
self.sync_node_non_blocking(dst,timeout_remove_only=True)
|
||||||
|
# TODO: notify receiver plugins
|
||||||
self.update_sharing(dst.node_id,-1,interface)
|
self.update_sharing(dst.node_id,-1,interface)
|
||||||
src["state"]="running"
|
src["state"]="running"
|
||||||
code=0 if perform_delivery else 1
|
code=0 if perform_delivery else 1
|
||||||
src.rqueue.put(("send",code))
|
src.rqueue.put(("send",code))
|
||||||
self.sync_node_non_blocking(src,timeout_remove_only=True)
|
self.sync_node_non_blocking(src,timeout_remove_only=True)
|
||||||
|
# TODO: notify sender plugins
|
||||||
else:
|
else:
|
||||||
if src.node_id != dst.node_id:
|
if src.node_id != dst.node_id:
|
||||||
if perform_delivery:
|
if perform_delivery:
|
||||||
|
@ -444,10 +453,12 @@ class Simulator:
|
||||||
dst["state"]="running"
|
dst["state"]="running"
|
||||||
dst.rqueue.put(("receive",0))
|
dst.rqueue.put(("receive",0))
|
||||||
self.sync_node_non_blocking(dst,timeout_remove_only=True)
|
self.sync_node_non_blocking(dst,timeout_remove_only=True)
|
||||||
|
# TODO: notify receiver plugins
|
||||||
else:
|
else:
|
||||||
src["state"]="running"
|
src["state"]="running"
|
||||||
src.rqueue.put(("send",0))
|
src.rqueue.put(("send",0))
|
||||||
self.sync_node_non_blocking(src,timeout_remove_only=True)
|
self.sync_node_non_blocking(src,timeout_remove_only=True)
|
||||||
|
# TODO: notify sender plugins
|
||||||
elif event_type == 1: # Timeout
|
elif event_type == 1: # Timeout
|
||||||
node=self.nodes[int(event)]
|
node=self.nodes[int(event)]
|
||||||
node["state"]="running"
|
node["state"]="running"
|
||||||
|
|
3
manual/.gitignore
vendored
Normal file
3
manual/.gitignore
vendored
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
_minted-manual
|
||||||
|
*.tex
|
||||||
|
*.bbl
|
BIN
manual/components.pdf
Normal file
BIN
manual/components.pdf
Normal file
Binary file not shown.
204
manual/manual.org
Normal file
204
manual/manual.org
Normal file
|
@ -0,0 +1,204 @@
|
||||||
|
#+TITLE: ESDS: Extensible Simulator for Distributed Systems
|
||||||
|
#+AUTHOR: Loic GUEGAN
|
||||||
|
#+OPTIONS: toc:nil
|
||||||
|
|
||||||
|
#+LATEX_HEADER: \usepackage{fullpage}
|
||||||
|
#+LATEX_HEADER: \usepackage{minted}
|
||||||
|
#+LATEX_HEADER: \usepackage{booktabs}
|
||||||
|
#+LATEX_HEADER: \usepackage{xspace}
|
||||||
|
#+LATEX_HEADER: \newcommand{\stateoff}{"\textit{off}"\xspace}
|
||||||
|
#+LATEX_HEADER: \newcommand{\stateon}{"\textit{on}"\xspace}
|
||||||
|
|
||||||
|
|
||||||
|
* Simulation Architecture
|
||||||
|
The ESDS simulator comprises two major components: 1) The Simulation Orchestrator(SO) 2) The Simulated
|
||||||
|
Nodes (SN). This architecture is depicted in Figure \ref{architecture}.
|
||||||
|
|
||||||
|
\begin{figure}[!h]
|
||||||
|
\centering
|
||||||
|
\includegraphics[scale=0.5]{components.pdf}
|
||||||
|
\caption{Architecture of ESDS}
|
||||||
|
\label{architecture}
|
||||||
|
\end{figure}
|
||||||
|
|
||||||
|
The SO is the main process in charge of implementing the simulation main loop. It instantiates the
|
||||||
|
network (e.g bandwidths andlatencies), collects and processes the events (e.g communications,turn
|
||||||
|
on/off). The nodes on the other hand are threads that simulate the nodes behaviors.
|
||||||
|
|
||||||
|
* Example
|
||||||
|
To run a simulation, you need to provide at least 2 files. The first one instantiate the
|
||||||
|
orchestrator and the second one will simulate the node. In this section, you will learn how to write
|
||||||
|
both files.
|
||||||
|
|
||||||
|
The simulated scenario comprises 2 nodes that wakes up randomly every hour for a duration called
|
||||||
|
"uptime". The sender try to transmit his data during that uptime. The other node is a receiver that
|
||||||
|
have similar random wake up parterns and strive to receive data from the sender.
|
||||||
|
** Orchestrator
|
||||||
|
#+attr_latex: :options fontsize=\small, breaklines
|
||||||
|
#+BEGIN_SRC python
|
||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
import esds # Load ESDS
|
||||||
|
import numpy as np # Use numpy to construct bandwidth and latencies matrix
|
||||||
|
|
||||||
|
##### Bandwidth matrix
|
||||||
|
# Bandwidth value can be 0 for unreachable nodes
|
||||||
|
# Regarding wireless interfaces the diagonals of the bandwidth and latency matrices are very important.
|
||||||
|
# They determine the duration of the tranmission for THE SENDER. It allows to have a different tx
|
||||||
|
# duration per node and per interface. Please cf esds.py for more informations.
|
||||||
|
n=2 # Number of nodes including the sender
|
||||||
|
B=np.full((n,n),5) # 5Mbps
|
||||||
|
|
||||||
|
##### Latency matrix
|
||||||
|
# If the latency entries match one with a bandwidth of 0
|
||||||
|
# then it will be ignore since node is unreachable.
|
||||||
|
L=np.full((n,n),0) # 0s
|
||||||
|
|
||||||
|
##### Create the simulator
|
||||||
|
# esds.Simulator take at least a dictionnary as a parameter
|
||||||
|
# This dictionnary contains all the network interfaces (name as a key) of each node
|
||||||
|
s=esds.Simulator({"wlan0":{"bandwidth":B, "latency":L, "is_wired":False},"eth0":{"bandwidth":B, "latency":L, "is_wired":True}})
|
||||||
|
|
||||||
|
##### Instantiate nodes
|
||||||
|
uptime=180 # 180s uptime
|
||||||
|
s.create_node("sender",args=uptime) # Load sender.py for the first node with 5 as argument (first row in B and L)
|
||||||
|
|
||||||
|
# Aguments can be passed to nodes via: s.create_node("sender",args="my argument")
|
||||||
|
for n in range(0,n-1): # Load receiver.py for the remaining nodes
|
||||||
|
s.create_node("receiver",args=uptime)
|
||||||
|
|
||||||
|
##### Run the simulation
|
||||||
|
s.run()
|
||||||
|
#+END_SRC
|
||||||
|
|
||||||
|
** Nodes
|
||||||
|
To implement a node, you should create a python file with the method execute(api). This method will be
|
||||||
|
called by the orchestrator to execute the code of your node. The api parameter provide you access to the following esds API:
|
||||||
|
|
||||||
|
|
||||||
|
\begin{table*}[]
|
||||||
|
\centering
|
||||||
|
\caption{Simulated Nodes blocking and non-blocking API calls}
|
||||||
|
\label{tab:api}
|
||||||
|
\small
|
||||||
|
\resizebox{\columnwidth}{!}{%
|
||||||
|
\begin{tabular}{@{}lll@{}}
|
||||||
|
\toprule
|
||||||
|
\textbf{Call} & \textbf{Blocking} & \textbf{Description} \\ \midrule
|
||||||
|
\verb!send(<int>,<data>,<size>,<dst>,<rdst>)! & yes & Send \verb!<data>! of size \verb!<size>! on interface \verb!<int>! \\
|
||||||
|
\verb!sendt(<int>,<data>,<size>,<dst>,<t>,<rdst>)! & yes & Send \verb!<data>! of size \verb!<size>! on interface \verb!<int>! with a timeout of \verb!<t>! \\
|
||||||
|
\verb!receive(<int>)! & yes & Wait for and fetch incoming data on interface \verb!<int>! \\
|
||||||
|
\verb!receivet(<int>,<t>)! & yes & Wait for and fetch incoming data on interface \verb!<int>! with a timeout of \verb!<t>! \\
|
||||||
|
\verb!wait(<t>)! & yes & Wait for a specific amount of simulated time \verb!<t>! \\
|
||||||
|
\verb!wait_end()! & yes & Wait until the end of the simulation \\
|
||||||
|
\verb!log(<message>)! & no & Report \verb!<message>! to the SO that will print it to the standard output \\
|
||||||
|
\verb!read(<register>)! & no & Read in the SO registers (e.g \textit{clock} to get the current simulated time) \\
|
||||||
|
\verb!turn_off()/turn_on()! & no & Change the node state to \stateoff or \stateon respectively
|
||||||
|
\end{tabular}}
|
||||||
|
\end{table*}
|
||||||
|
|
||||||
|
*** Sender
|
||||||
|
#+attr_latex: :options fontsize=\small, breaklines
|
||||||
|
#+BEGIN_SRC python
|
||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
import random
|
||||||
|
|
||||||
|
# Note that the following is required to have different instance from thread to thread
|
||||||
|
lr=random.Random(6)
|
||||||
|
|
||||||
|
def execute(api):
|
||||||
|
uptime=api.args
|
||||||
|
endoff=0
|
||||||
|
for i in range(0,24):
|
||||||
|
startoff=random.randint(0,3600-uptime)
|
||||||
|
api.turn_off()
|
||||||
|
api.wait(startoff+endoff)
|
||||||
|
api.turn_on()
|
||||||
|
wakeat=api.read("clock")
|
||||||
|
wakeuntil=wakeat+uptime
|
||||||
|
# Send until uptime seconds if elapsed
|
||||||
|
while api.read("clock") < wakeuntil:
|
||||||
|
api.sendt("wlan0","hello",10,None, wakeuntil-api.read("clock"))
|
||||||
|
api.log("Was up for {}s".format(api.read("clock")-wakeat))
|
||||||
|
endoff=3600*(i+1)-api.read("clock")
|
||||||
|
api.turn_off()
|
||||||
|
api.wait(endoff)
|
||||||
|
api.turn_on()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#+END_SRC
|
||||||
|
*** Receiver
|
||||||
|
#+attr_latex: :options fontsize=\small, breaklines
|
||||||
|
#+BEGIN_SRC python
|
||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
import sys, random, time
|
||||||
|
|
||||||
|
lr=random.Random(6)
|
||||||
|
|
||||||
|
def execute(api):
|
||||||
|
uptime=api.args
|
||||||
|
endoff=0
|
||||||
|
for i in range(0,24):
|
||||||
|
startoff=random.randint(0,3600-uptime)
|
||||||
|
api.turn_off()
|
||||||
|
api.wait(startoff+endoff)
|
||||||
|
api.turn_on()
|
||||||
|
wakeat=api.read("clock")
|
||||||
|
wakeuntil=wakeat+uptime
|
||||||
|
# Receive until uptime seconds if elapsed
|
||||||
|
while api.read("clock") < wakeuntil:
|
||||||
|
code, data=api.receivet("wlan0",wakeuntil-api.read("clock"))
|
||||||
|
if code == 0:
|
||||||
|
api.log("Receive "+data)
|
||||||
|
api.log("Was up for {}s".format(api.read("clock")-wakeat))
|
||||||
|
endoff=3600*(i+1)-api.read("clock")
|
||||||
|
api.turn_off()
|
||||||
|
api.wait(endoff)
|
||||||
|
api.turn_on()
|
||||||
|
|
||||||
|
|
||||||
|
#+END_SRC
|
||||||
|
|
||||||
|
** Simulation Output
|
||||||
|
Here is part of the simulation output:
|
||||||
|
#+begin_example
|
||||||
|
[t=82626.000,src=n0] Send 10 bytes on wlan0
|
||||||
|
[t=82630.000,src=n0] Was up for 180.0s
|
||||||
|
[t=82630.000,src=n0] Turned off
|
||||||
|
[t=83083.000,src=n1] Turned on
|
||||||
|
[t=83263.000,src=n1] Was up for 180.0s
|
||||||
|
[t=83263.000,src=n1] Turned off
|
||||||
|
[t=85910.000,src=n0] Turned on
|
||||||
|
[t=85910.000,src=n0] Send 10 bytes on wlan0
|
||||||
|
[t=85926.000,src=n0] Send 10 bytes on wlan0
|
||||||
|
[t=85942.000,src=n0] Send 10 bytes on wlan0
|
||||||
|
[t=85958.000,src=n0] Send 10 bytes on wlan0
|
||||||
|
[t=85974.000,src=n0] Send 10 bytes on wlan0
|
||||||
|
[t=85990.000,src=n0] Send 10 bytes on wlan0
|
||||||
|
[t=86006.000,src=n0] Send 10 bytes on wlan0
|
||||||
|
[t=86022.000,src=n0] Send 10 bytes on wlan0
|
||||||
|
[t=86038.000,src=n0] Send 10 bytes on wlan0
|
||||||
|
[t=86054.000,src=n0] Send 10 bytes on wlan0
|
||||||
|
[t=86070.000,src=n0] Send 10 bytes on wlan0
|
||||||
|
[t=86086.000,src=n0] Send 10 bytes on wlan0
|
||||||
|
[t=86090.000,src=n0] Was up for 180.0s
|
||||||
|
[t=86090.000,src=n0] Turned off
|
||||||
|
[t=86400.000,src=n0] Turned on
|
||||||
|
[t=86400.000,src=n1] Turned on
|
||||||
|
[t=86400.000,src=esds] Simulation ends
|
||||||
|
#+end_example
|
||||||
|
Brackets indicate additional informations related to the message (e.g source and simulated
|
||||||
|
time). All the send and received events are reported automatically by esds.
|
||||||
|
|
||||||
|
# Local Variables:
|
||||||
|
# eval: (setq org-latex-listings 'minted)
|
||||||
|
# eval: (setq org-latex-pdf-process
|
||||||
|
# '("pdflatex -shell-escape -interaction nonstopmode -output-directory %o %f"
|
||||||
|
# "bibtex %b"
|
||||||
|
# "pdflatex -shell-escape -interaction nonstopmode -output-directory %o %f"
|
||||||
|
# "pdflatex -shell-escape -interaction nonstopmode -output-directory %o %f"))
|
||||||
|
# End:
|
BIN
manual/manual.pdf
Normal file
BIN
manual/manual.pdf
Normal file
Binary file not shown.
Loading…
Add table
Reference in a new issue