diff --git a/src/csocket.c b/src/csocket.c index e88ee5d..fca14cc 100644 --- a/src/csocket.c +++ b/src/csocket.c @@ -16,17 +16,15 @@ #include typedef struct f2b_conn_t { - int sock; - const char *path; f2b_buf_t recv; f2b_buf_t send; + int sock; } f2b_conn_t; struct f2b_csock_t { f2b_conn_t *clients[MAXCONNS]; const char *path; int sock; - bool shutdown; }; /* helpers */ @@ -60,6 +58,67 @@ f2b_conn_destroy(f2b_conn_t *conn) { free(conn); } +int +f2b_conn_process(f2b_conn_t *conn, bool in, void (*cb)(const f2b_cmd_t *cmd, f2b_buf_t *res)) { + f2b_cmd_t *cmd = NULL; + int retval; + + /* handle incoming data */ + if (in) { + f2b_log_msg(log_debug, "some incoming data on socket %d", conn->sock); + char tmp[RBUF_SIZE] = ""; + char *line = NULL; + ssize_t read = 0; + read = recv(conn->sock, tmp, RBUF_SIZE, MSG_DONTWAIT); + if (read == 0) { + f2b_log_msg(log_debug, "received connection close on socket %d", conn->sock); + return -1; + } + if (read < 0) { + f2b_log_msg(log_error, "received error on sock %d: %s", conn->sock, strerror(errno)); + return -1; + } + if (read > 0) { + tmp[read] = '\0'; + f2b_buf_append(&conn->recv, tmp, read); + f2b_log_msg(log_debug, "received %zd bytes from socket %d", read, conn->sock); + /* TODO: properly handle empty lines */ + while (conn->recv.data[0] == '\n') { + f2b_buf_splice(&conn->recv, 1); + break; + } + /* extract message(s) */ + while ((line = f2b_buf_extract(&conn->recv, "\n")) != NULL) { + f2b_log_msg(log_debug, "extracted line: %s", line); + if ((cmd = f2b_cmd_create(line)) != NULL) { + cb(cmd, &conn->send); /* handle command */ + f2b_cmd_destroy(cmd); + } else { + f2b_buf_append(&conn->send, "can't parse input\n", 0); + } + free(line); + } + if (conn->recv.used >= conn->recv.size) { + f2b_log_msg(log_error, "drop connection on socket %d, recv buffer overflow", conn->sock); + return -1; + } + } + } + /* handle outgoing data */ + if (conn->send.used > 0) { + f2b_log_msg(log_debug, "sending %zu bytes to socket %d", conn->send.used, conn->sock); + retval = send(conn->sock, conn->send.data, conn->send.used, MSG_DONTWAIT); + if (retval > 0) { + f2b_buf_splice(&conn->send, retval); + f2b_log_msg(log_debug, "sent %d bytes to socket %d (%zu remains)", retval, conn->sock, conn->send.used); + } else if (retval < 0 && errno != EAGAIN) { + f2b_log_msg(log_error, "can't send() to socket %d: %s", conn->sock, strerror(errno)); + return -1; /* remote side closed connection */ + } + } + return 0; +} + /* control socket-related functions */ f2b_csock_t * @@ -127,7 +186,6 @@ f2b_csocket_poll(f2b_csock_t *csock, void (*cb)(const f2b_cmd_t *cmd, f2b_buf_t struct timeval tv = { .tv_sec = 0, .tv_usec = 0 }; fd_set rfds, wfds; f2b_conn_t *conn = NULL; - f2b_cmd_t *cmd = NULL; int retval, nfds; assert(csock != NULL); @@ -140,11 +198,11 @@ f2b_csocket_poll(f2b_csock_t *csock, void (*cb)(const f2b_cmd_t *cmd, f2b_buf_t FD_SET(csock->sock, &rfds); /* watch for new connections */ /* watch for new data on established connections */ + nfds = csock->sock; for (int cnum = 0; cnum < MAXCONNS; cnum++) { if ((conn = csock->clients[cnum]) == NULL) continue; - if (!csock->shutdown) - FD_SET(conn->sock, &rfds); + FD_SET(conn->sock, &rfds); if (conn->send.used) FD_SET(conn->sock, &wfds); nfds = max(csock->sock, conn->sock); @@ -175,17 +233,17 @@ f2b_csocket_poll(f2b_csock_t *csock, void (*cb)(const f2b_cmd_t *cmd, f2b_buf_t int sock = -1; /* accept() new connection */ if ((sock = accept(csock->sock, NULL, NULL)) < 0) { - perror("accept()"); + f2b_log_msg(log_error, "can't accept() new connection: %s", strerror(errno)); } else if (cnum < MAXCONNS) { if ((conn = f2b_conn_create(RBUF_SIZE, WBUF_SIZE)) != NULL) { - f2b_log_msg(log_debug, "new connection accept()ed, socket %d, conn %d\n", sock, cnum); + f2b_log_msg(log_debug, "new connection accept()ed, socket %d", sock); conn->sock = sock; csock->clients[cnum] = conn; } else { - f2b_log_msg(log_error, "can;t create new connection"); + f2b_log_msg(log_error, "can't create new connection"); } } else { - f2b_log_msg(log_error, "max number of clients reached, drop connection on socket %d\n", sock); + f2b_log_msg(log_error, "max number of clients reached, drop connection on socket %d", sock); shutdown(sock, SHUT_RDWR); } } @@ -193,50 +251,11 @@ f2b_csocket_poll(f2b_csock_t *csock, void (*cb)(const f2b_cmd_t *cmd, f2b_buf_t for (int cnum = 0; cnum < MAXCONNS; cnum++) { if ((conn = csock->clients[cnum]) == NULL) continue; - /* handle incoming data */ - if (FD_ISSET(conn->sock, &rfds)) { - f2b_log_msg(log_debug, "some incoming data on socket %d", conn->sock); - char tmp[RBUF_SIZE] = ""; - char *line = NULL; - ssize_t read = 0; - read = recv(conn->sock, tmp, RBUF_SIZE, MSG_DONTWAIT); - if (read > 0) { - tmp[read] = '\0'; - f2b_log_msg(log_debug, "conn %d received %zd bytes, and recv buf is now %zd bytes\n", conn->sock, read, conn->recv.used); - f2b_buf_append(&conn->recv, tmp, read); - /* TODO: properly handle empty lines */ - while (*conn->recv.data == '\n') { - /* TODO f2b_buf_splice(conn->send, retval); */ - break; - } - /* extract message(s) */ - while ((line = f2b_buf_extract(&conn->recv, "\n")) != NULL) { - if ((cmd = f2b_cmd_create(line)) != NULL) { - cb(cmd, &conn->send); /* handle command */ - f2b_cmd_destroy(cmd); - } else { - f2b_buf_append(&conn->send, "can't parse input\n", 0); - } - free(line); - } - } else if (read == 0) { - f2b_log_msg(log_debug, "received connection close on socket %d", conn->sock); - shutdown(conn->sock, SHUT_RDWR); - f2b_conn_destroy(conn); - csock->clients[cnum] = NULL; - } else { - perror("recv()"); - } - } - /* handle outgoing data */ - if (conn->send.used > 0) { - f2b_log_msg(log_debug, "sending %zu bytes to socket %d\n", conn->send.used, conn->sock); - retval = send(conn->sock, conn->send.data, conn->send.used, MSG_DONTWAIT); - if (retval > 0) { - /* TODO f2b_buf_splice(conn->send, retval); */ - } else if (retval < 0) { - f2b_log_msg(log_error, "can't send %zu bytes to socket %d", conn->send.used, conn->sock); - } + retval = f2b_conn_process(conn, FD_ISSET(conn->sock, &rfds), cb); + if (retval < 0) { + shutdown(conn->sock, SHUT_RDWR); + f2b_conn_destroy(conn); + csock->clients[cnum] = NULL; } } /* foreach connection(s) */ return;