#include #include #include #include "utils.h" unsigned char __stop=0; void sighandler(int signo){ if (signo == SIGINT){ printf("Stopping...\n"); __stop=1; } } int main (int argc, char *argv []) { if(argc != 3){ printf("Usage: %s ",argv[0]); exit(1); } //----- Arguments int port=atoi(argv[1]); char *cdatadir=argv[2]; //----- Various inits mkdirp(cdatadir); signal(SIGINT,sighandler); //----- Init ZMQ void *context = zmq_ctx_new (); void *subscriber = zmq_socket (context, ZMQ_SUB); char bindto[STATIC_LEN]; sprintf(bindto,"tcp://*:%d",port); int rc = zmq_bind (subscriber, bindto); if(rc!=0){ printf("Failed to bind zmq on %s\n",bindto); exit(1); } rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, ZMQ_TOKEN, strlen(ZMQ_TOKEN)); //----- Listen char buffer[ZMQ_MSG_SIZE]; int size; while(!__stop){ size=zmq_recv (subscriber, buffer, ZMQ_MSG_SIZE-1, 0); if(size<=0) continue; buffer[size < ZMQ_MSG_SIZE ? size : ZMQ_MSG_SIZE - 1] = '\0'; //----- Read buffer char *token = strtok(buffer, "\n"); char key[STATIC_LEN]; char client[STATIC_LEN]; long int interval; FILE *fptr; int line=1; while(token != NULL){ if(line==2) strcpy(key,token); else if(line==3) strcpy(client,token); else if(line==4) interval=atoi(token); if(line==4){ double size_mib=size/(1024*1024); printf("Data received: key=%s client=%s interval=%ld msgsize=%.2lfMiB\n",key, client, interval,size_mib); char path[STATIC_LEN]=""; // Create dir if not exists: sprintf(path,"%s/%s/%s/",cdatadir,key,client); if(!DIR_EXISTS(path)){ mkdirp(path); } // Now open output file: sprintf(path,"%s/%s/%s/%ld",cdatadir,key,client,interval); fptr=fopen(path,"a"); } // Write all the measurements: if(line>4){ fwrite(token, strlen(token), 1, fptr); //printf("%s\n",token); fwrite("\n",1,1,fptr); } token=strtok(NULL, "\n"); line++; } fclose(fptr); } zmq_close (subscriber); zmq_ctx_destroy (context); return 0; }