#include #include #include #include #include #include #include #include #include "utils.h" #ifndef LOGGERS_DELAY #define LOGGERS_DELAY 0 #endif // Global: char *__client; char *__ip; char *__key; int __port; char __logdir[STATIC_LEN]; char __regpower[STATIC_LEN]; int __loginterval; unsigned char __stop=0; void sighandler(int signo){ if (signo == SIGINT){ printf("Stopping...\n"); __stop=1; } } void *publisher(void *zmq_publisher); typedef struct queue { int size; char issending; char msg[ZMQ_MSG_SIZE]; } queue; queue queues[MAX_QUEUES]; int main (int argc, char *argv []) { if(argc != 7){ printf("Usage: %s ",argv[0]); exit(1); } //----- Init global variables __client=argv[2]; __loginterval=atoi(argv[3]); __ip=argv[4]; __port=atoi(argv[5]); __key=argv[6]; // __logdir: strcat(__logdir,argv[1]); strcat(__logdir,"/"); strcat(__logdir,__client); // __regpower: strcat(__regpower,INA260_SYSFS); strcat(__regpower,"/"); strcat(__regpower,__client); strcat(__regpower,"/"); strcat(__regpower,INA260_POWER_REGISTER); //----- Sanity checks signal(SIGINT,sighandler); mkdirp(__logdir); if(__loginterval=MAX_QUEUES ? 0 : (queue_id+1); // Busy wait: while(queues[queue_id].issending){}; // Write msg header: *queues[queue_id].msg='\0'; sprintf(queues[queue_id].msg,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,__client,interval); queues[queue_id].size=strlen(queues[queue_id].msg); // Monitor: while((TIMESTAMP()-interval)<__loginterval){ if(__stop) break; // Read power: fgets(buffer,STATIC_LEN,regptr); // Get power measurement timestamp: clock_gettime(CLOCK_REALTIME,&power_ts); // Write measurement into msg buffer: char line[MAX_RECORD_LEN]; if((queues[queue_id].size+MAX_RECORD_LEN)>ZMQ_MSG_SIZE){ printf("To many measurements to publish. Please increase ZMQ_MSG_SIZE\n"); } else { sprintf(queues[queue_id].msg+queues[queue_id].size,"%ld,%ld,%d\n",power_ts.tv_sec,power_ts.tv_nsec,atoi(buffer)); queues[queue_id].size+=strlen(queues[queue_id].msg+queues[queue_id].size); } // Reset power register file: fseek(regptr,0,SEEK_SET); #if LOGGERS_DELAY > 0 usleep(LOGGERS_DELAY*1000); #endif //printf("Tick\n"); fflush(stdout); } queues[queue_id].issending=1; } fclose(regptr); pthread_join(zmq_thread, NULL); zmq_close (zmq_publisher); zmq_ctx_destroy (zmq_context); return 0; } void *publisher(void *zmq_publisher){ int queue_id=0; while(!__stop){ if(queues[queue_id].issending){ printf("Publishing..."); zmq_send(zmq_publisher,queues[queue_id].msg,queues[queue_id].size,0); queues[queue_id].issending=0; printf("done\n"); } else { #if LOGGERS_DELAY > 0 usleep(LOGGERS_DELAY*1000); #endif continue; } queue_id++; if(queue_id>=MAX_QUEUES) queue_id=0; } pthread_exit(EXIT_SUCCESS); }