Minor changes

This commit is contained in:
Loic Guegan 2023-07-17 17:58:20 +02:00
parent c79a02c57b
commit 78b28ce923
17 changed files with 67 additions and 61 deletions

View file

@ -30,19 +30,12 @@ void sighandler(int signo){
void publish(int queue_id, void* publisher);
typedef struct record {
time_t secs;
long nsecs;
uint16_t power;
} record;
typedef struct queue {
int length;
long interval;
record records[RECORD_MAX];
int size;
char msg[ZMQ_MSG_SIZE];
} queue;
queue queues[RECORD_QUEUES];
queue queues[MAX_QUEUES];
int main (int argc, char *argv [])
{
@ -75,7 +68,7 @@ int main (int argc, char *argv [])
printf("Log interval is too small (min=%ds)\n",MIN_INTERVAL);
exit(2);
}
if(!FILE_EXISTS(__regpower)){
if(FILE_EXISTS(__regpower)){
printf("Logger cannot access to %s\n",__regpower);
exit(3);
}
@ -98,43 +91,45 @@ int main (int argc, char *argv [])
FILE *regptr,*logptr;
char logfilepath[STATIC_LEN]="";
regptr=fopen(__regpower,"r");
regptr=fopen("/home/loic/out.txt","r");
char buffer[STATIC_LEN];
int power;
time_t interval;
struct timespec power_ts;
int queue_id=0;
char *header[STATIC_LEN];
while(!__stop){
interval=INTERVAL(__loginterval);
// Log current interval
queue_id=(queue_id+1)>=RECORD_QUEUES ? 0 : (queue_id+1);
queue_id=(queue_id+1)>=MAX_QUEUES ? 0 : (queue_id+1);
printf("Queue id %d",queue_id);
queues[queue_id].interval=interval;
int record=0;
*queues[queue_id].msg='\0';
sprintf(queues[queue_id].msg,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,__client,interval);
queues[queue_id].size=strlen(queues[queue_id].msg);
while((TIMESTAMP()-interval)<__loginterval){
if(__stop)
break;
if(record < RECORD_MAX){
// Read power:
fgets(buffer,STATIC_LEN,regptr);
// Get power measurement timestamp:
clock_gettime(CLOCK_REALTIME,&power_ts);
queues[queue_id].records[record].secs=power_ts.tv_sec;
queues[queue_id].records[record].nsecs=power_ts.tv_nsec;
queues[queue_id].records[record].power=atoi(buffer);
// Reset power register file:
fseek(regptr,0,SEEK_SET);
#if LOGGERS_DELAY > 0
usleep(LOGGERS_DELAY*1000);
#endif
printf("Tick\n"); fflush(stdout);
record++;
// Read power:
fgets(buffer,STATIC_LEN,regptr);
// Get power measurement timestamp:
clock_gettime(CLOCK_REALTIME,&power_ts);
char line[STATIC_LEN];
sprintf(line,"%ld,%ld,%d\n",power_ts.tv_sec,power_ts.tv_nsec,atoi(buffer));
int linelen=strlen(line);
if(queues[queue_id].size+linelen>ZMQ_MSG_SIZE){
printf("To many measurements to publish. Please increase ZMQ_MSG_SIZE\n");
} else {
printf("Queue overflow, RECORD_MAX must be increase!! n=%d\n",record);
strcat(queues[queue_id].msg,line);
queues[queue_id].size+=strlen(line);
}
// Reset power register file:
fseek(regptr,0,SEEK_SET);
#if LOGGERS_DELAY > 0
usleep(LOGGERS_DELAY*1000);
#endif
printf("Tick\n"); fflush(stdout);
}
queues[queue_id].length=record;
publish(queue_id,publisher);
}
@ -148,30 +143,5 @@ int main (int argc, char *argv [])
void publish(int queue_id, void* publisher){
printf("Publishing...\n");
// Build message header:
char buffer[ZMQ_MSG_SIZE];
sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,__client,queues[queue_id].interval);
int msglen=strlen(buffer);
// Put every lines in the buffer and send it
char line[STATIC_LEN];
for(int record=0;record<queues[queue_id].length;record++){
*line='\0';
sprintf(line,"%ld,%ld,%d\n",queues[queue_id].records[record].secs,queues[queue_id].records[record].nsecs,queues[queue_id].records[record].power);
int linelength=strlen(line);
if((linelength+msglen)<ZMQ_MSG_SIZE){
strcat(buffer,line);
msglen+=linelength;
} else {
// It buffer is full, we send the message and create another one
zmq_send (publisher, buffer, msglen, 0);
// Build new message header:
sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,__client,queues[queue_id].interval);
strcat(buffer,line);
msglen=strlen(buffer);
}
}
printf("msglength=%d\n",msglen);
// Finally send the last message (or the only one)
zmq_send (publisher, buffer, msglen, 0);
zmq_send(publisher,queues[queue_id].msg,queues[queue_id].size,0);
}