#include <simgrid/s4u.hpp> #include <simgrid/s4u/Mailbox.hpp> #include <simgrid/s4u/Host.hpp> #include <simgrid/plugins/energy.h> #include <xbt/log.h> #include <string> #include <sstream> #include "Inputs.hpp" #define PLATFORM_FILE "platform.xml" #define MODE_OFF() simgrid::s4u::this_actor::get_host()->set_pstate(0); #define MODE_ON() simgrid::s4u::this_actor::get_host()->set_pstate(1); #define MODE_RX() simgrid::s4u::this_actor::get_host()->set_pstate(2); #define MODE_TX() simgrid::s4u::this_actor::get_host()->set_pstate(3); #define CLOCK (simgrid::s4u::Engine::get_clock()) #define CNAME (selfName.c_str()) #define TRACK_UPTIME(instruction) \ { \ double uptimeTrack=CLOCK; \ instruction; \ uptimeTrack=CLOCK-uptimeTrack; \ uptime-=uptimeTrack; \ uptime=uptime > 0 ? uptime : 0; \ } /// @brief Required by SimGrid XBT_LOG_NEW_DEFAULT_CATEGORY(simulator, "[DAO] Loosely Coupled DSS"); /// @brief For convenience sake typedef unsigned int u32; /** * Data that will be exchange between the nodes */ class Payload{ public: Payload():hint(0),duration(0),HasHint(false){} Payload(Payload &p):hint(p.hint),duration(p.duration),HasHint(p.HasHint),DedicatedMailbox(p.DedicatedMailbox){} double hint; // The timestamp that should be used by the receiver double duration; // The duration that should be used by the receiver bool HasHint; std::string DedicatedMailbox; // Dedicated mailbox used by the sender/receiver }; /// @brief Observation node code static void obs_node(std::vector<std::string> args); /** * No arguments are require (cf inputs.json) */ int main(int argc, char **argv) { // Build engine sg_host_energy_plugin_init(); simgrid::s4u::Engine engine(&argc, argv); Inputs::GeneratePlatform(PLATFORM_FILE); engine.load_platform(PLATFORM_FILE); // Headline XBT_INFO("-------------------------------------------------"); XBT_INFO("Sarting loosely coupled data dissemination experiments"); XBT_INFO("-------------------------------------------------"); // Init all nodes actors u32 nON=simgrid::s4u::Engine::get_instance()->get_host_count(); for(u32 i=0;i<nON;i++){ std::vector<std::string> args; std::ostringstream ss; ss<< "on" <<i; simgrid::s4u::Actor::create("ON", simgrid::s4u::Host::by_name(ss.str()), obs_node, args); } // Launch the simulation engine.run(); XBT_INFO("Simulation took %fs", simgrid::s4u::Engine::get_clock()); XBT_INFO("The simulated platform file is available in \"%s\"",PLATFORM_FILE); return (0); } /** * This is the brain behind each node */ static void obs_node(std::vector<std::string> args) { // Init various variables std::string selfName = simgrid::s4u::this_actor::get_host()->get_name(); simgrid::s4u::this_actor::get_host()->turn_on(); Inputs i(selfName); simgrid::s4u::Mailbox *m = simgrid::s4u::Mailbox::by_name("medium"); XBT_INFO("Deploying observation node %s",CNAME); // Starting node bool isObserver=false; u32 nWakeUp=0; u32 nDataRcv=0; u32 nSendFail=0; u32 nRcvFail=0; u32 nSend=0; double totalUptime=0; while(i.ShouldContinue()){ // Start by sleeping XBT_INFO("%s is spleeping",CNAME); MODE_OFF(); simgrid::s4u::this_actor::sleep_until(i.GetTS()); MODE_ON(); XBT_INFO("%s wakes up",CNAME); // Doing wake up stuff double uptime=i.GetDuration(); double upsince=simgrid::s4u::Engine::get_clock(); double upuntil=i.GetTS()+i.GetDuration(); while(uptime>0) { if(i.is_sender){ Payload *p=new Payload(); p->DedicatedMailbox="dedicated"+selfName; // Add hint informations to the payload if(i.use_hint && i.HasNext()){ p->HasHint=true; p->duration=i.GetNextDuration(); p->hint=i.GetNextTS(); } MODE_ON(); try { // First we send and instantaneous message // This allow first to detect if their is a receiver // (to not cause deadlock for the extended mode) and second // to inform the receiver if he should get a hint first TRACK_UPTIME(m->put(p,0,uptime)); simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); // First send hint if it is required MODE_TX(); if(p->HasHint){ TRACK_UPTIME(m_ded->put(p,i.hint_size,uptime)); XBT_INFO("%s sent a hint successfully",CNAME); } // Then try sending the data if(i.extended) m_ded->put(p,i.data_size); else m_ded->put(p,i.data_size,uptime); // If we reach here, data has been sent successfully XBT_INFO("%s sent data successfully",CNAME); nSend++; i.is_sender=(nSend<(i.n_nodes-1)); isObserver=!i.is_sender; } catch(...){ XBT_INFO("%s could not send any data",CNAME); nSendFail++; } } else if(!isObserver){ Payload *p; // Received data Payload *hint; // To Save the received hint bool hintReceived=false; // In case of error during data rx this will be use to check if we could use the *hint Payload object MODE_ON(); try { // Get the instantaneous message TRACK_UPTIME(p=m->get<Payload>(uptime)); simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); // Start receiving data MODE_RX(); if(p->HasHint){ TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); XBT_INFO("%s received a hint successfully",CNAME); hint=new Payload(*p); // Save hint hintReceived=true; } if(i.extended) p=m_ded->get<Payload>(); // Fetch data until sended else p=m_ded->get<Payload>(uptime); // Fetch data until sended or uptime expire // If we reach here, data has been received successfully XBT_INFO("%s received data successfully",CNAME); nDataRcv++; isObserver=true; i.is_sender=false; }catch(...){ XBT_INFO("%s could not receive any data",CNAME); nRcvFail++; if(hintReceived) i.AddEvent(hint->hint, hint->duration); // Add the hint to the event list } } else { XBT_INFO("%s is observing his environment...",CNAME); MODE_ON(); simgrid::s4u::this_actor::sleep_for(uptime); } uptime=upuntil-CLOCK; // Note that uptime can be < 0 in extended mode } // Load next event i.GotoNextEvent(); nWakeUp++; // Increase the number of wake up totalUptime+=CLOCK-upsince; } // Done MODE_OFF() XBT_INFO("Observation node %s finished [LOG2PARSE](node:%s|isSender:%d|nSend:%d|nWakeUp:%d|nDataRcv:%d|nSendFail:%d|nRcvFail:%d|totalUptime:%f|seed:%d)",CNAME,CNAME,i.is_sender,nSend,nWakeUp,nDataRcv,nSendFail,nRcvFail,totalUptime,i.seed); }