diff --git a/demodulator.h b/demodulator.h index 9ee854a..803a065 100644 --- a/demodulator.h +++ b/demodulator.h @@ -50,6 +50,8 @@ namespace flightaware::uat { class Receiver : public MessageSource { public: virtual void HandleSamples(std::uint64_t timestamp, Bytes::const_iterator begin, Bytes::const_iterator end) = 0; + + virtual void HandleError(const boost::system::error_code &ec) { DispatchError(ec); } }; class SingleThreadReceiver : public Receiver { diff --git a/dump978_main.cc b/dump978_main.cc index bbe6163..e3c36f3 100644 --- a/dump978_main.cc +++ b/dump978_main.cc @@ -122,7 +122,8 @@ static int realmain(int argc, char **argv) { } MessageDispatch dispatch; - SampleSource::Pointer source; + SampleSource::Pointer sample_source; + MessageSource::Pointer message_source; tcp::resolver resolver(io_service); @@ -132,13 +133,13 @@ static int realmain(int argc, char **argv) { } if (opts.count("stdin")) { - source = StdinSampleSource::Create(io_service, opts); + sample_source = StdinSampleSource::Create(io_service, opts); } else if (opts.count("file")) { boost::filesystem::path path(opts["file"].as()); - source = FileSampleSource::Create(io_service, path, opts); + sample_source = FileSampleSource::Create(io_service, path, opts); } else if (opts.count("sdr")) { auto device = opts["sdr"].as(); - source = SoapySampleSource::Create(io_service, device, opts); + sample_source = SoapySampleSource::Create(io_service, device, opts); } else { assert("impossible case" && false); } @@ -202,26 +203,30 @@ static int realmain(int argc, char **argv) { }); } - source->Init(); - auto format = source->Format(); + bool saw_error = false; - auto receiver = std::make_shared(format); - receiver->SetConsumer(std::bind(&MessageDispatch::Dispatch, &dispatch, std::placeholders::_1)); + if (sample_source) { + sample_source->Init(); + auto format = sample_source->Format(); - bool saw_error = false; + auto receiver = std::make_shared(format); - source->SetConsumer([&io_service, &saw_error, receiver](std::uint64_t timestamp, const Bytes &buffer, const boost::system::error_code &ec) { - if (ec) { - if (ec == boost::asio::error::eof) { - std::cerr << "Sample source reports EOF" << std::endl; - } else { - std::cerr << "Sample source reports error: " << ec.message() << std::endl; - saw_error = true; - } - io_service.stop(); + sample_source->SetConsumer([receiver](std::uint64_t timestamp, const Bytes &buffer) { receiver->HandleSamples(timestamp, buffer.begin(), buffer.end()); }); + + sample_source->SetErrorHandler(std::bind(&SingleThreadReceiver::HandleError, receiver, std::placeholders::_1)); + + message_source = std::static_pointer_cast(receiver); + } + + message_source->SetConsumer(std::bind(&MessageDispatch::Dispatch, &dispatch, std::placeholders::_1)); + message_source->SetErrorHandler([&io_service, &saw_error](const boost::system::error_code &ec) { + if (ec == boost::asio::error::eof) { + std::cerr << "Message source reports EOF" << std::endl; } else { - receiver->HandleSamples(timestamp, buffer.begin(), buffer.end()); + std::cerr << "Message source reports error: " << ec.message() << std::endl; + saw_error = true; } + io_service.stop(); }); boost::asio::signal_set signals(io_service, SIGINT, SIGTERM); @@ -231,11 +236,17 @@ static int realmain(int argc, char **argv) { io_service.stop(); }); - source->Start(); + message_source->Start(); + if (sample_source) { + sample_source->Start(); + } io_service.run(); - source->Stop(); + if (sample_source) { + sample_source->Stop(); + } + message_source->Stop(); if (saw_error) { std::cerr << "Abnormal exit" << std::endl; diff --git a/message_source.h b/message_source.h index 4c4bc54..5c91081 100644 --- a/message_source.h +++ b/message_source.h @@ -7,16 +7,24 @@ #ifndef DUMP978_MESSAGE_SOURCE_H #define DUMP978_MESSAGE_SOURCE_H +#include + #include "uat_message.h" namespace flightaware::uat { class MessageSource { public: + typedef std::shared_ptr Pointer; typedef std::function Consumer; + typedef std::function ErrorHandler; virtual ~MessageSource() {} void SetConsumer(Consumer consumer) { consumer_ = consumer; } + void SetErrorHandler(ErrorHandler handler) { error_handler_ = handler; } + + virtual void Start() {} + virtual void Stop() {} protected: void DispatchMessages(SharedMessageVector messages) { @@ -25,8 +33,15 @@ namespace flightaware::uat { } } + void DispatchError(const boost::system::error_code &ec) { + if (error_handler_) { + error_handler_(ec); + } + } + private: Consumer consumer_; + ErrorHandler error_handler_; }; }; // namespace flightaware::uat diff --git a/sample_source.h b/sample_source.h index dfa2bf5..4c52fb2 100644 --- a/sample_source.h +++ b/sample_source.h @@ -25,7 +25,8 @@ namespace flightaware::uat { class SampleSource : public std::enable_shared_from_this { public: typedef std::shared_ptr Pointer; - typedef std::function Consumer; + typedef std::function Consumer; + typedef std::function ErrorHandler; virtual ~SampleSource() {} @@ -35,24 +36,26 @@ namespace flightaware::uat { virtual SampleFormat Format() = 0; void SetConsumer(Consumer consumer) { consumer_ = consumer; } + void SetErrorHandler(ErrorHandler handler) { error_handler_ = handler; } protected: SampleSource() {} void DispatchBuffer(std::uint64_t timestamp, const Bytes &buffer) { if (consumer_) { - consumer_(timestamp, buffer, boost::system::error_code()); + consumer_(timestamp, buffer); } } void DispatchError(const boost::system::error_code &ec) { - if (consumer_) { - consumer_(0, Bytes(), ec); + if (error_handler_) { + error_handler_(ec); } } private: Consumer consumer_; + ErrorHandler error_handler_; }; class FileSampleSource : public SampleSource {