PdCom  5.0
Process data communication client
advanced_example.cpp

Base class for recieving notificationsThis class is in charge of passing notifications from the library to client code. Make sure that the subscriber outlives all assigned subscriptions. Otherwise use-after-free bugs will occur, so be careful.

/*****************************************************************************
*
* Copyright (C) 2021 Bjarne von Horn (vh at igh dot de).
*
* This file is part of the PdCom library.
*
* The PdCom library is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
*
* The PdCom library is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with the PdCom library. If not, see <http://www.gnu.org/licenses/>.
*
*****************************************************************************/
/* Advanced PdCom example.
*
* This is an interactive sample application with two signals and one parameter.
* The parameter can be changed via keyboard. If you press 'q', the application
* exits.
*
* The Process basically has the following states:
* -# waiting for connected
* -# waiting for both subscriptions and the parameter
* -# active
* -# finished
*
* The connected() callback switches from state 1 to 2. asyncData() is then
* called until all subscriptions and the parameter are available. After that,
* keyboard inputs are processed and subscription updates are shown until the
* 'q' key is pressed. Then, everything is torn down.
*/
#include <cassert>
#include <cstdio>
#include <iostream>
#include <pdcom5/Process.h>
#include <termios.h>
#include <unistd.h>
#include <unordered_set>
// RAII helper to configure the shell. Nothing to do with PdCom.
class TerminalGuard
{
termios old_;
public:
TerminalGuard()
{
tcgetattr(STDIN_FILENO, &old_);
termios newt = old_;
newt.c_lflag &= ~(ECHO | ICANON);
tcsetattr(STDIN_FILENO, TCSANOW, &newt);
}
~TerminalGuard() { tcsetattr(STDIN_FILENO, TCSANOW, &old_); }
};
template <class Future>
class FutureManager
{
std::unordered_set<Future> futures_;
public:
FutureManager() = default;
const Future &push(Future future)
{
auto ans = futures_.insert(std::move(future));
return *ans.first;
}
void pop(const Future &future) { futures_.erase(future); }
};
class MyProcess :
{
// Two subscriptions, they will be destructed by ~MyProcess before the
// Subscriber part of MyProcess goes away.
FutureManager<PdCom::Variable::SetValueFuture> setValue_futures_;
int read(char *buf, int count) override
{
const int ans = posixRead(buf, count);
// stop on EOF
if (ans == 0)
running_ = false;
return ans;
}
void write(const char *buf, size_t count) override
{
posixWriteBuffered(buf, count);
}
void flush() override { posixFlush(); }
public:
MyProcess(const char *host = "127.0.0.1", unsigned short port = 2345) :
PdCom::Process(),
PdCom::PosixProcess(host, port),
PdCom::Subscriber(
PdCom::event_mode) //{std::chrono::milliseconds(100)})
{}
void execute();
void connected() override;
void findReply(const PdCom::Variable &var) override
{
// the parameter has arrived
p1_ = var;
assert(!p1_.empty());
std::cout << "Found parameter!" << std::endl;
}
void stateChanged(PdCom::Subscription const &s) override
{
// the state of one of our subscriptions has changed.
using State = PdCom::Subscription::State;
if (s.getState() == State::Active) {
if (&s == &s1_)
s1_active_ = true;
if (&s == &s2_)
s2_active_ = true;
}
else if (s.getState() == State::Invalid) {
std::cout << "Invalid subscription!" << std::endl;
running_ = false;
}
}
void newValues(std::chrono::nanoseconds /* time_ns */) override
{
std::cout << "New Data: ";
s1_.print(std::cout, ',');
std::cout << " and ";
s2_.print(std::cout, ',');
std::cout << "\n";
}
bool s1_active_ = false, s2_active_ = false;
bool running_ = true;
bool connected_ = false;
};
void MyProcess::connected()
{
// connection is established, start subscriptions and query the server for
// the parameter.
std::cout << "Connected!" << std::endl;
s1_ = PdCom::Subscription(*this, *this, "/input/klemme1");
s2_ = PdCom::Subscription(*this, *this, "/input/klemme2");
find("/parameter01");
connected_ = true;
}
void MyProcess::execute()
{
fd_set fds;
const int max_fd = std::max<int>(fd_, STDIN_FILENO);
TerminalGuard tg;
// wait until everything is set up
while (running_ and !(s1_active_ and s2_active_ and !p1_.empty()))
asyncData();
// now we're ready
std::cout << "Ready to rumble!" << std::endl;
while (running_) {
FD_ZERO(&fds);
FD_SET(fd_, &fds);
FD_SET(STDIN_FILENO, &fds);
select(max_fd + 1, &fds, NULL, NULL, NULL);
// process input
if (FD_ISSET(STDIN_FILENO, &fds) and !p1_.empty()) {
char buf;
std::cin.read(&buf, 1);
if (buf == 'q') {
break;
}
const auto &future = setValue_futures_.push(
p1_.setValue(static_cast<unsigned char>(buf)));
future.then([&future, buf, this]() {
std::cout << "Changed Parameter to " << buf << std::endl;
this->setValue_futures_.pop(future);
});
future.handle_exception([&future,
this](PdCom::Exception const &ex) {
std::cout << "Future got exception " << ex.what() << std::endl;
this->running_ = false;
this->setValue_futures_.pop(future);
});
}
// ask PdCom to process incoming data
if (FD_ISSET(fd_, &fds)) {
asyncData();
}
}
}
int main(int argc, char **argv)
{
// usage: example2 <host> <port>
MyProcess p(
argc >= 2 ? argv[1] : "127.0.0.1",
argc >= 3 ? strtoul(argv[2], nullptr, 10) : 2345);
p.execute();
}