diff --git a/src/logger.c b/src/logger.c index 842aec7..f7574d4 100644 --- a/src/logger.c +++ b/src/logger.c @@ -1,14 +1,10 @@ #include #include -#include -#include -#include #include #include #include -#include "utils.h" -#include #include +#include "utils.h" #ifndef LOGGER_DELAY #define LOGGER_DELAY 0 @@ -66,7 +62,7 @@ int main (int argc, char *argv []) FILE *regptr,*logptr; char logfilepath[STATIC_LEN]=""; - regptr=fopen("/home/loic/out.txt","r"); + regptr=fopen("/home/loic/out.txt","r"); // TODO UPDATE!!! char buffer[STATIC_LEN]; int power; time_t interval; @@ -74,16 +70,20 @@ int main (int argc, char *argv []) while(!__stop){ interval=INTERVAL(__loginterval); - *logfilepath='\0'; sprintf(logfilepath,"%s/%ld",__logdir,interval); logptr=fopen(logfilepath,"w"); + // Write file header: fprintf(logptr,"timestamp,ns,power\n"); // Log current interval while((TIMESTAMP()-interval)<__loginterval){ + // Read power: fgets(buffer,STATIC_LEN,regptr); power=atoi(buffer); + // Get power measurement timestamp: clock_gettime(CLOCK_REALTIME,&power_ts); + // Write measurement in file: fprintf(logptr,"%d,%ld,%d\n",power_ts.tv_sec,power_ts.tv_nsec,power); + // Reset power register file: fseek(regptr,0,SEEK_SET); #if LOGGER_DELAY > 0 usleep(LOGGER_DELAY*1000); diff --git a/src/publisher.c b/src/publisher.c index 3e41d86..8c8c2f3 100644 --- a/src/publisher.c +++ b/src/publisher.c @@ -1,7 +1,5 @@ #include #include -#include -#include #include #include "utils.h" @@ -59,33 +57,38 @@ int main (int argc, char *argv []) //----- Start publisher struct dirent *de; // Pointer for directory entry while(!__stop){ - int interval=INTERVAL(__loginterval); - int interval_next=INTERVAL_NEXT(__loginterval); - DIR *dr = opendir(__logdir); + int interval=INTERVAL(__loginterval); // Current interval + int interval_next=INTERVAL_NEXT(__loginterval); // The next one + DIR *dr = opendir(__logdir); // Open the log directory if(dr !=NULL){ while ((de = readdir(dr)) != NULL){ + // Iterate over each loggers directories if(strcmp(de->d_name,".") && strcmp(de->d_name,"..")){ - char *client=de->d_name; + char *client=de->d_name; // Directory name is the client name + // Build the path for files we are looking for: char logfile[STATIC_LEN]; char logfile_next[STATIC_LEN]; sprintf(logfile,"%s/%s/%ld",__logdir,client,interval); sprintf(logfile_next,"%s/%s/%ld",__logdir,client,interval_next); // As long as next logfile is not available, we should wait - // for sending the current one + // before sending the current one (since loggers are working on it) printf("Waiting for %s logger measurements...\n",client); while(!FILE_EXISTS(logfile_next) && (!__stop)){ sleep(1); } - // Send current one + // Send current one: if(FILE_EXISTS(logfile)){ + printf("Publishing %s measurements\n",client); publish(publisher,logfile,client,interval); - remove(logfile); + remove(logfile); // Prevent log accumulation } } } closedir(dr); } else { + // If open fails, directory probably do not exists + // yet so wait: sleep(1); } } @@ -97,27 +100,31 @@ int main (int argc, char *argv []) } void publish(void *publisher, char *filepath, char* client, long int interval){ - printf("Publishing %s measurements\n",client); + // Build message header: char buffer[ZMQ_MSG_SIZE]; sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,client,interval); int msglen=strlen(buffer); - FILE *fptr; char * line = NULL; size_t len = 0; ssize_t read; + FILE *fptr; fptr=fopen(filepath,"r"); + // Put every lines in the buffer and send it while ((read = getline(&line, &len, fptr)) != -1) { if((read+msglen) #include -#include -#include #include #include "utils.h" @@ -68,15 +66,17 @@ int main (int argc, char *argv []) double size_mib=size/(1024*1024); printf("Data received: key=%s client=%s interval=%ld msgsize=%.2lfMiB\n",key, client, interval,size_mib); char path[STATIC_LEN]=""; - //sprintf(path,"%s/%s_%s_%ld",cdatadir,key,client,interval); + // Create dir if not exists: sprintf(path,"%s/%s/%s/",cdatadir,key,client); if(!DIR_EXISTS(path)){ mkdirp(path); } + // Now open output file: sprintf(path,"%s/%s/%s/%ld",cdatadir,key,client,interval); fptr=fopen(path,"a"); } + // Write all the measurements: if(line>4){ fwrite(token, strlen(token), 1, fptr); fwrite("\n",1,1,fptr); @@ -88,8 +88,6 @@ int main (int argc, char *argv []) fclose(fptr); } - - zmq_close (subscriber); zmq_ctx_destroy (context);