diff --git a/src/server/Makefile.am b/src/server/Makefile.am index 2e84b66b..1ed8dc55 100644 --- a/src/server/Makefile.am +++ b/src/server/Makefile.am @@ -8,7 +8,7 @@ DEFAULT_LIB = -L. -lserver \ -L../common/ -lcommon $(LIBRDMACM) $(LIBIBVERBS) noinst_LIBRARIES = libserver.a -bin_PROGRAMS = tcpservermain +bin_PROGRAMS = tcpservermain testtcpserver testtcpclient libserver_a_SOURCES = TCPServer.cpp libserver_a_CPPFLAGS = -ggdb -I$(top_srcdir) \ @@ -28,6 +28,11 @@ LDFLAGS = -pthread LDADD = $(DEFAULT_LIB) CPPFLAGS = -I$(top_srcdir) -I$(top_srcdir)/src +testtcpserver_DEPENDENCIES = libserver.a +testtcpserver_SOURCES = TestTCPServer.cpp +testtcpclient_DEPENDENCIES = libserver.a +testtcpclient_SOURCES = TestTCPClient.cpp + bladepoolmain_DEPENDENCIES = libserver.a bladepoolmain_SOURCES = bladepoolmain.cpp diff --git a/src/server/TestTCPClient.cpp b/src/server/TestTCPClient.cpp new file mode 100644 index 00000000..fabed93e --- /dev/null +++ b/src/server/TestTCPClient.cpp @@ -0,0 +1,131 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "utils/logging.h" +#include "utils/Stats.h" +#include "common/Exception.h" + + + +const char* address_ = "127.0.0.1"; +const char* port_string_ = "12345"; +const int MILLION = 1000000; +const int num_messages = 100000; + +namespace cirrus { + + +ssize_t read_all(int sock, void* data, size_t len) { + uint64_t bytes_read = 0; + + while (bytes_read < len) { + int64_t retval = read(sock, reinterpret_cast(data) + bytes_read, + len - bytes_read); + + if (retval < 0) { + throw cirrus::Exception("Error reading from server"); + } + + bytes_read += retval; + } + + return bytes_read; +} + + +/** + * Guarantees that an entire message is sent. + * @param sock the fd of the socket to send on. + * @param data a pointer to the data to send. + * @param len the number of bytes to send. + * @param flags for the send call. + * @return the number of bytes sent. + */ +ssize_t send_all(int sock, const void* data, size_t len, + int /* flags */) { + uint64_t to_send = len; + uint64_t total_sent = 0; + int64_t sent = 0; + + while (total_sent != to_send) { + sent = send(sock, data, len - total_sent, 0); + + if (sent == -1) { + throw cirrus::Exception("Server error sending data to client"); + } + + total_sent += sent; + + // Increment the pointer to data we're sending by the amount just sent + data = static_cast(data) + sent; + } + + return total_sent; +} + +void loop() { + std::string address(address_); + std::string port_string(port_string_); + int sock; + // Create socket + if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + throw cirrus::ConnectionException("Error when creating socket."); + } + // int opt = 1; + // if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt))) { + // throw cirrus::ConnectionException("Error setting socket options."); + // } + + struct sockaddr_in serv_addr; + + // Set the type of address being used, assuming ip v4 + serv_addr.sin_family = AF_INET; + if (inet_pton(AF_INET, address.c_str(), &serv_addr.sin_addr) != 1) { + throw cirrus::ConnectionException("Address family invalid or invalid " + "IP address passed in"); + } + // Convert port from string to int + int port = stoi(port_string, nullptr); + + // Save the port in the info + serv_addr.sin_port = htons(port); + + // Connect to the server + if (::connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) { + throw cirrus::ConnectionException("Client could " + "not connect to server."); + } + std::vector buf(4100); + cirrus::TimerFunction start; + for (int i = 0; i < num_messages; i++) { + send_all(sock, buf.data(), buf.size(), 0); + read_all(sock, buf.data(), 4); + read_all(sock, buf.data(), 128); + } + uint64_t elapsed_us = start.getUsElapsed(); + + std::cout << "msg/s: " << (num_messages * 1.0 / elapsed_us * MILLION) + << std::endl; + std::cout << "MB/s: " << (num_messages * 1.0 * 4096/ + (1024 * 1024 * elapsed_us / MILLION)) << std::endl; +} + +} + + + +auto main() -> int { + cirrus::loop(); + return 0; +} diff --git a/src/server/TestTCPServer.cpp b/src/server/TestTCPServer.cpp new file mode 100644 index 00000000..e6ca66c3 --- /dev/null +++ b/src/server/TestTCPServer.cpp @@ -0,0 +1,167 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "common/Exception.h" +#include "utils/logging.h" + +namespace cirrus { + +/** + * Guarantees that an entire message is sent. + * @param sock the fd of the socket to send on. + * @param data a pointer to the data to send. + * @param len the number of bytes to send. + * @param flags for the send call. + * @return the number of bytes sent. + */ +ssize_t send_all(int sock, const void* data, size_t len, + int /* flags */) { + uint64_t to_send = len; + uint64_t total_sent = 0; + int64_t sent = 0; + + while (total_sent != to_send) { + sent = send(sock, data, len - total_sent, 0); + + if (sent == -1) { + throw cirrus::Exception("Server error sending data to client"); + } + + total_sent += sent; + + // Increment the pointer to data we're sending by the amount just sent + data = static_cast(data) + sent; + } + + return total_sent; +} + + + +ssize_t read_all(int sock, void* data, size_t len) { + uint64_t bytes_read = 0; + + while (bytes_read < len) { + int64_t retval = read(sock, reinterpret_cast(data) + bytes_read, + len - bytes_read); + + if (retval == -1) { + char *error = strerror(errno); + LOG(error); + throw cirrus::Exception("Error reading from client"); + } + + bytes_read += retval; + } + + return bytes_read; +} + +void loop() { + int server_sock_; + int port_ = 12345; + struct sockaddr_in serv_addr; + struct sockaddr_in cli_addr; + socklen_t clilen = sizeof(cli_addr); + + server_sock_ = socket(AF_INET, SOCK_STREAM, 0); + if (server_sock_ < 0) { + throw cirrus::ConnectionException("Server error creating socket"); + } + + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; + serv_addr.sin_port = htons(port_); + + LOG("Created socket in TCPServer"); + + int opt = 1; + if (setsockopt(server_sock_, SOL_SOCKET, SO_REUSEADDR, &opt, + sizeof(opt))) { + switch (errno) { + case EBADF: + LOG("EBADF"); + break; + case ENOTSOCK: + LOG("ENOTSOCK"); + break; + case ENOPROTOOPT: + LOG("ENOPROTOOPT"); + break; + case EFAULT: + LOG("EFAULT"); + break; + case EDOM: + LOG("EDOM"); + break; + } + throw cirrus::ConnectionException("Error forcing port binding"); + } + // + // if (setsockopt(server_sock_, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt))) { + // throw cirrus::ConnectionException("Error setting socket options."); + // } + + if (setsockopt(server_sock_, SOL_SOCKET, SO_REUSEPORT, &opt, + sizeof(opt))) { + switch (errno) { + case EBADF: + LOG("EBADF"); + break; + case ENOTSOCK: + LOG("ENOTSOCK"); + break; + case ENOPROTOOPT: + LOG("ENOPROTOOPT"); + break; + case EFAULT: + LOG("EFAULT"); + break; + case EDOM: + LOG("EDOM"); + break; + } + throw cirrus::ConnectionException("Error forcing port binding"); + } + + int ret = bind(server_sock_, reinterpret_cast(&serv_addr), + sizeof(serv_addr)); + if (ret < 0) { + throw cirrus::ConnectionException("Error binding in port " + + to_string(port_)); + } + + // SOMAXCONN is the "max reasonable backlog size" defined in socket.h + if (listen(server_sock_, SOMAXCONN) == -1) { + throw cirrus::ConnectionException("Error listening on port " + + to_string(port_)); + } + + int sock = accept(server_sock_, + reinterpret_cast(&cli_addr), + &clilen); + if (sock < 0) { + throw std::runtime_error("Error accepting socket"); + } + std::vector buf(4096); + + while (1) { + read_all(sock, buf.data(), 4); + read_all(sock, buf.data(), 4096); + send_all(sock, buf.data(), 132, 0); + } +} + +} + +auto main() -> int { + cirrus::loop(); + return 0; +}