-
$CMD
+
"""
def makeShell(process, w):
- content = Monitor.SHELL_TEMPLATE
- content = content.replace('$WIDTH', str(w))
- content = content.replace('$CMD', process.to_string(True))
- shell_content = ''
- out, err = process.get()
- for line in StringIO(out).readlines():
- shell_content += '
{}
\n'.format( stripEndl(line) )
- for line in StringIO(err).readlines():
- shell_content += '
{}
\n'.format( stripEndl(line) )
- content = content.replace('$LINES', shell_content)
- return content
+ out, err = process.get()
+ shell_content = '\n'.join(['
{}
'.format( line.strip() ) for line in StringIO(out).readlines()])
+ shell_content += '\n'.join(['
{}
'.format( line.strip() ) for line in StringIO(err).readlines()])
+ return Monitor.SHELL_TEMPLATE.format(**{
+ 'WIDTH': w,
+ 'CMD': str(process),
+ 'LINES': shell_content
+ })
def make(cmd_source, destination, location=None, sleep_in_between=None):
procs = ProcessHandler.fromFile(cmd_source, location, sleep_in_between)
- monitor_page = Monitor.LAYOUT_TEMPLATE
w = int(round(100 / len(procs), 0))
- shells = ''
- for proc in procs:
- shells += '{}\n'.format( Monitor.makeShell(proc, w) )
- monitor_page = monitor_page.replace('$SHELLS', shells)
+ shells = '\n'.join([Monitor.makeShell(proc, w) for proc in procs])
with open(destination, 'w') as stream:
- stream.write(monitor_page)
+ stream.write(Monitor.LAYOUT_TEMPLATE.replace('$SHELLS', shells))
def test():
tmpdirname = tempfile.TemporaryDirectory()
@@ -126,11 +110,8 @@ def test():
print('open in a browser {}'.format(output))
def monitor(options):
- try:
- Monitor.make(options.cmd, options.dest, options.location, options.sleep)
- print('open in a browser {} to see the results sent by the run processes'.format(options.dest))
- except:
- sys.exit(1)
+ Monitor.make(options.cmd, options.dest, options.location, options.sleep)
+ print('\nopen in a browser {} to see the results sent by the run processes'.format(options.dest))
def main():
parser = OptionParser()
@@ -145,9 +126,15 @@ def main():
(options, args) = parser.parse_args()
if options.test:
- test()
+ test()
elif options.cmd and options.dest:
monitor(options)
+ elif options.cmd:
+ # just show the commands
+ print()
+ with open(options.cmd, 'r') as stream:
+ print(''.join(['Running {}'.format(line) for line in stream.readlines()]))
+ print('\n\nwaiting for all the spawned processes to complete ...\n\n')
if __name__ == '__main__':
main()
diff --git a/samples/utils/Names.cpp b/samples/utils/Names.cpp
index 159012db..b3052e36 100644
--- a/samples/utils/Names.cpp
+++ b/samples/utils/Names.cpp
@@ -8,16 +8,8 @@
#include
namespace MinimalSocket::samples {
-const Names NamesCircularIterator::NAMES_SURNAMES =
- Names{{"Luciano", "Pavarotti"},
- {"Gengis", "Khan"},
- {"Giulio", "Cesare"},
- {"Theodor", "Roosvelt"},
- {"Immanuel", "Kant"}};
-
void NamesCircularIterator::next() {
- ++current_;
- if (current_ == NAMES_SURNAMES.end()) {
+ if (++current_ == NAMES_SURNAMES.end()) {
current_ = NAMES_SURNAMES.begin();
}
}
diff --git a/samples/utils/Names.h b/samples/utils/Names.h
index 6e795733..0a125a76 100644
--- a/samples/utils/Names.h
+++ b/samples/utils/Names.h
@@ -7,30 +7,28 @@
#pragma once
-#include
#include
#include
namespace MinimalSocket::samples {
-using Names = std::unordered_map;
-
-using NamesIterator = Names::const_iterator;
class NamesCircularIterator {
public:
- static const Names NAMES_SURNAMES;
+ using Names = std::unordered_map;
+ using NamesIterator = Names::const_iterator;
+ static inline Names NAMES_SURNAMES = {{"Luciano", "Pavarotti"},
+ {"Gengis", "Khan"},
+ {"Giulio", "Cesare"},
+ {"Theodor", "Roosvelt"},
+ {"Immanuel", "Kant"}};
- NamesCircularIterator() : current_(NAMES_SURNAMES.begin()){};
+ NamesCircularIterator() = default;
const NamesIterator ¤t() { return current_; };
void next();
- static std::size_t size() {
- return NamesCircularIterator::NAMES_SURNAMES.size();
- }
-
private:
- NamesIterator current_;
+ NamesIterator current_ = NAMES_SURNAMES.begin();
};
} // namespace MinimalSocket::samples
diff --git a/samples/utils/Pollables.cpp b/samples/utils/Pollables.cpp
new file mode 100644
index 00000000..5630e1ec
--- /dev/null
+++ b/samples/utils/Pollables.cpp
@@ -0,0 +1,34 @@
+#include "Pollables.h"
+
+namespace MinimalSocket::samples {
+void Pollables::loop(const std::chrono::seconds &timeout) {
+ auto last_notable_event = std::chrono::high_resolution_clock::now();
+ while (!pollables_.empty()) {
+ auto it = pollables_.begin();
+ while (it != pollables_.end()) {
+ auto stat = (*it)();
+ switch (stat) {
+ case PollableStatus::NOT_ADVANCED:
+ ++it;
+ break;
+ case PollableStatus::ADVANCED:
+ last_notable_event = std::chrono::high_resolution_clock::now();
+ ++it;
+ break;
+ case PollableStatus::COMPLETED:
+ last_notable_event = std::chrono::high_resolution_clock::now();
+ it = pollables_.erase(it);
+ break;
+ }
+ }
+
+ auto elapsed_since_last_notable =
+ std::chrono::duration_cast(
+ std::chrono::high_resolution_clock::now() - last_notable_event);
+ if (timeout < elapsed_since_last_notable) {
+ break;
+ }
+ }
+}
+
+} // namespace MinimalSocket::samples
diff --git a/samples/utils/Pollables.h b/samples/utils/Pollables.h
new file mode 100644
index 00000000..8ca3745f
--- /dev/null
+++ b/samples/utils/Pollables.h
@@ -0,0 +1,29 @@
+#pragma once
+
+#include
+#include
+#include
+
+namespace MinimalSocket::samples {
+enum class PollableStatus { NOT_ADVANCED, ADVANCED, COMPLETED };
+
+using Pollable = std::function;
+
+class Pollables {
+public:
+ Pollables() = default;
+
+ template void emplace(Pred &&pred) {
+ pollables_.emplace_back(std::forward(pred));
+ }
+
+ // At each iteration, it polls one by one all the pollables.
+ // If there is no progress for a time period higher than the specified
+ // timeout, the function completes and return to the caller.
+ void loop(const std::chrono::seconds &timeout);
+
+private:
+ std::list pollables_;
+};
+
+} // namespace MinimalSocket::samples
diff --git a/samples/utils/Respond.h b/samples/utils/Respond.h
index 0d627861..53a05a2b 100644
--- a/samples/utils/Respond.h
+++ b/samples/utils/Respond.h
@@ -7,31 +7,38 @@
#pragma once
-#include
+#include
+#include
+#include
#include
namespace MinimalSocket::samples {
template void respond(SocketT &channel) {
while (true) {
- // receive name to search
- auto request = channel.receive(500, std::chrono::seconds{5});
- // respond with corresponding surname
- if constexpr (std::is_same::value) {
- if (!request.has_value()) {
- break;
+ try {
+ // receive name to search
+ auto request = channel.receive(500, std::chrono::seconds{5});
+ // respond with corresponding surname
+ if constexpr (std::is_base_of_v) {
+ if (!request.has_value()) {
+ break;
+ }
+ const auto &response = NamesCircularIterator::NAMES_SURNAMES
+ .find(request->received_message)
+ ->second;
+ channel.sendTo(response, request->sender);
+ } else {
+ if (request.empty()) {
+ break;
+ }
+ const auto &response =
+ NamesCircularIterator::NAMES_SURNAMES.find(request)->second;
+ channel.send(response);
}
- const auto &response =
- NamesCircularIterator::NAMES_SURNAMES.find(request->received_message)
- ->second;
- channel.sendTo(response, request->sender);
- } else {
- if (request.empty()) {
- break;
- }
- const auto &response =
- NamesCircularIterator::NAMES_SURNAMES.find(request)->second;
- channel.send(response);
+ } catch (const MinimalSocket::SocketError &) {
+ // if here the connection was shut down
+ break;
}
}
}
diff --git a/src/header/MinimalSocket/Error.h b/src/header/MinimalSocket/Error.h
index 733c84a8..7aa96374 100644
--- a/src/header/MinimalSocket/Error.h
+++ b/src/header/MinimalSocket/Error.h
@@ -16,46 +16,46 @@ class Error : public std::runtime_error {
public:
Error(const std::string &what) : std::runtime_error(what){};
- template Error(Args... args) : Error(merge(args...)) {}
+ template
+ Error(const Args &...args) : Error(merge(args...)) {}
protected:
- template static std::string merge(Args... args) {
+ template static std::string merge(const Args &...args) {
std::stringstream stream;
- merge(stream, args...);
+ (merge_(stream, args), ...);
return stream.str();
};
- template
- static void merge(std::stringstream &stream, const T ¤t,
- Args... remaining) {
- stream << current;
- merge(stream, remaining...);
- };
-
- template
- static void merge(std::stringstream &stream, const T &back) {
- stream << back;
+ template
+ static void merge_(std::stringstream &stream, const T &arg) {
+ stream << arg;
};
};
-class ErrorCodeAware {
+class ErrorCodeHolder {
public:
- int getErrorCode() const { return error_code; }
+ ErrorCodeHolder();
-protected:
- ErrorCodeAware();
+ int getErrorCode() const { return errorCode; }
private:
- int error_code;
+ int errorCode;
};
-class SocketError : public ErrorCodeAware, public Error {
+
+class SocketError : public ErrorCodeHolder, public Error {
public:
/**
- * @brief last error code raised by the socket API is automatically retrieved
+ * @brief The last raised error code is automatically retrieved
+ * and included in the error message
*/
SocketError(const std::string &what);
template
SocketError(const Args &...args) : SocketError{merge(args...)} {};
};
+
+class TimeOutError : public Error {
+public:
+ TimeOutError() : Error("Timeout reached"){};
+};
} // namespace MinimalSocket
diff --git a/src/header/MinimalSocket/NonCopiable.h b/src/header/MinimalSocket/NonCopiable.h
new file mode 100644
index 00000000..4d683e02
--- /dev/null
+++ b/src/header/MinimalSocket/NonCopiable.h
@@ -0,0 +1,19 @@
+/**
+ * Author: Andrea Casalino
+ * Created: 01.28.2020
+ *
+ * report any bug to andrecasa91@gmail.com.
+ **/
+
+#pragma once
+
+namespace MinimalSocket {
+class NonCopiable {
+public:
+ NonCopiable(const NonCopiable &) = delete;
+ NonCopiable &operator=(const NonCopiable &) = delete;
+
+protected:
+ NonCopiable() = default;
+};
+} // namespace MinimalSocket
diff --git a/src/header/MinimalSocket/core/Address.h b/src/header/MinimalSocket/core/Address.h
index e6ca3aca..9c4f7e02 100644
--- a/src/header/MinimalSocket/core/Address.h
+++ b/src/header/MinimalSocket/core/Address.h
@@ -21,35 +21,29 @@ enum class AddressFamily { IP_V4, IP_V6 };
using Port = std::uint16_t;
/**
- * @brief Passing this value as Port implies to ask the system to reserve a
- * random port.
+ * @brief Used to indicate a random port that can be assigned by the os.
*/
static constexpr Port ANY_PORT = 0;
class Address {
public:
/**
- * @brief Internally the AddressFamily is deduced according to the
- * hostIp content.
- * In case of invalid host, the object is built but left empty (i.e. *this ==
- * nullptr would be true)
+ * @brief The AddressFamily is deduced on the basis of the hostIp.
+ * @throw In case of an invalid hostIp
*/
- Address(const std::string &hostIp, const Port &port);
+ Address(const std::string &hostIp, Port port);
/**
- * @brief A representation of a local host address is created.
+ * @brief Local host on the specified port is assumed as address.
*/
- Address(const Port &port, const AddressFamily &family = AddressFamily::IP_V4);
+ Address(Port port, AddressFamily family = AddressFamily::IP_V4);
const std::string &getHost() const { return this->host; };
- const Port &getPort() const { return this->port; };
- const AddressFamily &getFamily() const { return this->family; };
+ Port getPort() const { return this->port; };
+ AddressFamily getFamily() const { return this->family; };
bool operator==(const Address &o) const;
- Address(const Address &) = default;
- Address &operator=(const Address &) = default;
-
private:
Address() = default;
@@ -59,20 +53,17 @@ class Address {
};
/**
- * @return host:port into a string.
+ * @return "host:port"
*/
std::string to_string(const Address &subject);
/**
* @brief Tries to deduce the family from the host.
- * @return nullopt in case the host is invalid, otherwise the deduced family
- * value is returned.
+ * @return nullopt in case the host is invalid, otherwise the family
+ * conrresponding to the passed address
*/
std::optional
deduceAddressFamily(const std::string &host_address);
-bool operator==(std::nullptr_t, const Address &subject);
-bool operator==(const Address &subject, std::nullptr_t);
-
bool isValidHost(const std::string &host_address);
} // namespace MinimalSocket
diff --git a/src/header/MinimalSocket/core/Definitions.h b/src/header/MinimalSocket/core/Definitions.h
index e4ad3e37..68226a66 100644
--- a/src/header/MinimalSocket/core/Definitions.h
+++ b/src/header/MinimalSocket/core/Definitions.h
@@ -11,26 +11,26 @@
#include
namespace MinimalSocket {
-struct Buffer {
+struct BufferView {
char *buffer;
- const std::size_t buffer_size;
+ std::size_t buffer_size;
};
/**
* @brief sets all values inside the passed buffer to 0
*/
-void clear(const Buffer &subject);
+void clear(BufferView &subject);
/**
- * @param subject the string buffer to convert
+ * @param subject the string to convert
* @return a buffer pointing to the first element of the subject, and a lenght
* equal to the current size of subject
*/
-Buffer makeStringBuffer(std::string &subject);
+BufferView makeBufferView(std::string &subject);
-struct ConstBuffer {
+struct BufferViewConst {
const char *buffer;
- const std::size_t buffer_size;
+ std::size_t buffer_size;
};
/**
@@ -38,7 +38,7 @@ struct ConstBuffer {
* @return an immutable buffer pointing to the first element of the subject, and
* a lenght equal to the current size of subject
*/
-ConstBuffer makeStringConstBuffer(const std::string &subject);
+BufferViewConst makeBufferViewConst(const std::string &subject);
enum class SocketType { UDP, TCP };
diff --git a/src/header/MinimalSocket/core/Receiver.h b/src/header/MinimalSocket/core/Receiver.h
index caeeaf61..307ca17d 100644
--- a/src/header/MinimalSocket/core/Receiver.h
+++ b/src/header/MinimalSocket/core/Receiver.h
@@ -13,96 +13,179 @@
#include
namespace MinimalSocket {
-class ReceiverBase : public virtual Socket {
+class ReceiverWithTimeout : public virtual Socket {
protected:
- std::unique_ptr>
- lazyUpdateReceiveTimeout(const Timeout &timeout);
+ void updateTimeout_(const Timeout &timeout);
private:
- std::mutex receive_mtx;
Timeout receive_timeout = NULL_TIMEOUT;
};
-/**
- * @brief Typically associated to a connected socket, whose remote peer
- * exchanging messages is known.
- * Attention!! Even when calling from different threads some simultaneously
- * receive, they will be satisfited one at a time, as an internal mutex must be
- * locked before starting to receive.
- */
-class Receiver : public ReceiverBase {
+class ReceiverBlocking : public ReceiverWithTimeout {
public:
/**
- * @param message the buffer that will store the received bytes.
- * @param timeout the timeout to consider. A NULL_TIMEOUT means actually to
- * begin a blocking receive.
- * @return the number of received bytes actually received and copied into
- * message. It can be also lower then buffer size, as less bytes might be
- * received.
+ * @param the buffer that will store the received message.
+ * @param optional timeout after which the receive is considered failed. A
+ * NULL_TIMEOUT means actually to begin a blocking receive.
+ * @return the number of actually received bytes. It can be also lower then
+ * the message cacapity, as that represents the maxium number of bytes
+ * expected to be received.
+ * @throw When the receive is not possible. This can be due to the fact that
+ * the connection was terminated or the socket was actually transferred.
*/
- std::size_t receive(const Buffer &message,
+ std::size_t receive(BufferView message,
const Timeout &timeout = NULL_TIMEOUT);
/**
- * @brief Similar to Receiver::receive(Buffer &, const Timeout &), but
- * internally building the recipient buffer which is converted into a string
- * before returning it.
- *
- * @param expected_max_bytes maximum number of bytes to receive
- * @param timeout the timeout to consider. A NULL_TIMEOUT means actually to
- * begin a blocking receive.
+ * @brief Similar to Receiver::receive(BufferView &, const Timeout &), but
+ * returning a string as a buffer containing the received message.
+ * @param maximum number of bytes to receive
+ * @param optional timeout after which the receive is considered failed. A
+ * NULL_TIMEOUT means actually to begin a blocking receive.
* @return the received sequence of bytes as a string. The size of the result
- * might be less than expected_max_bytes, in case less bytes were actually
- * received.
+ * might be less than expected_max_bytes, as that represents the maxium number
+ * of bytes expected to be received.
+ * @throw When the receive is not possible. This can be due to the fact that
+ * the connection was terminated or the socket was actually transferred.
*/
std::string receive(std::size_t expected_max_bytes,
const Timeout &timeout = NULL_TIMEOUT);
+
+private:
+ std::mutex recv_mtx;
};
-/**
- * @brief Typically associated to a non connected socket, whose remote peer that
- * sends bytes is known and may change over the time.
- * Attention!! Even when calling from different threads some simultaneously
- * receive, they will be satisfited one at a time, as an internal mutex must be
- * locked before starting to receive.
- */
-class ReceiverUnkownSender : public ReceiverBase {
+class ReceiverNonBlocking : public virtual Socket {
+public:
+ /**
+ * @brief Notice this is a non blocking operation, meaning that in case no new
+ * bytes were actually received at the time of calling this method, 0 will be
+ * immediately returned.
+ * @param the buffer that will store the received bytes.
+ * @return the number of actually received bytes. It can be also lower then
+ * the message cacapity, as that represents the maxium number of bytes
+ * expected to be received.
+ * @throw When the receive is not possible. This can be due to the fact that
+ * the connection was terminated or the socket was actually transferred.
+ */
+ std::size_t receive(BufferView message);
+
+ /**
+ * @brief Similar to Receiver::receive(BufferView), but
+ * returning a string as a buffer containing the received message.
+ * Notice that this is a non blocking operation, meaning that in case no new
+ * bytes were actually received at the time of calling this method, an empty
+ * string will be immediately returned.
+ *
+ * @param expected_max_bytes maximum number of bytes to receive
+ * @return the received sequence of bytes as a string. The size of the result
+ * might be less than expected_max_bytes, as that represents the maxium number
+ * of bytes expected to be received.
+ * @throw When the receive is not possible. This can be due to the fact that
+ * the connection was terminated or the socket was actually transferred.
+ */
+ std::string receive(std::size_t expected_max_bytes);
+
+private:
+ std::mutex recv_mtx;
+};
+
+template class Receiver {};
+template <> class Receiver : public ReceiverBlocking {};
+template <> class Receiver : public ReceiverNonBlocking {};
+
+struct ReceiveResult {
+ Address sender;
+ std::size_t received_bytes;
+};
+
+struct ReceiveStringResult {
+ Address sender;
+ std::string received_message;
+};
+
+class ReceiverUnkownSenderBlocking : public ReceiverWithTimeout {
public:
- struct ReceiveResult {
- Address sender;
- std::size_t received_bytes;
- };
/**
- * @param message the buffer that will store the received bytes.
- * @param timeout the timeout to consider. A NULL_TIMEOUT means actually to
+ * @param the buffer that will store the received bytes.
+ * @param the timeout to consider. A NULL_TIMEOUT means actually to
* begin a blocking receive.
* @return the number of received bytes actually received and copied into
* message, together with the address of the sender. The received bytes can be
* also lower then buffer size, as less bytes might be received.
* In case no bytes were received within the timeout, a nullopt is returned.
+ * @throw When the receive is not possible. This can be due to the fact that
+ * the connection was terminated or the socket was actually transferred.
*/
- std::optional receive(const Buffer &message,
+ std::optional receive(BufferView message,
const Timeout &timeout = NULL_TIMEOUT);
- struct ReceiveStringResult {
- Address sender;
- std::string received_message;
- };
/**
- * @brief Similar to ReceiverUnkownSender::receive(Buffer &, const Timeout &),
- * but internally building the recipient buffer which is converted into a
+ * @brief Similar to ReceiverUnkownSender::receive(BufferView &, const Timeout
+ * &), but internally building the recipient buffer which is converted into a
* string before returning it.
*
- * @param expected_max_bytes maximum number of bytes to receive
- * @param timeout the timeout to consider. A NULL_TIMEOUT means actually to
+ * @param maximum number of bytes to receive
+ * @param the timeout to consider. A NULL_TIMEOUT means actually to
* begin a blocking receive.
* @return the received sequence of bytes as a string, together with the
* address of the sender. The size of the result might be less than
* expected_max_bytes, in case less bytes were actually received.
* In case no bytes were received within the timeout, a nullopt is returned.
+ * @throw When the receive is not possible. This can be due to the fact that
+ * the connection was terminated or the socket was actually transferred.
*/
std::optional
receive(std::size_t expected_max_bytes,
const Timeout &timeout = NULL_TIMEOUT);
+
+private:
+ std::mutex recv_mtx;
+};
+
+class ReceiverUnkownSenderNonBlocking : public virtual Socket {
+public:
+ /**
+ * @brief Notice this is a non blocking operation, meaning that in case no new
+ * bytes were actually received at the time of calling this method, a nullopt
+ * will be actually returned.
+ * @param the buffer that will store the received bytes.
+ * @return the number of received bytes actually received and copied into
+ * message, together with the address of the sender. The received bytes can be
+ * also lower then buffer size, as less bytes might be received.
+ * In case no bytes were received within the timeout, a nullopt is returned.
+ * @throw When the receive is not possible. This can be due to the fact that
+ * the connection was terminated or the socket was actually transferred.
+ */
+ std::optional receive(BufferView message);
+
+ /**
+ * @brief Similar to ReceiverUnkownSender::receive(BufferView &, const Timeout
+ * &), but internally building the recipient buffer which is converted into a
+ * string before returning it.
+ * Notice this is a non blocking operation, meaning that in case no new
+ * bytes were actually received at the time of calling this method, a nullopt
+ * will be actually returned.
+ *
+ * @param maximum number of bytes to receive
+ * @param the timeout to consider. A NULL_TIMEOUT means actually to
+ * begin a blocking receive.
+ * @return the received sequence of bytes as a string, together with the
+ * address of the sender. The size of the result might be less than
+ * expected_max_bytes, in case less bytes were actually received.
+ * In case no bytes were received within the timeout, a nullopt is returned.
+ * @throw When the receive is not possible. This can be due to the fact that
+ * the connection was terminated or the socket was actually transferred.
+ */
+ std::optional receive(std::size_t expected_max_bytes);
+
+private:
+ std::mutex recv_mtx;
};
+
+template class ReceiverUnkownSender {};
+template <>
+class ReceiverUnkownSender : public ReceiverUnkownSenderBlocking {};
+template <>
+class ReceiverUnkownSender : public ReceiverUnkownSenderNonBlocking {};
} // namespace MinimalSocket
diff --git a/src/header/MinimalSocket/core/Sender.h b/src/header/MinimalSocket/core/Sender.h
index 125adb06..3849858e 100644
--- a/src/header/MinimalSocket/core/Sender.h
+++ b/src/header/MinimalSocket/core/Sender.h
@@ -10,30 +10,35 @@
#include
#include
-#include
-#include
#include
#include
namespace MinimalSocket {
-/**
- * @brief Typically associated to a connected socket, whose remote peer
- * exchanging messages is known.
- * Attention!! Even when calling from different threads some simultaneously
- * send, they will be satisfited one at a time, as an internal mutex must be
- * locked before starting to receive.
- */
class Sender : public virtual Socket {
public:
/**
- * @param message the buffer storing the bytes to send
- * @return true in case all the bytes were successfully sent
+ * @param the buffer storing the bytes to send
+ * @return true in case all the bytes were successfully sent. In case the
+ * socket is non blocking, false is returned when the buffer is full and no
+ * further bytes can be inserted. In such case, the send fails but does not
+ * throw.
+ * On the opposite, a blocking socket will wait (absorbing the calling thread)
+ * until the buffer has enough space to proceed with the send.
+ * @throw When the receive is not possible. This can be due to the fact that
+ * the connection was terminated or the socket was actually transferred.
*/
- bool send(const ConstBuffer &message);
+ bool send(const BufferViewConst &message);
/**
- * @param message the buffer storing the bytes to send as a string
- * @return true in case all the bytes were successfully sent
+ * @param the buffer storing the bytes to send as a string
+ * @return true in case all the bytes were successfully sent. In case the
+ * socket is non blocking, false is returned when the buffer is full and no
+ * further bytes can be inserted. In such case, the send fails but does not
+ * throw.
+ * On the opposite, a blocking socket will wait (absorbing the calling thread)
+ * until the buffer has enough space to proceed with the send.
+ * @throw When the receive is not possible. This can be due to the fact that
+ * the connection was terminated or the socket was actually transferred.
*/
bool send(const std::string &message);
@@ -41,47 +46,53 @@ class Sender : public virtual Socket {
std::mutex send_mtx;
};
-/**
- * @brief Typically associated to a non connected socket, whose remote peer that
- * sends bytes is known and may change over the time.
- * Attention!! It is thread safe to simultaneously send messages from different
- * threads to many different recipients.
- * However, be aware that in case 2 or more threads are sending a message to the
- * same recipient, sendTo request will be queued and executed one at a time.
- */
class SenderTo : public virtual Socket {
public:
/**
- * @param message the buffer storing the bytes to send
- * @param recipient the recpient of the message
+ * @param the buffer storing the bytes to send
+ * @param the recpient of the message
* @return true in case all the bytes were successfully sent to the specified
- * recipient
+ * recipient. In case the
+ * socket is non blocking, false is returned when the buffer is full and no
+ * further bytes can be inserted. In such case, the send fails but does not
+ * throw.
+ * On the opposite, a blocking socket will wait (absorbing the calling thread)
+ * until the buffer has enough space to proceed with the send.
+ * @throw When the receive is not possible. This can be due to the fact that
+ * the connection was terminated or the socket was actually transferred.
*/
- bool sendTo(const ConstBuffer &message, const Address &recipient);
+ bool sendTo(const BufferViewConst &message, const Address &recipient);
/**
- * @param message the buffer storing the bytes to send as a string
- * @param recipient the recpient of the message
+ * @param the buffer storing the bytes to send as a string
+ * @param the recpient of the message
* @return true in case all the bytes were successfully sent to the specified
- * recipient
+ * recipient. In case the
+ * socket is non blocking, false is returned when the buffer is full and no
+ * further bytes can be inserted. In such case, the send fails but does not
+ * throw.
+ * On the opposite, a blocking socket will wait (absorbing the calling thread)
+ * until the buffer has enough space to proceed with the send.
+ * @throw When the receive is not possible. This can be due to the fact that
+ * the connection was terminated or the socket was actually transferred.
*/
bool sendTo(const std::string &message, const Address &recipient);
private:
- std::future reserveAddress(const Address &to_reserve);
- void freeAddress(const Address &to_reserve);
+ std::mutex &getRecipientMtx(const Address &recipient);
std::mutex recipients_register_mtx;
-
struct AddressHasher {
- std::hash string_hasher;
-
std::size_t operator()(const Address &subject) const {
- return string_hasher(to_string(subject));
+ return getHasher()(to_string(subject));
+ }
+
+ static std::hash &getHasher() {
+ static std::hash res = std::hash{};
+ return res;
}
};
- using WaitingToSendQueue = std::list>;
- std::unordered_map
+ std::unordered_map, AddressHasher>
recipients_register;
};
} // namespace MinimalSocket
diff --git a/src/header/MinimalSocket/core/Socket.h b/src/header/MinimalSocket/core/Socket.h
index d96aaada..d790889a 100644
--- a/src/header/MinimalSocket/core/Socket.h
+++ b/src/header/MinimalSocket/core/Socket.h
@@ -15,7 +15,6 @@
#include
#include
#include
-#include
namespace MinimalSocket {
#ifdef _WIN32
@@ -24,18 +23,18 @@ using WSAVersion = std::array;
/**
* @brief Refer to
* https://docs.microsoft.com/en-us/windows/win32/api/winsock/nf-winsock-wsastartup
- * In windows, any application using sockets, need to initialize the WSA data
- * structure. When doing this, the version to use must be specified. This will
- * be internally handled by this library, before calling any fucntion that
- * requires the WSA data to be already initialized.
- * The WSA data will be automatically cleaned up when the process terminates.
+ * When operating under Windows, any application using sockets need to
+ * initialize the WSA data structure. When doing this, the version to use must
+ * be specified. This is internally handled, before using any relevant
+ * functionality. The WSA data will be automatically cleaned up when the process
+ * terminates.
*
- * This object allows you specify the WSA version used to initialize the WSA
- * data. When calling setWsaVersion(...) you can change the WSA version and such
- * information will be used next time the WSA data will be (automatically and
- * internally) initialized. Clearly, in case the WSA data was already
- * initialized, as a consequence of creating any kind of sockets or using one of
- * the features defined in Address.h, setWsaVersion has actually no effect.
+ * This object allows you to specify the WSA version to use. When calling
+ * setWsaVersion(...) you can change the WSA version, as this information will
+ * be used next time the WSA data will be (automatically and internally)
+ * initialized. Clearly, in case the WSA data was already initialized, as a
+ * consequence of creating any kind of sockets or using one of the features
+ * defined in Address.h, setWsaVersion has actually no effect.
*/
class WSAManager {
public:
@@ -48,10 +47,10 @@ class WSAManager {
};
#endif
-class SocketIdWrapper;
+class SocketHandler;
/**
- * @brief The base onject of any kind of socket.
+ * @brief The base object for any socket.
*/
class Socket {
public:
@@ -61,12 +60,12 @@ class Socket {
Socket &operator=(const Socket &) = delete;
/**
- * @return the socket id associated to this object.
+ * @return the file descriptor associated to this socket.
*
- * This might be:
+ * This could be:
*
* https://docs.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-socket
- * in windows is the SOCKET (cast as in integer) data used to identify the
+ * in Windows is the SOCKET (cast as an integer) used to identify the
* underlying socket and returned by ::socket(...)
*
* https://man7.org/linux/man-pages/man2/socket.2.html
@@ -75,52 +74,83 @@ class Socket {
*
* Normally, all the operations involving the socket should be handled by the
* functions provided by this library. Therefore, you shouldn't need to
- * access this number However, it might happen that sometimes you want to
- * specify additional socket option, and in order to do that you need this
- * number. Beware that you should really know what you are doing when using
- * this number.
+ * access this value. However, it might happen that you want to
+ * specify additional options, and in order to do so you need to access such
+ * value. Beware that you should really know what you are doing when using
+ * this value.
*/
- int accessSocketID() const;
+ int getSocketDescriptor() const;
+
+ /**
+ * @return true in case the socket is blocking, i.e. all possible operations
+ * absorb the calling thread till completion. Otherwise false, for a
+ * socket configured to be non blocking, i.e. operations immediately
+ * succeed or not.
+ */
+ bool isBlocking() const { return isBlocking_; }
protected:
Socket();
- static void transfer(Socket &receiver, Socket &giver);
+ void steal(Socket &giver);
+ void transfer(Socket &recipient) { recipient.steal(*this); }
- const SocketIdWrapper &getIDWrapper() const;
- SocketIdWrapper &getIDWrapper();
- void resetIDWrapper();
+ const SocketHandler &getHandler() const;
+ SocketHandler &getHandler();
+ void resetHandler();
+
+ void setNonBlocking() { isBlocking_ = false; }
+ void setUp();
private:
- std::unique_ptr socket_id_wrapper;
+ bool isBlocking_ = true;
+ std::unique_ptr socket_id_wrapper;
};
-class Openable : public virtual Socket {
+class OpenableBase : public virtual Socket {
public:
+ virtual ~OpenableBase() = default;
bool wasOpened() const { return opened; }
- /**
- * @brief Tries to do all the steps required to open this socket. In case
- * potentially expected expections are raised, they are interally catched and
- asserted.
- * On the opposite, unexpected expections are passed to the caller.
- * In both cases, the object is inernally closed and left in a state for which
- * open may be tried again.
- * @param timeout the timeout to consider. A NULL_TIMEOUT means actually to
- * begin a blocking open.
- */
- bool open(const Timeout &timeout = NULL_TIMEOUT);
-
protected:
- Openable() = default;
+ OpenableBase() = default;
- static void transfer(Openable &receiver,
- Openable &giver); // Socket::transfer(...) is also called
+ void steal(OpenableBase &giver); // Socket::steal(...) is also called
+ void transfer(OpenableBase &recipient) { recipient.steal(*this); }
virtual void open_() = 0;
-private:
std::mutex open_procedure_mtx;
std::atomic_bool opened = false;
};
+
+class Openable : public OpenableBase {
+public:
+ /**
+ * @brief Tries to execute all required steps in order to open this socket.
+ * Expections (but actually only those extending the Error class) thrown will
+ * doing so, are interally catched, terminating the opening phase and
+ * consequently leaving the socket closed. On the opposite, unexpected
+ * expections are passed to the caller. In both cases, the object is inernally
+ * closed and left in a state for which open may be tried again.
+ */
+ bool open();
+};
+
+class OpenableWithTimeout : public OpenableBase {
+public:
+ /**
+ * @brief Tries to execute all required steps in order to open this socket.
+ * Expections (but actually only those extending the Error class) thrown will
+ * doing so, are interally catched, terminating the opening phase and
+ * consequently leaving the socket closed. On the opposite, unexpected
+ * expections are passed to the caller. In both cases, the object is inernally
+ * closed and left in a state for which open may be tried again.
+ * @param the timeout to consider. A NULL_TIMEOUT means actually to
+ * begin a blocking open. On the contrary, in case the open steps are not
+ * completely within the specified timeout, function returns to the caller and
+ * leave the socket closed.
+ */
+ bool open(const Timeout &timeout = NULL_TIMEOUT);
+};
} // namespace MinimalSocket
diff --git a/src/header/MinimalSocket/core/SocketContext.h b/src/header/MinimalSocket/core/SocketContext.h
index f931a6b8..25b118b3 100644
--- a/src/header/MinimalSocket/core/SocketContext.h
+++ b/src/header/MinimalSocket/core/SocketContext.h
@@ -8,6 +8,7 @@
#pragma once
#include
+#include
#include
#include
@@ -15,20 +16,20 @@
namespace MinimalSocket {
class RemoteAddressAware {
public:
+ RemoteAddressAware(const RemoteAddressAware &o)
+ : remote_address{o.remote_address} {}
+ RemoteAddressAware &operator=(const RemoteAddressAware &o) {
+ remote_address = o.remote_address;
+ return *this;
+ }
+
/**
* @return the address of the peer that can exchange messages with this
* socket.
*/
Address getRemoteAddress() const;
- RemoteAddressAware(const RemoteAddressAware &);
- RemoteAddressAware &operator=(const RemoteAddressAware &);
-
protected:
- /**
- * @throw in case the passed address is invalid (i.e. address == nullptr is
- * true).
- */
RemoteAddressAware(const Address &address);
private:
@@ -38,28 +39,32 @@ class RemoteAddressAware {
class PortToBindAware {
public:
+ PortToBindAware(const PortToBindAware &o)
+ : port_to_bind{o.port_to_bind.load()}, must_be_free_port{
+ o.must_be_free_port.load()} {}
+ PortToBindAware &operator=(const PortToBindAware &o) {
+ port_to_bind = o.port_to_bind.load();
+ must_be_free_port = o.must_be_free_port.load();
+ return *this;
+ }
+
/**
- * @return the port that will be reserved, in case the socket was not already
- * opened, or the port actually reserved when the socket was opened.
+ * @return the port to reserve.
*/
Port getPortToBind() const { return port_to_bind; }
- PortToBindAware(const PortToBindAware &);
- PortToBindAware &operator=(const PortToBindAware &);
-
/**
- * @brief Used to enforce the fact that this port should be not previously
- * binded by anyone else when opening the socket. Beware that the default
- * behaviour is the opposite: you don't call this function the port will be
- * possibly re-used.
+ * @brief Used to specify that the port should be actually free when trying to
+ * open this socket. Beware that the default behaviour is the opposite: until
+ * you don't call this function the port will be re-used.
*/
void mustBeFreePort() { must_be_free_port = true; };
bool shallBeFreePort() const { return must_be_free_port; }
protected:
- PortToBindAware(const Port &port) : port_to_bind(port){};
+ PortToBindAware(Port port) : port_to_bind(port){};
- void setPort(const Port &port) { port_to_bind = port; };
+ void setPort(Port port) { port_to_bind = port; };
private:
std::atomic port_to_bind;
@@ -68,20 +73,25 @@ class PortToBindAware {
class RemoteAddressFamilyAware {
public:
+ RemoteAddressFamilyAware(const RemoteAddressFamilyAware &o)
+ : remote_address_family{o.remote_address_family.load()} {}
+ RemoteAddressFamilyAware &operator=(const RemoteAddressFamilyAware &o) {
+ remote_address_family = o.remote_address_family.load();
+ return *this;
+ }
+
/**
* @return the address family of the peer that can exchange messages with this
* socket.
*/
AddressFamily getRemoteAddressFamily() const { return remote_address_family; }
- RemoteAddressFamilyAware(const RemoteAddressFamilyAware &);
- RemoteAddressFamilyAware &operator=(const RemoteAddressFamilyAware &);
-
protected:
- RemoteAddressFamilyAware(const AddressFamily &family)
+ RemoteAddressFamilyAware(AddressFamily family)
: remote_address_family(family){};
private:
std::atomic remote_address_family;
};
+
} // namespace MinimalSocket
diff --git a/src/header/MinimalSocket/tcp/TcpClient.h b/src/header/MinimalSocket/tcp/TcpClient.h
index e39d1e54..7a246fa9 100644
--- a/src/header/MinimalSocket/tcp/TcpClient.h
+++ b/src/header/MinimalSocket/tcp/TcpClient.h
@@ -7,34 +7,53 @@
#pragma once
+#include
#include
#include
#include
namespace MinimalSocket::tcp {
-class TcpClient : public Openable,
- public Sender,
- public Receiver,
- public RemoteAddressAware {
-public:
- TcpClient(TcpClient &&o);
- TcpClient &operator=(TcpClient &&o);
+class TcpClientBase : public NonCopiable,
+ public OpenableWithTimeout,
+ public Sender,
+ public RemoteAddressAware {
+protected:
+ TcpClientBase(TcpClientBase &&o);
+
+ void stealBase(TcpClientBase &o);
+ TcpClientBase(const Address &server_address, bool block_mode);
+
+ void open_() override;
+};
+
+template
+class TcpClient : public TcpClientBase, public Receiver {
+public:
/**
- * @brief The connection to the server is not asked in this c'tor which
- * simply initialize this object. Such a connection is tried to be established
- * when calling open(...)
+ * @brief After construction, the socket is left closed. Indeed, the
+ * connection to the server is actually asked when opening the client.
* @param server_address the server to reach when opening this socket
*/
- TcpClient(const Address &server_address);
+ TcpClient(const Address &server_address)
+ : TcpClientBase{server_address, BlockMode} {}
-protected:
- void open_() override;
+ TcpClient(TcpClient &&o)
+ : TcpClientBase{std::forward(o)} {}
+ TcpClient &operator=(TcpClient &&o) {
+ this->stealBase(o);
+ return *this;
+ }
};
/**
- * @return a client ready to ask the connection to the same server.
- * Beware that a closed socket is returned, which can be later opened.
+ * @return a client ready to ask the connection to the same server targeted by
+ * the passed client. Beware that a closed socket is returned, which can be
+ * later opened.
*/
-TcpClient clone(const TcpClient &o);
+template
+TcpClient clone(const TcpClient &o) {
+ return TcpClient{o.getRemoteAddress()};
+}
+
} // namespace MinimalSocket::tcp
diff --git a/src/header/MinimalSocket/tcp/TcpServer.h b/src/header/MinimalSocket/tcp/TcpServer.h
index 99adc837..57854118 100644
--- a/src/header/MinimalSocket/tcp/TcpServer.h
+++ b/src/header/MinimalSocket/tcp/TcpServer.h
@@ -7,6 +7,7 @@
#pragma once
+#include
#include
#include
#include
@@ -15,78 +16,171 @@
#include
namespace MinimalSocket::tcp {
-class TcpServer;
+class TcpServerBase;
+class TcpConnectionNonBlocking;
+
/**
- * @brief An already accepted connection to a client.
- * An istance of this object can be built by before creating a TcpServer, open
- * it and call acceptNewClient().
+ * @brief Handler of an already established connection.
*/
-class TcpConnection : public Sender,
- public Receiver,
- public RemoteAddressAware {
- friend class TcpServer;
+class TcpConnectionBlocking : public NonCopiable,
+ public Sender,
+ public Receiver,
+ public RemoteAddressAware {
+ friend class TcpServerBase;
public:
- TcpConnection(TcpConnection &&o);
- TcpConnection &operator=(TcpConnection &&o);
+ TcpConnectionBlocking(TcpConnectionBlocking &&o);
+ TcpConnectionBlocking &operator=(TcpConnectionBlocking &&o);
+
+ /**
+ * @brief Beware that the giver objet is left empty.
+ */
+ TcpConnectionNonBlocking turnToNonBlocking();
private:
- TcpConnection(const Address &remote_address);
+ TcpConnectionBlocking(const Address &remote_address);
};
-class TcpServer : public PortToBindAware,
- public RemoteAddressFamilyAware,
- public virtual Socket,
- public Openable {
+/**
+ * @brief Similar to TcpConnectionBlocking, but representing the non blocking
+ * version.
+ */
+class TcpConnectionNonBlocking : public NonCopiable,
+ public Sender,
+ public Receiver,
+ public RemoteAddressAware {
public:
- TcpServer(TcpServer &&o);
- TcpServer &operator=(TcpServer &&o);
+ TcpConnectionNonBlocking(TcpConnectionNonBlocking &&o);
+ TcpConnectionNonBlocking &operator=(TcpConnectionNonBlocking &&o);
+
+ TcpConnectionNonBlocking(TcpConnectionBlocking &&connection);
+};
+
+class TcpServerBase : public NonCopiable,
+ public PortToBindAware,
+ public RemoteAddressFamilyAware,
+ public Openable {
+protected:
+ TcpServerBase(TcpServerBase &&o);
+
+ void stealBase(TcpServerBase &o);
+ TcpServerBase(Port port_to_bind, AddressFamily accepted_client_family,
+ bool block_mode);
+
+public:
/**
- * @brief The port is not reserved in this c'tor which
- * simply initialize this object. Such a thing is done when
- * when calling open(...) which does:
- * bind to the specified port this server
- * start listening for clients on the same port
- * @param port_to_bind the port that will be reserved when opening this server
- * @param accepted_client_family family of the client that will ask the
- * connection to this server
+ * @param the maximum size of the queue of clients waiting to establish a
+ * connection with this server.
+ * https://www.linuxjournal.com/files/linuxjournal.com/linuxjournal/articles/023/2333/2333s2.html#:~:text=TCP%20listen()%20Backlog,queue%20of%20partially%20open%20connections.
+ * (valid also for Windows)
+ * @throw in case the server was already opened.
*/
- TcpServer(const Port port_to_bind = ANY_PORT,
- const AddressFamily &accepted_client_family = AddressFamily::IP_V4);
+ void setClientQueueSize(std::size_t queue_size);
+
+protected:
+ void open_() override;
+
+ struct AcceptedSocket;
+ void acceptClient_(AcceptedSocket &recipient);
+ static TcpConnectionBlocking makeClient(const AcceptedSocket &acceptedSocket);
+
+private:
+ std::atomic client_queue_size = 50;
+};
+
+class AcceptorBlocking : public TcpServerBase {
+public:
/**
* @brief Wait till accepting the connection from a new client. This is a
* blocking operation.
*/
- TcpConnection acceptNewClient(); // blocking
+ TcpConnectionBlocking acceptNewClient();
/**
- * @brief Wait till accepting the connection from a new client. In case such a
- * connection is not asked within the specified timeout, a nullopt is
- * returned.
- * @param timeout the timeout to consider. A NULL_TIMEOUT means actually to
- * begin a blocking accept.
+ * @brief Similar to AcceptorBlocking::acceptNewClient, but returning a non
+ * blocking socket after the connection is established.
+ * Notice that in any case, this operation is blocking.
*/
- std::optional acceptNewClient(const Timeout &timeout);
+ TcpConnectionNonBlocking acceptNewNonBlockingClient() {
+ return acceptNewClient().turnToNonBlocking();
+ }
+protected:
+ template
+ AcceptorBlocking(Args &&...args)
+ : TcpServerBase{std::forward(args)...} {}
+
+private:
+ std::mutex accept_mtx;
+};
+
+class AcceptorNonBlocking : public TcpServerBase {
+public:
/**
- * @param queue_size the backlog size to assume when the server will be
- * opened, refer also to
- * https://www.linuxjournal.com/files/linuxjournal.com/linuxjournal/articles/023/2333/2333s2.html#:~:text=TCP%20listen()%20Backlog,queue%20of%20partially%20open%20connections.
- * (valid also for Windows)
- * @throw in case the server was already opened.
+ * @brief Check if a new client tried to connect and eventually immediately
+ * return the object handling the connection. This is a non blocking
+ * operation: in case no client actually asked for the connection, the method
+ * immeditely returns a nullopt.
+ *
+ * Notice that even though this operation is non
+ * blocking, a blocking socket is returned.
+ */
+ std::optional acceptNewClient();
+
+ /**
+ * @brief Similar to AcceptorNonBlocking::acceptNewClient, but returning a non
+ * blocking socket.
*/
- void setClientQueueSize(const std::size_t queue_size);
+ std::optional acceptNewNonBlockingClient() {
+ auto client = acceptNewClient();
+ if (client.has_value()) {
+ return client->turnToNonBlocking();
+ }
+ return std::nullopt;
+ }
protected:
- void open_() override;
+ template
+ AcceptorNonBlocking(Args &&...args)
+ : TcpServerBase{std::forward(args)...} {}
private:
- std::atomic client_queue_size =
- 50; // maximum number of clients put in the queue wiating for connection
- // to be accepted
-
std::mutex accept_mtx;
};
+
+template class Acceptor {};
+template <> class Acceptor : public AcceptorBlocking {
+protected:
+ template
+ Acceptor(Args &&...args) : AcceptorBlocking{std::forward(args)...} {}
+};
+template <> class Acceptor : public AcceptorNonBlocking {
+protected:
+ template
+ Acceptor(Args &&...args) : AcceptorNonBlocking{std::forward(args)...} {}
+};
+
+template class TcpServer : public Acceptor {
+public:
+ /**
+ * @brief After construction, the socket is left closed. Only after calling
+ * open(...), the port is binded and the server starts to listen for
+ * connection request on that port.
+ * @param the port that will be reserved when opening this server
+ * @param family of the client that will ask the
+ * connection to this server
+ */
+ TcpServer(Port port_to_bind = 0,
+ AddressFamily accepted_client_family = AddressFamily::IP_V4)
+ : Acceptor{port_to_bind, accepted_client_family, BlockMode} {}
+
+ TcpServer(TcpServer &&o)
+ : Acceptor{std::forward>(o)} {}
+ TcpServer &operator=(TcpServer &&o) {
+ this->stealBase(o);
+ return *this;
+ }
+};
} // namespace MinimalSocket::tcp
diff --git a/src/header/MinimalSocket/udp/UdpSocket.h b/src/header/MinimalSocket/udp/UdpSocket.h
index eb785175..d800fde3 100644
--- a/src/header/MinimalSocket/udp/UdpSocket.h
+++ b/src/header/MinimalSocket/udp/UdpSocket.h
@@ -7,6 +7,7 @@
#pragma once
+#include