/** @file relrecv.c * @data May 4, 2009 */ #ifndef CONFIG_RELRECV #error This is used for relrecv only. #endif /* CONFIG_RELRECV */ #include "common.h" #include "config.h" #include "crc.h" #include "protocol.h" #include "udp.h" /* ------------------------------------------------------------------------ */ /** A general purpose buffer for sending and receiving. */ static msg_buffer_t g_buffer; /** The next expected DATA packet to receive. */ static u_long g_expected; /** The file pointer. */ static FILE * g_file; /** The INIT message. */ static msg_init_t g_init; /** An array that records the number of time a DATA message arrives. */ static size_t * g_packets; /** The client information. */ static peer_t g_peer; /** The number of DATA packets retried. */ static size_t g_retries; /** The START message. */ static msg_start_t g_start; /* ------------------------------------------------------------------------ */ static int parse_args(int argc, const char * argv[], u_short * port); static int poll_buffer(int * timeout_flag); static int process_file(void); static int process_socket(void); static int receive_all_data(void); static int receive_all_data_helper(void); static int receive_data(u_long packets, int * received_flag); static int send_abort(const peer_t * peer, u_long id, const char * reason); static int send_buffer(const peer_t * peer, const char * msg_type); static int send_retries(void); /* ------------------------------------------------------------------------ */ /** The main entry point of the program. * * @param argc The number of arguments. * @param argv The arguments. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ int main(int argc, const char * argv[]) { u_short port; /* Parse the arguments; obtain the port number. */ RETURN_IF_FAILURE(parse_args(argc, argv, &port)); /* Create the UDP socket. */ if (EXIT_SUCCESS != udp_start_server(port)) { fprintf(stderr, "Cannot bind to port %hu.\n", port); RETURN_FAILURE(); } /* Receive files now that the socket is open. */ if (EXIT_SUCCESS != process_socket()) { udp_stop(); RETURN_FAILURE(); } /* Destroy the UDP socket. */ if (EXIT_SUCCESS != udp_stop()) { fprintf(stderr, "Error closing socket.\n"); RETURN_FAILURE(); } return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ /** Parses the command line arguments. * * @param argc The number of arguments. * @param argv The arguments. * @param port The returned port number. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ static int parse_args(int argc, const char * argv[], u_short * port) { assert(NULL != argv); assert(NULL != port); /* Check the number of arguments. */ if (argc != 2) { fprintf(stderr, "usage: relrecv port\n"); RETURN_FAILURE(); } /* Parse the port number. */ if (1 != sscanf(argv[1], "%hu", port)) { fprintf(stderr, "Invalid port number '%s'.\n", argv[1]); RETURN_FAILURE(); } return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ /** Waits for a message from the host or a timeout. * * @param timeout_flag A flag set to one if there is a timeout. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ static int poll_buffer(int * timeout_flag) { double timeout; assert(NULL != timeout_flag); /* Calculate the absolute time for the timeout in units of clock_t. */ timeout = (double)clock() + ((CLOCKS_PER_SEC / 1000.0) * RELRECV_TIMEOUT); /* Loop until failure, timeout, or received buffer. */ for (;;) { peer_t peer; size_t ms; /* Calculate the remaining time in milliseconds. */ ms = (size_t)((timeout - (double)clock()) * 1000.0 / CLOCKS_PER_SEC); if (EXIT_SUCCESS != udp_wait(ms, timeout_flag)) { fprintf(stderr, "Socket failure.\n"); RETURN_FAILURE(); } /* Return successfully if there is a timeout. */ if (*timeout_flag == 1) break; /* Put the received data into g_buffer. */ if (EXIT_SUCCESS != udp_recv( g_buffer.data, sizeof(g_buffer.data), &g_buffer.size, &peer)) { fprintf(stderr, "Socket failure.\n"); RETURN_FAILURE(); } /* Return successfully only if it's the right peer. */ if (peer.address == g_peer.address && peer.port == g_peer.port) break; /* Send an ABORT to the unexpected peer. */ RETURN_IF_FAILURE(send_abort(&peer, 0, "Receiver is busy.")); } return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ /** Performs all functionality while the file is open. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ static int process_file(void) { msg_done_t done; u_long checksum; /* Print some information about this session. */ printf( "Transferring file 'r%s' (%lu bytes).\n", g_init.name, g_init.filesize); /* Mark the time when the transfer starts. */ stopwatch_start(); g_retries = 0; /* Build and send a START message with a new id. */ g_start.type = MSG_TYPE_START; g_start.id++; msg_start_write(&g_buffer, &g_start); RETURN_IF_FAILURE(send_buffer(&g_peer, "START")); /* Receive all DATA messages. */ RETURN_IF_FAILURE(receive_all_data()); /* Compute the checksum. */ if (EXIT_SUCCESS != crc_file(g_file, &checksum)) { RETURN_IF_FAILURE(send_abort( &g_peer, g_start.id, "Error computing checksum.")); RETURN_FAILURE(); } /* Verify the checksum. */ if (checksum != g_init.checksum) { RETURN_IF_FAILURE(send_abort( &g_peer, g_start.id, "Checksum failure.")); RETURN_FAILURE(); } /* Build and send a DONE message. */ done.type = MSG_TYPE_DONE; done.id = g_start.id; msg_done_write(&g_buffer, &done); RETURN_IF_FAILURE(send_buffer(&g_peer, "DONE")); /* Display statistics when the transfer completes. */ printf("Transfer complete: 'r%s'.\n\n", g_init.name); stopwatch_stop( g_init.filesize, g_retries, protocol_packet_count(&g_init)); return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ /** Performs all functionality while the socket is open. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ static int process_socket(void) { /* The main loop waits for new connections. */ for (;;) { char path[NAME_SIZE + 1]; /* Wait for a message to arrive. */ if (EXIT_SUCCESS != udp_recv( g_buffer.data, sizeof(g_buffer.data), &g_buffer.size, &g_peer)) { fprintf(stderr, "Socket failure.\n"); RETURN_FAILURE(); } /* This should be a INIT message. */ if (EXIT_SUCCESS != msg_init_read(&g_init, &g_buffer)) { RETURN_IF_FAILURE(send_abort( &g_peer, 0, "Invalid INIT message.")); continue; } /* Check if the file already exists. */ sprintf(path, "r%s", g_init.name); if (NULL != (g_file = fopen(path, "rb"))) { fclose(g_file); g_file = NULL; RETURN_IF_FAILURE(send_abort(&g_peer, 0, "File already exists.")); continue; } /* Create the file for writing and reading. */ if (NULL == (g_file = fopen(path, "w+b"))) { fclose(g_file); g_file = NULL; RETURN_IF_FAILURE(send_abort(&g_peer, 0, "Cannot create file.")); continue; } /* Receive the file now that the file is open. */ if (EXIT_SUCCESS != process_file()) { fclose(g_file); g_file = NULL; /* Delete the file if the transfer failed. */ remove(path); continue; } /* Close the file and wait for another connection. */ if (0 != fclose(g_file)) fprintf(stderr, "Warning: Cannot close file.\n"); g_file = NULL; } } /* ------------------------------------------------------------------------ */ /** Receives all DATA messages. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ static int receive_all_data(void) { /* Allocate and initialize the array of packet counts. */ g_packets = (size_t *)calloc( protocol_packet_count(&g_init), sizeof(size_t)); /* Check for memory failures. */ if (NULL == g_packets) { RETURN_IF_FAILURE(send_abort(&g_peer, g_start.id, "Out of memory.")); RETURN_FAILURE(); } /* Receive all the DATA messages. */ if (EXIT_SUCCESS != receive_all_data_helper()) { free(g_packets); g_packets = NULL; RETURN_FAILURE(); } /* Free the array of packet counts before returning. */ free(g_packets); g_packets = NULL; return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ /** Receives all DATA messages. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ static int receive_all_data_helper(void) { u_long packets; u_long remaining; size_t timeouts; /* Determine the total number of packets that will be sent. */ packets = protocol_packet_count(&g_init); remaining = packets; /* The first expected packet. */ g_expected = 0; /* Assuming no more than 10% of the DATA messages are lost, and assuming * no more than 10% of all retried DATA messages are lost, then we can * assume there are no more than the following number of timeouts. Of * course, some of the RETRY messages are lost as well, but to keep it * simply, we round up a little. */ timeouts = (size_t)log10((double)remaining) + 1; /* Loop while there are more packets. */ while (remaining > 0) { int timeout_flag; int received_flag; /* Get a message or timeout. */ RETURN_IF_FAILURE(poll_buffer(&timeout_flag)); if (timeout_flag == 1) { /* Do not allow too many timeouts. */ if (timeouts-- == 0) { RETURN_IF_FAILURE(send_abort( &g_peer, g_start.id, "Too many timeouts.")); RETURN_FAILURE(); } /* Send all the RETRY messages. */ g_expected = packets; RETURN_IF_FAILURE(send_retries()); continue; } /* Check what type of message this is. */ switch (g_buffer.data[0]) { case MSG_TYPE_DATA: break; /* Abort if this is an INIT message. */ case MSG_TYPE_INIT: RETURN_IF_FAILURE(send_abort( &g_peer, g_start.id, "Unexpected INIT message.")); RETURN_FAILURE(); /* Abort if this is an unknown message. */ default: RETURN_IF_FAILURE(send_abort( &g_peer, g_start.id, "Received invalid message.")); RETURN_FAILURE(); } /* Use a helper function to process the DATA message. */ RETURN_IF_FAILURE(receive_data(packets, &received_flag)); /* Keep track of the remaining number of packets. */ if (received_flag == 1) remaining--; } return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ /** Processes a DATA message in g_buffer. * * @param packets The total number of packets. * @param received_flag Set to 1 for new valid DATA packets. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ static int receive_data(u_long packets, int * received_flag) { msg_data_t data; size_t data_size; assert(NULL != received_flag); /* Assume this is not a new DATA packet. */ *received_flag = 0; /* Parse the message; abort if bad. */ if (EXIT_SUCCESS != msg_data_read(&data, &g_buffer)) { RETURN_IF_FAILURE(send_abort( &g_peer, g_start.id, "Received invalid DATA message.")); RETURN_FAILURE(); } /* Check the id; ignore if bad. */ if (data.id != g_start.id) { fprintf(stderr, "Warning: Ignoring packet with bad id.\n"); return EXIT_SUCCESS; } /* Check the DATA message index; abort if bad. */ if (data.packet >= packets) { RETURN_IF_FAILURE(send_abort( &g_peer, g_start.id, "Received invalid DATA message index.")); RETURN_FAILURE(); } /* Check the data size; abort if bad. */ data_size = protocol_data_size(&g_init, data.packet); if ((size_t)g_buffer.size != data_size + DATA_HEADER_SIZE) { RETURN_IF_FAILURE(send_abort( &g_peer, g_start.id, "Received invalid DATA message size.")); RETURN_FAILURE(); } /* Ignore packets received more than once. */ if (++g_packets[data.packet] != 1) return EXIT_SUCCESS; /* Send RETRY message if DATA was skipped. */ if (data.packet > g_expected) { msg_retry_t retry; retry.type = MSG_TYPE_RETRY; retry.id = g_start.id; retry.first = g_expected; retry.last = data.packet - 1; fprintf( stderr, "DATA skipped; sending RETRY message.\n" "Sending RETRY message for packets %lu to %lu.\n", retry.first, retry.last); g_retries += (retry.last - retry.first) + 1; msg_retry_write(&g_buffer, &retry); RETURN_IF_FAILURE(send_buffer(&g_peer, "RETRY")); } /* Save the new expected packet to receive. */ if (data.packet >= g_expected) g_expected = data.packet + 1; /* Seek within the file; abort if failed. */ if (0 != fseek( g_file, protocol_packet_offset(&g_init, data.packet), SEEK_SET)) { RETURN_IF_FAILURE(send_abort( &g_peer, g_start.id, "Error writing to file.")); RETURN_FAILURE(); } /* Write to the file; abort if failed. */ if (data_size != fwrite(data.data, 1, data_size, g_file)) { RETURN_IF_FAILURE(send_abort( &g_peer, g_start.id, "Error writing to file.")); RETURN_FAILURE(); } /* Set the flag to indicate a new valid DATA message. */ *received_flag = 1; return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ /** Sends an ABORT message. * * @param peer The peer. * @param id The sequence id or 0. * @param reason The reason for the aborting. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ static int send_abort(const peer_t * peer, u_long id, const char * reason) { msg_abort_t abort; assert(NULL != peer); assert(NULL != reason); assert(strlen(reason) < sizeof(abort.reason)); /* First display a warning to the console. */ fprintf(stderr, "Warning: Sending ABORT message: %s\n", reason); /* Build and send the ABORT message. */ abort.type = MSG_TYPE_ABORT; abort.id = id; strcpy(abort.reason, reason); msg_abort_write(&g_buffer, &abort); RETURN_IF_FAILURE(send_buffer(peer, "ABORT")); return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ /** Sends g_buffer to the specified peer. * * @param peer The peer. * @param msg_type The type of message. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ static int send_buffer(const peer_t * peer, const char * msg_type) { assert(NULL != peer); /* Send the buffer and check for errors. */ if (EXIT_SUCCESS != udp_send(peer, g_buffer.data, g_buffer.size)) { fprintf(stderr, "Error sending %s message.\n", msg_type); RETURN_FAILURE(); } /* Reset the global buffer for clarity in the debugger. */ memset(&g_buffer, 0, sizeof(g_buffer)); return EXIT_SUCCESS; } /* ------------------------------------------------------------------------ */ /** Send a series of RETRY messages for all missing packets. * * @return EXIT_SUCCESS or EXIT_FAILURE. */ static int send_retries(void) { msg_retry_t retry; u_long i; u_long count; /* These values don't change during the loop. */ count = protocol_packet_count(&g_init); retry.type = MSG_TYPE_RETRY; retry.id = g_start.id; fprintf(stderr, "Timeout occurred; sending RETRY messages.\n"); /* Looping over every packet, received or not. */ for (i = 0; i < count; i++) { /* Skip over the received packets. */ if (g_packets[i] != 0) continue; retry.first = i; /* Find the last missing packet inclusive. */ while (++i < count) if (g_packets[i] != 0) break; retry.last = --i; /* Send the RETRY message. */ msg_retry_write(&g_buffer, &retry); RETURN_IF_FAILURE(send_buffer(&g_peer, "RETRY")); /* Keep some statistics and display a warning. */ g_retries += (retry.last - retry.first) + 1; fprintf( stderr, "Sending RETRY message for packets %lu to %lu.\n", retry.first, retry.last); } return EXIT_SUCCESS; }