C++ Distributed Hash Table
network_engine.h
1 /*
2  * Copyright (C) 2014-2022 Savoir-faire Linux Inc.
3  * Author(s) : Adrien BĂ©raud <adrien.beraud@savoirfairelinux.com>
4  * Simon DĂ©saulniers <simon.desaulniers@savoirfairelinux.com>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program. If not, see <https://www.gnu.org/licenses/>.
18  */
19 
20 #pragma once
21 
22 #include "node_cache.h"
23 #include "value.h"
24 #include "infohash.h"
25 #include "node.h"
26 #include "scheduler.h"
27 #include "utils.h"
28 #include "rng.h"
29 #include "rate_limiter.h"
30 #include "log_enable.h"
31 #include "network_utils.h"
32 
33 #include <vector>
34 #include <string>
35 #include <functional>
36 #include <algorithm>
37 #include <memory>
38 #include <queue>
39 
40 namespace dht {
41 namespace net {
42 
43 struct Request;
44 struct Socket;
45 struct TransId;
46 
47 #ifndef MSG_CONFIRM
48 #define MSG_CONFIRM 0
49 #endif
50 
51 struct NetworkConfig {
52  NetId network {0};
53  ssize_t max_req_per_sec {0};
54  ssize_t max_peer_req_per_sec {0};
55 };
56 
58 public:
59  // sent to another peer (http-like).
60  static const constexpr uint16_t NON_AUTHORITATIVE_INFORMATION {203}; /* incomplete request packet. */
61  static const constexpr uint16_t UNAUTHORIZED {401}; /* wrong tokens. */
62  static const constexpr uint16_t NOT_FOUND {404}; /* storage not found */
63  // for internal use (custom).
64  static const constexpr uint16_t INVALID_TID_SIZE {421}; /* id was truncated. */
65  static const constexpr uint16_t UNKNOWN_TID {422}; /* unknown tid */
66  static const constexpr uint16_t WRONG_NODE_INFO_BUF_LEN {423}; /* node info length is wrong */
67 
68  static const std::string GET_NO_INFOHASH; /* received "get" request with no infohash */
69  static const std::string LISTEN_NO_INFOHASH; /* got "listen" request without infohash */
70  static const std::string LISTEN_WRONG_TOKEN; /* wrong token in "listen" request */
71  static const std::string PUT_NO_INFOHASH; /* no infohash in "put" request */
72  static const std::string PUT_WRONG_TOKEN; /* got "put" request with wrong token */
73  static const std::string STORAGE_NOT_FOUND; /* got access request for an unknown storage */
74  static const std::string PUT_INVALID_ID; /* invalid id in "put" request */
75 
76  DhtProtocolException(uint16_t code, const std::string& msg="", InfoHash failing_node_id={})
77  : DhtException(msg), msg(msg), code(code), failing_node_id(failing_node_id) {}
78 
79  const std::string& getMsg() const { return msg; }
80  uint16_t getCode() const { return code; }
81  const InfoHash& getNodeId() const { return failing_node_id; }
82 
83 private:
84  std::string msg;
85  uint16_t code;
86  InfoHash failing_node_id;
87 };
88 
89 struct ParsedMessage;
90 
94 struct RequestAnswer {
95  Blob ntoken {};
96  Value::Id vid {};
97  std::vector<Sp<Value>> values {};
98  std::vector<Value::Id> refreshed_values {};
99  std::vector<Value::Id> expired_values {};
100  std::vector<Sp<FieldValueIndex>> fields {};
101  std::vector<Sp<Node>> nodes4 {};
102  std::vector<Sp<Node>> nodes6 {};
103  RequestAnswer() {}
104  RequestAnswer(ParsedMessage&& msg);
105 };
106 
125 class NetworkEngine final
126 {
127 private:
131  std::function<void(Sp<Request>, DhtProtocolException)> onError;
132 
139  std::function<void(const Sp<Node>&, int)> onNewNode;
146  std::function<void(const InfoHash&, const SockAddr&)> onReportedAddr;
152  std::function<RequestAnswer(Sp<Node>)> onPing {};
161  std::function<RequestAnswer(Sp<Node>, const InfoHash&, want_t)> onFindNode {};
170  std::function<RequestAnswer(Sp<Node>, const InfoHash&, want_t, const Query&)> onGetValues {};
179  std::function<RequestAnswer(Sp<Node>,
180  const InfoHash&,
181  const Blob&,
182  Tid,
183  const Query&,
184  int)> onListen {};
194  std::function<RequestAnswer(Sp<Node>,
195  const InfoHash&,
196  const Blob&,
197  const std::vector<Sp<Value>>&,
198  const time_point&)> onAnnounce {};
207  std::function<RequestAnswer(Sp<Node>,
208  const InfoHash&,
209  const Blob&,
210  const Value::Id&)> onRefresh {};
211 
212 public:
213  using RequestCb = std::function<void(const Request&, RequestAnswer&&)>;
214  using RequestErrorCb = std::function<bool(const Request&, DhtProtocolException&&)>;
215  using RequestExpiredCb = std::function<void(const Request&, bool)>;
216 
218  InfoHash& myid,
219  NetworkConfig config,
220  std::unique_ptr<DatagramSocket>&& sock,
221  const Sp<Logger>& log,
222  std::mt19937_64& rd,
223  Scheduler& scheduler,
224  decltype(NetworkEngine::onError)&& onError,
225  decltype(NetworkEngine::onNewNode)&& onNewNode,
226  decltype(NetworkEngine::onReportedAddr)&& onReportedAddr,
227  decltype(NetworkEngine::onPing)&& onPing,
228  decltype(NetworkEngine::onFindNode)&& onFindNode,
229  decltype(NetworkEngine::onGetValues)&& onGetValues,
230  decltype(NetworkEngine::onListen)&& onListen,
231  decltype(NetworkEngine::onAnnounce)&& onAnnounce,
232  decltype(NetworkEngine::onRefresh)&& onRefresh);
233 
234  ~NetworkEngine();
235 
236  net::DatagramSocket* getSocket() const { return dht_socket.get(); };
237 
238  void clear();
239 
255  void tellListener(const Sp<Node>& n, Tid socket_id, const InfoHash& hash, want_t want, const Blob& ntoken,
256  std::vector<Sp<Node>>&& nodes, std::vector<Sp<Node>>&& nodes6,
257  std::vector<Sp<Value>>&& values, const Query& q, int version);
258 
259  void tellListenerRefreshed(const Sp<Node>& n, Tid socket_id, const InfoHash& hash, const Blob& ntoken, const std::vector<Value::Id>& values, int version);
260  void tellListenerExpired(const Sp<Node>& n, Tid socket_id, const InfoHash& hash, const Blob& ntoken, const std::vector<Value::Id>& values, int version);
261 
262  bool isRunning(sa_family_t af) const;
263  inline want_t want () const { return dht_socket->hasIPv4() and dht_socket->hasIPv6() ? (WANT4 | WANT6) : -1; }
264 
265  void connectivityChanged(sa_family_t);
266 
267  /**************
268  * Requests *
269  **************/
270 
280  Sp<Request>
281  sendPing(const Sp<Node>& n, RequestCb&& on_done, RequestExpiredCb&& on_expired);
282 
293  Sp<Request>
294  sendPing(SockAddr&& sa, RequestCb&& on_done, RequestExpiredCb&& on_expired) {
295  return sendPing(std::make_shared<Node>(InfoHash::zero(), std::move(sa), rd),
296  std::forward<RequestCb>(on_done),
297  std::forward<RequestExpiredCb>(on_expired));
298  }
299 
312  Sp<Request> sendFindNode(const Sp<Node>& n,
313  const InfoHash& hash,
314  want_t want = -1,
315  RequestCb&& on_done = {},
316  RequestExpiredCb&& on_expired = {});
331  Sp<Request> sendGetValues(const Sp<Node>& n,
332  const InfoHash& hash,
333  const Query& query,
334  want_t want,
335  RequestCb&& on_done,
336  RequestExpiredCb&& on_expired);
360  Sp<Request> sendListen(const Sp<Node>& n,
361  const InfoHash& hash,
362  const Query& query,
363  const Blob& token,
364  Tid socketId,
365  RequestCb&& on_done,
366  RequestExpiredCb&& on_expired);
380  Sp<Request> sendAnnounceValue(const Sp<Node>& n,
381  const InfoHash& hash,
382  const Sp<Value>& v,
383  time_point created,
384  const Blob& token,
385  RequestCb&& on_done,
386  RequestExpiredCb&& on_expired);
400  Sp<Request> sendRefreshValue(const Sp<Node>& n,
401  const InfoHash& hash,
402  const Value::Id& vid,
403  const Blob& token,
404  RequestCb&& on_done,
405  RequestErrorCb&& on_error,
406  RequestExpiredCb&& on_expired);
419  Sp<Request> sendUpdateValues(const Sp<Node>& n,
420  const InfoHash& infohash,
421  const std::vector<Sp<Value>>& values,
422  time_point created,
423  const Blob& token,
424  const size_t& sid);
425 
435  void processMessage(const uint8_t *buf, size_t buflen, SockAddr addr);
436 
437  Sp<Node> insertNode(const InfoHash& id, const SockAddr& addr) {
438  auto n = cache.getNode(id, addr, scheduler.time(), 0);
439  onNewNode(n, 0);
440  return n;
441  }
442 
443  std::vector<unsigned> getNodeMessageStats(bool in) {
444  auto& st = in ? in_stats : out_stats;
445  std::vector<unsigned> stats {st.ping, st.find, st.get, st.listen, st.put};
446  st = {};
447  return stats;
448  }
449 
450  void blacklistNode(const Sp<Node>& n);
451 
452  std::vector<Sp<Node>> getCachedNodes(const InfoHash& id, sa_family_t sa_f, size_t count) {
453  return cache.getCachedNodes(id, sa_f, count);
454  }
455 
456  size_t getNodeCacheSize() const {
457  return cache.size();
458  }
459  size_t getNodeCacheSize(sa_family_t af) const {
460  return cache.size(af);
461  }
462 
463  size_t getRateLimiterSize() const {
464  return address_rate_limiter.size();
465  }
466 
467  size_t getPartialCount() const {
468  return partial_messages.size();
469  }
470 
471 private:
472 
473  struct PartialMessage;
474 
475  /***************
476  * Constants *
477  ***************/
478  /* the length of a node info buffer in ipv4 format */
479  static const constexpr size_t NODE4_INFO_BUF_LEN {HASH_LEN + sizeof(in_addr) + sizeof(in_port_t)};
480  /* the length of a node info buffer in ipv6 format */
481  static const constexpr size_t NODE6_INFO_BUF_LEN {HASH_LEN + sizeof(in6_addr) + sizeof(in_port_t)};
482  /* after a UDP reply, the period during which we tell the link layer about it */
483  static constexpr std::chrono::seconds UDP_REPLY_TIME {15};
484 
485  /* Max. time to receive a full fragmented packet */
486  static constexpr std::chrono::seconds RX_MAX_PACKET_TIME {10};
487  /* Max. time between packet fragments */
488  static constexpr std::chrono::seconds RX_TIMEOUT {3};
489  /* The maximum number of nodes that we snub. There is probably little
490  reason to increase this value. */
491  static constexpr unsigned BLACKLISTED_MAX {10};
492 
493  static constexpr size_t MTU {1280};
494  static constexpr size_t MAX_PACKET_VALUE_SIZE {600};
495 
496  static const std::string my_v;
497 
498  void process(std::unique_ptr<ParsedMessage>&&, const SockAddr& from);
499 
500  bool rateLimit(const SockAddr& addr);
501 
502  static bool isMartian(const SockAddr& addr);
503  bool isNodeBlacklisted(const SockAddr& addr) const;
504 
505  void requestStep(Sp<Request> req);
506 
511  void sendRequest(const Sp<Request>& request);
512 
513  struct MessageStats {
514  unsigned ping {0};
515  unsigned find {0};
516  unsigned get {0};
517  unsigned put {0};
518  unsigned listen {0};
519  unsigned refresh {0};
520  unsigned updateValue {0};
521  };
522 
523 
524  // basic wrapper for socket sendto function
525  int send(const SockAddr& addr, const char *buf, size_t len, bool confirmed = false);
526 
527  void sendValueParts(Tid tid, const std::vector<Blob>& svals, const SockAddr& addr);
528  std::vector<Blob> packValueHeader(msgpack::sbuffer&, const std::vector<Sp<Value>>&);
529  void maintainRxBuffer(Tid tid);
530 
531  /*************
532  * Answers *
533  *************/
534  /* answer to a ping request */
535  void sendPong(const SockAddr& addr, Tid tid);
536  /* answer to findnodes/getvalues request */
537  void sendNodesValues(const SockAddr& addr,
538  Tid tid,
539  const Blob& nodes,
540  const Blob& nodes6,
541  const std::vector<Sp<Value>>& st,
542  const Query& query,
543  const Blob& token);
544  Blob bufferNodes(sa_family_t af, const InfoHash& id, std::vector<Sp<Node>>& nodes);
545 
546  std::pair<Blob, Blob> bufferNodes(sa_family_t af,
547  const InfoHash& id,
548  want_t want,
549  std::vector<Sp<Node>>& nodes,
550  std::vector<Sp<Node>>& nodes6);
551  /* answer to a listen request */
552  void sendListenConfirmation(const SockAddr& addr, Tid tid);
553  /* answer to put request */
554  void sendValueAnnounced(const SockAddr& addr, Tid, Value::Id);
555  /* answer in case of error */
556  void sendError(const SockAddr& addr,
557  Tid tid,
558  uint16_t code,
559  const std::string& message,
560  bool include_id=false);
561 
562  void deserializeNodes(ParsedMessage& msg, const SockAddr& from);
563 
564  /* DHT info */
565  const InfoHash& myid;
566  const NetworkConfig config {};
567  const std::unique_ptr<DatagramSocket> dht_socket;
568  Sp<Logger> logger_;
569  std::mt19937_64& rd;
570 
571  NodeCache cache;
572 
573  // global limiting should be triggered by at least 8 different IPs
574  using IpLimiter = RateLimiter;
575  using IpLimiterMap = std::map<SockAddr, IpLimiter, SockAddr::ipCmp>;
576  IpLimiterMap address_rate_limiter;
577  RateLimiter rate_limiter;
578  ssize_t limiter_maintenance {0};
579 
580  // requests handling
581  std::map<Tid, Sp<Request>> requests {};
582  std::map<Tid, PartialMessage> partial_messages;
583 
584  MessageStats in_stats {}, out_stats {};
585  std::set<SockAddr> blacklist {};
586 
587  Scheduler& scheduler;
588 
589  bool logIncoming_ {false};
590 };
591 
592 } /* namespace net */
593 } /* namespace dht */
Sp< Request > sendUpdateValues(const Sp< Node > &n, const InfoHash &infohash, const std::vector< Sp< Value >> &values, time_point created, const Blob &token, const size_t &sid)
void tellListener(const Sp< Node > &n, Tid socket_id, const InfoHash &hash, want_t want, const Blob &ntoken, std::vector< Sp< Node >> &&nodes, std::vector< Sp< Node >> &&nodes6, std::vector< Sp< Value >> &&values, const Query &q, int version)
Sp< Request > sendFindNode(const Sp< Node > &n, const InfoHash &hash, want_t want=-1, RequestCb &&on_done={}, RequestExpiredCb &&on_expired={})
const time_point & time() const
Definition: scheduler.h:123
Job scheduler.
Definition: scheduler.h:36
An abstraction of communication protocol on the network.
Sp< Request > sendRefreshValue(const Sp< Node > &n, const InfoHash &hash, const Value::Id &vid, const Blob &token, RequestCb &&on_done, RequestErrorCb &&on_error, RequestExpiredCb &&on_expired)
std::vector< uint8_t > Blob
Definition: utils.h:151
Sp< Request > sendGetValues(const Sp< Node > &n, const InfoHash &hash, const Query &query, want_t want, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendListen(const Sp< Node > &n, const InfoHash &hash, const Query &query, const Blob &token, Tid socketId, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendPing(SockAddr &&sa, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Describes a query destined to another peer.
Definition: value.h:924
Sp< Request > sendAnnounceValue(const Sp< Node > &n, const InfoHash &hash, const Sp< Value > &v, time_point created, const Blob &token, RequestCb &&on_done, RequestExpiredCb &&on_expired)
void processMessage(const uint8_t *buf, size_t buflen, SockAddr addr)
Sp< Request > sendPing(const Sp< Node > &n, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Definition: callbacks.h:35