From 7f87211b01bcc116eba8b01b73677ea87172e8d7 Mon Sep 17 00:00:00 2001 From: Loic Guegan Date: Tue, 18 Jul 2023 13:00:38 +0200 Subject: [PATCH] Minor changes --- Makefile | 25 +++-- config.mk | 7 +- src/logger.c | 172 ------------------------------ src/publisher.c | 270 +++++++++++++++++++++++++++-------------------- src/subscriber.c | 8 +- 5 files changed, 176 insertions(+), 306 deletions(-) delete mode 100644 src/logger.c diff --git a/Makefile b/Makefile index 46ad715..31dd09c 100644 --- a/Makefile +++ b/Makefile @@ -6,30 +6,29 @@ CFLAGS= MACROS=\ -DZMQ_TOKEN=\"$(ZMQ_TOKEN)\" \ -DZMQ_MSG_SIZE=$(ZMQ_MSG_SIZE) \ --DLOGGERS_DELAY=$(LOGGERS_DELAY) +-DLOG_DELAY=$(LOG_DELAY) -all: publisher subscriber logger +all: publisher subscriber publisher: src/publisher.c src/utils.c config.mk - $(CC) -lzmq $(filter-out config.mk,$^) -o $@ $(MACROS) + $(CC) -lzmq -lpthread $(filter-out config.mk,$^) -o $@ $(MACROS) subscriber: src/subscriber.c src/utils.c config.mk $(CC) -lzmq $(filter-out config.mk,$^) -o $@ $(MACROS) -logger: src/logger.c src/utils.c config.mk - $(CC) -lzmq -lpthread $(filter-out config.mk,$^) -o $@ $(MACROS) - -publish: publisher logger - for client in $$(basename -a /sys/kernel/ina260/*); \ +publish: publisher + [ -f pid ] && { cat pid|xargs kill -INT; rm pid; } || exit 0 + for client in $$(ls /sys/kernel/ina260/|xargs basename -a); \ do \ - ./logger $(LOGGERS_DIR) $$client $(LOG_INTERVAL) $(SUBSCRIBER_ADDR) $(ZMQ_PORT) $(KEY) ; \ - done -# [ -f pid ] && { kill -INT $(shell cat pid); rm pid; } + ./$^ $$client $(LOG_INTERVAL) $(SUBSCRIBER_ADDR) $(ZMQ_PORT) $(KEY) > publisher_$${client}.log 2>&1 & \ + echo $$! >> pid; \ + done ;\ + wait subscribe: subscriber - ./subscriber $(ZMQ_PORT) $(SUBSCRIBER_DIR) + ./$^ $(ZMQ_PORT) $(SUBSCRIBER_DIR) clean: - rm -f logger subscriber publisher + rm -f subscriber publisher .PHONY: clean publish subscribe diff --git a/config.mk b/config.mk index 92232f7..15dd5d6 100644 --- a/config.mk +++ b/config.mk @@ -13,13 +13,10 @@ ZMQ_TOKEN=ina260-zmq-publisher ZMQ_MSG_SIZE=5242880 ##### Loggers/Publisher -# LOGGERS_DIR will contains all the data generated by the loggers -# a.k.a the ina260 power measurements -LOGGERS_DIR=/tmp/ina260_logs/ -# LOGGERS_DELAY defines the delay between 2 consecutive +# LOG_DELAY defines the delay between 2 consecutive # ina260 power read performed by the logger # Unit is milliseconds -LOGGERS_DELAY=1000 +LOG_DELAY=1000 # SUBSCRIBER_DIR will contain all the measurments # received from the publishers SUBSCRIBER_DIR=./data diff --git a/src/logger.c b/src/logger.c deleted file mode 100644 index 7655f75..0000000 --- a/src/logger.c +++ /dev/null @@ -1,172 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include "utils.h" - -#ifndef LOGGERS_DELAY -#define LOGGERS_DELAY 0 -#endif - -// Global: -char *__client; -char *__ip; -char *__key; -int __port; -char __logdir[STATIC_LEN]; -char __regpower[STATIC_LEN]; -int __loginterval; -unsigned char __stop=0; - -void sighandler(int signo){ - if (signo == SIGINT){ - printf("Stopping...\n"); - __stop=1; - } -} - -void *publisher(void *zmq_publisher); - -typedef struct queue { - int size; - char issending; - char msg[ZMQ_MSG_SIZE]; -} queue; -queue queues[MAX_QUEUES]; - -int main (int argc, char *argv []) -{ - if(argc != 7){ - printf("Usage: %s ",argv[0]); - exit(1); - } - - //----- Init global variables - __client=argv[2]; - __loginterval=atoi(argv[3]); - __ip=argv[4]; - __port=atoi(argv[5]); - __key=argv[6]; - // __logdir: - strcat(__logdir,argv[1]); - strcat(__logdir,"/"); - strcat(__logdir,__client); - // __regpower: - strcat(__regpower,INA260_SYSFS); - strcat(__regpower,"/"); - strcat(__regpower,__client); - strcat(__regpower,"/"); - strcat(__regpower,INA260_POWER_REGISTER); - - //----- Sanity checks - signal(SIGINT,sighandler); - mkdirp(__logdir); - if(__loginterval=MAX_QUEUES ? 0 : (queue_id+1); - // Busy wait: - while(queues[queue_id].issending){}; - // Write msg header: - *queues[queue_id].msg='\0'; - sprintf(queues[queue_id].msg,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,__client,interval); - queues[queue_id].size=strlen(queues[queue_id].msg); - // Monitor: - while((TIMESTAMP()-interval)<__loginterval){ - if(__stop) - break; - // Read power: - fgets(buffer,STATIC_LEN,regptr); - // Get power measurement timestamp: - clock_gettime(CLOCK_REALTIME,&power_ts); - // Write measurement into msg buffer: - char line[MAX_RECORD_LEN]; - if((queues[queue_id].size+MAX_RECORD_LEN)>ZMQ_MSG_SIZE){ - printf("To many measurements to publish. Please increase ZMQ_MSG_SIZE\n"); - } else { - sprintf(queues[queue_id].msg+queues[queue_id].size,"%ld,%ld,%d\n",power_ts.tv_sec,power_ts.tv_nsec,atoi(buffer)); - queues[queue_id].size+=strlen(queues[queue_id].msg+queues[queue_id].size); - } - // Reset power register file: - fseek(regptr,0,SEEK_SET); -#if LOGGERS_DELAY > 0 - usleep(LOGGERS_DELAY*1000); -#endif - //printf("Tick\n"); fflush(stdout); - } - queues[queue_id].issending=1; - } - - fclose(regptr); - pthread_join(zmq_thread, NULL); - zmq_close (zmq_publisher); - zmq_ctx_destroy (zmq_context); - return 0; -} - - -void *publisher(void *zmq_publisher){ - int queue_id=0; - while(!__stop){ - if(queues[queue_id].issending){ - printf("Publishing..."); - zmq_send(zmq_publisher,queues[queue_id].msg,queues[queue_id].size,0); - queues[queue_id].issending=0; - printf("done\n"); - } else { -#if LOGGERS_DELAY > 0 - usleep(LOGGERS_DELAY*1000); -#endif - continue; - } - queue_id++; - if(queue_id>=MAX_QUEUES) - queue_id=0; - } - pthread_exit(EXIT_SUCCESS); -} \ No newline at end of file diff --git a/src/publisher.c b/src/publisher.c index 8c8c2f3..03b3a1a 100644 --- a/src/publisher.c +++ b/src/publisher.c @@ -1,130 +1,170 @@ -#include -#include -#include - #include "utils.h" +#include +#include +#include +#include +#include +#include +#include +#include -// Global: -char *__logdir; -char *__key; -char *__interface; -char *__ip; -int __loginterval; -int __port; -unsigned char __stop=0; +#ifndef LOG_DELAY +#define LOG_DELAY 0 +#endif -void publish(void *publisher, char *filepath, char* client, long int interval); +/// @brief Set to non-zero to stop the processes +unsigned char __stop = 0; -void sighandler(int signo){ - if (signo == SIGINT){ +/** + * @brief Stop process properly on SIGINT + * + * @param signo + */ +void sighandler(int signo) { + if (signo == SIGINT) { printf("Stopping...\n"); - __stop=1; + __stop = 1; } } -int main (int argc, char *argv []) -{ - if(argc != 6){ - printf("Usage: %s ",argv[0]); - exit(1); +void *publisher(void *zmq_publisher); + +typedef struct queue { + int size; + char issending; + char msg[ZMQ_MSG_SIZE]; +} queue; +queue queues[MAX_QUEUES]; + +int main(int argc, char *argv[]) { + if (argc != 6) { + printf("Usage: %s ", + argv[0]); + exit(1); + } + + //----- Init arguments + char *client = argv[1]; + int loginterval = atoi(argv[2]); + char *ip = argv[3]; + int port = atoi(argv[4]); + char *key = argv[5]; + + // __regpower: + char regpower[STATIC_LEN]=""; + strcat(regpower, INA260_SYSFS); + strcat(regpower, "/"); + strcat(regpower, client); + strcat(regpower, "/"); + strcat(regpower, INA260_POWER_REGISTER); + + //----- Sanity checks + signal(SIGINT, sighandler); + if (loginterval < MIN_INTERVAL) { + printf("Log interval is too small (min=%ds)\n", MIN_INTERVAL); + exit(2); + } + if (!FILE_EXISTS(regpower)) { + printf("Logger cannot access to %s\n", regpower); + exit(3); + } + + //----- Prepare our context and publisher + void *zmq_context = zmq_ctx_new(); + void *zmq_publisher = zmq_socket(zmq_context, ZMQ_PUB); + char bindto[STATIC_LEN]; + sprintf(bindto, "tcp://%s:%d", ip, port); + int rc = zmq_connect(zmq_publisher, bindto); + if (rc != 0) { + printf("Failed to connect to %s\n", bindto); + exit(1); + } + + //----- Init logging variables + pthread_t zmq_thread; + FILE *regptr, *logptr; + char logfilepath[STATIC_LEN] = ""; + regptr = fopen(regpower, "r"); + char buffer[STATIC_LEN]; + time_t interval; + struct timespec power_ts; + int queue_id = 0; + + // Init queues + for (int i = 0; i < MAX_QUEUES; i++) { + queues[queue_id].issending = 0; + } + pthread_create(&zmq_thread, NULL, publisher, zmq_publisher); + + //----- Start logging + printf("Logger started [client=%s,interval=%ds]\n", client, loginterval); + while (!__stop) { + interval = INTERVAL(loginterval); + // Log current interval + queue_id = (queue_id + 1) >= MAX_QUEUES ? 0 : (queue_id + 1); + // Busy wait: + while (queues[queue_id].issending) { + }; + // Write msg header: + *queues[queue_id].msg = '\0'; + sprintf(queues[queue_id].msg, "%s\n%s\n%s\n%ld\n", ZMQ_TOKEN, key, client, + interval); + queues[queue_id].size = strlen(queues[queue_id].msg); + // Monitor: + while ((TIMESTAMP() - interval) < loginterval) { + // Check if should stop: + if (__stop) + break; + // Read power: + fgets(buffer, STATIC_LEN, regptr); + // Get power measurement timestamp: + clock_gettime(CLOCK_REALTIME, &power_ts); + // Write measurement into msg buffer: + char line[MAX_RECORD_LEN]; + if ((queues[queue_id].size + MAX_RECORD_LEN) > ZMQ_MSG_SIZE) { + printf( + "To many measurements to publish. Please increase ZMQ_MSG_SIZE\n"); + } else { + sprintf(queues[queue_id].msg + queues[queue_id].size, "%ld,%ld,%d\n", + power_ts.tv_sec, power_ts.tv_nsec, atoi(buffer)); + queues[queue_id].size += + strlen(queues[queue_id].msg + queues[queue_id].size); + } + // Reset power register file: + fseek(regptr, 0, SEEK_SET); +#if LOG_DELAY > 0 + usleep(LOG_DELAY * 1000); +#endif } + queues[queue_id].issending = 1; + } - //----- Init global variables - __logdir=argv[1]; - __loginterval=atoi(argv[2]); - __ip=argv[3]; - __port=atoi(argv[4]); - __key=argv[5]; + //----- Cleaning + fclose(regptr); + pthread_join(zmq_thread, NULL); + zmq_close(zmq_publisher); + zmq_ctx_destroy(zmq_context); - //----- Sanity checks - signal(SIGINT,sighandler); - if(__logintervald_name,".") && strcmp(de->d_name,"..")){ - char *client=de->d_name; // Directory name is the client name - // Build the path for files we are looking for: - char logfile[STATIC_LEN]; - char logfile_next[STATIC_LEN]; - sprintf(logfile,"%s/%s/%ld",__logdir,client,interval); - sprintf(logfile_next,"%s/%s/%ld",__logdir,client,interval_next); - // As long as next logfile is not available, we should wait - // before sending the current one (since loggers are working on it) - printf("Waiting for %s logger measurements...\n",client); - while(!FILE_EXISTS(logfile_next) && (!__stop)){ - sleep(1); - } - // Send current one: - if(FILE_EXISTS(logfile)){ - printf("Publishing %s measurements\n",client); - publish(publisher,logfile,client,interval); - remove(logfile); // Prevent log accumulation - } - } - } - closedir(dr); - } - else { - // If open fails, directory probably do not exists - // yet so wait: - sleep(1); - } - } - - zmq_close (publisher); - zmq_ctx_destroy (context); - - return 0; + return 0; } -void publish(void *publisher, char *filepath, char* client, long int interval){ - // Build message header: - char buffer[ZMQ_MSG_SIZE]; - sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,client,interval); - int msglen=strlen(buffer); - - char * line = NULL; - size_t len = 0; - ssize_t read; - FILE *fptr; - fptr=fopen(filepath,"r"); - // Put every lines in the buffer and send it - while ((read = getline(&line, &len, fptr)) != -1) { - if((read+msglen) 0 + usleep(LOG_DELAY * 1000); +#endif + continue; // Queues are always filled in order by the logger (from 0 to n) } - fclose(fptr); - // Finally send the last message (or the only one) - zmq_send (publisher, buffer, msglen, 0); + queue_id++; + if (queue_id >= MAX_QUEUES) + queue_id = 0; + } + pthread_exit(EXIT_SUCCESS); } \ No newline at end of file diff --git a/src/subscriber.c b/src/subscriber.c index 9902d92..08e5a25 100644 --- a/src/subscriber.c +++ b/src/subscriber.c @@ -3,7 +3,14 @@ #include #include "utils.h" +/// @brief Set to non-zero to stop the processes unsigned char __stop=0; + +/** + * @brief Stop process properly on SIGINT + * + * @param signo + */ void sighandler(int signo){ if (signo == SIGINT){ printf("Stopping...\n"); @@ -83,7 +90,6 @@ int main (int argc, char *argv []) // Write all the measurements: if(line>4){ fwrite(token, strlen(token), 1, fptr); - printf("%s\n",token); fwrite("\n",1,1,fptr); }