loosely-coupled-dss-extended/src/simulator.cc
2021-06-30 15:41:03 +02:00

358 lines
No EOL
16 KiB
C++

#include <simgrid/s4u.hpp>
#include <simgrid/s4u/Mailbox.hpp>
#include <simgrid/s4u/Host.hpp>
#include <simgrid/plugins/energy.h>
#include <xbt/log.h>
#include <string>
#include <sstream>
#include "Inputs.hpp"
#include "simgrid/s4u/Actor.hpp"
#define PLATFORM_FILE "platform.xml"
#define MODE_OFF() simgrid::s4u::this_actor::get_host()->set_pstate(0);
#define MODE_ON() simgrid::s4u::this_actor::get_host()->set_pstate(1);
#define MODE_RX() simgrid::s4u::this_actor::get_host()->set_pstate(2);
#define MODE_TX() simgrid::s4u::this_actor::get_host()->set_pstate(3);
#define CLOCK (simgrid::s4u::Engine::get_clock())
#define CNAME (selfName.c_str())
#define FOR(t) (t<uptime?t:uptime)
#define ADD_EVENT(HINT) \
{ \
XBT_INFO("%s add a new hint at %f for a duration of %f",CNAME,HINT->hint,HINT->duration); \
i.AddEvent(HINT->hint, HINT->duration); \
}
#define TRACK_UPTIME(instruction) \
{ \
instruction; \
uptime=upuntil-CLOCK; \
uptime=uptime > 0 ? uptime : 0; \
}
/// @brief Note that we need to simulate latency our self since we need to send instantaneous messages
#define SEND(instruction) \
{ \
TRACK_UPTIME(simgrid::s4u::this_actor::sleep_for(i.latency > uptime ? uptime : i.latency)); \
instruction; \
}
#define FORWARD_HINT(TRY_FORWARD_DURING) \
{ \
if(hint_forward!=NULL && CLOCK<hint_forward->hint){ \
hint_forward->HisForward=true; \
hint_forward->DedicatedMailbox="hint_forward"; \
try { \
XBT_INFO("%s try to forward a hint",CNAME); \
TRACK_UPTIME(m->put(hint_forward,0,TRY_FORWARD_DURING)); \
simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(hint_forward->DedicatedMailbox); \
MODE_TX(); \
SEND(m_ded->put(hint_forward,0,uptime)); \
XBT_INFO("%s forward a hint successfully",CNAME); \
} \
catch(...){ \
XBT_INFO("%s fail to forward a hint",CNAME); \
MODE_ON(); \
uptime=upuntil-CLOCK; \
TRACK_UPTIME(simgrid::s4u::this_actor::sleep_for(FOR(TRY_FORWARD_DURING))); \
} \
} \
}
/// @brief Required by SimGrid
XBT_LOG_NEW_DEFAULT_CATEGORY(simulator, "[DAO] Loosely Coupled DSS");
/// @brief For convenience sake
typedef unsigned int u32;
/**
* Data that will be exchange between the nodes
*/
class Payload{
public:
Payload():hint(0),duration(0),HasHint(false),HisForward(false),HasData(false),DataSize(0),Abort(false){}
Payload(Payload &p):hint(p.hint),duration(p.duration),HasHint(p.HasHint),DedicatedMailbox(p.DedicatedMailbox),HisForward(p.HisForward),HasData(p.HasData),DataSize(p.DataSize),Abort(p.Abort){}
double hint; // The timestamp that should be used by the receiver
double duration; // The duration that should be used by the receiver
bool HasHint;
bool HasData; // This way observer could check if they want to receive data (maybe they already received data)
bool HisForward;
bool Abort; // Allow the receiver to abort a communication (if they already received the data for example) and unlock the sender
u32 DataSize;
std::string DedicatedMailbox; // Dedicated mailbox used by the sender/receiver
};
/// @brief Observation node code
static void obs_node(std::vector<std::string> args);
/**
* No arguments are require (cf inputs.json)
*/
int main(int argc, char **argv) {
// Build engine
sg_host_energy_plugin_init();
simgrid::s4u::Engine engine(&argc, argv);
Inputs::GeneratePlatform(PLATFORM_FILE);
engine.load_platform(PLATFORM_FILE);
// Headline
XBT_INFO("-------------------------------------------------");
XBT_INFO("Sarting loosely coupled data dissemination experiments");
XBT_INFO("-------------------------------------------------");
// Init all nodes actors
u32 nON=simgrid::s4u::Engine::get_instance()->get_host_count();
for(u32 i=0;i<nON;i++){
std::vector<std::string> args; // No args
std::ostringstream ss;
ss<< "on" <<i;
simgrid::s4u::Actor::create("ON", simgrid::s4u::Host::by_name(ss.str()), obs_node, args);
}
// Launch the simulation
engine.run();
XBT_INFO("Simulation took %fs", simgrid::s4u::Engine::get_clock());
XBT_INFO("The simulated platform file is available in \"%s\"",PLATFORM_FILE);
return (0);
}
/**
* This is the brain behind each node
*/
static void obs_node(std::vector<std::string> args) {
// Init various variables
std::string selfName = simgrid::s4u::this_actor::get_host()->get_name();
simgrid::s4u::this_actor::get_host()->turn_on();
Inputs i=(selfName); // Load node input parameters from the json file
simgrid::s4u::Mailbox *m = simgrid::s4u::Mailbox::by_name("medium");
XBT_INFO("Deploying observation node %s",CNAME);
// Starting node
u32 nWakeUp=0;
u32 nDataRcv=0;
u32 nSendFail=0;
u32 nRcvFail=0;
u32 nSend=0;
u32 hint_added=0;
double totalUptime=0;
Payload *hint_forward=NULL; // Contains the hint to forward
bool is_sender=i.is_sender; // This variable might change if all receiver have received the data
bool isObserver=false;
double timeDataRcv=-1;
while(i.ShouldContinue()){
// Start by sleeping
XBT_INFO("%s is sleeping",CNAME);
MODE_OFF();
simgrid::s4u::this_actor::sleep_until(i.GetTS());
MODE_ON();
XBT_INFO("%s wakes up",CNAME);
// Doing wake up stuff
double uptime=i.GetDuration(); // Store the remaining wake up duration (updated during the node uptime)
double upsince=CLOCK; // Store the time at which the node woke up
double upuntil=i.GetTS()+i.GetDuration(); // Store the time at which the node should sleep
bool forward_mode=false; // Turned on and off every x seconds by the receiver (to switch between forward hint mode and receiving data mode)
bool forward_only=false; // When observer receive a hint it switch to forward only up to the next wake up time
bool sendhint_mode=false; // Turned on and off every x seconds by the sender (to switch between send hint and send data)
while(CLOCK < upuntil)
{
// ---------- SENDER ----------
if(is_sender){
// Send hint if send hint mode is enable
if(i.use_hint && sendhint_mode && i.HasNext()){
Payload *p=new Payload();
p->DedicatedMailbox="hintmailbox"+selfName; // Use a dedicated mailbox
p->HasHint=true;
p->duration=i.GetNextDuration();
p->hint=i.GetNextTS();
p->DataSize=i.hint_size;
try {
TRACK_UPTIME(m->put(p,0,FOR(0.3))); // Init connection with a receiver
simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
MODE_TX();
SEND(m_ded->put(p,p->DataSize,uptime)); // Send the actual hint
MODE_ON();
XBT_INFO("%s sent a hint successfully",CNAME);
}
catch(...){}
}
// Send data if send hint mode is disable
else{
Payload *p=new Payload();
p->DedicatedMailbox="datamailbox"+selfName;
p->HasData=true;
p->HasHint=false;
p->DataSize=i.data_size;
// Add hint to the data if possible
if(i.use_hint && i.HasNext()){
p->HasHint=true;
p->duration=i.GetHintDuration(CLOCK);
p->hint=i.GetHintTS(CLOCK);
p->DataSize+=i.hint_size; // Don't forget!!
}
// Send the data
try {
TRACK_UPTIME(m->put(p,0,FOR(1)));
simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
Payload *ack=m_ded->get<Payload>();
if(!ack->Abort){
MODE_TX();
if(i.extended){
SEND(m_ded->put(p,p->DataSize));
}
else{
SEND(m_ded->put(p,p->DataSize,uptime));
}
XBT_INFO("%s sent data successfully",CNAME);
nSend++;
is_sender=(nSend<(i.n_nodes-1)); // Stop sending if all nodes received
isObserver=!is_sender; // Switch to observer mode if all nodes received the data
}
else {
simgrid::s4u::this_actor::sleep_for(FOR(1));
}
}
catch(...){}
}
sendhint_mode=!sendhint_mode; // Switch back and forth between sending hint and data
MODE_ON();
}
// ---------- RECEIVER ----------
else if(!isObserver){
// Forward hint mode
if(forward_mode){
if(i.use_hint && hint_forward!=NULL && CLOCK < hint_forward->hint){
try {
FORWARD_HINT(FOR(0.3)); // Try forward for 0.3 seconds then switch to received mode
}
catch(...){}
}
}
else { // Receiving mode
Payload *p; // Received data
try {
// Get the instantaneous message
do {
TRACK_UPTIME(p=m->get<Payload>(FOR(1)));
if(p->HisForward){
if(hint_forward==NULL || (hint_forward !=NULL && p->hint>hint_forward->hint)){
simgrid::s4u::Mailbox *m_ded=simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
MODE_RX();
TRACK_UPTIME(p=m_ded->get<Payload>(uptime));
MODE_ON();
XBT_INFO("%s received a forwarded hint successfully",CNAME);
if(CLOCK < p->hint){
ADD_EVENT(p);
hint_forward=new Payload(*p);
hint_added++;
}
}
}
} while(p->HisForward);
simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
// Start receiving data
MODE_RX();
if(p->HasHint && !p->HasData){
TRACK_UPTIME(p=m_ded->get<Payload>(uptime));
XBT_INFO("%s received a hint successfully",CNAME);
hint_forward=new Payload(*p); // Enable hint forwarding
if(CLOCK < p->hint){
ADD_EVENT(p);
hint_forward=new Payload(*p);
hint_added++;
}
}
else {
// Inform the sender that we do not want to abort
Payload *ack=new Payload();
ack->Abort=false;
m_ded->put(ack,0); // Instantaneous msg
if(i.extended){
p=m_ded->get<Payload>(); // Fetch data until sended
}
else{
TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); // Fetch data until sended or uptime expire
}
// If we reach here, data has been received successfully
XBT_INFO("%s received data successfully",CNAME);
timeDataRcv=CLOCK;
if(p->HasHint){
XBT_INFO("%s received a hint along with data successfully",CNAME);
hint_forward=new Payload(*p); // Enable hint forwarding
}
if(i.shutdown_on_rcv)
upuntil=CLOCK;
nDataRcv++;
isObserver=true;
is_sender=false;
}
}catch(...){
XBT_INFO("%s could not receive any data",CNAME);
nRcvFail++;
}
}
forward_mode=!forward_mode; // Toggle mode (go back and forth between receiving and forwarding)
}
// ---------- OBSERVER ----------
else {
XBT_INFO("%s is observing his environment...",CNAME);
MODE_ON();
// If use hint we should listen for the sender
if(i.use_hint){
if((forward_mode|forward_only) && hint_forward!=NULL && CLOCK < hint_forward->hint){
FORWARD_HINT(FOR(1));
}
else {
Payload *p;
try {
do {
TRACK_UPTIME(p=m->get<Payload>(FOR(1)));
} while(p->HisForward); // Ignore forwarded hint
simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
// Start receiving hint from sender
if(p->HasData){
Payload *ack=new Payload();
ack->Abort=true;
m_ded->put(ack,0);
simgrid::s4u::this_actor::sleep_for(FOR(1));
}
else if(p->HasHint){
MODE_RX();
TRACK_UPTIME(p=m_ded->get<Payload>(uptime));
XBT_INFO("%s received a hint successfully",CNAME);
hint_forward=new Payload(*p); // Enable hint forwarding
forward_only=true;
}
else {
simgrid::s4u::this_actor::sleep_for(FOR(1));
}
}
catch(...){
}
}
forward_mode=!forward_mode;
}
else {
simgrid::s4u::this_actor::sleep_until(upuntil);
}
}
uptime=upuntil-CLOCK; // Note that uptime can be < 0 in extended mode
uptime=uptime > 0 ? uptime : 0; // Just in case
}
// Check if we use unschedule_on_rcv
if(i.unschedule_on_rcv)
i.ResetEvents(CLOCK);
// Load next event
i.GotoNextEvent();
nWakeUp++; // Increase the number of wake up
totalUptime+=CLOCK-upsince; // Synchronize total uptime
}
// Done
MODE_OFF()
XBT_INFO("Observation node %s finished [LOG2PARSE](node:%s|isSender:%d|nSend:%d|nWakeUp:%d|nDataRcv:%d|nSendFail:%d|nRcvFail:%d|totalUptime:%f|seed:%d|hint_added:%d|timeDataRcv:%f|shutdown_on_rcv:%d|unschedule_on_rcv:%d|farhint:%d|hintdist:%f)",CNAME,CNAME,i.is_sender,nSend,nWakeUp,nDataRcv,nSendFail,nRcvFail,totalUptime,i.seed,hint_added,timeDataRcv,i.shutdown_on_rcv,i.unschedule_on_rcv,i.farhint,i.hintdist);
}