From d0e57511fda12afb1049e260bcccb434b186d735 Mon Sep 17 00:00:00 2001 From: Loic Guegan Date: Thu, 6 May 2021 16:14:57 +0200 Subject: [PATCH] Switch to timestamps --- inputs.json | 33 +++++++------ src/inputs.cc | 123 ++++++++++++++++++++++++++++++++++++++++++++--- src/inputs.hpp | 66 +++++++++++++++++++++++-- src/simulator.cc | 51 ++++++++------------ 4 files changed, 219 insertions(+), 54 deletions(-) diff --git a/inputs.json b/inputs.json index e5b2b99..a19f821 100644 --- a/inputs.json +++ b/inputs.json @@ -1,24 +1,29 @@ { "on0":{ - "wake_interval": 5, - "is_sender": false, - "wake_duration": 5, - "startup_delay": 0, - "max_attemps" : 1, + "is_sender": true, "power_off": 0, "power_on":10, - "use_hint": true, - "data_size": 10 + "use_hint": false, + "wake_ts": [ 1, 7, 7 ], + "wake_duration": [ 5, 1, 2], + "data_size": 50 }, "on1":{ - "wake_interval": 5, - "is_sender": true, - "wake_duration": 5, - "startup_delay": 0, - "max_attemps" : 2, + "is_sender": false, "power_off": 0, "power_on":10, - "use_hint":true, - "data_size": 10 + "use_hint": false, + "wake_ts": [ 1, 7, 7 ], + "wake_duration": [ 5, 1, 2], + "data_size": 50 + }, + "on2":{ + "is_sender": false, + "power_off": 0, + "power_on":10, + "use_hint": false, + "wake_ts": [ 1, 7, 7 ], + "wake_duration": [ 5, 1, 2], + "data_size": 50 } } \ No newline at end of file diff --git a/src/inputs.cc b/src/inputs.cc index 0cba7b6..84acdb7 100644 --- a/src/inputs.cc +++ b/src/inputs.cc @@ -1,27 +1,138 @@ #include "inputs.hpp" +#include "xbt/log.h" +#include #include #include Inputs::Inputs(std::string node_name){ + // Here we doing all the boring stuff FILE* input_file = fopen(INPUTS_FILE, "rb"); - char input_file_buffer[65536]; + char input_file_buffer[JSON_BUFFER_SIZE]; rapidjson::FileReadStream is(input_file, input_file_buffer, sizeof(input_file_buffer)); d.ParseStream(is); fclose(input_file); - wake_duration=d[node_name.c_str()]["wake_duration"].GetDouble(); - wake_interval=d[node_name.c_str()]["wake_interval"].GetDouble(); - startup_delay=d[node_name.c_str()]["startup_delay"].GetDouble(); + // Init all variables is_sender=d[node_name.c_str()]["is_sender"].GetBool(); use_hint=d[node_name.c_str()]["use_hint"].GetBool(); - max_attempts=d[node_name.c_str()]["max_attemps"].GetInt(); data_size=d[node_name.c_str()]["data_size"].GetInt(); + for(auto& v:d[node_name.c_str()]["wake_ts"].GetArray()){ + wake_ts.push_back(v.GetDouble()); + } + for(auto& v:d[node_name.c_str()]["wake_duration"].GetArray()){ + wake_duration.push_back(v.GetDouble()); + } + + // Identity check + if(wake_ts.size()!=wake_duration.size()){ + std::cerr << "Invalid node configuration: wake_ts.size() != wake_duration.size()" <=next_ts){ + // Create variable for convenience + double start=cur_ts; + double end=std::max(cur_ts+cur_duration,next_ts+next_duration); + wake_duration[i]=end-start; + // Now remove next event + wake_ts.erase(wake_ts.begin() + i + 1); + wake_duration.erase(wake_duration.begin() + i +1); + // This is not optimal. Yet it is simple :D + MergeEvents(); + } + } +} + +double Inputs::GetNextTS(){ + // Ensure the caller is smart + if(wake_duration.size()<2){ + std::cerr << "You are trying to access to the next timestamp but it does not exists" <=ts){ + wake_ts.insert(it,ts); + break; + } + pos++; + } + + // Ensure that ts and duration should not go to the end + if(pos==wake_ts.size()){ + wake_ts.push_back(ts); + wake_duration.push_back(duration); + } + else { + // Handle durations here + int pos2=0; + for(auto it = std::begin(wake_duration); it != std::end(wake_duration); ++it) { + if(pos==pos2){ + wake_duration.insert(it,duration); + break; + } + else if (it+1==std::end(wake_duration)) { + wake_duration.push_back(duration); + } + pos2++; + } + } + // Don't forget + MergeEvents(); } void Inputs::GeneratePlatform(std::string p){ + // The boring stuff FILE* input_file = fopen(INPUTS_FILE, "rb"); - char input_file_buffer[65536]; + char input_file_buffer[JSON_BUFFER_SIZE]; rapidjson::FileReadStream is(input_file, input_file_buffer, sizeof(input_file_buffer)); rapidjson::Document d; d.ParseStream(is); diff --git a/src/inputs.hpp b/src/inputs.hpp index fa8af17..23ced49 100644 --- a/src/inputs.hpp +++ b/src/inputs.hpp @@ -2,23 +2,81 @@ #include "rapidjson/filereadstream.h" #include #include +#include +#include +#include +#include "xbt/log.h" +#include #define INPUTS_FILE "inputs.json" +/// @brief Pay attention to this strange number, you could tear your hairs out +#define JSON_BUFFER_SIZE 65536 using namespace rapidjson; class Inputs { + /// @brief RapidJSON Document d; + /// @brief Current node associated with the Inputs std::string node_name; + /// @brief Timestamps (at which time the nodes should wake up) + std::vector wake_ts; + /// @brief Wake up time durations + std::vector wake_duration; + /** + * Recursively merge overlapping events + */ + void MergeEvents(); public: + /** + * Load node_name configuration + */ Inputs(std::string node_name); + /** + * Generate a SimGrid platform file from the json configuration + */ static void GeneratePlatform(std::string p); + /** + * Is there any event that remains in the queue ? + */ + bool ShouldContinue(){return wake_ts.size()!=0;} + /** + * Is there another event to process ? + */ + bool HasNext(){return(wake_ts.size()>1);} + /** + * Get current event timestamp + */ + double GetTS(){return wake_ts.front();} + /** + * Get current event duration + */ + double GetDuration(){return wake_duration.front();} + /** + * Get next event timestamp + */ + double GetNextTS(); + /** + * Get next event duration + */ + double GetNextDuration(); + /** + * Time travel machine (note that this function is following the second principle + * of thermodynamics) + */ + void GotoNextEvent(); + /** + * Allows to add a *FUTURE* event and merge overlapping events + */ + void AddEvent(double ts, double duration); + /** + * This is the timeline + */ + void DumpEvents(); - double wake_duration; - double wake_interval; - double startup_delay; + /// @brief These are public attributes, please take care they are fragile bool is_sender; bool use_hint; - int max_attempts; int data_size; + }; \ No newline at end of file diff --git a/src/simulator.cc b/src/simulator.cc index 922cd15..3254c5c 100644 --- a/src/simulator.cc +++ b/src/simulator.cc @@ -17,8 +17,9 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(simulator, "[DAO]"); typedef unsigned int u32; class Payload{ public: - Payload():hint(0),containsHint(false){} + Payload():hint(0),duration(0),containsHint(false){} double hint; + double duration; bool containsHint; }; @@ -63,53 +64,40 @@ static void obs_node(std::vector args) { XBT_INFO("Deploying observation node %s",selfName.c_str()); // Init convenien variables - double wake_interval=i.wake_interval; - double wake_duration=i.wake_duration; - double startup_delay=i.startup_delay; - int max_attempts=i.max_attempts; bool isSender=i.is_sender; bool useHint=i.use_hint; bool isObserver=false; u32 data_size=i.data_size; // Starting node - u32 effectiveAttemps=0; - double effective_wake_duration=wake_duration; - double effective_wake_interval=wake_interval; - TURN_OFF(); - simgrid::s4u::this_actor::sleep_for(startup_delay); - for(u32 i=0;icontainsHint=true; - p->hint=5; - if(useHint){ - p->containsHint=i<(max_attempts-1); // Ensure that we will wake up again - p->hint=wake_interval; + if(useHint&&i.HasNext()){ + p->hint=i.GetNextTS(); + p->duration=i.GetNextDuration(); } - m->put(p,data_size,effective_wake_duration); + m->put(p,data_size,i.GetDuration()); XBT_INFO("%s send data successfully",selfName.c_str()); isObserver=true; // Do one send for now... isSender=false; } else if (!isObserver){ - Payload* p=m->get(effective_wake_duration); + Payload* p=m->get(i.GetDuration()); + nDataRcv++; if(p->containsHint){ XBT_INFO("%s received and hint of %f",selfName.c_str(),p->hint); - effective_wake_interval=p->hint; - i--; // Add new attempt + i.AddEvent(p->hint, p->duration); } else{ XBT_INFO("%s received data successfully and switch to forwarding mode",selfName.c_str()); @@ -118,6 +106,7 @@ static void obs_node(std::vector args) { } else { XBT_INFO("%s is observing is environment...",selfName.c_str()); + simgrid::s4u::this_actor::sleep_until(i.GetDuration()); } } catch (...) @@ -127,9 +116,11 @@ static void obs_node(std::vector args) { else XBT_INFO("%s failed to receive data",selfName.c_str()); } - effectiveAttemps++; + i.GotoNextEvent(); + nWakeUp++; } + // Done - XBT_INFO("Observation node %s finished (attemps:%d)",selfName.c_str(),effectiveAttemps); + XBT_INFO("Observation node %s finished (nWakeUp:%d|nDataRcv:%d)",selfName.c_str(),nWakeUp,nDataRcv); } \ No newline at end of file