Base class for recieving notificationsThis class is in charge of passing notifications from the library to client code. Make sure that the subscriber outlives all assigned subscriptions. Otherwise use-after-free bugs will occur, so be careful.
#include <cassert>
#include <cstdio>
#include <iostream>
#include <termios.h>
#include <unistd.h>
#include <unordered_set>
class TerminalGuard
{
termios old_;
public:
TerminalGuard()
{
tcgetattr(STDIN_FILENO, &old_);
termios newt = old_;
newt.c_lflag &= ~(ECHO | ICANON);
tcsetattr(STDIN_FILENO, TCSANOW, &newt);
}
~TerminalGuard() { tcsetattr(STDIN_FILENO, TCSANOW, &old_); }
};
template <class Future>
class FutureManager
{
std::unordered_set<Future> futures_;
public:
FutureManager() = default;
const Future &push(Future future)
{
auto ans = futures_.insert(std::move(future));
return *ans.first;
}
void pop(const Future &future) { futures_.erase(future); }
};
class MyProcess :
{
FutureManager<PdCom::Variable::SetValueFuture> setValue_futures_;
int read(
char *buf,
int count)
override {
if (ans == 0)
running_ = false;
return ans;
}
void write(
const char *buf,
size_t count)
override {
}
public:
MyProcess(const char *host = "127.0.0.1", unsigned short port = 2345) :
PdCom::PosixProcess(host, port),
{}
void execute();
{
p1_ = var;
std::cout << "Found parameter!" << std::endl;
}
{
using State = PdCom::Subscription::State;
if (&s == &s1_)
s1_active_ = true;
if (&s == &s2_)
s2_active_ = true;
}
else if (s.
getState() == State::Invalid) {
std::cout << "Invalid subscription!" << std::endl;
running_ = false;
}
}
void newValues(std::chrono::nanoseconds ) override
{
std::cout << "New Data: ";
s1_.
print(std::cout,
',');
std::cout << " and ";
s2_.
print(std::cout,
',');
std::cout << "\n";
}
bool s1_active_ = false, s2_active_ = false;
bool running_ = true;
bool connected_ = false;
};
void MyProcess::connected()
{
std::cout << "Connected!" << std::endl;
find("/parameter01");
connected_ = true;
}
void MyProcess::execute()
{
fd_set fds;
const int max_fd = std::max<int>(fd_, STDIN_FILENO);
TerminalGuard tg;
while (running_ and !(s1_active_ and s2_active_ and !p1_.
empty()))
asyncData();
std::cout << "Ready to rumble!" << std::endl;
while (running_) {
FD_ZERO(&fds);
FD_SET(fd_, &fds);
FD_SET(STDIN_FILENO, &fds);
select(max_fd + 1, &fds, NULL, NULL, NULL);
if (FD_ISSET(STDIN_FILENO, &fds) and !p1_.
empty()) {
char buf;
std::cin.read(&buf, 1);
if (buf == 'q') {
break;
}
const auto &future = setValue_futures_.push(
p1_.
setValue(static_cast<unsigned char>(buf)));
future.then([&future, buf, this]() {
std::cout << "Changed Parameter to " << buf << std::endl;
this->setValue_futures_.pop(future);
});
future.handle_exception([&future,
std::cout << "Future got exception " << ex.what() << std::endl;
this->running_ = false;
this->setValue_futures_.pop(future);
});
}
if (FD_ISSET(fd_, &fds)) {
asyncData();
}
}
}
int main(int argc, char **argv)
{
MyProcess p(
argc >= 2 ? argv[1] : "127.0.0.1",
argc >= 3 ? strtoul(argv[2], nullptr, 10) : 2345);
p.execute();
}