ina260-zmq-publisher/src/logger.c
2023-07-17 18:37:38 +02:00

148 lines
No EOL
3.7 KiB
C

#include <stdio.h>
#include <zmq.h>
#include <assert.h>
#include <libgen.h>
#include <unistd.h>
#include <sys/stat.h>
#include <signal.h>
#include "utils.h"
#ifndef LOGGERS_DELAY
#define LOGGERS_DELAY 0
#endif
// Global:
char *__client;
char *__ip;
char *__key;
int __port;
char __logdir[STATIC_LEN];
char __regpower[STATIC_LEN];
int __loginterval;
unsigned char __stop=0;
void sighandler(int signo){
if (signo == SIGINT){
printf("Stopping...\n");
__stop=1;
}
}
void publish(int queue_id, void* publisher);
typedef struct queue {
int size;
char msg[ZMQ_MSG_SIZE];
} queue;
queue queues[MAX_QUEUES];
int main (int argc, char *argv [])
{
if(argc != 7){
printf("Usage: %s <abslogdir> <client> <loginterval> <ip> <port> <key>",argv[0]);
exit(1);
}
//----- Init global variables
__client=argv[2];
__loginterval=atoi(argv[3]);
__ip=argv[4];
__port=atoi(argv[5]);
__key=argv[6];
// __logdir:
strcat(__logdir,argv[1]);
strcat(__logdir,"/");
strcat(__logdir,__client);
// __regpower:
strcat(__regpower,INA260_SYSFS);
strcat(__regpower,"/");
strcat(__regpower,__client);
strcat(__regpower,"/");
strcat(__regpower,INA260_POWER_REGISTER);
//----- Sanity checks
signal(SIGINT,sighandler);
mkdirp(__logdir);
if(__loginterval<MIN_INTERVAL){
printf("Log interval is too small (min=%ds)\n",MIN_INTERVAL);
exit(2);
}
if(FILE_EXISTS(__regpower)){
printf("Logger cannot access to %s\n",__regpower);
exit(3);
}
//----- Prepare our context and publisher
void *context = zmq_ctx_new ();
void *publisher = zmq_socket (context, ZMQ_PUB);
char bindto[STATIC_LEN];
sprintf(bindto,"tcp://%s:%d",__ip,__port);
int rc = zmq_connect (publisher, bindto);
if(rc!=0){
printf("Failed to connect to %s\n",bindto);
exit(1);
}
//----- Start logging
printf("Logger started [client=%s,interval=%ds]\n",__client,__loginterval);
FILE *regptr,*logptr;
char logfilepath[STATIC_LEN]="";
regptr=fopen("/home/loic/out.txt","r");
char buffer[STATIC_LEN];
int power;
time_t interval;
struct timespec power_ts;
int queue_id=0;
char *header[STATIC_LEN];
while(!__stop){
interval=INTERVAL(__loginterval);
// Log current interval
queue_id=(queue_id+1)>=MAX_QUEUES ? 0 : (queue_id+1);
printf("Queue id %d",queue_id);
*queues[queue_id].msg='\0';
sprintf(queues[queue_id].msg,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,__client,interval);
queues[queue_id].size=strlen(queues[queue_id].msg);
while((TIMESTAMP()-interval)<__loginterval){
if(__stop)
break;
// Read power:
fgets(buffer,STATIC_LEN,regptr);
// Get power measurement timestamp:
clock_gettime(CLOCK_REALTIME,&power_ts);
char line[MAX_RECORD_LEN];
sprintf(line,"%ld,%ld,%d\n",power_ts.tv_sec,power_ts.tv_nsec,atoi(buffer));
int linelen=strlen(line);
if((queues[queue_id].size+MAX_RECORD_LEN)>ZMQ_MSG_SIZE){
printf("To many measurements to publish. Please increase ZMQ_MSG_SIZE\n");
} else {
sprintf(queues[queue_id].msg+queues[queue_id].size,"%ld,%ld,%d\n",power_ts.tv_sec,power_ts.tv_nsec,atoi(buffer));
queues[queue_id].size+=strlen(queues[queue_id].msg+queues[queue_id].size);
}
// Reset power register file:
fseek(regptr,0,SEEK_SET);
#if LOGGERS_DELAY > 0
usleep(LOGGERS_DELAY*1000);
#endif
//printf("Tick\n"); fflush(stdout);
}
publish(queue_id,publisher);
}
fclose(regptr);
zmq_close (publisher);
zmq_ctx_destroy (context);
return 0;
}
void publish(int queue_id, void* publisher){
printf("Publishing...\n");
zmq_send(publisher,queues[queue_id].msg,queues[queue_id].size,0);
printf("Done\n");
}