Minor changes

This commit is contained in:
Loic Guegan 2023-07-18 13:00:38 +02:00
parent 55d9cd7567
commit 7f87211b01
5 changed files with 176 additions and 306 deletions

View file

@ -6,30 +6,29 @@ CFLAGS=
MACROS=\
-DZMQ_TOKEN=\"$(ZMQ_TOKEN)\" \
-DZMQ_MSG_SIZE=$(ZMQ_MSG_SIZE) \
-DLOGGERS_DELAY=$(LOGGERS_DELAY)
-DLOG_DELAY=$(LOG_DELAY)
all: publisher subscriber logger
all: publisher subscriber
publisher: src/publisher.c src/utils.c config.mk
$(CC) -lzmq $(filter-out config.mk,$^) -o $@ $(MACROS)
$(CC) -lzmq -lpthread $(filter-out config.mk,$^) -o $@ $(MACROS)
subscriber: src/subscriber.c src/utils.c config.mk
$(CC) -lzmq $(filter-out config.mk,$^) -o $@ $(MACROS)
logger: src/logger.c src/utils.c config.mk
$(CC) -lzmq -lpthread $(filter-out config.mk,$^) -o $@ $(MACROS)
publish: publisher logger
for client in $$(basename -a /sys/kernel/ina260/*); \
publish: publisher
[ -f pid ] && { cat pid|xargs kill -INT; rm pid; } || exit 0
for client in $$(ls /sys/kernel/ina260/|xargs basename -a); \
do \
./logger $(LOGGERS_DIR) $$client $(LOG_INTERVAL) $(SUBSCRIBER_ADDR) $(ZMQ_PORT) $(KEY) ; \
done
# [ -f pid ] && { kill -INT $(shell cat pid); rm pid; }
./$^ $$client $(LOG_INTERVAL) $(SUBSCRIBER_ADDR) $(ZMQ_PORT) $(KEY) > publisher_$${client}.log 2>&1 & \
echo $$! >> pid; \
done ;\
wait
subscribe: subscriber
./subscriber $(ZMQ_PORT) $(SUBSCRIBER_DIR)
./$^ $(ZMQ_PORT) $(SUBSCRIBER_DIR)
clean:
rm -f logger subscriber publisher
rm -f subscriber publisher
.PHONY: clean publish subscribe

View file

@ -13,13 +13,10 @@ ZMQ_TOKEN=ina260-zmq-publisher
ZMQ_MSG_SIZE=5242880
##### Loggers/Publisher
# LOGGERS_DIR will contains all the data generated by the loggers
# a.k.a the ina260 power measurements
LOGGERS_DIR=/tmp/ina260_logs/
# LOGGERS_DELAY defines the delay between 2 consecutive
# LOG_DELAY defines the delay between 2 consecutive
# ina260 power read performed by the logger
# Unit is milliseconds
LOGGERS_DELAY=1000
LOG_DELAY=1000
# SUBSCRIBER_DIR will contain all the measurments
# received from the publishers
SUBSCRIBER_DIR=./data

View file

@ -1,172 +0,0 @@
#include <stdio.h>
#include <zmq.h>
#include <assert.h>
#include <libgen.h>
#include <unistd.h>
#include <sys/stat.h>
#include <signal.h>
#include <pthread.h>
#include "utils.h"
#ifndef LOGGERS_DELAY
#define LOGGERS_DELAY 0
#endif
// Global:
char *__client;
char *__ip;
char *__key;
int __port;
char __logdir[STATIC_LEN];
char __regpower[STATIC_LEN];
int __loginterval;
unsigned char __stop=0;
void sighandler(int signo){
if (signo == SIGINT){
printf("Stopping...\n");
__stop=1;
}
}
void *publisher(void *zmq_publisher);
typedef struct queue {
int size;
char issending;
char msg[ZMQ_MSG_SIZE];
} queue;
queue queues[MAX_QUEUES];
int main (int argc, char *argv [])
{
if(argc != 7){
printf("Usage: %s <abslogdir> <client> <loginterval> <ip> <port> <key>",argv[0]);
exit(1);
}
//----- Init global variables
__client=argv[2];
__loginterval=atoi(argv[3]);
__ip=argv[4];
__port=atoi(argv[5]);
__key=argv[6];
// __logdir:
strcat(__logdir,argv[1]);
strcat(__logdir,"/");
strcat(__logdir,__client);
// __regpower:
strcat(__regpower,INA260_SYSFS);
strcat(__regpower,"/");
strcat(__regpower,__client);
strcat(__regpower,"/");
strcat(__regpower,INA260_POWER_REGISTER);
//----- Sanity checks
signal(SIGINT,sighandler);
mkdirp(__logdir);
if(__loginterval<MIN_INTERVAL){
printf("Log interval is too small (min=%ds)\n",MIN_INTERVAL);
exit(2);
}
if(FILE_EXISTS(__regpower)){
printf("Logger cannot access to %s\n",__regpower);
exit(3);
}
//----- Prepare our context and publisher
void *zmq_context = zmq_ctx_new ();
void *zmq_publisher = zmq_socket (zmq_context, ZMQ_PUB);
char bindto[STATIC_LEN];
sprintf(bindto,"tcp://%s:%d",__ip,__port);
int rc = zmq_connect (zmq_publisher, bindto);
if(rc!=0){
printf("Failed to connect to %s\n",bindto);
exit(1);
}
//----- Start logging
pthread_t zmq_thread;
printf("Logger started [client=%s,interval=%ds]\n",__client,__loginterval);
FILE *regptr,*logptr;
char logfilepath[STATIC_LEN]="";
regptr=fopen("/home/loic/out.txt","r");
char buffer[STATIC_LEN];
int power;
time_t interval;
struct timespec power_ts;
int queue_id=0;
// Init queues
for(int i=0;i<MAX_QUEUES;i++){
queues[queue_id].issending=0;
}
pthread_create(&zmq_thread, NULL, publisher, zmq_publisher);
while(!__stop){
interval=INTERVAL(__loginterval);
// Log current interval
queue_id=(queue_id+1)>=MAX_QUEUES ? 0 : (queue_id+1);
// Busy wait:
while(queues[queue_id].issending){};
// Write msg header:
*queues[queue_id].msg='\0';
sprintf(queues[queue_id].msg,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,__client,interval);
queues[queue_id].size=strlen(queues[queue_id].msg);
// Monitor:
while((TIMESTAMP()-interval)<__loginterval){
if(__stop)
break;
// Read power:
fgets(buffer,STATIC_LEN,regptr);
// Get power measurement timestamp:
clock_gettime(CLOCK_REALTIME,&power_ts);
// Write measurement into msg buffer:
char line[MAX_RECORD_LEN];
if((queues[queue_id].size+MAX_RECORD_LEN)>ZMQ_MSG_SIZE){
printf("To many measurements to publish. Please increase ZMQ_MSG_SIZE\n");
} else {
sprintf(queues[queue_id].msg+queues[queue_id].size,"%ld,%ld,%d\n",power_ts.tv_sec,power_ts.tv_nsec,atoi(buffer));
queues[queue_id].size+=strlen(queues[queue_id].msg+queues[queue_id].size);
}
// Reset power register file:
fseek(regptr,0,SEEK_SET);
#if LOGGERS_DELAY > 0
usleep(LOGGERS_DELAY*1000);
#endif
//printf("Tick\n"); fflush(stdout);
}
queues[queue_id].issending=1;
}
fclose(regptr);
pthread_join(zmq_thread, NULL);
zmq_close (zmq_publisher);
zmq_ctx_destroy (zmq_context);
return 0;
}
void *publisher(void *zmq_publisher){
int queue_id=0;
while(!__stop){
if(queues[queue_id].issending){
printf("Publishing...");
zmq_send(zmq_publisher,queues[queue_id].msg,queues[queue_id].size,0);
queues[queue_id].issending=0;
printf("done\n");
} else {
#if LOGGERS_DELAY > 0
usleep(LOGGERS_DELAY*1000);
#endif
continue;
}
queue_id++;
if(queue_id>=MAX_QUEUES)
queue_id=0;
}
pthread_exit(EXIT_SUCCESS);
}

View file

@ -1,130 +1,170 @@
#include <zmq.h>
#include <assert.h>
#include <signal.h>
#include "utils.h"
#include <assert.h>
#include <libgen.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <sys/stat.h>
#include <unistd.h>
#include <zmq.h>
// Global:
char *__logdir;
char *__key;
char *__interface;
char *__ip;
int __loginterval;
int __port;
unsigned char __stop=0;
#ifndef LOG_DELAY
#define LOG_DELAY 0
#endif
void publish(void *publisher, char *filepath, char* client, long int interval);
/// @brief Set to non-zero to stop the processes
unsigned char __stop = 0;
void sighandler(int signo){
if (signo == SIGINT){
/**
* @brief Stop process properly on SIGINT
*
* @param signo
*/
void sighandler(int signo) {
if (signo == SIGINT) {
printf("Stopping...\n");
__stop=1;
__stop = 1;
}
}
int main (int argc, char *argv [])
{
if(argc != 6){
printf("Usage: %s <abslogdir> <loginterval> <ip> <port> <key>",argv[0]);
exit(1);
void *publisher(void *zmq_publisher);
typedef struct queue {
int size;
char issending;
char msg[ZMQ_MSG_SIZE];
} queue;
queue queues[MAX_QUEUES];
int main(int argc, char *argv[]) {
if (argc != 6) {
printf("Usage: %s <client> <loginterval> <ip> <port> <key>",
argv[0]);
exit(1);
}
//----- Init arguments
char *client = argv[1];
int loginterval = atoi(argv[2]);
char *ip = argv[3];
int port = atoi(argv[4]);
char *key = argv[5];
// __regpower:
char regpower[STATIC_LEN]="";
strcat(regpower, INA260_SYSFS);
strcat(regpower, "/");
strcat(regpower, client);
strcat(regpower, "/");
strcat(regpower, INA260_POWER_REGISTER);
//----- Sanity checks
signal(SIGINT, sighandler);
if (loginterval < MIN_INTERVAL) {
printf("Log interval is too small (min=%ds)\n", MIN_INTERVAL);
exit(2);
}
if (!FILE_EXISTS(regpower)) {
printf("Logger cannot access to %s\n", regpower);
exit(3);
}
//----- Prepare our context and publisher
void *zmq_context = zmq_ctx_new();
void *zmq_publisher = zmq_socket(zmq_context, ZMQ_PUB);
char bindto[STATIC_LEN];
sprintf(bindto, "tcp://%s:%d", ip, port);
int rc = zmq_connect(zmq_publisher, bindto);
if (rc != 0) {
printf("Failed to connect to %s\n", bindto);
exit(1);
}
//----- Init logging variables
pthread_t zmq_thread;
FILE *regptr, *logptr;
char logfilepath[STATIC_LEN] = "";
regptr = fopen(regpower, "r");
char buffer[STATIC_LEN];
time_t interval;
struct timespec power_ts;
int queue_id = 0;
// Init queues
for (int i = 0; i < MAX_QUEUES; i++) {
queues[queue_id].issending = 0;
}
pthread_create(&zmq_thread, NULL, publisher, zmq_publisher);
//----- Start logging
printf("Logger started [client=%s,interval=%ds]\n", client, loginterval);
while (!__stop) {
interval = INTERVAL(loginterval);
// Log current interval
queue_id = (queue_id + 1) >= MAX_QUEUES ? 0 : (queue_id + 1);
// Busy wait:
while (queues[queue_id].issending) {
};
// Write msg header:
*queues[queue_id].msg = '\0';
sprintf(queues[queue_id].msg, "%s\n%s\n%s\n%ld\n", ZMQ_TOKEN, key, client,
interval);
queues[queue_id].size = strlen(queues[queue_id].msg);
// Monitor:
while ((TIMESTAMP() - interval) < loginterval) {
// Check if should stop:
if (__stop)
break;
// Read power:
fgets(buffer, STATIC_LEN, regptr);
// Get power measurement timestamp:
clock_gettime(CLOCK_REALTIME, &power_ts);
// Write measurement into msg buffer:
char line[MAX_RECORD_LEN];
if ((queues[queue_id].size + MAX_RECORD_LEN) > ZMQ_MSG_SIZE) {
printf(
"To many measurements to publish. Please increase ZMQ_MSG_SIZE\n");
} else {
sprintf(queues[queue_id].msg + queues[queue_id].size, "%ld,%ld,%d\n",
power_ts.tv_sec, power_ts.tv_nsec, atoi(buffer));
queues[queue_id].size +=
strlen(queues[queue_id].msg + queues[queue_id].size);
}
// Reset power register file:
fseek(regptr, 0, SEEK_SET);
#if LOG_DELAY > 0
usleep(LOG_DELAY * 1000);
#endif
}
queues[queue_id].issending = 1;
}
//----- Init global variables
__logdir=argv[1];
__loginterval=atoi(argv[2]);
__ip=argv[3];
__port=atoi(argv[4]);
__key=argv[5];
//----- Cleaning
fclose(regptr);
pthread_join(zmq_thread, NULL);
zmq_close(zmq_publisher);
zmq_ctx_destroy(zmq_context);
//----- Sanity checks
signal(SIGINT,sighandler);
if(__loginterval<MIN_INTERVAL){
printf("Log interval is too small (min=%ds)\n",MIN_INTERVAL);
exit(2);
}
//----- Prepare our context and publisher
void *context = zmq_ctx_new ();
void *publisher = zmq_socket (context, ZMQ_PUB);
char bindto[STATIC_LEN];
sprintf(bindto,"tcp://%s:%d",__ip,__port);
int rc = zmq_connect (publisher, bindto);
if(rc!=0){
printf("Failed to connect to %s\n",bindto);
exit(1);
}
//----- Start publisher
struct dirent *de; // Pointer for directory entry
while(!__stop){
int interval=INTERVAL(__loginterval); // Current interval
int interval_next=INTERVAL_NEXT(__loginterval); // The next one
DIR *dr = opendir(__logdir); // Open the log directory
if(dr !=NULL){
while ((de = readdir(dr)) != NULL){
// Iterate over each loggers directories
if(strcmp(de->d_name,".") && strcmp(de->d_name,"..")){
char *client=de->d_name; // Directory name is the client name
// Build the path for files we are looking for:
char logfile[STATIC_LEN];
char logfile_next[STATIC_LEN];
sprintf(logfile,"%s/%s/%ld",__logdir,client,interval);
sprintf(logfile_next,"%s/%s/%ld",__logdir,client,interval_next);
// As long as next logfile is not available, we should wait
// before sending the current one (since loggers are working on it)
printf("Waiting for %s logger measurements...\n",client);
while(!FILE_EXISTS(logfile_next) && (!__stop)){
sleep(1);
}
// Send current one:
if(FILE_EXISTS(logfile)){
printf("Publishing %s measurements\n",client);
publish(publisher,logfile,client,interval);
remove(logfile); // Prevent log accumulation
}
}
}
closedir(dr);
}
else {
// If open fails, directory probably do not exists
// yet so wait:
sleep(1);
}
}
zmq_close (publisher);
zmq_ctx_destroy (context);
return 0;
return 0;
}
void publish(void *publisher, char *filepath, char* client, long int interval){
// Build message header:
char buffer[ZMQ_MSG_SIZE];
sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,client,interval);
int msglen=strlen(buffer);
char * line = NULL;
size_t len = 0;
ssize_t read;
FILE *fptr;
fptr=fopen(filepath,"r");
// Put every lines in the buffer and send it
while ((read = getline(&line, &len, fptr)) != -1) {
if((read+msglen) <ZMQ_MSG_SIZE){
strcat(buffer,line);
msglen+=read;
} else {
// It buffer is full, we send the message and create another one
zmq_send (publisher, buffer, msglen, 0);
// Build new message header:
sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,client,interval);
strcat(buffer,line);
msglen=strlen(buffer);
}
void *publisher(void *zmq_publisher) {
int queue_id = 0;
while (!__stop) {
if (queues[queue_id].issending) {
printf("Publishing...");
zmq_send(zmq_publisher, queues[queue_id].msg, queues[queue_id].size, 0);
queues[queue_id].issending = 0;
printf("done\n");
} else {
#if LOG_DELAY > 0
usleep(LOG_DELAY * 1000);
#endif
continue; // Queues are always filled in order by the logger (from 0 to n)
}
fclose(fptr);
// Finally send the last message (or the only one)
zmq_send (publisher, buffer, msglen, 0);
queue_id++;
if (queue_id >= MAX_QUEUES)
queue_id = 0;
}
pthread_exit(EXIT_SUCCESS);
}

View file

@ -3,7 +3,14 @@
#include <signal.h>
#include "utils.h"
/// @brief Set to non-zero to stop the processes
unsigned char __stop=0;
/**
* @brief Stop process properly on SIGINT
*
* @param signo
*/
void sighandler(int signo){
if (signo == SIGINT){
printf("Stopping...\n");
@ -83,7 +90,6 @@ int main (int argc, char *argv [])
// Write all the measurements:
if(line>4){
fwrite(token, strlen(token), 1, fptr);
printf("%s\n",token);
fwrite("\n",1,1,fptr);
}