Minor changes

This commit is contained in:
Loic Guegan 2023-07-17 16:41:07 +02:00
parent 0c08c2f0d8
commit c79a02c57b
4 changed files with 31 additions and 31 deletions

View file

@ -22,10 +22,9 @@ logger: src/logger.c src/utils.c config.mk
publish: publisher logger publish: publisher logger
for client in $$(basename -a /sys/kernel/ina260/*); \ for client in $$(basename -a /sys/kernel/ina260/*); \
do \ do \
./logger $(LOGGERS_DIR) $$client $(LOG_INTERVAL) &> logger_$${client}.log & echo $$! >> pid; \ ./logger $(LOGGERS_DIR) $$client $(LOG_INTERVAL) $(SUBSCRIBER_ADDR) $(ZMQ_PORT) $(KEY) ; \
done done
./publisher $(LOGGERS_DIR) $(LOG_INTERVAL) $(SUBSCRIBER_ADDR) $(ZMQ_PORT) $(KEY) # [ -f pid ] && { kill -INT $(shell cat pid); rm pid; }
[ -f pid ] && { kill -INT $(shell cat pid); rm pid; }
subscribe: subscriber subscribe: subscriber
./subscriber $(ZMQ_PORT) $(SUBSCRIBER_DIR) ./subscriber $(ZMQ_PORT) $(SUBSCRIBER_DIR)

View file

@ -19,7 +19,7 @@ LOGGERS_DIR=/tmp/ina260_logs/
# LOGGERS_DELAY defines the delay between 2 consecutive # LOGGERS_DELAY defines the delay between 2 consecutive
# ina260 power read performed by the logger # ina260 power read performed by the logger
# Unit is milliseconds # Unit is milliseconds
LOGGERS_DELAY=1000 LOGGERS_DELAY=0
# SUBSCRIBER_DIR will contain all the measurments # SUBSCRIBER_DIR will contain all the measurments
# received from the publishers # received from the publishers
SUBSCRIBER_DIR=./data SUBSCRIBER_DIR=./data
@ -28,7 +28,7 @@ SUBSCRIBER_DIR=./data
# a file with all the power measurements is generated by the loggers in LOGGERS_DIR # a file with all the power measurements is generated by the loggers in LOGGERS_DIR
# This file, is then transmitted by the publishers to the subscriber # This file, is then transmitted by the publishers to the subscriber
# Unit is seconds # Unit is seconds
LOG_INTERVAL=30 LOG_INTERVAL=5
# KEY is attached to all the messages published by the node # KEY is attached to all the messages published by the node
# It allows you to filter the messages received on the client # It allows you to filter the messages received on the client
# if you are using multiple monitoring nodes (multiple publishers) # if you are using multiple monitoring nodes (multiple publishers)

View file

@ -20,8 +20,6 @@ char __logdir[STATIC_LEN];
char __regpower[STATIC_LEN]; char __regpower[STATIC_LEN];
int __loginterval; int __loginterval;
unsigned char __stop=0; unsigned char __stop=0;
void *__zmq_context;
void *__zmq_publisher;
void sighandler(int signo){ void sighandler(int signo){
if (signo == SIGINT){ if (signo == SIGINT){
@ -30,7 +28,7 @@ void sighandler(int signo){
} }
} }
void publish(int queue_id); void publish(int queue_id, void* publisher);
typedef struct record { typedef struct record {
time_t secs; time_t secs;
@ -77,21 +75,21 @@ int main (int argc, char *argv [])
printf("Log interval is too small (min=%ds)\n",MIN_INTERVAL); printf("Log interval is too small (min=%ds)\n",MIN_INTERVAL);
exit(2); exit(2);
} }
if(FILE_EXISTS(__regpower)){ if(!FILE_EXISTS(__regpower)){
printf("Logger cannot access to %s\n",__regpower); printf("Logger cannot access to %s\n",__regpower);
exit(3); exit(3);
} }
//----- Prepare our context and publisher //----- Prepare our context and publisher
__zmq_context = zmq_ctx_new (); void *context = zmq_ctx_new ();
__zmq_publisher = zmq_socket (__zmq_context, ZMQ_PUB); void *publisher = zmq_socket (context, ZMQ_PUB);
char bindto[STATIC_LEN]; char bindto[STATIC_LEN];
sprintf(bindto,"tcp://%s:%d",__ip,__port); sprintf(bindto,"tcp://%s:%d",__ip,__port);
int rc = zmq_connect (__zmq_publisher, bindto); int rc = zmq_connect (publisher, bindto);
if(rc!=0){ if(rc!=0){
printf("Failed to connect to %s\n",bindto); printf("Failed to connect to %s\n",bindto);
exit(1); exit(1);
} }
@ -100,7 +98,7 @@ int main (int argc, char *argv [])
FILE *regptr,*logptr; FILE *regptr,*logptr;
char logfilepath[STATIC_LEN]=""; char logfilepath[STATIC_LEN]="";
regptr=fopen("/home/loic/out.txt","r"); regptr=fopen(__regpower,"r");
char buffer[STATIC_LEN]; char buffer[STATIC_LEN];
int power; int power;
time_t interval; time_t interval;
@ -110,7 +108,8 @@ int main (int argc, char *argv [])
while(!__stop){ while(!__stop){
interval=INTERVAL(__loginterval); interval=INTERVAL(__loginterval);
// Log current interval // Log current interval
queue_id=queue_id>=RECORD_QUEUES ? 0 : queue_id+1; queue_id=(queue_id+1)>=RECORD_QUEUES ? 0 : (queue_id+1);
printf("Queue id %d",queue_id);
queues[queue_id].interval=interval; queues[queue_id].interval=interval;
int record=0; int record=0;
while((TIMESTAMP()-interval)<__loginterval){ while((TIMESTAMP()-interval)<__loginterval){
@ -136,41 +135,43 @@ int main (int argc, char *argv [])
} }
} }
queues[queue_id].length=record; queues[queue_id].length=record;
publish(queue_id); publish(queue_id,publisher);
} }
fclose(regptr); fclose(regptr);
zmq_close (__zmq_publisher);
zmq_ctx_destroy (__zmq_context); zmq_close (publisher);
zmq_ctx_destroy (context);
return 0; return 0;
} }
void publish(int queue_id){ void publish(int queue_id, void* publisher){
printf("Publishing...\n");
// Build message header: // Build message header:
char buffer[ZMQ_MSG_SIZE]; char buffer[ZMQ_MSG_SIZE];
sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,__client,queues[queue_id].interval); sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,__client,queues[queue_id].interval);
printf(buffer);
int msglen=strlen(buffer); int msglen=strlen(buffer);
// Put every lines in the buffer and send it // Put every lines in the buffer and send it
char line[STATIC_LEN]; char line[STATIC_LEN];
for(int record=0;record<queues[queue_id].length;record++){ for(int record=0;record<queues[queue_id].length;record++){
*line='\0'; *line='\0';
sprintf(line,"%ld,%ld,%d",queues[queue_id].records[record].secs,queues[queue_id].records[record].nsecs,queues[queue_id].records[record].power); sprintf(line,"%ld,%ld,%d\n",queues[queue_id].records[record].secs,queues[queue_id].records[record].nsecs,queues[queue_id].records[record].power);
int linelength=strlen(line); int linelength=strlen(line);
if((linelength+msglen) <ZMQ_MSG_SIZE){ if((linelength+msglen)<ZMQ_MSG_SIZE){
strcat(buffer,line); strcat(buffer,line);
msglen+=linelength; msglen+=linelength;
} else { } else {
// It buffer is full, we send the message and create another one // It buffer is full, we send the message and create another one
zmq_send (__zmq_publisher, buffer, msglen, 0); zmq_send (publisher, buffer, msglen, 0);
// Build new message header: // Build new message header:
sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,__client,queues[queue_id].interval); sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,__client,queues[queue_id].interval);
strcat(buffer,line); strcat(buffer,line);
msglen=strlen(buffer); msglen=strlen(buffer);
} }
} }
printf("msglength=%d\n",msglen);
// Finally send the last message (or the only one) // Finally send the last message (or the only one)
zmq_send (__zmq_publisher, buffer, msglen, 0); zmq_send (publisher, buffer, msglen, 0);
} }

View file

@ -6,12 +6,12 @@
#include <string.h> #include <string.h>
#include <dirent.h> #include <dirent.h>
#define MIN_INTERVAL 30 #define MIN_INTERVAL 0
#define INA260_SYSFS "/sys/kernel/ina260" #define INA260_SYSFS "/sys/kernel/ina260"
#define INA260_POWER_REGISTER "registers/power" #define INA260_POWER_REGISTER "registers/power"
#define STATIC_LEN 255 #define STATIC_LEN 255
#define RECORD_QUEUES 1 #define RECORD_QUEUES 1
#define RECORD_MAX 1000 #define RECORD_MAX 100000
#ifndef ZMQ_TOKEN #ifndef ZMQ_TOKEN
#define ZMQ_TOKEN "ina260-zmq-publisher" #define ZMQ_TOKEN "ina260-zmq-publisher"