mirror of
https://gitlab.com/manzerbredes/loosely-coupled-dss.git
synced 2025-04-06 11:36:25 +02:00
Switch to timestamps
This commit is contained in:
parent
5f7bdb8f3c
commit
d0e57511fd
4 changed files with 219 additions and 54 deletions
33
inputs.json
33
inputs.json
|
@ -1,24 +1,29 @@
|
||||||
{
|
{
|
||||||
"on0":{
|
"on0":{
|
||||||
"wake_interval": 5,
|
"is_sender": true,
|
||||||
"is_sender": false,
|
|
||||||
"wake_duration": 5,
|
|
||||||
"startup_delay": 0,
|
|
||||||
"max_attemps" : 1,
|
|
||||||
"power_off": 0,
|
"power_off": 0,
|
||||||
"power_on":10,
|
"power_on":10,
|
||||||
"use_hint": true,
|
"use_hint": false,
|
||||||
"data_size": 10
|
"wake_ts": [ 1, 7, 7 ],
|
||||||
|
"wake_duration": [ 5, 1, 2],
|
||||||
|
"data_size": 50
|
||||||
},
|
},
|
||||||
"on1":{
|
"on1":{
|
||||||
"wake_interval": 5,
|
"is_sender": false,
|
||||||
"is_sender": true,
|
|
||||||
"wake_duration": 5,
|
|
||||||
"startup_delay": 0,
|
|
||||||
"max_attemps" : 2,
|
|
||||||
"power_off": 0,
|
"power_off": 0,
|
||||||
"power_on":10,
|
"power_on":10,
|
||||||
"use_hint":true,
|
"use_hint": false,
|
||||||
"data_size": 10
|
"wake_ts": [ 1, 7, 7 ],
|
||||||
|
"wake_duration": [ 5, 1, 2],
|
||||||
|
"data_size": 50
|
||||||
|
},
|
||||||
|
"on2":{
|
||||||
|
"is_sender": false,
|
||||||
|
"power_off": 0,
|
||||||
|
"power_on":10,
|
||||||
|
"use_hint": false,
|
||||||
|
"wake_ts": [ 1, 7, 7 ],
|
||||||
|
"wake_duration": [ 5, 1, 2],
|
||||||
|
"data_size": 50
|
||||||
}
|
}
|
||||||
}
|
}
|
123
src/inputs.cc
123
src/inputs.cc
|
@ -1,27 +1,138 @@
|
||||||
#include "inputs.hpp"
|
#include "inputs.hpp"
|
||||||
|
#include "xbt/log.h"
|
||||||
|
#include <algorithm>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
|
|
||||||
|
|
||||||
Inputs::Inputs(std::string node_name){
|
Inputs::Inputs(std::string node_name){
|
||||||
|
// Here we doing all the boring stuff
|
||||||
FILE* input_file = fopen(INPUTS_FILE, "rb");
|
FILE* input_file = fopen(INPUTS_FILE, "rb");
|
||||||
char input_file_buffer[65536];
|
char input_file_buffer[JSON_BUFFER_SIZE];
|
||||||
rapidjson::FileReadStream is(input_file, input_file_buffer, sizeof(input_file_buffer));
|
rapidjson::FileReadStream is(input_file, input_file_buffer, sizeof(input_file_buffer));
|
||||||
d.ParseStream(is);
|
d.ParseStream(is);
|
||||||
fclose(input_file);
|
fclose(input_file);
|
||||||
|
|
||||||
wake_duration=d[node_name.c_str()]["wake_duration"].GetDouble();
|
// Init all variables
|
||||||
wake_interval=d[node_name.c_str()]["wake_interval"].GetDouble();
|
|
||||||
startup_delay=d[node_name.c_str()]["startup_delay"].GetDouble();
|
|
||||||
is_sender=d[node_name.c_str()]["is_sender"].GetBool();
|
is_sender=d[node_name.c_str()]["is_sender"].GetBool();
|
||||||
use_hint=d[node_name.c_str()]["use_hint"].GetBool();
|
use_hint=d[node_name.c_str()]["use_hint"].GetBool();
|
||||||
max_attempts=d[node_name.c_str()]["max_attemps"].GetInt();
|
|
||||||
data_size=d[node_name.c_str()]["data_size"].GetInt();
|
data_size=d[node_name.c_str()]["data_size"].GetInt();
|
||||||
|
for(auto& v:d[node_name.c_str()]["wake_ts"].GetArray()){
|
||||||
|
wake_ts.push_back(v.GetDouble());
|
||||||
|
}
|
||||||
|
for(auto& v:d[node_name.c_str()]["wake_duration"].GetArray()){
|
||||||
|
wake_duration.push_back(v.GetDouble());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Identity check
|
||||||
|
if(wake_ts.size()!=wake_duration.size()){
|
||||||
|
std::cerr << "Invalid node configuration: wake_ts.size() != wake_duration.size()" <<std::endl;
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
if(!std::is_sorted(wake_ts.begin(),wake_ts.end())){
|
||||||
|
std::cerr << "Invalid node configuration: wake_ts is not sorted" <<std::endl;
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure events are merged
|
||||||
|
MergeEvents();
|
||||||
|
}
|
||||||
|
|
||||||
|
void Inputs::MergeEvents(){
|
||||||
|
for(int i=0;i<(wake_ts.size()-1);i++){
|
||||||
|
double cur_ts=wake_ts[i];
|
||||||
|
double next_ts=wake_ts[i+1];
|
||||||
|
double cur_duration=wake_duration[i];
|
||||||
|
double next_duration=wake_duration[i+1];
|
||||||
|
// If we should merge then
|
||||||
|
if((cur_ts+cur_duration)>=next_ts){
|
||||||
|
// Create variable for convenience
|
||||||
|
double start=cur_ts;
|
||||||
|
double end=std::max(cur_ts+cur_duration,next_ts+next_duration);
|
||||||
|
wake_duration[i]=end-start;
|
||||||
|
// Now remove next event
|
||||||
|
wake_ts.erase(wake_ts.begin() + i + 1);
|
||||||
|
wake_duration.erase(wake_duration.begin() + i +1);
|
||||||
|
// This is not optimal. Yet it is simple :D
|
||||||
|
MergeEvents();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
double Inputs::GetNextTS(){
|
||||||
|
// Ensure the caller is smart
|
||||||
|
if(wake_duration.size()<2){
|
||||||
|
std::cerr << "You are trying to access to the next timestamp but it does not exists" <<std::endl;
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
return wake_ts[1];
|
||||||
|
}
|
||||||
|
|
||||||
|
double Inputs::GetNextDuration(){
|
||||||
|
// Ensure the caller is smart
|
||||||
|
if(wake_duration.size()<2){
|
||||||
|
std::cerr << "You are trying to access to the next duration but it does not exists" <<std::endl;
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
return wake_duration[1];
|
||||||
|
}
|
||||||
|
|
||||||
|
void Inputs::GotoNextEvent(){
|
||||||
|
wake_ts.erase(wake_ts.begin());
|
||||||
|
wake_duration.erase(wake_duration.begin());
|
||||||
|
}
|
||||||
|
|
||||||
|
void Inputs::DumpEvents(){
|
||||||
|
std::cout << "Timestamps: ";
|
||||||
|
for(auto a:wake_ts){
|
||||||
|
std::cout << std::setw(5) << a << " ";
|
||||||
|
}
|
||||||
|
std::cout << std::endl;
|
||||||
|
std::cout << "Wake Durations: ";
|
||||||
|
for(auto a:wake_duration){
|
||||||
|
std::cout << std::setw(5) << a << " ";
|
||||||
|
}
|
||||||
|
std::cout << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Inputs::AddEvent(double ts, double duration){
|
||||||
|
// First handle timestamp
|
||||||
|
int pos=0;
|
||||||
|
for(auto it = std::begin(wake_ts); it != std::end(wake_ts); ++it) {
|
||||||
|
if(*it>=ts){
|
||||||
|
wake_ts.insert(it,ts);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
pos++;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure that ts and duration should not go to the end
|
||||||
|
if(pos==wake_ts.size()){
|
||||||
|
wake_ts.push_back(ts);
|
||||||
|
wake_duration.push_back(duration);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// Handle durations here
|
||||||
|
int pos2=0;
|
||||||
|
for(auto it = std::begin(wake_duration); it != std::end(wake_duration); ++it) {
|
||||||
|
if(pos==pos2){
|
||||||
|
wake_duration.insert(it,duration);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else if (it+1==std::end(wake_duration)) {
|
||||||
|
wake_duration.push_back(duration);
|
||||||
|
}
|
||||||
|
pos2++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Don't forget
|
||||||
|
MergeEvents();
|
||||||
}
|
}
|
||||||
|
|
||||||
void Inputs::GeneratePlatform(std::string p){
|
void Inputs::GeneratePlatform(std::string p){
|
||||||
|
// The boring stuff
|
||||||
FILE* input_file = fopen(INPUTS_FILE, "rb");
|
FILE* input_file = fopen(INPUTS_FILE, "rb");
|
||||||
char input_file_buffer[65536];
|
char input_file_buffer[JSON_BUFFER_SIZE];
|
||||||
rapidjson::FileReadStream is(input_file, input_file_buffer, sizeof(input_file_buffer));
|
rapidjson::FileReadStream is(input_file, input_file_buffer, sizeof(input_file_buffer));
|
||||||
rapidjson::Document d;
|
rapidjson::Document d;
|
||||||
d.ParseStream(is);
|
d.ParseStream(is);
|
||||||
|
|
|
@ -2,23 +2,81 @@
|
||||||
#include "rapidjson/filereadstream.h"
|
#include "rapidjson/filereadstream.h"
|
||||||
#include <cstdio>
|
#include <cstdio>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
#include <iostream>
|
||||||
|
#include <algorithm>
|
||||||
|
#include "xbt/log.h"
|
||||||
|
#include <iomanip>
|
||||||
|
|
||||||
#define INPUTS_FILE "inputs.json"
|
#define INPUTS_FILE "inputs.json"
|
||||||
|
/// @brief Pay attention to this strange number, you could tear your hairs out
|
||||||
|
#define JSON_BUFFER_SIZE 65536
|
||||||
|
|
||||||
using namespace rapidjson;
|
using namespace rapidjson;
|
||||||
|
|
||||||
class Inputs {
|
class Inputs {
|
||||||
|
/// @brief RapidJSON
|
||||||
Document d;
|
Document d;
|
||||||
|
/// @brief Current node associated with the Inputs
|
||||||
std::string node_name;
|
std::string node_name;
|
||||||
|
/// @brief Timestamps (at which time the nodes should wake up)
|
||||||
|
std::vector<double> wake_ts;
|
||||||
|
/// @brief Wake up time durations
|
||||||
|
std::vector<double> wake_duration;
|
||||||
|
/**
|
||||||
|
* Recursively merge overlapping events
|
||||||
|
*/
|
||||||
|
void MergeEvents();
|
||||||
public:
|
public:
|
||||||
|
/**
|
||||||
|
* Load node_name configuration
|
||||||
|
*/
|
||||||
Inputs(std::string node_name);
|
Inputs(std::string node_name);
|
||||||
|
/**
|
||||||
|
* Generate a SimGrid platform file from the json configuration
|
||||||
|
*/
|
||||||
static void GeneratePlatform(std::string p);
|
static void GeneratePlatform(std::string p);
|
||||||
|
/**
|
||||||
|
* Is there any event that remains in the queue ?
|
||||||
|
*/
|
||||||
|
bool ShouldContinue(){return wake_ts.size()!=0;}
|
||||||
|
/**
|
||||||
|
* Is there another event to process ?
|
||||||
|
*/
|
||||||
|
bool HasNext(){return(wake_ts.size()>1);}
|
||||||
|
/**
|
||||||
|
* Get current event timestamp
|
||||||
|
*/
|
||||||
|
double GetTS(){return wake_ts.front();}
|
||||||
|
/**
|
||||||
|
* Get current event duration
|
||||||
|
*/
|
||||||
|
double GetDuration(){return wake_duration.front();}
|
||||||
|
/**
|
||||||
|
* Get next event timestamp
|
||||||
|
*/
|
||||||
|
double GetNextTS();
|
||||||
|
/**
|
||||||
|
* Get next event duration
|
||||||
|
*/
|
||||||
|
double GetNextDuration();
|
||||||
|
/**
|
||||||
|
* Time travel machine (note that this function is following the second principle
|
||||||
|
* of thermodynamics)
|
||||||
|
*/
|
||||||
|
void GotoNextEvent();
|
||||||
|
/**
|
||||||
|
* Allows to add a *FUTURE* event and merge overlapping events
|
||||||
|
*/
|
||||||
|
void AddEvent(double ts, double duration);
|
||||||
|
/**
|
||||||
|
* This is the timeline
|
||||||
|
*/
|
||||||
|
void DumpEvents();
|
||||||
|
|
||||||
double wake_duration;
|
/// @brief These are public attributes, please take care they are fragile
|
||||||
double wake_interval;
|
|
||||||
double startup_delay;
|
|
||||||
bool is_sender;
|
bool is_sender;
|
||||||
bool use_hint;
|
bool use_hint;
|
||||||
int max_attempts;
|
|
||||||
int data_size;
|
int data_size;
|
||||||
|
|
||||||
};
|
};
|
|
@ -17,8 +17,9 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(simulator, "[DAO]");
|
||||||
typedef unsigned int u32;
|
typedef unsigned int u32;
|
||||||
class Payload{
|
class Payload{
|
||||||
public:
|
public:
|
||||||
Payload():hint(0),containsHint(false){}
|
Payload():hint(0),duration(0),containsHint(false){}
|
||||||
double hint;
|
double hint;
|
||||||
|
double duration;
|
||||||
bool containsHint;
|
bool containsHint;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -63,53 +64,40 @@ static void obs_node(std::vector<std::string> args) {
|
||||||
XBT_INFO("Deploying observation node %s",selfName.c_str());
|
XBT_INFO("Deploying observation node %s",selfName.c_str());
|
||||||
|
|
||||||
// Init convenien variables
|
// Init convenien variables
|
||||||
double wake_interval=i.wake_interval;
|
|
||||||
double wake_duration=i.wake_duration;
|
|
||||||
double startup_delay=i.startup_delay;
|
|
||||||
int max_attempts=i.max_attempts;
|
|
||||||
bool isSender=i.is_sender;
|
bool isSender=i.is_sender;
|
||||||
bool useHint=i.use_hint;
|
bool useHint=i.use_hint;
|
||||||
bool isObserver=false;
|
bool isObserver=false;
|
||||||
u32 data_size=i.data_size;
|
u32 data_size=i.data_size;
|
||||||
|
|
||||||
// Starting node
|
// Starting node
|
||||||
u32 effectiveAttemps=0;
|
u32 nWakeUp=0;
|
||||||
double effective_wake_duration=wake_duration;
|
u32 nDataRcv=0;
|
||||||
double effective_wake_interval=wake_interval;
|
while(i.ShouldContinue()){
|
||||||
TURN_OFF();
|
|
||||||
simgrid::s4u::this_actor::sleep_for(startup_delay);
|
|
||||||
for(u32 i=0;i<max_attempts;i++){
|
|
||||||
|
|
||||||
// Sleeping
|
|
||||||
XBT_INFO("%s is spleeping",selfName.c_str());
|
XBT_INFO("%s is spleeping",selfName.c_str());
|
||||||
TURN_OFF();
|
TURN_OFF();
|
||||||
simgrid::s4u::this_actor::sleep_for(effective_wake_interval);
|
simgrid::s4u::this_actor::sleep_until(i.GetTS());
|
||||||
effective_wake_interval=wake_interval; // Restore wake interval
|
|
||||||
TURN_ON();
|
TURN_ON();
|
||||||
XBT_INFO("%s wakes up",selfName.c_str());
|
XBT_INFO("%s wakes up",selfName.c_str());
|
||||||
|
|
||||||
// Wake up: try to send/receive
|
// Doing wake up stuff
|
||||||
try
|
try {
|
||||||
{
|
if(isSender){ // If I am a sender
|
||||||
if(isSender){
|
|
||||||
Payload *p=new Payload();
|
Payload *p=new Payload();
|
||||||
p->containsHint=true;
|
if(useHint&&i.HasNext()){
|
||||||
p->hint=5;
|
p->hint=i.GetNextTS();
|
||||||
if(useHint){
|
p->duration=i.GetNextDuration();
|
||||||
p->containsHint=i<(max_attempts-1); // Ensure that we will wake up again
|
|
||||||
p->hint=wake_interval;
|
|
||||||
}
|
}
|
||||||
m->put(p,data_size,effective_wake_duration);
|
m->put(p,data_size,i.GetDuration());
|
||||||
XBT_INFO("%s send data successfully",selfName.c_str());
|
XBT_INFO("%s send data successfully",selfName.c_str());
|
||||||
isObserver=true; // Do one send for now...
|
isObserver=true; // Do one send for now...
|
||||||
isSender=false;
|
isSender=false;
|
||||||
}
|
}
|
||||||
else if (!isObserver){
|
else if (!isObserver){
|
||||||
Payload* p=m->get<Payload>(effective_wake_duration);
|
Payload* p=m->get<Payload>(i.GetDuration());
|
||||||
|
nDataRcv++;
|
||||||
if(p->containsHint){
|
if(p->containsHint){
|
||||||
XBT_INFO("%s received and hint of %f",selfName.c_str(),p->hint);
|
XBT_INFO("%s received and hint of %f",selfName.c_str(),p->hint);
|
||||||
effective_wake_interval=p->hint;
|
i.AddEvent(p->hint, p->duration);
|
||||||
i--; // Add new attempt
|
|
||||||
}
|
}
|
||||||
else{
|
else{
|
||||||
XBT_INFO("%s received data successfully and switch to forwarding mode",selfName.c_str());
|
XBT_INFO("%s received data successfully and switch to forwarding mode",selfName.c_str());
|
||||||
|
@ -118,6 +106,7 @@ static void obs_node(std::vector<std::string> args) {
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
XBT_INFO("%s is observing is environment...",selfName.c_str());
|
XBT_INFO("%s is observing is environment...",selfName.c_str());
|
||||||
|
simgrid::s4u::this_actor::sleep_until(i.GetDuration());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
|
@ -127,9 +116,11 @@ static void obs_node(std::vector<std::string> args) {
|
||||||
else
|
else
|
||||||
XBT_INFO("%s failed to receive data",selfName.c_str());
|
XBT_INFO("%s failed to receive data",selfName.c_str());
|
||||||
}
|
}
|
||||||
effectiveAttemps++;
|
i.GotoNextEvent();
|
||||||
|
nWakeUp++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Done
|
// Done
|
||||||
XBT_INFO("Observation node %s finished (attemps:%d)",selfName.c_str(),effectiveAttemps);
|
XBT_INFO("Observation node %s finished (nWakeUp:%d|nDataRcv:%d)",selfName.c_str(),nWakeUp,nDataRcv);
|
||||||
}
|
}
|
Loading…
Add table
Reference in a new issue