/* * File Name: p2p.c * Author: Jade Cheng * Date: March 29, 2009 * Course: ICS 451 * Assignment: Project 2 */ #include "common.h" #include "data.h" #include "log.h" #include "messages.h" #include "timehelp.h" #include "udp.h" /** The minimum number of arguments. */ #define MIN_ARGS 2 /** The standard input file descriptor. */ #define STDIN_FD 0 /** The time interval for sending listing messages. */ #define LISTING_INTERVAL 240 static void listing_msg_create(listing_msg_t * msg); static int main_loop(void); static int parse_uchar(const char * s, char ** endp, u_char * result); static int parse_ushort(const char * s, char ** endp, u_short * result); static int parse_peer(const char * s, peer_t * result); static void print_prompt(void); static int process_stdin(void); static int process_udp(void); static int receive_data(msg_buf_t * buf, const peer_t * peer); static int receive_listing(msg_buf_t * buf, const peer_t * peer); static int receive_request(msg_buf_t * buf, const peer_t * peer); static int receive_try(msg_buf_t * buf, const peer_t * peer); static void send_listing(void); static int send_request(const char * name); static void send_request_to_peer( const peer_t * to, const void * buffer, size_t size); /** The peers obtained from the command line. */ static peers_t g_peers; /** The local content obtained from the command line. */ static local_content_t g_local_content; /** The data cache. */ static data_cache_t g_data_cache; /** The content directory cache. */ static content_dir_t g_content_dir; /** The outstanding request cache. */ static request_cache_t g_requests; /* ------------------------------------------------------------------------ */ /** * This is the main entry of the program. * * @param argc The number of arguments. * @param argv The arguments. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ extern int main(int argc, const char * argv[]) { char * end; int i; peer_t peer; u_short port; #ifdef DEBUG /* Print out a message if the program is running in debug mode. */ printf("Running in DEBUG mode...\n"); #endif /* Check argc and print out the usage if program is not called right. */ if (argc < MIN_ARGS || argc > MAX_ARGS) { fprintf(stderr, "usage: p2p port [options]\n"); RETURN_FAILURE(); } /* Parse the second argument, the listening port for UDP. */ if (EXIT_SUCCESS != parse_ushort(argv[1], &end, &port)) { fprintf(stderr, "usage: port number type.\n"); RETURN_FAILURE(); } /* Examine the later parameters. */ for (i = 2; i < argc; i++) { const char * arg = argv[i]; /* If the argument contains a '/', parse it as a peer and add it to * the peers collection if the parsing is successful. Ortherwise * report the error. */ if (strchr(arg, '/') != NULL && EXIT_SUCCESS == parse_peer(arg, &peer)) { if (EXIT_SUCCESS != peers_add(&g_peers, &peer)) { fprintf(stderr, "Cannot add '%s' to list of peers.\n", arg); RETURN_FAILURE(); } /* If the argument doesn't contain a '/', parse it as a file name and * add it to the local content if the parsing is successful. */ } else if (EXIT_SUCCESS == verify_name(arg)) { if (EXIT_SUCCESS != local_content_add(&g_local_content, arg)) { fprintf(stderr, "Cannot add '%s' to local content.\n", arg); RETURN_FAILURE(); } /* Anything other than a peer and a file name is not allowed. */ } else { fprintf(stderr, "Invalid command line argument '%s'.\n", arg); RETURN_FAILURE(); } } /* Initialize the log file. */ log_init(); /* Print out the welcome message. */ printf("\n"); printf("Jade p2p\n"); printf("\n"); printf(" Type '!quit' to quit.\n"); printf(" Type '!info' to display information.\n"); printf(" Type anything else to make a request.\n"); printf("\n"); /* Start UDP, execute the main loop, and stop UDP before terminating. */ udp_start(port); main_loop(); udp_stop(); return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ /** * Creates a listing message. * * @param msg The listing message. */ static void listing_msg_create(listing_msg_t * msg) { size_t i; assert(msg != NULL); assert(g_local_content.count + g_data_cache.count <= MAX_LISTING_COUNT); memset(msg, 0, sizeof(*msg)); /* Add all items from the local content. */ for (i = 0; i < g_local_content.count; i++) strcpy(msg->entries[msg->count++], g_local_content.items[i].name); /* Add all items from the data cache. */ for (i = 0; i < g_data_cache.count; i++) strcpy(msg->entries[msg->count++], g_data_cache.items[i].name); } /* ------------------------------------------------------------------------ */ /** * Performs the main loop of the program. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ static int main_loop(void) { timeval listing_timeout; printf("(p2p) "); fflush(stdout); /* Send a listing message when p2p starts, and compute the time to send * the next listing message. */ send_listing(); listing_timeout = time_add(time_now(), time_seconds(LISTING_INTERVAL)); for (;;) { int fd; timeval now; fd_set readfds; int result; timeval request_timeout; timeval select_timeout; /* The select function waits for standard input or the UDP socket. */ fd = udp_fd(); FD_ZERO(&readfds); FD_SET(STDIN_FD, &readfds); FD_SET(fd, &readfds); /* Send a listing message when current time passes the last computed * time to send a listing message. The next time to send is the last * computed time plus a listing message interval. */ now = time_now(); if (time_cmp(now, listing_timeout) > 0) { send_listing(); listing_timeout = time_add( listing_timeout, time_seconds(LISTING_INTERVAL)); } /* Set the timeout to the next time to send a listing message. */ select_timeout = time_sub(listing_timeout, now); assert(time_cmp(select_timeout, time_seconds(0)) > 0); /* Alternatively, if a request timeout occurs sooner, use this as the * select timeout. */ if (EXIT_SUCCESS == request_get_next_timeout( &g_requests, &request_timeout)) { request_timeout = time_sub(request_timeout, now); if (time_cmp(request_timeout, select_timeout) < 0) select_timeout = request_timeout; } /* Wait for standard input, UDP input, or a timeout. */ result = select(fd + 1, &readfds, NULL, NULL, &select_timeout); RETURN_IF_FALSE(result != -1); assert(result >= 0); /* Expire the outdated requests, and at the same time clean up the * content directory cache collection every time select returns. */ if (EXIT_SUCCESS != request_expire(&g_requests, &g_content_dir)) if (!FD_ISSET(STDIN_FD, &readfds)) print_prompt(); /* Process standard input if necessary. */ if (FD_ISSET(STDIN_FD, &readfds)) { if (EXIT_SUCCESS != process_stdin()) break; } /* Process UDP input if necessary. */ if (FD_ISSET(fd, &readfds)) RETURN_IF_FAILED(process_udp()); } return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ static int parse_uchar(const char * s, char ** endp, u_char * result) { u_long temp; temp = strtoul(s, endp, 10); if (*endp == NULL ||errno != 0 || *endp == s) { RETURN_FAILURE(); } if (temp > UCHAR_MAX) { RETURN_FAILURE(); } *result = (u_char)temp; return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ /** * Parses an unsigned short from the command line. * * @param s The source string. * @param endp The end pointer. * @param result The result. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ static int parse_ushort(const char * s, char ** endp, u_short * result) { u_long temp; temp = strtoul(s, endp, 10); if (*endp == NULL || errno != 0 || *endp == s) { RETURN_FAILURE(); } if (temp > USHRT_MAX) { RETURN_FAILURE(); } *result = (u_short)temp; return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ /** * Parses a peer from the command line. * * @param s The source string. * @param result The peer. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ static int parse_peer(const char * s, peer_t * result) { char buf[16]; char * end; int i; u_char num[4]; u_short port; /* Read the four numbers. */ for (i = 0; i < 4; i ++) { if (EXIT_SUCCESS != parse_uchar(s, &end, &num[i])) { RETURN_FAILURE(); } if (i != 3) { if (*end != '.') { RETURN_FAILURE(); } } else { if (*end != '/') { RETURN_FAILURE(); } s = end + 1; if (EXIT_SUCCESS != parse_ushort(s, &end, &port)) { RETURN_FAILURE(); } if (*end != '\0') { printf("end = %c\n", *end); RETURN_FAILURE(); } } s = end + 1; } /* Use inet_pton to parse the peer address. */ sprintf(buf, "%u.%u.%u.%u", num[0], num[1], num[2], num[3]); inet_pton(AF_INET, buf, &result->addr); result->port = port; return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ /** * Prints a prompt and flushes standard output. */ static void print_prompt(void) { printf("(p2p) "); fflush(stdout); } /* ------------------------------------------------------------------------ */ /** * Processes incoming standard input data. This functions handles special * commands like "!quit" and "!info". Anything else is treated as file name * requests. This function returns EXIT_FAILURE when the program should * terminate. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ static int process_stdin(void) { size_t count; char line[1000]; line[0] = '\0'; count = 0; /* Read until nothing to read or '\n' is reached. */ for (;;) { char ch; ssize_t r; /* Check for no more characters. */ r = read(STDIN_FD, &ch, 1); RETURN_IF_FALSE(r != -1); assert(r >= 0); if (r == 0) break; /* Check for an end of line. */ assert(r == 1); if (ch == '\n') break; /* Otherwise, add the character to the line. */ if (count < sizeof(line) - 1) { line[count++] = ch; line[count] = '\0'; } } /* Quit if "!quit" is entered. */ if (0 == strcmp(line, "!quit")) return EXIT_FAILURE; /* Prints out collection contents if "!info" is entered. */ if (0 == strcmp(line, "!info")) { local_content_print(&g_local_content); printf("\n"); data_cache_print(&g_data_cache); printf("\n"); content_dir_print(&g_content_dir); printf("\n"); peers_print(&g_peers); printf("\n"); request_print(&g_requests); printf("\n"); /* Otherwise, parse the input as a file name and request this file. */ } else if (0 != strcmp(line, "")) { if (EXIT_SUCCESS != verify_name(line)) { fprintf(stderr, "Invalid file name: '%s'\n\n", line); } else { (void)send_request(line); } } print_prompt(); return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ /** * Processes incoming UDP data. Based on the incoming header, this function * calls other functions to handle specific messages. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ static int process_udp(void) { msg_buf_t buf; peer_t peer; RETURN_IF_FAILED(udp_recv( buf.buffer, sizeof(buf.buffer), &buf.size, &peer)); switch (get_msg_type(&buf)) { case msg_type_data: receive_data(&buf, &peer); break; case msg_type_listing: receive_listing(&buf, &peer); break; case msg_type_request: receive_request(&buf, &peer); break; case msg_type_try: receive_try(&buf, &peer); break; case msg_type_unknown: default: break; } return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ /** * Processes an incoming data message. If the data received does not * correspond to an outstanding request, the event is logged and nothing is * modified. Otherwise the data received is saved to a local file, the * corresponding request is removed, and the data cache is updated. * * @param buf The message buffer. * @param peer The peer that this message is received from. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ static int receive_data(msg_buf_t * buf, const peer_t * peer) { FILE * f; size_t index; data_msg_t msg; assert(buf != NULL); assert(peer != NULL); /* Parse the message as a data message. */ memset(&msg, 0, sizeof(msg)); RETURN_IF_FAILED(data_msg_read(buf, &msg)); /* Determine if the name corresponds to an outstanding request. */ if (request_find(&g_requests, msg.name, &index) == EXIT_FAILURE) { FILE * f; if ((f = log_open()) != NULL) { fprintf(f, "Discarding data message for %s because it does not\n" "correspond to an outstanding request.\n", msg.name); log_close(f); } return EXIT_SUCCESS; } /* Interrupt the user and print a message indicating it was downloaded. */ printf("\n\ncontent found (%s)\n\n", msg.name); print_prompt(); /* Delete from the outstanding requests. */ request_remove(&g_requests, index); /* Write the received data to a local file with the received file name. */ f = fopen(msg.name, "wb"); RETURN_IF_FALSE(f != NULL); if (fwrite(msg.data, 1, msg.size, f) != msg.size) { fclose(f); RETURN_FAILURE(); } fclose(f); /* Write the received data to the data cache collection. */ data_cache_add(&g_data_cache, msg.name, msg.data, msg.size); /* Write this entry to content directory if it does not already exist. */ content_dir_add(&g_content_dir, msg.name, peer); return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ /** * Processes an incoming listing message. The received file and peer * information is added to the content directory collection. Duplicated * entries are not added more than once. * * @param buf The message buffer. * @param peer The peer that this message is received from. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ static int receive_listing(msg_buf_t * buf, const peer_t * peer) { size_t i; listing_msg_t msg; assert(buf != NULL); assert(peer != NULL); /* Parse the message buffer as a listing message. */ memset(&msg, 0, sizeof(msg)); RETURN_IF_FAILED(listing_msg_read(buf, &msg)); /* Add each item to the collection. */ for (i = 0; i < msg.count; i++) content_dir_add(&g_content_dir, msg.entries[i], peer); return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ /** * Processes an incoming request message. If the requested file is in the * local content collection or the data cache, a data message is sent. If * not, but the name is in the content directory, a try message is sent. * Otherwise the request is ignored. * * @param buf The message buffer. * @param peer The peer that this message is received from. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ static int receive_request(msg_buf_t * buf, const peer_t * peer) { msg_buf_t buffer; data_msg_t data_msg; size_t index; request_msg_t request_msg; assert(buf != NULL); assert(peer != NULL); memset(&request_msg, 0, sizeof(request_msg)); memset(&data_msg, 0, sizeof(data_msg)); memset(&buffer, 0, sizeof(buffer)); /* Parse the message buffer as a request message. */ RETURN_IF_FAILED(request_msg_read(buf, &request_msg)); /* A data message is created if the file is found in the local content. */ strcpy(data_msg.name, request_msg.name); if (local_content_find( &g_local_content, data_msg.name, &index) == EXIT_SUCCESS) { data_msg.size = g_local_content.items[index].size; memcpy( data_msg.data, g_local_content.items[index].buffer, data_msg.size); data_msg_write(&buffer, &data_msg); /* A data message is created if the file is found in the data cache. */ } else if (data_cache_find( &g_data_cache, data_msg.name, &index) == EXIT_SUCCESS) { data_msg.size = g_data_cache.items[index].size; memcpy( data_msg.data, g_data_cache.items[index].buffer, data_msg.size); data_msg_write(&buffer, &data_msg); /* If the file name is found in the content directory, a try message is * created with all peers who know about the file. */ } else { try_msg_t try_msg; memset(&try_msg, 0, sizeof(try_msg)); strcpy(try_msg.name, request_msg.name); while (content_dir_find( &g_content_dir, request_msg.name, &index) == EXIT_SUCCESS) { try_msg.peers[try_msg.count++] = g_content_dir.items[index++].peer; } if (try_msg.count > 0) try_msg_write(&buffer, &try_msg); } /* Send the message if a data message or a try message was created. */ if (buffer.size > 0) udp_send(peer, buffer.buffer, buffer.size); return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ /** * Process an incoming try message. If the requested file is in the local * content collection or the data cache, a data message is sent. If not, but * the name is in the content directory, a try message is sent. Otherwise * the request is ignored. * * @param buf The message buffer. * @param peer The peer that this message is received from. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ static int receive_try(msg_buf_t * buf, const peer_t * peer) { size_t i; size_t j; try_msg_t msg; int new_peer_flag; assert(buf != NULL); assert(peer != NULL); /* Parse the message buffer as a try message. */ memset(&msg, 0, sizeof(msg)); RETURN_IF_FAILED(try_msg_read(buf, &msg)); /* Ignore try messages for names not in the request cache. */ if (request_find(&g_requests, msg.name, &i) == EXIT_FAILURE) { FILE * f; if ((f = log_open()) != NULL) { fprintf(f, "Discarding try message for %s because it does not\n" "correspond to an outstanding request.\n", msg.name); log_close(f); } return EXIT_SUCCESS; } /* Loop over each peer listed in the try message. */ new_peer_flag = 0; for (i = 0; i < msg.count; i++) { msg_buf_t buffer; request_msg_t request_msg; memset(&request_msg, 0, sizeof(request_msg)); memset(&buffer, 0, sizeof(buffer)); /* Search in the content directory to see if it has the peer. */ for (j = 0; j < g_content_dir.count; j++) { if (peer_cmp(&msg.peers[i], &g_content_dir.items[j].peer) == 0) new_peer_flag = 1; } /* If the content directory doesn't already have this peer, send a * request to this peer. */ if (new_peer_flag == 0) { strcpy(request_msg.name, msg.name); request_msg_write(&buffer, &request_msg); udp_send(&msg.peers[i], buffer.buffer, buffer.size); } /* Add the peers received into the content directory. */ content_dir_add(&g_content_dir, msg.name, &msg.peers[i]); } return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ /** * Sends a listing message. */ static void send_listing(void) { msg_buf_t buf; size_t i; listing_msg_t msg; peers_t peers; memset(&msg, 0, sizeof(msg)); memset(&buf, 0, sizeof(buf)); memset(&peers, 0, sizeof(peers)); /* Create a combined list of peers from the content directory and the * ones specified on the command line. */ peers_combine(&peers, &g_content_dir, &g_peers); listing_msg_create(&msg); listing_msg_write(&buf, &msg); /* Send the listing message to each peer. */ for (i = 0; i < peers.count; i++) (void)udp_send(&peers.peers[i], buf.buffer, buf.size); } /* ------------------------------------------------------------------------ */ /** * Sends a request for a file name. This function prints "content found" if * the name already exists in the local content or the data cache. If name of * the file is found in the content directory, a request is sent to those * peers listed in the content directory. If not, a request is sent to all * known peers. * * @param name The name of file. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ static int send_request(const char * name) { int counter; msg_buf_t buf; size_t index; request_msg_t msg; /* Print a message if the item is found locally. */ if (local_content_find(&g_local_content, name, &index) == EXIT_SUCCESS || data_cache_find(&g_data_cache, name, &index) == EXIT_SUCCESS) { printf("content found\n\n"); return EXIT_SUCCESS; } /* Add the request to the outstanding request collection. */ request_add(&g_requests, name); strcpy(msg.name, name); request_msg_write(&buf, &msg); printf("Searching for %s...\n\n", name); /* Check if the item is found in the content directory. */ counter = 0; while (content_dir_find(&g_content_dir, name, &index) == EXIT_SUCCESS) { send_request_to_peer( &g_content_dir.items[index].peer, buf.buffer, buf.size); index++; counter++; } /* Otherwise, send it to all known peers. */ if (counter == 0) { peers_t combined_peers; memset(&combined_peers, 0, sizeof(combined_peers)); peers_combine(&combined_peers, &g_content_dir, &g_peers); for (counter = 0; counter < combined_peers.count; counter++) send_request_to_peer( &combined_peers.peers[counter], buf.buffer, buf.size); } return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ /** * Sends a request message to a single peer and prints an error message if * there is a problem. * * @param to The peer. * @param buffer The message buffer. * @param size The size of the message buffer in bytes. */ static void send_request_to_peer( const peer_t * to, const void * buffer, size_t size) { if (udp_send(to, buffer, size) != EXIT_SUCCESS) { printf("Failed to send request to "); fprint_peer(stdout, to); printf(".\n"); } } /* ------------------------------------------------------------------------ */ #ifdef TESTS static void test() { if (EXIT_SUCCESS != udp_start(port)) { fprintf(stderr, "Cannot bind to port %hu.", port); RETURN_FAILURE(); } for (i = 0; i < p_index; i++) { int j; for (j = 0; j < f_index; j++) { if (EXIT_SUCCESS != udp_send(&peers[i], files[j], strlen(files[j]) + 1)) { RETURN_FAILURE(); } } } printf("Parsed %d peers and %d files.\n", p_index, f_index); for (;;) { char buffer[1200]; char ip[16]; size_t actual; peer_t peer; printf("Listening...\n"); if (EXIT_SUCCESS != udp_recv(buffer, sizeof(buffer), &actual, &peer)) { RETURN_FAILURE(); } inet_ntop(AF_INET, &peer.addr, ip, sizeof(struct sockaddr_in)); printf("%d bytes arrived from %s on port %hu.\n", actual, ip, peer.port); dump_buffer(buffer, actual); printf("\n"); } udp_stop(); } #endif /* TESTS */