Skip to content

Commit

Permalink
Split out error handler; prepare to support messsage sources that are…
Browse files Browse the repository at this point in the history
…n't the demodulator.
  • Loading branch information
mutability committed Jun 24, 2019
1 parent 0541c7b commit 285613c
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 25 deletions.
2 changes: 2 additions & 0 deletions demodulator.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ namespace flightaware::uat {
class Receiver : public MessageSource {
public:
virtual void HandleSamples(std::uint64_t timestamp, Bytes::const_iterator begin, Bytes::const_iterator end) = 0;

virtual void HandleError(const boost::system::error_code &ec) { DispatchError(ec); }
};

class SingleThreadReceiver : public Receiver {
Expand Down
53 changes: 32 additions & 21 deletions dump978_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ static int realmain(int argc, char **argv) {
}

MessageDispatch dispatch;
SampleSource::Pointer source;
SampleSource::Pointer sample_source;
MessageSource::Pointer message_source;

tcp::resolver resolver(io_service);

Expand All @@ -132,13 +133,13 @@ static int realmain(int argc, char **argv) {
}

if (opts.count("stdin")) {
source = StdinSampleSource::Create(io_service, opts);
sample_source = StdinSampleSource::Create(io_service, opts);
} else if (opts.count("file")) {
boost::filesystem::path path(opts["file"].as<std::string>());
source = FileSampleSource::Create(io_service, path, opts);
sample_source = FileSampleSource::Create(io_service, path, opts);
} else if (opts.count("sdr")) {
auto device = opts["sdr"].as<std::string>();
source = SoapySampleSource::Create(io_service, device, opts);
sample_source = SoapySampleSource::Create(io_service, device, opts);
} else {
assert("impossible case" && false);
}
Expand Down Expand Up @@ -202,26 +203,30 @@ static int realmain(int argc, char **argv) {
});
}

source->Init();
auto format = source->Format();
bool saw_error = false;

auto receiver = std::make_shared<SingleThreadReceiver>(format);
receiver->SetConsumer(std::bind(&MessageDispatch::Dispatch, &dispatch, std::placeholders::_1));
if (sample_source) {
sample_source->Init();
auto format = sample_source->Format();

bool saw_error = false;
auto receiver = std::make_shared<SingleThreadReceiver>(format);

source->SetConsumer([&io_service, &saw_error, receiver](std::uint64_t timestamp, const Bytes &buffer, const boost::system::error_code &ec) {
if (ec) {
if (ec == boost::asio::error::eof) {
std::cerr << "Sample source reports EOF" << std::endl;
} else {
std::cerr << "Sample source reports error: " << ec.message() << std::endl;
saw_error = true;
}
io_service.stop();
sample_source->SetConsumer([receiver](std::uint64_t timestamp, const Bytes &buffer) { receiver->HandleSamples(timestamp, buffer.begin(), buffer.end()); });

sample_source->SetErrorHandler(std::bind(&SingleThreadReceiver::HandleError, receiver, std::placeholders::_1));

message_source = std::static_pointer_cast<MessageSource>(receiver);
}

message_source->SetConsumer(std::bind(&MessageDispatch::Dispatch, &dispatch, std::placeholders::_1));
message_source->SetErrorHandler([&io_service, &saw_error](const boost::system::error_code &ec) {
if (ec == boost::asio::error::eof) {
std::cerr << "Message source reports EOF" << std::endl;
} else {
receiver->HandleSamples(timestamp, buffer.begin(), buffer.end());
std::cerr << "Message source reports error: " << ec.message() << std::endl;
saw_error = true;
}
io_service.stop();
});

boost::asio::signal_set signals(io_service, SIGINT, SIGTERM);
Expand All @@ -231,11 +236,17 @@ static int realmain(int argc, char **argv) {
io_service.stop();
});

source->Start();
message_source->Start();
if (sample_source) {
sample_source->Start();
}

io_service.run();

source->Stop();
if (sample_source) {
sample_source->Stop();
}
message_source->Stop();

if (saw_error) {
std::cerr << "Abnormal exit" << std::endl;
Expand Down
15 changes: 15 additions & 0 deletions message_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,24 @@
#ifndef DUMP978_MESSAGE_SOURCE_H
#define DUMP978_MESSAGE_SOURCE_H

#include <boost/system/error_code.hpp>

#include "uat_message.h"

namespace flightaware::uat {
class MessageSource {
public:
typedef std::shared_ptr<MessageSource> Pointer;
typedef std::function<void(SharedMessageVector)> Consumer;
typedef std::function<void(const boost::system::error_code &)> ErrorHandler;

virtual ~MessageSource() {}

void SetConsumer(Consumer consumer) { consumer_ = consumer; }
void SetErrorHandler(ErrorHandler handler) { error_handler_ = handler; }

virtual void Start() {}
virtual void Stop() {}

protected:
void DispatchMessages(SharedMessageVector messages) {
Expand All @@ -25,8 +33,15 @@ namespace flightaware::uat {
}
}

void DispatchError(const boost::system::error_code &ec) {
if (error_handler_) {
error_handler_(ec);
}
}

private:
Consumer consumer_;
ErrorHandler error_handler_;
};
}; // namespace flightaware::uat

Expand Down
11 changes: 7 additions & 4 deletions sample_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ namespace flightaware::uat {
class SampleSource : public std::enable_shared_from_this<SampleSource> {
public:
typedef std::shared_ptr<SampleSource> Pointer;
typedef std::function<void(std::uint64_t, const Bytes &, const boost::system::error_code &ec)> Consumer;
typedef std::function<void(std::uint64_t, const Bytes &)> Consumer;
typedef std::function<void(const boost::system::error_code &ec)> ErrorHandler;

virtual ~SampleSource() {}

Expand All @@ -35,24 +36,26 @@ namespace flightaware::uat {
virtual SampleFormat Format() = 0;

void SetConsumer(Consumer consumer) { consumer_ = consumer; }
void SetErrorHandler(ErrorHandler handler) { error_handler_ = handler; }

protected:
SampleSource() {}

void DispatchBuffer(std::uint64_t timestamp, const Bytes &buffer) {
if (consumer_) {
consumer_(timestamp, buffer, boost::system::error_code());
consumer_(timestamp, buffer);
}
}

void DispatchError(const boost::system::error_code &ec) {
if (consumer_) {
consumer_(0, Bytes(), ec);
if (error_handler_) {
error_handler_(ec);
}
}

private:
Consumer consumer_;
ErrorHandler error_handler_;
};

class FileSampleSource : public SampleSource {
Expand Down

0 comments on commit 285613c

Please sign in to comment.