Browse Source

+ extract f2b_conn_process() as separate function

master
Alex 'AdUser' Z 4 years ago
parent
commit
39fe0fcce2
  1. 119
      src/csocket.c

119
src/csocket.c

@ -16,17 +16,15 @@
#include <sys/select.h> #include <sys/select.h>
typedef struct f2b_conn_t { typedef struct f2b_conn_t {
int sock;
const char *path;
f2b_buf_t recv; f2b_buf_t recv;
f2b_buf_t send; f2b_buf_t send;
int sock;
} f2b_conn_t; } f2b_conn_t;
struct f2b_csock_t { struct f2b_csock_t {
f2b_conn_t *clients[MAXCONNS]; f2b_conn_t *clients[MAXCONNS];
const char *path; const char *path;
int sock; int sock;
bool shutdown;
}; };
/* helpers */ /* helpers */
@ -60,6 +58,67 @@ f2b_conn_destroy(f2b_conn_t *conn) {
free(conn); free(conn);
} }
int
f2b_conn_process(f2b_conn_t *conn, bool in, void (*cb)(const f2b_cmd_t *cmd, f2b_buf_t *res)) {
f2b_cmd_t *cmd = NULL;
int retval;
/* handle incoming data */
if (in) {
f2b_log_msg(log_debug, "some incoming data on socket %d", conn->sock);
char tmp[RBUF_SIZE] = "";
char *line = NULL;
ssize_t read = 0;
read = recv(conn->sock, tmp, RBUF_SIZE, MSG_DONTWAIT);
if (read == 0) {
f2b_log_msg(log_debug, "received connection close on socket %d", conn->sock);
return -1;
}
if (read < 0) {
f2b_log_msg(log_error, "received error on sock %d: %s", conn->sock, strerror(errno));
return -1;
}
if (read > 0) {
tmp[read] = '\0';
f2b_buf_append(&conn->recv, tmp, read);
f2b_log_msg(log_debug, "received %zd bytes from socket %d", read, conn->sock);
/* TODO: properly handle empty lines */
while (conn->recv.data[0] == '\n') {
f2b_buf_splice(&conn->recv, 1);
break;
}
/* extract message(s) */
while ((line = f2b_buf_extract(&conn->recv, "\n")) != NULL) {
f2b_log_msg(log_debug, "extracted line: %s", line);
if ((cmd = f2b_cmd_create(line)) != NULL) {
cb(cmd, &conn->send); /* handle command */
f2b_cmd_destroy(cmd);
} else {
f2b_buf_append(&conn->send, "can't parse input\n", 0);
}
free(line);
}
if (conn->recv.used >= conn->recv.size) {
f2b_log_msg(log_error, "drop connection on socket %d, recv buffer overflow", conn->sock);
return -1;
}
}
}
/* handle outgoing data */
if (conn->send.used > 0) {
f2b_log_msg(log_debug, "sending %zu bytes to socket %d", conn->send.used, conn->sock);
retval = send(conn->sock, conn->send.data, conn->send.used, MSG_DONTWAIT);
if (retval > 0) {
f2b_buf_splice(&conn->send, retval);
f2b_log_msg(log_debug, "sent %d bytes to socket %d (%zu remains)", retval, conn->sock, conn->send.used);
} else if (retval < 0 && errno != EAGAIN) {
f2b_log_msg(log_error, "can't send() to socket %d: %s", conn->sock, strerror(errno));
return -1; /* remote side closed connection */
}
}
return 0;
}
/* control socket-related functions */ /* control socket-related functions */
f2b_csock_t * f2b_csock_t *
@ -127,7 +186,6 @@ f2b_csocket_poll(f2b_csock_t *csock, void (*cb)(const f2b_cmd_t *cmd, f2b_buf_t
struct timeval tv = { .tv_sec = 0, .tv_usec = 0 }; struct timeval tv = { .tv_sec = 0, .tv_usec = 0 };
fd_set rfds, wfds; fd_set rfds, wfds;
f2b_conn_t *conn = NULL; f2b_conn_t *conn = NULL;
f2b_cmd_t *cmd = NULL;
int retval, nfds; int retval, nfds;
assert(csock != NULL); assert(csock != NULL);
@ -140,10 +198,10 @@ f2b_csocket_poll(f2b_csock_t *csock, void (*cb)(const f2b_cmd_t *cmd, f2b_buf_t
FD_SET(csock->sock, &rfds); /* watch for new connections */ FD_SET(csock->sock, &rfds); /* watch for new connections */
/* watch for new data on established connections */ /* watch for new data on established connections */
nfds = csock->sock;
for (int cnum = 0; cnum < MAXCONNS; cnum++) { for (int cnum = 0; cnum < MAXCONNS; cnum++) {
if ((conn = csock->clients[cnum]) == NULL) if ((conn = csock->clients[cnum]) == NULL)
continue; continue;
if (!csock->shutdown)
FD_SET(conn->sock, &rfds); FD_SET(conn->sock, &rfds);
if (conn->send.used) if (conn->send.used)
FD_SET(conn->sock, &wfds); FD_SET(conn->sock, &wfds);
@ -175,17 +233,17 @@ f2b_csocket_poll(f2b_csock_t *csock, void (*cb)(const f2b_cmd_t *cmd, f2b_buf_t
int sock = -1; int sock = -1;
/* accept() new connection */ /* accept() new connection */
if ((sock = accept(csock->sock, NULL, NULL)) < 0) { if ((sock = accept(csock->sock, NULL, NULL)) < 0) {
perror("accept()"); f2b_log_msg(log_error, "can't accept() new connection: %s", strerror(errno));
} else if (cnum < MAXCONNS) { } else if (cnum < MAXCONNS) {
if ((conn = f2b_conn_create(RBUF_SIZE, WBUF_SIZE)) != NULL) { if ((conn = f2b_conn_create(RBUF_SIZE, WBUF_SIZE)) != NULL) {
f2b_log_msg(log_debug, "new connection accept()ed, socket %d, conn %d\n", sock, cnum); f2b_log_msg(log_debug, "new connection accept()ed, socket %d", sock);
conn->sock = sock; conn->sock = sock;
csock->clients[cnum] = conn; csock->clients[cnum] = conn;
} else { } else {
f2b_log_msg(log_error, "can;t create new connection"); f2b_log_msg(log_error, "can't create new connection");
} }
} else { } else {
f2b_log_msg(log_error, "max number of clients reached, drop connection on socket %d\n", sock); f2b_log_msg(log_error, "max number of clients reached, drop connection on socket %d", sock);
shutdown(sock, SHUT_RDWR); shutdown(sock, SHUT_RDWR);
} }
} }
@ -193,50 +251,11 @@ f2b_csocket_poll(f2b_csock_t *csock, void (*cb)(const f2b_cmd_t *cmd, f2b_buf_t
for (int cnum = 0; cnum < MAXCONNS; cnum++) { for (int cnum = 0; cnum < MAXCONNS; cnum++) {
if ((conn = csock->clients[cnum]) == NULL) if ((conn = csock->clients[cnum]) == NULL)
continue; continue;
/* handle incoming data */ retval = f2b_conn_process(conn, FD_ISSET(conn->sock, &rfds), cb);
if (FD_ISSET(conn->sock, &rfds)) { if (retval < 0) {
f2b_log_msg(log_debug, "some incoming data on socket %d", conn->sock);
char tmp[RBUF_SIZE] = "";
char *line = NULL;
ssize_t read = 0;
read = recv(conn->sock, tmp, RBUF_SIZE, MSG_DONTWAIT);
if (read > 0) {
tmp[read] = '\0';
f2b_log_msg(log_debug, "conn %d received %zd bytes, and recv buf is now %zd bytes\n", conn->sock, read, conn->recv.used);
f2b_buf_append(&conn->recv, tmp, read);
/* TODO: properly handle empty lines */
while (*conn->recv.data == '\n') {
/* TODO f2b_buf_splice(conn->send, retval); */
break;
}
/* extract message(s) */
while ((line = f2b_buf_extract(&conn->recv, "\n")) != NULL) {
if ((cmd = f2b_cmd_create(line)) != NULL) {
cb(cmd, &conn->send); /* handle command */
f2b_cmd_destroy(cmd);
} else {
f2b_buf_append(&conn->send, "can't parse input\n", 0);
}
free(line);
}
} else if (read == 0) {
f2b_log_msg(log_debug, "received connection close on socket %d", conn->sock);
shutdown(conn->sock, SHUT_RDWR); shutdown(conn->sock, SHUT_RDWR);
f2b_conn_destroy(conn); f2b_conn_destroy(conn);
csock->clients[cnum] = NULL; csock->clients[cnum] = NULL;
} else {
perror("recv()");
}
}
/* handle outgoing data */
if (conn->send.used > 0) {
f2b_log_msg(log_debug, "sending %zu bytes to socket %d\n", conn->send.used, conn->sock);
retval = send(conn->sock, conn->send.data, conn->send.used, MSG_DONTWAIT);
if (retval > 0) {
/* TODO f2b_buf_splice(conn->send, retval); */
} else if (retval < 0) {
f2b_log_msg(log_error, "can't send %zu bytes to socket %d", conn->send.used, conn->sock);
}
} }
} /* foreach connection(s) */ } /* foreach connection(s) */
return; return;

Loading…
Cancel
Save