loosely-coupled-dss/src/simulator.cc

178 lines
No EOL
6.2 KiB
C++

#include <simgrid/s4u.hpp>
#include <simgrid/s4u/Mailbox.hpp>
#include <simgrid/s4u/Host.hpp>
#include <simgrid/plugins/energy.h>
#include <xbt/log.h>
#include <string>
#include <sstream>
#include "Inputs.hpp"
#define PLATFORM_FILE "platform.xml"
#define MODE_OFF() simgrid::s4u::this_actor::get_host()->set_pstate(0);
#define MODE_ON() simgrid::s4u::this_actor::get_host()->set_pstate(1);
#define MODE_RX() simgrid::s4u::this_actor::get_host()->set_pstate(2);
#define MODE_TX() simgrid::s4u::this_actor::get_host()->set_pstate(3);
/// @brief Required by SimGrid
XBT_LOG_NEW_DEFAULT_CATEGORY(simulator, "[DAO]");
/// @brief For convenience sake
typedef unsigned int u32;
/**
* Data that will be exchange between the nodes
*/
class Payload{
public:
Payload():hint(0),duration(0),containsHint(false){}
double hint;
double duration;
bool containsHint;
std::string node;
};
/// @brief Observation node code
static void obs_node(std::vector<std::string> args);
/**
* No arguments are require (cf inputs.json)
*/
int main(int argc, char **argv) {
// Build engine
sg_host_energy_plugin_init();
simgrid::s4u::Engine engine(&argc, argv);
Inputs::GeneratePlatform(PLATFORM_FILE);
engine.load_platform(PLATFORM_FILE);
// Headline
XBT_INFO("-------------------------------------------------");
XBT_INFO("Sarting loosely coupled data dissemination experiments");
XBT_INFO("-------------------------------------------------");
// Init all nodes actors
u32 nON=simgrid::s4u::Engine::get_instance()->get_host_count();
for(u32 i=0;i<nON;i++){
std::vector<std::string> args;
std::ostringstream ss;
ss<< "on" <<i;
simgrid::s4u::Actor::create("ON", simgrid::s4u::Host::by_name(ss.str()), obs_node, args);
}
// Launch the simulation
engine.run();
XBT_INFO("Simulation took %fs", simgrid::s4u::Engine::get_clock());
XBT_INFO("The simulated platform file is available in \"%s\"",PLATFORM_FILE);
return (0);
}
/**
* This is the brain behind each node
*/
static void obs_node(std::vector<std::string> args) {
// Init various variables
std::string selfName = simgrid::s4u::this_actor::get_host()->get_name();
simgrid::s4u::this_actor::get_host()->turn_on();
Inputs i(selfName);
simgrid::s4u::Mailbox *m = simgrid::s4u::Mailbox::by_name("medium");
XBT_INFO("Deploying observation node %s",selfName.c_str());
// Init convenient variables
bool isSender=i.is_sender;
bool useHint=i.use_hint;
bool isObserver=false;
u32 data_size=i.data_size;
// Starting node
u32 nWakeUp=0;
u32 nDataRcv=0;
u32 nSendFail=0;
u32 nRcvFail=0;
while(i.ShouldContinue()){
XBT_INFO("%s is spleeping",selfName.c_str());
MODE_OFF();
simgrid::s4u::this_actor::sleep_until(i.GetTS());
MODE_ON();
XBT_INFO("%s wakes up",selfName.c_str());
// Doing wake up stuff
try {
if(isSender){ // If I am a sender
Payload *p=new Payload();
p->node=selfName;
if(useHint&&i.HasNext()){
p->containsHint=true;
p->hint=i.GetNextTS();
p->duration=i.GetNextDuration();
}
if(i.extended){
// We use a trick here
// First we send an instantaneous message (size=0) with the usual timeout
// to check whether there is a receiver!
// If there is one, we are sure that a put in the "medium"+selfName
// will not lead to a deadlock (cf anchor:5623) and we are using a exclusive
// channel (to avoid other receivers to get the message)
m->put(p,0,i.GetDuration());
simgrid::s4u::Mailbox *m_ext= simgrid::s4u::Mailbox::by_name("medium"+selfName);
MODE_TX();
m_ext->put(p,data_size);
}
else{
MODE_TX();
m->put(p,data_size,i.GetDuration());
}
XBT_INFO("%s sent data successfully",selfName.c_str());
}
else if (!isObserver){
Payload* p;
if(i.extended){
// anchor:5623 We can see here that
// we first receive the instantaneous message
// and then we use a mailbox specific to the sender (to have an exclusive channel)
p=m->get<Payload>(i.GetDuration());
simgrid::s4u::Mailbox *m_ext_sender = simgrid::s4u::Mailbox::by_name("medium"+p->node);
MODE_RX();
p=m_ext_sender->get<Payload>();
}
else{
MODE_RX();
p=m->get<Payload>(i.GetDuration());
}
nDataRcv++; // New data received
if(p->containsHint){
XBT_INFO("%s received and hint of %f",selfName.c_str(),p->hint);
i.AddEvent(p->hint, p->duration); // Schedule a new wake up time
}
else{
XBT_INFO("%s received data successfully and switch to forwarding mode",selfName.c_str());
}
isObserver=true; // Now we received the data we switch to observer
}
else {
XBT_INFO("%s is observing his environment...",selfName.c_str());
simgrid::s4u::this_actor::sleep_until(i.GetDuration());
}
}
catch (...)
{
if(isSender){
XBT_INFO("%s could not send any data",selfName.c_str());
nSendFail++;
}
else{
XBT_INFO("%s could not receive any data",selfName.c_str());
nRcvFail++;
}
}
// Load next event
i.GotoNextEvent();
nWakeUp++; // Increase the number of wake up
}
// Done
MODE_OFF()
XBT_INFO("Observation node %s finished [LOG2PARSE](node:%s|nWakeUp:%d|nDataRcv:%d|nSendFail:%d|nRcvFail:%d)",selfName.c_str(),selfName.c_str(),nWakeUp,nDataRcv,nSendFail,nRcvFail);
}