#ifndef WIN32 //#define USE_POLL #endif #include #include #include #ifdef WIN32 #include #include #else #include #ifdef USE_POLL #include #endif /* USE_POLL */ #include #include #include #include #include #endif #include #include #include "socket.h" #include "list.h" int s_running; struct socket_t { int fd; int mode; socket_func socket_func; void *arg; }; static inline int socket_compare(struct socket_t **a, struct socket_t **b) { return (*a)->fd == (*b)->fd; } LIST(socket, struct socket_t *, socket_compare) static struct socket_list_t s_sockets; static pthread_mutex_t s_sockets_mutex; #ifdef USE_POLL static int s_epoll_fd = -1; #endif /* USE_POLL */ #ifdef USE_POLL static struct socket_t *socket_get_by_fd(int fd) { size_t i; for (i = 0; i < s_sockets.used; i++) { struct socket_t *s = s_sockets.items[i]; if (s->fd == fd) return s; } return NULL; } #endif void register_socket(int fd, int mode, socket_func socket_func, void *arg) { struct socket_t *s = malloc(sizeof *s); s->fd = fd; s->mode = mode; s->socket_func = socket_func; s->arg = arg; pthread_mutex_lock(&s_sockets_mutex); socket_list_add(&s_sockets, s); #ifdef USE_POLL if (s_epoll_fd == -1) { s_epoll_fd = epoll_create(32); if (s_epoll_fd == -1) fprintf(stderr, "register_socket(): epoll_create: %s\n", strerror(errno)); } struct epoll_event ev; ev.events = EPOLLIN; ev.data.ptr = s; int r = epoll_ctl(s_epoll_fd, EPOLL_CTL_ADD, fd, &ev); if (r == -1) fprintf(stderr, "register_socket(): epoll_ctl: %s\n", strerror(errno)); #endif /* USE_POLL */ pthread_mutex_unlock(&s_sockets_mutex); } void socket_flag_write(int fd) { #ifdef USE_POLL struct epoll_event ev; ev.events = EPOLLIN | EPOLLOUT; ev.data.ptr = socket_get_by_fd(fd);; int r = epoll_ctl(s_epoll_fd, EPOLL_CTL_MOD, fd, &ev); if (r == -1) fprintf(stderr, "socket_flag_write(): epoll_ctl: %s\n", strerror(errno)); #endif /* USE_POLL */ } void socket_clear_write(int fd) { #ifdef USE_POLL struct epoll_event ev; ev.events = EPOLLIN; ev.data.ptr = socket_get_by_fd(fd); int r = epoll_ctl(s_epoll_fd, EPOLL_CTL_MOD, fd, &ev); if (r == -1) fprintf(stderr, "socket_clear_write(): epoll_ctl: %s\n", strerror(errno)); #endif /* USE_POLL */ } void deregister_socket(int fd) { struct socket_t s; s.fd = fd; pthread_mutex_lock(&s_sockets_mutex); socket_list_del_item(&s_sockets, &s); pthread_mutex_unlock(&s_sockets_mutex); } void socket_init(void) { #ifdef WIN32 WSADATA WSA_Data; WSAStartup (0x101, & WSA_Data); #endif pthread_mutex_init(&s_sockets_mutex, NULL); } void socket_deinit(void) { if (s_sockets.used > 0) { fprintf(stderr, "[network] socket_deinit(): %u sockets remaining in list\n", (unsigned)s_sockets.used); } #ifdef USE_POLL close(s_epoll_fd); #endif socket_list_free(&s_sockets); } void socket_set_nonblock(int fd) { #ifdef WIN32 u_long iMode = 1; ioctlsocket(fd, FIONBIO, &iMode); #else int flags = fcntl(fd, F_GETFL, 0); int res = fcntl(fd, F_SETFL, flags | O_NONBLOCK); if (res != 0) { fprintf(stderr, "[network] socket_set_nonblocK(): Could not set nonblocking IO: %s\n", strerror(errno)); } #endif } void socket_set_nodelay(int fd) { int b = 1; int res = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (const char *)&b, sizeof b); if (res != 0) { fprintf(stderr, "[network] socket_set_nonblock(): Could not set TCP_NODELAY: %s\n", strerror(errno)); } } #ifdef USE_POLL #define EPOLL_EVENTS 256 void *socket_run(void *arg) { s_running = 1; while (s_running) { struct epoll_event events[EPOLL_EVENTS]; int n = epoll_wait(s_epoll_fd, events, EPOLL_EVENTS, 10000); if (n == -1) fprintf(stderr, "socket_run(): epoll_wait: %s\n", strerror(errno)); int i; for (i = 0; i < n; i++) { struct socket_t *s = events[i].data.ptr; int state = s->socket_func(s->fd, !!(events[i].events & EPOLLOUT), !!(events[i].events & EPOLLIN), s->arg); if (!state) { close(s->fd); s->fd = -1; } } } } #else /* USE_POLL */ void *socket_run(void *arg) { s_running = 1; while (s_running) { unsigned i; int n; fd_set read_fd, write_fd; struct timeval tv; FD_ZERO(&read_fd); FD_ZERO(&write_fd); /* Add service sockets */ for (i = 0; i < s_sockets.used; i++) { int fd = s_sockets.items[i]->fd; if (fd >= 0) { if (s_sockets.items[i]->mode & SM_READ ) FD_SET(fd, &read_fd); if (s_sockets.items[i]->mode & SM_WRITE) FD_SET(fd, &write_fd); } } tv.tv_sec = 10; tv.tv_usec = 0; n = select(FD_SETSIZE, &read_fd, &write_fd, NULL, &tv); if (n == -1) { fprintf(stderr, "select: %s\n", strerror(errno)); return NULL; } if (n == 0) continue; for (i = 0; i < s_sockets.used; i++) { struct socket_t *s = s_sockets.items[i]; int can_write = FD_ISSET(s->fd, &write_fd); int can_read = FD_ISSET(s->fd, &read_fd); if (can_write || can_read) { int state = s->socket_func(s->fd, can_write, can_read, s->arg); if (!state) { close(s->fd); s->fd = -1; } } } } printf("Socket thread finished\n"); return NULL; } #endif /* USE_POLL */