Back to the Résumé
/* p2p_main.c: Jay Miller, 11-2001
* Contains all primary functions for standard use of the p2p library.
*/
#include "p2p.h"
/*** globals - mostly needed to handle the signal-driven i/o *************/
static dgram dg[MAX_QUEUE]; /* datagram queue */
static long dg_counter[MAX_QUEUE + 1]; /* diagnostics for queue usage */
static BOOL p2p_quit = FALSE;
static pthread_t tid; /* save the thread id so we can kill it later */
static int fd_server; /* keep descriptors around */
static int fd_client;
static int queue_get; /* these must be available to i/o signal handler */
static int queue_put;
static int queue_count;
static void p2p_sig_io(int);
static void p2p_sig_hup(int);
static void *p2p_main_loop(void *);
/*** public functions ****************************************************/
/* Library init. Calls two private functions to startup the client and
* server halves. Also sets the key to be used for encryption.
* key : a 32-bit encryption key passed in by the caller. This key
* must be the same for all p2p instances with which the current
* instance wishes to communicate. If key == 0, no encryption will
* occur.
*
* Returns 0 on success or -1 on error.
*/
int p2p_init(crypt_t key)
{
if (p2p_server_main() == -1) {
p2p_error(WARN, __LINE__, "Error in server_main().\n");
return -1;
}
if (p2p_client_main() == -1) {
p2p_error(WARN, __LINE__, "Error in client_main().\n");
return -1;
}
/* set encryption key */
p2p_setkey(key);
return 0;
}
/* Packet sending function. This is the one a user will call to send
* data to another p2p library.
* dest : destination of the remote p2p library in either
* dotted-decimal or a full hostname format.
* buf : arbitrary data to send.
* len : length of buf in bytes.
*
* Returns 0 on success or -1 on error.
*/
int p2p_send(char *dest, char *buf, size_t length)
{
char *buf_padded;
int ret = 0;
struct hostent *hptr;
struct sockaddr_in servaddr;
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(RECV_PORT);
if ((hptr = gethostbyname(dest)) == NULL) {
p2p_error(SYS, __LINE__, "Error in gethostbyname()");
return -1;
}
memcpy(&servaddr.sin_addr, hptr->h_addr, sizeof(struct in_addr));
/* If encryption is enabled, encrypt that buffer for sending. */
if (p2p_iskeyset()) {
if ((buf_padded = p2p_encrypt(buf, &length)) == NULL)
return -1;
}
if ((ret = sendto(fd_client, buf_padded, length, 0, &servaddr,
sizeof(servaddr))) == -1)
p2p_error(SYS, __LINE__, "Error in sendto()");
/* If we encrypted the datagram, free the padded buffer (if any). */
if (p2p_iskeyset())
p2p_padding_free(buf_padded);
return ret;
}
/* Library cleanup.
*/
void p2p_stop(void)
{
int i;
/* Code to release the thread. If p2p_quit is true, any signal will
* cause the main server loop to break. The SIGIO signal handler is
* also prepared to handle a signal with no data on the socket. */
p2p_quit = TRUE;
if (pthread_kill(tid, SIGIO) != 0)
p2p_error(WARN, __LINE__, "kill:");
for (i = 0; i < MAX_QUEUE; i++) { /* free queue of buffers */
free(dg[i].dg);
free(dg[i].sa);
}
close(fd_client);
close(fd_server);
}
/*** initialization functions ********************************************/
/* Initialize the server half. We open the server-side socket, bind it
* to an address, and spawn a thread so the calling program can resume.
*
* Returns 0 on success or a non-zero error code on error.
*/
int p2p_server_main(void)
{
int i, ret = 0;
struct sockaddr_in saddr;
/* start up a standard server-side UDP socket */
if ((fd_server = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
p2p_error(SYS, __LINE__, "Error in socket()");
return fd_server;
}
bzero(&saddr, sizeof(saddr));
saddr.sin_family = AF_INET;
saddr.sin_addr.s_addr = htons(INADDR_ANY);
saddr.sin_port = htons(RECV_PORT);
if ((ret = bind(fd_server, &saddr, sizeof(saddr))) == -1) {
p2p_error(SYS, __LINE__, "Error in bind()");
return ret;
}
/* initialize the buffer queue and queue counters */
for (i = 0; i < MAX_QUEUE; i++) {
dg[i].dg = malloc(MAX_DGRAM);
dg[i].sa = malloc((size_t)sizeof(saddr));
dg[i].sa_len = (size_t)sizeof(saddr);
}
queue_get = queue_put = queue_count = 0;
if (((ret = pthread_create(
&tid, NULL, &p2p_main_loop, 0))) != 0)
p2p_error(SYS, __LINE__, "Error in pthread_create().\n");
return ret;
}
/* Initialize the client half. Open the client-side socket and set it
* to allow broadcasting.
*
* Returns 0 on success or -1 on error.
*/
int p2p_client_main(void)
{
int ret = 0;
const int on = 1;
/* start up a UDP socket for the client */
if ((fd_client = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
p2p_error(SYS, __LINE__, "Error in socket()");
return fd_client;
}
/* turn on the socket's ability to broadcast on this network. */
if ((ret = setsockopt(fd_client, SOL_SOCKET, SO_BROADCAST, &on,
sizeof(on))) == -1) {
p2p_error(SYS, __LINE__, "Error in setsockopt()");
return ret;
}
return ret;
}
/*** main server loop ****************************************************/
static void *p2p_main_loop(void *null)
{
sigset_t noblock_mask, block_mask, unblock_mask;
pthread_detach(pthread_self());
signal(SIGHUP, p2p_sig_hup); /* register signal handlers */
signal(SIGIO, p2p_sig_io);
/* let the kernel know who owns this socket */
fcntl(fd_server, F_SETOWN, getpid());
/* Setup socket for signal-driven and non-blocking i/o. It must be
* non-blocking because POSIX signals do not queue. */
fcntl(fd_server, F_SETFL, O_ASYNC | O_NONBLOCK);
sigemptyset(&noblock_mask); /* init our three signal sets */
sigemptyset(&block_mask); /* so that we may block SIGIO */
sigemptyset(&unblock_mask);
/* We must be able to block SIGIO because queue_count is modified
* both in this server loop and in the signal handler itself. There
* is a possible race condition if we do not restrict the handler. */
sigaddset(&block_mask, SIGIO); /* the signal we want to block */
sigprocmask(SIG_BLOCK, &block_mask, &unblock_mask); /* block SIGIO */
for (;;) { /* the main server loop */
while (queue_count == 0 && p2p_quit == FALSE)
sigsuspend(&noblock_mask); /* return after any signal */
sigprocmask(SIG_SETMASK, &unblock_mask, NULL); /* unblock SIGIO */
if (p2p_quit)
break;
/* decrypt the new datagram, retrieve the length of the plaintext */
if (p2p_iskeyset()) {
dg[queue_get].dg_len =
p2p_decrypt(dg[queue_get].dg, dg[queue_get].dg_len);
}
p2p_recv_handler(dg[queue_get].dg, dg[queue_get].dg_len,
(struct sockaddr_in *)dg[queue_get].sa, dg[queue_get].sa_len);
if (++queue_get >= MAX_QUEUE)
queue_get = 0; /* it's a circular queue */
sigprocmask(SIG_BLOCK, &block_mask, &unblock_mask); /* block SIGIO */
queue_count--;
}
return NULL;
}
/*** signal related functions ********************************************/
/* Signal handler for SIGIO.
*
* For each datagram delivered to our socket, the kernel posts a SIGIO
* to our application's PID. This function immediately queues the
* datagrams so they may later be handled by the application.
*/
static void p2p_sig_io(int signo)
{
ssize_t len;
int num_read = 0;
/* Because POSIX signals don't queue past 1, we must read the socket
* until all data has been removed from it on each SIGIO. We break
* when our non-blocking socket receives an EWOULDBLOCK. */
while (1) {
if (queue_count >= MAX_QUEUE)
p2p_error(WARN, __LINE__, "Queue size exceeds maximum!\n");
/* read the datagram from the socket into the queue */
len = recvfrom(fd_server, dg[queue_put].dg, MAX_DGRAM, 0,
dg[queue_put].sa, &dg[queue_put].sa_len);
if (len < 0) {
if (errno == EWOULDBLOCK) {
/* this error implies there is no further data on the socket */
break;
} else {
/* this error implies something is wrong */
p2p_error(SYS, __LINE__, "Error in recvfrom()");
}
}
dg[queue_put].dg_len = len;
num_read++; /* diagnostics */
queue_count++;
if (++queue_put >= MAX_QUEUE)
queue_put = 0; /* it's a circular queue */
}
dg_counter[num_read]++; /* diagnostics */
return;
}
/* Diagnostics function. This function returns a histogram of the
* number of times the SIGIO while loop is executed on each SIGIO
* signal. This is mostly here to tell us if we need a bigger queue.
*
* It also resets the counter after each SIGHUP.
*/
static void p2p_sig_hup(int signo)
{
int i;
for (i = 0; i <= MAX_QUEUE; i++) {
printf("dg_counter[%d] = %ld\n", i, dg_counter[i]);
dg_counter[i] = 0;
}
return;
}
/*** utility functions ***************************************************/
/* Error handling for entire library.
* level : Some combination of WARM, SYS (for syscalls) or FATAL.
* line : The line number of the error.
* fmt : printf-style format string describing the error.
*/
void p2p_error(int level, int line, char *fmt, ...)
{
int errno_saved;
char buf[MAX_BUFFER];
va_list va;
va_start(va, fmt);
#ifdef HAVE_VSNPRINTF
vsnprintf(buf, sizeof(buf), fmt, va);
#else
vsprintf(buf, fmt, va); /* not safe */
#endif
va_end(va);
if (level & SYS) { /* error on a system call */
errno_saved = errno;
snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), ": %s",
strerror(errno_saved));
}
fprintf(stderr, "%s error at line %d.\n%s.",
(level & FATAL) ? "Fatal" : "Non-fatal", line, buf);
if (level & FATAL)
exit(1);
return;
}
/*************************************************************************/
Back to the Résumé