C++ Distributed Hash Table
network_utils.h
1 /*
2  * Copyright (C) 2014-2022 Savoir-faire Linux Inc.
3  * Author(s) : Adrien BĂ©raud <adrien.beraud@savoirfairelinux.com>
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program. If not, see <https://www.gnu.org/licenses/>.
17  */
18 #pragma once
19 
20 #include "def.h"
21 
22 #include "sockaddr.h"
23 #include "utils.h"
24 #include "log_enable.h"
25 
26 #ifdef _WIN32
27 #include <ws2tcpip.h>
28 #include <winsock2.h>
29 #else
30 #include <sys/socket.h>
31 #include <netinet/in.h>
32 #include <unistd.h>
33 #endif
34 
35 #include <functional>
36 #include <thread>
37 #include <atomic>
38 #include <mutex>
39 #include <list>
40 
41 namespace dht {
42 namespace net {
43 
44 static const constexpr in_port_t DHT_DEFAULT_PORT = 4222;
45 static const constexpr size_t RX_QUEUE_MAX_SIZE = 1024 * 64;
46 static const constexpr std::chrono::milliseconds RX_QUEUE_MAX_DELAY(650);
47 
48 int bindSocket(const SockAddr& addr, SockAddr& bound);
49 
50 bool setNonblocking(int fd, bool nonblocking = true);
51 
52 #ifdef _WIN32
53 void udpPipe(int fds[2]);
54 #endif
56  Blob data;
57  SockAddr from;
58  time_point received;
59 };
60 using PacketList = std::list<ReceivedPacket>;
61 
62 class OPENDHT_PUBLIC DatagramSocket {
63 public:
67  using OnReceive = std::function<PacketList(PacketList&& packets)>;
68  virtual ~DatagramSocket() {};
69 
70  virtual int sendTo(const SockAddr& dest, const uint8_t* data, size_t size, bool replied) = 0;
71 
72  inline void setOnReceive(OnReceive&& cb) {
73  std::lock_guard<std::mutex> lk(lock);
74  rx_callback = std::move(cb);
75  }
76 
77  virtual bool hasIPv4() const = 0;
78  virtual bool hasIPv6() const = 0;
79 
80  SockAddr getBound(sa_family_t family = AF_UNSPEC) const {
81  std::lock_guard<std::mutex> lk(lock);
82  return getBoundRef(family);
83  }
84  in_port_t getPort(sa_family_t family = AF_UNSPEC) const {
85  std::lock_guard<std::mutex> lk(lock);
86  return getBoundRef(family).getPort();
87  }
88 
89  virtual const SockAddr& getBoundRef(sa_family_t family = AF_UNSPEC) const = 0;
90 
92  virtual std::vector<SockAddr> resolve(const std::string& host, const std::string& service = {}) {
93  return SockAddr::resolve(host, service);
94  }
95 
96  virtual void stop() = 0;
97 protected:
98 
99  PacketList getNewPacket() {
100  PacketList pkts;
101  if (toRecycle_.empty()) {
102  pkts.emplace_back();
103  } else {
104  auto begIt = toRecycle_.begin();
105  auto begItNext = std::next(begIt);
106  pkts.splice(pkts.end(), toRecycle_, begIt, begItNext);
107  }
108  return pkts;
109  }
110 
111  inline void onReceived(PacketList&& packets) {
112  std::lock_guard<std::mutex> lk(lock);
113  if (rx_callback) {
114  auto r = rx_callback(std::move(packets));
115  if (not r.empty() and toRecycle_.size() < RX_QUEUE_MAX_SIZE)
116  toRecycle_.splice(toRecycle_.end(), std::move(r));
117  }
118  }
119 protected:
120  mutable std::mutex lock;
121 private:
122  OnReceive rx_callback;
123  PacketList toRecycle_;
124 };
125 
126 class OPENDHT_PUBLIC UdpSocket : public DatagramSocket {
127 public:
128  UdpSocket(in_port_t port, const std::shared_ptr<Logger>& l = {});
129  UdpSocket(const SockAddr& bind4, const SockAddr& bind6, const std::shared_ptr<Logger>& l = {});
130  ~UdpSocket();
131 
132  int sendTo(const SockAddr& dest, const uint8_t* data, size_t size, bool replied) override;
133 
134  const SockAddr& getBoundRef(sa_family_t family = AF_UNSPEC) const override {
135  return (family == AF_INET6) ? bound6 : bound4;
136  }
137 
138  bool hasIPv4() const override {
139  std::lock_guard<std::mutex> lk(lock);
140  return s4 != -1;
141  }
142  bool hasIPv6() const override {
143  std::lock_guard<std::mutex> lk(lock);
144  return s6 != -1;
145  }
146 
147  void stop() override;
148 private:
149  std::shared_ptr<Logger> logger;
150  int s4 {-1};
151  int s6 {-1};
152  int stopfd {-1};
153  SockAddr bound4, bound6;
154  std::thread rcv_thread {};
155  std::atomic_bool running {false};
156 
157  void openSockets(const SockAddr& bind4, const SockAddr& bind6);
158 };
159 
160 }
161 }
std::vector< uint8_t > Blob
Definition: utils.h:151
virtual std::vector< SockAddr > resolve(const std::string &host, const std::string &service={})
Definition: network_utils.h:92
std::function< PacketList(PacketList &&packets)> OnReceive
Definition: network_utils.h:67
Definition: callbacks.h:35