Minor changes

This commit is contained in:
Loic Guegan 2023-07-16 17:19:44 +02:00
parent 33c5443794
commit dd12a4b42f
3 changed files with 28 additions and 23 deletions

View file

@ -1,14 +1,10 @@
#include <zmq.h> #include <zmq.h>
#include <assert.h> #include <assert.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <libgen.h> #include <libgen.h>
#include <unistd.h> #include <unistd.h>
#include <sys/stat.h> #include <sys/stat.h>
#include "utils.h"
#include <time.h>
#include <signal.h> #include <signal.h>
#include "utils.h"
#ifndef LOGGER_DELAY #ifndef LOGGER_DELAY
#define LOGGER_DELAY 0 #define LOGGER_DELAY 0
@ -66,7 +62,7 @@ int main (int argc, char *argv [])
FILE *regptr,*logptr; FILE *regptr,*logptr;
char logfilepath[STATIC_LEN]=""; char logfilepath[STATIC_LEN]="";
regptr=fopen("/home/loic/out.txt","r"); regptr=fopen("/home/loic/out.txt","r"); // TODO UPDATE!!!
char buffer[STATIC_LEN]; char buffer[STATIC_LEN];
int power; int power;
time_t interval; time_t interval;
@ -74,16 +70,20 @@ int main (int argc, char *argv [])
while(!__stop){ while(!__stop){
interval=INTERVAL(__loginterval); interval=INTERVAL(__loginterval);
*logfilepath='\0';
sprintf(logfilepath,"%s/%ld",__logdir,interval); sprintf(logfilepath,"%s/%ld",__logdir,interval);
logptr=fopen(logfilepath,"w"); logptr=fopen(logfilepath,"w");
// Write file header:
fprintf(logptr,"timestamp,ns,power\n"); fprintf(logptr,"timestamp,ns,power\n");
// Log current interval // Log current interval
while((TIMESTAMP()-interval)<__loginterval){ while((TIMESTAMP()-interval)<__loginterval){
// Read power:
fgets(buffer,STATIC_LEN,regptr); fgets(buffer,STATIC_LEN,regptr);
power=atoi(buffer); power=atoi(buffer);
// Get power measurement timestamp:
clock_gettime(CLOCK_REALTIME,&power_ts); clock_gettime(CLOCK_REALTIME,&power_ts);
// Write measurement in file:
fprintf(logptr,"%d,%ld,%d\n",power_ts.tv_sec,power_ts.tv_nsec,power); fprintf(logptr,"%d,%ld,%d\n",power_ts.tv_sec,power_ts.tv_nsec,power);
// Reset power register file:
fseek(regptr,0,SEEK_SET); fseek(regptr,0,SEEK_SET);
#if LOGGER_DELAY > 0 #if LOGGER_DELAY > 0
usleep(LOGGER_DELAY*1000); usleep(LOGGER_DELAY*1000);

View file

@ -1,7 +1,5 @@
#include <zmq.h> #include <zmq.h>
#include <assert.h> #include <assert.h>
#include <time.h>
#include <dirent.h>
#include <signal.h> #include <signal.h>
#include "utils.h" #include "utils.h"
@ -59,33 +57,38 @@ int main (int argc, char *argv [])
//----- Start publisher //----- Start publisher
struct dirent *de; // Pointer for directory entry struct dirent *de; // Pointer for directory entry
while(!__stop){ while(!__stop){
int interval=INTERVAL(__loginterval); int interval=INTERVAL(__loginterval); // Current interval
int interval_next=INTERVAL_NEXT(__loginterval); int interval_next=INTERVAL_NEXT(__loginterval); // The next one
DIR *dr = opendir(__logdir); DIR *dr = opendir(__logdir); // Open the log directory
if(dr !=NULL){ if(dr !=NULL){
while ((de = readdir(dr)) != NULL){ while ((de = readdir(dr)) != NULL){
// Iterate over each loggers directories
if(strcmp(de->d_name,".") && strcmp(de->d_name,"..")){ if(strcmp(de->d_name,".") && strcmp(de->d_name,"..")){
char *client=de->d_name; char *client=de->d_name; // Directory name is the client name
// Build the path for files we are looking for:
char logfile[STATIC_LEN]; char logfile[STATIC_LEN];
char logfile_next[STATIC_LEN]; char logfile_next[STATIC_LEN];
sprintf(logfile,"%s/%s/%ld",__logdir,client,interval); sprintf(logfile,"%s/%s/%ld",__logdir,client,interval);
sprintf(logfile_next,"%s/%s/%ld",__logdir,client,interval_next); sprintf(logfile_next,"%s/%s/%ld",__logdir,client,interval_next);
// As long as next logfile is not available, we should wait // As long as next logfile is not available, we should wait
// for sending the current one // before sending the current one (since loggers are working on it)
printf("Waiting for %s logger measurements...\n",client); printf("Waiting for %s logger measurements...\n",client);
while(!FILE_EXISTS(logfile_next) && (!__stop)){ while(!FILE_EXISTS(logfile_next) && (!__stop)){
sleep(1); sleep(1);
} }
// Send current one // Send current one:
if(FILE_EXISTS(logfile)){ if(FILE_EXISTS(logfile)){
printf("Publishing %s measurements\n",client);
publish(publisher,logfile,client,interval); publish(publisher,logfile,client,interval);
remove(logfile); remove(logfile); // Prevent log accumulation
} }
} }
} }
closedir(dr); closedir(dr);
} }
else { else {
// If open fails, directory probably do not exists
// yet so wait:
sleep(1); sleep(1);
} }
} }
@ -97,27 +100,31 @@ int main (int argc, char *argv [])
} }
void publish(void *publisher, char *filepath, char* client, long int interval){ void publish(void *publisher, char *filepath, char* client, long int interval){
printf("Publishing %s measurements\n",client); // Build message header:
char buffer[ZMQ_MSG_SIZE]; char buffer[ZMQ_MSG_SIZE];
sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,client,interval); sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,client,interval);
int msglen=strlen(buffer); int msglen=strlen(buffer);
FILE *fptr;
char * line = NULL; char * line = NULL;
size_t len = 0; size_t len = 0;
ssize_t read; ssize_t read;
FILE *fptr;
fptr=fopen(filepath,"r"); fptr=fopen(filepath,"r");
// Put every lines in the buffer and send it
while ((read = getline(&line, &len, fptr)) != -1) { while ((read = getline(&line, &len, fptr)) != -1) {
if((read+msglen) <ZMQ_MSG_SIZE){ if((read+msglen) <ZMQ_MSG_SIZE){
strcat(buffer,line); strcat(buffer,line);
msglen+=read; msglen+=read;
} else { } else {
// It buffer is full, we send the message and create another one
zmq_send (publisher, buffer, msglen, 0); zmq_send (publisher, buffer, msglen, 0);
// Build new message header:
sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,client,interval); sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,client,interval);
strcat(buffer,line); strcat(buffer,line);
msglen=strlen(buffer); msglen=strlen(buffer);
} }
} }
fclose(fptr); fclose(fptr);
// Finally send the last message (or the only one)
zmq_send (publisher, buffer, msglen, 0); zmq_send (publisher, buffer, msglen, 0);
} }

View file

@ -1,7 +1,5 @@
#include <zmq.h> #include <zmq.h>
#include <assert.h> #include <assert.h>
#include <time.h>
#include <string.h>
#include <signal.h> #include <signal.h>
#include "utils.h" #include "utils.h"
@ -68,15 +66,17 @@ int main (int argc, char *argv [])
double size_mib=size/(1024*1024); double size_mib=size/(1024*1024);
printf("Data received: key=%s client=%s interval=%ld msgsize=%.2lfMiB\n",key, client, interval,size_mib); printf("Data received: key=%s client=%s interval=%ld msgsize=%.2lfMiB\n",key, client, interval,size_mib);
char path[STATIC_LEN]=""; char path[STATIC_LEN]="";
//sprintf(path,"%s/%s_%s_%ld",cdatadir,key,client,interval); // Create dir if not exists:
sprintf(path,"%s/%s/%s/",cdatadir,key,client); sprintf(path,"%s/%s/%s/",cdatadir,key,client);
if(!DIR_EXISTS(path)){ if(!DIR_EXISTS(path)){
mkdirp(path); mkdirp(path);
} }
// Now open output file:
sprintf(path,"%s/%s/%s/%ld",cdatadir,key,client,interval); sprintf(path,"%s/%s/%s/%ld",cdatadir,key,client,interval);
fptr=fopen(path,"a"); fptr=fopen(path,"a");
} }
// Write all the measurements:
if(line>4){ if(line>4){
fwrite(token, strlen(token), 1, fptr); fwrite(token, strlen(token), 1, fptr);
fwrite("\n",1,1,fptr); fwrite("\n",1,1,fptr);
@ -88,8 +88,6 @@ int main (int argc, char *argv [])
fclose(fptr); fclose(fptr);
} }
zmq_close (subscriber); zmq_close (subscriber);
zmq_ctx_destroy (context); zmq_ctx_destroy (context);