#include "simgrid/s4u.hpp" #include #include #include #include "inputs.hpp" #include "simgrid/s4u/Host.hpp" #include "simgrid/plugins/energy.h" #include "xbt/log.h" #define PLATFORM_FILE "platform.xml" #define TURN_OFF() simgrid::s4u::this_actor::get_host()->set_pstate(0); #define TURN_ON() simgrid::s4u::this_actor::get_host()->set_pstate(1); XBT_LOG_NEW_DEFAULT_CATEGORY(simulator, "[DAO]"); typedef unsigned int u32; class Payload{ public: Payload():hint(0),containsHint(false){} double hint; bool containsHint; }; /// @brief Observation unit code static void obs_node(std::vector args); int main(int argc, char **argv) { // Build engine sg_host_energy_plugin_init(); simgrid::s4u::Engine engine(&argc, argv); Inputs::GeneratePlatform(PLATFORM_FILE); engine.load_platform(PLATFORM_FILE); XBT_INFO("-------------------------------------------------"); XBT_INFO("Sarting loosely coupled data dissemination experiments"); XBT_INFO("-------------------------------------------------"); u32 nON=simgrid::s4u::Engine::get_instance()->get_host_count(); for(u32 i=0;i args; std::ostringstream ss; ss<< "on" < args) { std::string selfName = simgrid::s4u::this_actor::get_host()->get_name(); simgrid::s4u::this_actor::get_host()->turn_on(); Inputs i(selfName); simgrid::s4u::Mailbox *m = simgrid::s4u::Mailbox::by_name("medium"); XBT_INFO("Deploying observation node %s",selfName.c_str()); // Init convenien variables double wake_interval=i.wake_interval; double wake_duration=i.wake_duration; double startup_delay=i.startup_delay; int max_attempts=i.max_attempts; bool isSender=i.is_sender; bool useHint=i.use_hint; bool isObserver=false; u32 data_size=i.data_size; // Starting node u32 effectiveAttemps=0; double effective_wake_duration=wake_duration; double effective_wake_interval=wake_interval; TURN_OFF(); simgrid::s4u::this_actor::sleep_for(startup_delay); for(u32 i=0;icontainsHint=true; p->hint=5; if(useHint){ p->containsHint=i<(max_attempts-1); // Ensure that we will wake up again p->hint=wake_interval; } m->put(p,data_size,effective_wake_duration); XBT_INFO("%s send data successfully",selfName.c_str()); isObserver=true; // Do one send for now... isSender=false; } else if (!isObserver){ Payload* p=m->get(effective_wake_duration); if(p->containsHint){ XBT_INFO("%s received and hint of %f",selfName.c_str(),p->hint); effective_wake_interval=p->hint; i--; // Add new attempt } else{ XBT_INFO("%s received data successfully and switch to forwarding mode",selfName.c_str()); isSender=!isSender; } } else { XBT_INFO("%s is observing is environment...",selfName.c_str()); } } catch (...) { if(isSender) XBT_INFO("%s failed to send data",selfName.c_str()); else XBT_INFO("%s failed to receive data",selfName.c_str()); } effectiveAttemps++; } // Done XBT_INFO("Observation node %s finished (attemps:%d)",selfName.c_str(),effectiveAttemps); }