#include #include #include #include #include #include #include #include "Inputs.hpp" #define PLATFORM_FILE "platform.xml" #define TURN_OFF() simgrid::s4u::this_actor::get_host()->set_pstate(0); #define TURN_ON() simgrid::s4u::this_actor::get_host()->set_pstate(1); /// @brief Required by SimGrid XBT_LOG_NEW_DEFAULT_CATEGORY(simulator, "[DAO]"); /// @brief For convenience typedef unsigned int u32; /** * Data that will be exchange between the nodes */ class Payload{ public: Payload():hint(0),duration(0),containsHint(false){} double hint; double duration; bool containsHint; }; /// @brief Observation node code static void obs_node(std::vector args); /** * No arguments are require (cf inputs.json) */ int main(int argc, char **argv) { // Build engine sg_host_energy_plugin_init(); simgrid::s4u::Engine engine(&argc, argv); Inputs::GeneratePlatform(PLATFORM_FILE); engine.load_platform(PLATFORM_FILE); // Headline XBT_INFO("-------------------------------------------------"); XBT_INFO("Sarting loosely coupled data dissemination experiments"); XBT_INFO("-------------------------------------------------"); // Init all nodes actors u32 nON=simgrid::s4u::Engine::get_instance()->get_host_count(); for(u32 i=0;i args; std::ostringstream ss; ss<< "on" < args) { // Init various variables std::string selfName = simgrid::s4u::this_actor::get_host()->get_name(); simgrid::s4u::this_actor::get_host()->turn_on(); Inputs i(selfName); simgrid::s4u::Mailbox *m = simgrid::s4u::Mailbox::by_name("medium"); XBT_INFO("Deploying observation node %s",selfName.c_str()); // Init convenient variables bool isSender=i.is_sender; bool useHint=i.use_hint; bool isObserver=false; u32 data_size=i.data_size; // Starting node u32 nWakeUp=0; u32 nDataRcv=0; while(i.ShouldContinue()){ XBT_INFO("%s is spleeping",selfName.c_str()); TURN_OFF(); simgrid::s4u::this_actor::sleep_until(i.GetTS()); TURN_ON(); XBT_INFO("%s wakes up",selfName.c_str()); // Doing wake up stuff try { if(isSender){ // If I am a sender Payload *p=new Payload(); if(useHint&&i.HasNext()){ p->containsHint=true; p->hint=i.GetNextTS(); p->duration=i.GetNextDuration(); } m->put(p,data_size,i.GetDuration()); XBT_INFO("%s send data successfully",selfName.c_str()); isObserver=true; // Do one send for now... isSender=false; } else if (!isObserver){ Payload* p=m->get(i.GetDuration()); nDataRcv++; // New data received if(p->containsHint){ XBT_INFO("%s received and hint of %f",selfName.c_str(),p->hint); i.AddEvent(p->hint, p->duration); // Schedule a new wake up time } else{ XBT_INFO("%s received data successfully and switch to forwarding mode",selfName.c_str()); isSender=!isSender; // Toggle isSender to start sending } } else { XBT_INFO("%s is observing is environment...",selfName.c_str()); simgrid::s4u::this_actor::sleep_until(i.GetDuration()); } } catch (...) { if(isSender) XBT_INFO("%s failed to send data",selfName.c_str()); else XBT_INFO("%s failed to receive data",selfName.c_str()); } // Load next event i.GotoNextEvent(); nWakeUp++; // Increase the number of wake up } // Done TURN_OFF() XBT_INFO("Observation node %s finished (nWakeUp:%d|nDataRcv:%d)",selfName.c_str(),nWakeUp,nDataRcv); }