ina260-zmq-publisher/src/publisher.c
2024-02-02 13:08:16 +01:00

191 lines
4.8 KiB
C

#include "utils.h"
#include <assert.h>
#include <libgen.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <unistd.h>
#include <zmq.h>
#include <fcntl.h>
#ifndef LOG_DELAY
#define LOG_DELAY 0
#endif
/// @brief Set to non-zero to stop the processes
unsigned char __stop = 0;
unsigned char __stop_zmq_thread = 0;
/// @brief Path of the break file (if found, publisher will stop publishing)
char __break_file[STATIC_LEN];
/**
* @brief Stop process properly on SIGINT
*
* @param signo
*/
void sighandler(int signo) {
if (signo == SIGINT) {
printf("Stopping...\n");
__stop = 1;
}
}
void *publisher(void *zmq_publisher);
void askforbreak(){
}
typedef struct queue {
int size;
char issending;
char msg[ZMQ_MSG_SIZE];
} queue;
queue queues[MAX_QUEUES];
int main(int argc, char *argv[]) {
if (argc != 6) {
printf("Usage: %s <client> <loginterval> <ip> <port> <key>",
argv[0]);
exit(1);
}
//----- Init arguments
char *client = argv[1];
int loginterval = atoi(argv[2]);
char *ip = argv[3];
int port = atoi(argv[4]);
char *key = argv[5];
// __regpower:
char regpower[STATIC_LEN]="";
strcat(regpower, INA260_SYSFS);
strcat(regpower, "/");
strcat(regpower, client);
strcat(regpower, "/");
strcat(regpower, INA260_POWER_OUTPUT);
// __break_file
strcat(__break_file, "/publisher_");
strcat(__break_file, key);
strcat(__break_file, "_break");
printf("%s\n",__break_file);
//----- Sanity checks
signal(SIGINT, sighandler);
if (loginterval < MIN_INTERVAL) {
printf("Log interval is too small (min=%ds)\n", MIN_INTERVAL);
exit(2);
}
if (!FILE_EXISTS(regpower)) {
printf("Logger cannot access to %s\n", regpower);
exit(3);
}
//----- Prepare our context and publisher
void *zmq_context = zmq_ctx_new();
void *zmq_publisher = zmq_socket(zmq_context, ZMQ_PUB);
char bindto[STATIC_LEN];
sprintf(bindto, "tcp://%s:%d", ip, port);
int rc = zmq_connect(zmq_publisher, bindto);
if (rc != 0) {
printf("Failed to connect to %s\n", bindto);
exit(1);
}
//----- Init logging variables
pthread_t zmq_thread;
FILE *logptr;
char logfilepath[STATIC_LEN] = "";
int regfd = open(regpower, O_RDONLY);
char buffer[STATIC_LEN];
time_t interval;
struct timespec power_ts;
int queue_id = 0;
// Init queues
for (int i = 0; i < MAX_QUEUES; i++) {
queues[queue_id].issending = 0;
}
pthread_create(&zmq_thread, NULL, publisher, zmq_publisher);
//----- Start logging
printf("Logger started [client=%s,interval=%ds]\n", client, loginterval);
while (!__stop) {
// Busy wait:
while (queues[queue_id].issending) {}
// Get current interval
interval = INTERVAL(loginterval);
// Write msg header:
*queues[queue_id].msg = '\0';
sprintf(queues[queue_id].msg, "%s\n%s\n%s\n%ld\n", ZMQ_TOKEN, key, client,
interval);
queues[queue_id].size = strlen(queues[queue_id].msg);
// Monitor:
while ((TIMESTAMP() - interval) < loginterval) {
// Check if should stop:
if (__stop)
break;
// Read power:
read(regfd, buffer, STATIC_LEN);
// Get power measurement timestamp:
clock_gettime(CLOCK_REALTIME, &power_ts);
// Write measurement into msg buffer:
char line[MAX_RECORD_LEN];
if ((queues[queue_id].size + MAX_RECORD_LEN) > ZMQ_MSG_SIZE) {
printf(
"Too many measurements to publish. Please increase ZMQ_MSG_SIZE\n");
} else {
sprintf(queues[queue_id].msg + queues[queue_id].size, "%ld,%ld,%f\n",
power_ts.tv_sec, power_ts.tv_nsec, atof(buffer));
queues[queue_id].size +=
strlen(queues[queue_id].msg + queues[queue_id].size);
}
// Reset power register file:
lseek(regfd,0,SEEK_SET);
#if LOG_DELAY > 0
usleep(LOG_DELAY * 1000);
#endif
}
queues[queue_id].issending = 1;
// Wait for next queue to be available:
queue_id = (queue_id + 1) >= MAX_QUEUES ? 0 : (queue_id + 1);
}
//----- Cleaning
close(regfd);
__stop_zmq_thread=1; // Ignite clean stop
pthread_join(zmq_thread, NULL);
zmq_close(zmq_publisher);
zmq_ctx_destroy(zmq_context);
return 0;
}
void *publisher(void *zmq_publisher) {
int queue_id = 0;
while (1) {
if(__stop_zmq_thread && !queues[queue_id].issending)
break;
if (queues[queue_id].issending) {
long long startat=TIMESTAMP();
printf("Publishing...");
zmq_send(zmq_publisher, queues[queue_id].msg, queues[queue_id].size, 0);
queues[queue_id].issending = 0;
printf("done (duration %lld)\n",TIMESTAMP()-startat);
} else {
#if LOG_DELAY > 0
usleep(LOG_DELAY * 1000);
#endif
continue; // Queues are always filled in order by the logger (from 0 to n)
}
queue_id++;
if (queue_id >= MAX_QUEUES)
queue_id = 0;
}
pthread_exit(EXIT_SUCCESS);
}