diff --git a/Makefile b/Makefile index 5ea15b4..d832451 100644 --- a/Makefile +++ b/Makefile @@ -22,10 +22,9 @@ logger: src/logger.c src/utils.c config.mk publish: publisher logger for client in $$(basename -a /sys/kernel/ina260/*); \ do \ - ./logger $(LOGGERS_DIR) $$client $(LOG_INTERVAL) &> logger_$${client}.log & echo $$! >> pid; \ + ./logger $(LOGGERS_DIR) $$client $(LOG_INTERVAL) $(SUBSCRIBER_ADDR) $(ZMQ_PORT) $(KEY) ; \ done - ./publisher $(LOGGERS_DIR) $(LOG_INTERVAL) $(SUBSCRIBER_ADDR) $(ZMQ_PORT) $(KEY) - [ -f pid ] && { kill -INT $(shell cat pid); rm pid; } +# [ -f pid ] && { kill -INT $(shell cat pid); rm pid; } subscribe: subscriber ./subscriber $(ZMQ_PORT) $(SUBSCRIBER_DIR) diff --git a/config.mk b/config.mk index a37b20e..34a3ced 100644 --- a/config.mk +++ b/config.mk @@ -19,7 +19,7 @@ LOGGERS_DIR=/tmp/ina260_logs/ # LOGGERS_DELAY defines the delay between 2 consecutive # ina260 power read performed by the logger # Unit is milliseconds -LOGGERS_DELAY=1000 +LOGGERS_DELAY=0 # SUBSCRIBER_DIR will contain all the measurments # received from the publishers SUBSCRIBER_DIR=./data @@ -28,7 +28,7 @@ SUBSCRIBER_DIR=./data # a file with all the power measurements is generated by the loggers in LOGGERS_DIR # This file, is then transmitted by the publishers to the subscriber # Unit is seconds -LOG_INTERVAL=30 +LOG_INTERVAL=5 # KEY is attached to all the messages published by the node # It allows you to filter the messages received on the client # if you are using multiple monitoring nodes (multiple publishers) diff --git a/src/logger.c b/src/logger.c index cf379c6..b44b5d0 100644 --- a/src/logger.c +++ b/src/logger.c @@ -20,8 +20,6 @@ char __logdir[STATIC_LEN]; char __regpower[STATIC_LEN]; int __loginterval; unsigned char __stop=0; -void *__zmq_context; -void *__zmq_publisher; void sighandler(int signo){ if (signo == SIGINT){ @@ -30,7 +28,7 @@ void sighandler(int signo){ } } -void publish(int queue_id); +void publish(int queue_id, void* publisher); typedef struct record { time_t secs; @@ -77,21 +75,21 @@ int main (int argc, char *argv []) printf("Log interval is too small (min=%ds)\n",MIN_INTERVAL); exit(2); } - if(FILE_EXISTS(__regpower)){ + if(!FILE_EXISTS(__regpower)){ printf("Logger cannot access to %s\n",__regpower); exit(3); } //----- Prepare our context and publisher - __zmq_context = zmq_ctx_new (); - __zmq_publisher = zmq_socket (__zmq_context, ZMQ_PUB); - char bindto[STATIC_LEN]; - sprintf(bindto,"tcp://%s:%d",__ip,__port); - int rc = zmq_connect (__zmq_publisher, bindto); - if(rc!=0){ - printf("Failed to connect to %s\n",bindto); - exit(1); - } + void *context = zmq_ctx_new (); + void *publisher = zmq_socket (context, ZMQ_PUB); + char bindto[STATIC_LEN]; + sprintf(bindto,"tcp://%s:%d",__ip,__port); + int rc = zmq_connect (publisher, bindto); + if(rc!=0){ + printf("Failed to connect to %s\n",bindto); + exit(1); + } @@ -100,7 +98,7 @@ int main (int argc, char *argv []) FILE *regptr,*logptr; char logfilepath[STATIC_LEN]=""; - regptr=fopen("/home/loic/out.txt","r"); + regptr=fopen(__regpower,"r"); char buffer[STATIC_LEN]; int power; time_t interval; @@ -110,7 +108,8 @@ int main (int argc, char *argv []) while(!__stop){ interval=INTERVAL(__loginterval); // Log current interval - queue_id=queue_id>=RECORD_QUEUES ? 0 : queue_id+1; + queue_id=(queue_id+1)>=RECORD_QUEUES ? 0 : (queue_id+1); + printf("Queue id %d",queue_id); queues[queue_id].interval=interval; int record=0; while((TIMESTAMP()-interval)<__loginterval){ @@ -136,41 +135,43 @@ int main (int argc, char *argv []) } } queues[queue_id].length=record; - publish(queue_id); + publish(queue_id,publisher); } fclose(regptr); - zmq_close (__zmq_publisher); - zmq_ctx_destroy (__zmq_context); + + zmq_close (publisher); + zmq_ctx_destroy (context); return 0; } -void publish(int queue_id){ +void publish(int queue_id, void* publisher){ + printf("Publishing...\n"); // Build message header: char buffer[ZMQ_MSG_SIZE]; sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,__client,queues[queue_id].interval); - printf(buffer); int msglen=strlen(buffer); // Put every lines in the buffer and send it char line[STATIC_LEN]; for(int record=0;record #include -#define MIN_INTERVAL 30 +#define MIN_INTERVAL 0 #define INA260_SYSFS "/sys/kernel/ina260" #define INA260_POWER_REGISTER "registers/power" #define STATIC_LEN 255 #define RECORD_QUEUES 1 -#define RECORD_MAX 1000 +#define RECORD_MAX 100000 #ifndef ZMQ_TOKEN #define ZMQ_TOKEN "ina260-zmq-publisher"