#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef __linux__ #include #include #include #include #endif #ifdef __MACH__ #include #include #include #include #include #endif #include "Rewind.h" #define HELPER(value) #value #define STRING(value) HELPER(value) #define COUNT(array) sizeof(array) / sizeof(array[0]) #ifdef __cplusplus #define CAST(type, value) const_cast(value) #else #define CAST(type, value) (type)value #ifndef bool #define bool int #define true 1 #define false 0 #endif #endif #ifdef __linux__ #define CHECK(event, handle) (event->data.fd == handle) #endif #ifdef __MACH__ #define CHECK(event, handle) (event->ident == handle) && (event->filter == EVFILT_READ) #define htobe16(value) OSSwapHostToBigInt16(value) #define be16toh(value) OSSwapBigToHostInt16(value) #define htobe32(value) OSSwapHostToBigInt32(value) #define be32toh(value) OSSwapBigToHostInt32(value) #define htole16(value) OSSwapHostToLittleInt16(value) #define le16toh(value) OSSwapLittleToHostInt16(value) #define htole32(value) OSSwapHostToLittleInt32(value) #define le32toh(value) OSSwapLittleToHostInt32(value) #define __bswap_16(value) OSSwapConstInt16(value) #define __bswap_32(value) OSSwapConstInt32(value) #endif #define MODE_CONSOLE (1 << 0) #define MODE_SYSLOG (1 << 1) #define MODE_DAEMON (1 << 2) #define EVENT_LIST_LENGTH (4 + 4) #define BUFFER_SIZE 2048 #define WATCH_THRESHOLD 20 #define PROXY_PORT_COUNT 3 int serviceMode = MODE_CONSOLE; void print(const char* format, ...) { va_list arguments; va_start(arguments, format); if (serviceMode & MODE_CONSOLE) vprintf(format, arguments); if (serviceMode & MODE_SYSLOG) vsyslog(LOG_INFO, format, arguments); va_end(arguments); } int main(int argc, const char* argv[]) { print("\n"); print("TellusAgent for BrandMeister Core\n"); print("Copyright 2016-2023 Artem Prilutskiy (R3ABM, cyanide.burnout@gmail.com)\n"); print("Software revision " STRING(VERSION) " build " BUILD "\n"); print("\n"); // Parameters for server const char* serverPort = "54003"; const char* serverLocation = NULL; struct addrinfo* serverAddress = NULL; // Parameters for repeater uint16_t proxyPorts[PROXY_PORT_COUNT] = { htons(50000), htons(50001), htons(50002) }; // Start up struct option options[] = { { "connect-port", required_argument, NULL, 'c' }, { "control-port", required_argument, NULL, 'r' }, { "media-port", required_argument, NULL, 'd' }, { "server-address", required_argument, NULL, 's' }, { "server-port", required_argument, NULL, 'p' }, { "service-mode", required_argument, NULL, 'm' }, { NULL, 0, NULL, 0 } }; int selection = 0; while ((selection = getopt_long(argc, CAST(char* const*, argv), "c:r:d:s:p:m:", options, NULL)) != EOF) switch (selection) { case 'c': proxyPorts[0] = htons(strtol(optarg, NULL, 10)); break; case 'r': proxyPorts[1] = htons(strtol(optarg, NULL, 10)); break; case 'd': proxyPorts[2] = htons(strtol(optarg, NULL, 10)); break; case 's': serverLocation = optarg; break; case 'p': serverPort = optarg; break; case 'm': serviceMode = strtol(optarg, NULL, 10); break; } if (serverLocation == NULL) { print( "Usage:\n" " %s\n" " --connect-port \n" " --control-port \n" " --media-port \n" " --server-address \n" " --server-port \n" " --service-mode \n" " bit 0 - print to standard output\n" " bit 1 - print to system log\n" " bit 2 - run as daemon\n" "\n", argv[0]); return EXIT_FAILURE; } if (serviceMode & MODE_SYSLOG) { // Set proper origin for syslog (required by OpenWRT) openlog("TellusAgent", LOG_NOWAIT | LOG_PID, LOG_USER); } #ifdef __linux__ if ((serviceMode & MODE_DAEMON) && (daemon(-1, -1) < 0)) { print("Error launching daemon"); return EXIT_FAILURE; } #endif #ifdef __MACH__ if (serviceMode & MODE_DAEMON) { launch_data_t request = launch_data_new_string(LAUNCH_KEY_CHECKIN); launch_data_t response = launch_msg(request); launch_data_free(request); if (response == NULL) { print("Error calling launchd"); return EXIT_FAILURE; } launch_data_type_t type = launch_data_get_type(response); launch_data_free(response); if (type == LAUNCH_DATA_ERRNO) { print("launchd returned error %d\n", launch_data_get_errno(response)); return EXIT_FAILURE; } // launchd will return dictionary of job for successful check-in if (type != LAUNCH_DATA_DICTIONARY) { print("Error launching daemon"); return EXIT_FAILURE; } } #endif // Resolve server address struct addrinfo hints; memset(&hints, 0, sizeof(hints)); hints.ai_socktype = SOCK_DGRAM; #ifdef __linux__ hints.ai_flags = AI_ADDRCONFIG; hints.ai_family = AF_UNSPEC; #endif #ifdef __MACH__ hints.ai_flags = AI_V4MAPPED; hints.ai_family = AF_INET6; #endif if (getaddrinfo(serverLocation, serverPort, &hints, &serverAddress) != 0) { print("Error resolving server address %s\n", serverLocation); return EXIT_FAILURE; } // Initialize proxy sockets int proxyHandles[PROXY_PORT_COUNT]; struct sockaddr_in proxySocketAddress; socklen_t proxySocketLength = sizeof(proxySocketAddress); proxySocketAddress.sin_family = AF_INET; proxySocketAddress.sin_addr.s_addr = htonl(INADDR_ANY); for (selection = 0; selection < PROXY_PORT_COUNT; selection ++) { int handle; proxySocketAddress.sin_port = proxyPorts[selection]; handle = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP); if ((handle < 0) || (bind(handle, (struct sockaddr*)&proxySocketAddress, proxySocketLength) < 0)) { print("Error opening ports for Multi-Site Connect\n"); return EXIT_FAILURE; } proxyHandles[selection] = handle; } // Initialize uplink socket int uplinkHandle; struct sockaddr_in6 uplinkSocketAddress; uplinkSocketAddress.sin6_family = AF_INET6; uplinkSocketAddress.sin6_addr = in6addr_any; uplinkSocketAddress.sin6_port = 0; uplinkSocketAddress.sin6_scope_id = 0; uplinkHandle = socket(PF_INET6, SOCK_DGRAM, IPPROTO_UDP); if ((uplinkHandle < 0) || (bind(uplinkHandle, (struct sockaddr*)&uplinkSocketAddress, sizeof(uplinkSocketAddress)) < 0)) { print("Error opening port for Rewind Uplink\n"); return EXIT_FAILURE; } #ifdef __linux__ // Initialize signal handle int signalHandle; sigset_t signalMask; sigemptyset(&signalMask); sigaddset(&signalMask, SIGINT); sigaddset(&signalMask, SIGHUP); sigaddset(&signalMask, SIGTERM); sigaddset(&signalMask, SIGQUIT); sigprocmask(SIG_BLOCK, &signalMask, NULL); signalHandle = signalfd(-1, &signalMask, 0); // Initialize ePoll int pollHandle; struct epoll_event event; pollHandle = epoll_create(EVENT_LIST_LENGTH); event.events = EPOLLIN; event.data.fd = proxyHandles[0]; epoll_ctl(pollHandle, EPOLL_CTL_ADD, event.data.fd, &event); event.events = EPOLLIN; event.data.fd = proxyHandles[1]; epoll_ctl(pollHandle, EPOLL_CTL_ADD, event.data.fd, &event); event.events = EPOLLIN; event.data.fd = proxyHandles[2]; epoll_ctl(pollHandle, EPOLL_CTL_ADD, event.data.fd, &event); event.events = EPOLLIN; event.data.fd = uplinkHandle; epoll_ctl(pollHandle, EPOLL_CTL_ADD, event.data.fd, &event); event.events = EPOLLIN; event.data.fd = signalHandle; epoll_ctl(pollHandle, EPOLL_CTL_ADD, event.data.fd, &event); #endif #ifdef __MACH__ // Prepare signal handlers signal(SIGINT, SIG_IGN); signal(SIGHUP, SIG_IGN); signal(SIGTERM, SIG_IGN); signal(SIGQUIT, SIG_IGN); // Initialize KQueue int queueHandle; struct kevent changes[EVENT_LIST_LENGTH]; struct kevent* change = changes; queueHandle = kqueue(); EV_SET(change, proxyHandles[0], EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0); change ++; EV_SET(change, proxyHandles[1], EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0); change ++; EV_SET(change, proxyHandles[2], EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0); change ++; EV_SET(change, uplinkHandle, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0); change ++; EV_SET(change, SIGINT, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, 0); change ++; EV_SET(change, SIGHUP, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, 0); change ++; EV_SET(change, SIGTERM, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, 0); change ++; EV_SET(change, SIGQUIT, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, 0); change ++; #endif // Prepare buffers void* controlBuffer = alloca(BUFFER_SIZE); struct RewindData* incomingBuffer = (struct RewindData*)alloca(sizeof(struct RewindData) + BUFFER_SIZE); struct RewindData* outgoingBuffer = (struct RewindData*)alloca(sizeof(struct RewindData) + BUFFER_SIZE); memset(outgoingBuffer, 0, sizeof(struct RewindData)); memcpy(outgoingBuffer, REWIND_PROTOCOL_SIGN, REWIND_SIGN_LENGTH); struct sockaddr_in repeaterSocketAddresses[PROXY_PORT_COUNT]; memset(repeaterSocketAddresses, 0, sizeof(repeaterSocketAddresses)); // Main loop bool running = true; size_t watchDog = 0; print("Server started\n"); while (running) { #ifdef __linux__ struct epoll_event events[EVENT_LIST_LENGTH]; int count = epoll_wait(pollHandle, events, EVENT_LIST_LENGTH, -1); #endif #ifdef __MACH__ struct kevent events[EVENT_LIST_LENGTH]; int count = kevent(queueHandle, changes, EVENT_LIST_LENGTH, events, EVENT_LIST_LENGTH, NULL); #endif if (count < 0) { int error = errno; print("Error processing handles: %s (%d)\n", strerror(error), error); break; } if (watchDog > WATCH_THRESHOLD) { print("Server connection timed out\n"); break; } for (size_t index = 0; index < count; index ++) { #ifdef __linux__ struct epoll_event* event = events + index; #endif #ifdef __MACH__ struct kevent* event = events + index; #endif // Handle packet from the uplink if (CHECK(event, uplinkHandle)) { struct sockaddr_in6 address; socklen_t size = sizeof(address); size_t length = recvfrom(uplinkHandle, incomingBuffer, BUFFER_SIZE, 0, (struct sockaddr*)&address, &size); if ((length >= sizeof(struct RewindData)) && ((serverAddress->ai_addr->sa_family == AF_INET) || /* Work-around for Linux */ (memcmp(&address, serverAddress->ai_addr, serverAddress->ai_addrlen) == 0)) && (memcmp(incomingBuffer->sign, REWIND_PROTOCOL_SIGN, REWIND_SIGN_LENGTH) == 0)) { uint16_t type = le16toh(incomingBuffer->type); size_t length = le16toh(incomingBuffer->length); watchDog = 0; if ((type >= REWIND_TYPE_PEER_DATA) && (type <= REWIND_TYPE_MEDIA_DATA)) { int handle; struct sockaddr_in* address; selection = type - REWIND_TYPE_PEER_DATA; address = repeaterSocketAddresses + selection; handle = proxyHandles[selection]; sendto(handle, incomingBuffer->data, length, 0, (struct sockaddr*)address, sizeof(struct sockaddr_in)); continue; } if (type == REWIND_TYPE_XNMS_DATA) { struct sockaddr_in* address; struct RewindForwardData* data; data = (struct RewindForwardData*)incomingBuffer->data; length -= sizeof(struct RewindForwardData); address = repeaterSocketAddresses + 1; address->sin_port = data->port; sendto(proxyHandles[1], data->data, length, 0, (struct sockaddr*)address, sizeof(struct sockaddr_in)); continue; } if (type == REWIND_TYPE_REPORT) { incomingBuffer->data[length] = '\0'; print("Server message: %s\n", incomingBuffer->data); continue; } if (type == REWIND_TYPE_CLOSE) { print("Disconnect request received\n"); running = false; break; } } } // Handle packet from the repeater for (selection = 0; selection < PROXY_PORT_COUNT; selection ++) { int handle = proxyHandles[selection]; if (CHECK(event, handle)) { socklen_t size = sizeof(struct sockaddr_in); uint8_t* buffer = (uint8_t*)outgoingBuffer->data; struct sockaddr_in* address = repeaterSocketAddresses + selection; size_t length = recvfrom(handle, buffer, BUFFER_SIZE, 0, (struct sockaddr*)address, &size); if ((selection == 0) && (memcmp(buffer, "HMTP", 4) != 0)) { struct msghdr message; struct iovec vectors[2]; watchDog ++; outgoingBuffer->type = htole16(REWIND_TYPE_BINDING_NOTICE); outgoingBuffer->length = htole16(sizeof(proxyPorts)); vectors[0].iov_base = outgoingBuffer; vectors[0].iov_len = sizeof(struct RewindData); vectors[1].iov_base = proxyPorts; vectors[1].iov_len = sizeof(proxyPorts); message.msg_name = serverAddress->ai_addr; message.msg_namelen = serverAddress->ai_addrlen; message.msg_iov = vectors; message.msg_iovlen = 2; message.msg_control = NULL; message.msg_controllen = 0; message.msg_flags = 0; sendmsg(uplinkHandle, &message, 0); } outgoingBuffer->type = htole16(REWIND_CLASS_HYTERA_DATA + selection); outgoingBuffer->length = htole16(length); length += sizeof(struct RewindData); sendto(uplinkHandle, outgoingBuffer, length, 0, serverAddress->ai_addr, serverAddress->ai_addrlen); } } // Handle signal from the kernel #ifdef __linux__ if (event->data.fd == signalHandle) { struct signalfd_siginfo information; read(signalHandle, &information, sizeof(information)); #endif #ifdef __MACH__ if (event->filter == EVFILT_SIGNAL) { #endif outgoingBuffer->type = htole16(REWIND_TYPE_CLOSE); outgoingBuffer->length = 0; sendto(uplinkHandle, outgoingBuffer, sizeof(struct RewindData), 0, serverAddress->ai_addr, serverAddress->ai_addrlen); #ifdef __linux__ if ((information.ssi_signo == SIGINT) || (information.ssi_signo == SIGTERM) || (information.ssi_signo == SIGQUIT)) #endif #ifdef __MACH__ if ((event->ident == SIGINT) || (event->ident == SIGTERM) || (event->ident == SIGQUIT)) #endif { running = false; break; } } } } print("Server stopped\n"); // Clean up close(uplinkHandle); close(proxyHandles[0]); close(proxyHandles[1]); close(proxyHandles[2]); #ifdef __linux__ close(pollHandle); close(signalHandle); #endif #ifdef __MACH__ close(queueHandle); #endif freeaddrinfo(serverAddress); return EXIT_SUCCESS; };