// Weather update client // Connects SUB socket to tcp://localhost:5556 // Collects weather updates and finds avg temp in zipcode #include #include #include #include #include "utils.h" int main (int argc, char *argv []) { if(argc != 4){ printf("Usage: %s
",argv[0]); exit(1); } //----- Arguments char *ip=argv[1]; int port=atoi(argv[2]); char *cdatadir=argv[3]; //----- Various inits mkdirp(cdatadir); //----- Init ZMQ void *context = zmq_ctx_new (); void *subscriber = zmq_socket (context, ZMQ_SUB); char bindto[30]; sprintf(bindto,"tcp://%s:%d",ip,port); int rc = zmq_connect (subscriber, bindto); if(rc!=0){ printf("Failed to bind zmq on %s\n",bindto); exit(1); } rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, ZMQ_TOKEN, strlen(ZMQ_TOKEN)); //----- Listen char buffer[ZMQ_MSG_SIZE]; int size; while(1){ size=zmq_recv (subscriber, buffer, ZMQ_MSG_SIZE-1, 0); buffer[size < ZMQ_MSG_SIZE ? size : ZMQ_MSG_SIZE - 1] = '\0'; //----- Read buffer char *token = strtok(buffer, "\n"); char key[255]; char client[255]; long int interval; FILE *fptr; int line=1; while(token != NULL){ if(line==2) strcpy(key,token); else if(line==3) strcpy(client,token); else if(line==4) interval=atoi(token); if(line==4){ char path[255]=""; sprintf(path,"%s/%s_%s_%ld",cdatadir,key,client,interval); fptr=fopen(path,"a"); } if(line>4){ fwrite(token, strlen(token), 1, fptr); fwrite("\n",2,1,fptr); } token=strtok(NULL, "\n"); line++; } fclose(fptr); } zmq_close (subscriber); zmq_ctx_destroy (context); return 0; }