C++ Distributed Hash Table
dht.h
1 /*
2  * Copyright (C) 2014-2022 Savoir-faire Linux Inc.
3  * Authors: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
4  * Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
5  * Sébastien Blin <sebastien.blin@savoirfairelinux.com>
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program. If not, see <https://www.gnu.org/licenses/>.
19  */
20 
21 #pragma once
22 
23 #include "infohash.h"
24 #include "value.h"
25 #include "utils.h"
26 #include "network_engine.h"
27 #include "scheduler.h"
28 #include "routing_table.h"
29 #include "callbacks.h"
30 #include "dht_interface.h"
31 
32 #include <string>
33 #include <array>
34 #include <vector>
35 #include <map>
36 #include <functional>
37 #include <memory>
38 
39 #ifdef _WIN32
40 #include <iso646.h>
41 #endif
42 
43 namespace dht {
44 
45 namespace net {
46 struct Request;
47 } /* namespace net */
48 
49 struct Storage;
50 struct ValueStorage;
51 class StorageBucket;
52 struct Listener;
53 struct LocalListener;
54 
62 class OPENDHT_PUBLIC Dht final : public DhtInterface {
63 public:
68  Dht(std::unique_ptr<net::DatagramSocket>&& sock, const Config& config, const Sp<Logger>& l = {});
69 
70  Dht(std::unique_ptr<net::DatagramSocket>&& sock, const Config& config, const Logger& l = {})
71  : Dht(std::move(sock), config, std::make_shared<Logger>(l)) {}
72 
73  virtual ~Dht();
74 
78  inline const InfoHash& getNodeId() const override { return myid; }
79 
80  NodeStatus updateStatus(sa_family_t af) override;
81 
85  NodeStatus getStatus(sa_family_t af) const override {
86  return dht(af).status;
87  }
88 
89  NodeStatus getStatus() const override {
90  return std::max(getStatus(AF_INET), getStatus(AF_INET6));
91  }
92 
93  net::DatagramSocket* getSocket() const override { return network_engine.getSocket(); };
94 
98  void shutdown(ShutdownCallback cb, bool stop = false) override;
99 
106  bool isRunning(sa_family_t af = 0) const override;
107 
108  virtual void registerType(const ValueType& type) override {
109  types.registerType(type);
110  }
111  const ValueType& getType(ValueType::Id type_id) const override {
112  return types.getType(type_id);
113  }
114 
115  void addBootstrap(const std::string& host, const std::string& service) override {
116  bootstrap_nodes.emplace_back(host, service);
117  startBootstrap();
118  }
119 
120  void clearBootstrap() override {
121  bootstrap_nodes.clear();
122  }
123 
129  void insertNode(const InfoHash& id, const SockAddr&) override;
130  void insertNode(const NodeExport& n) override {
131  insertNode(n.id, SockAddr(n.ss, n.sslen));
132  }
133 
134  void pingNode(SockAddr, DoneCallbackSimple&& cb={}) override;
135 
136  time_point periodic(const uint8_t *buf, size_t buflen, SockAddr, const time_point& now) override;
137  time_point periodic(const uint8_t *buf, size_t buflen, const sockaddr* from, socklen_t fromlen, const time_point& now) override {
138  return periodic(buf, buflen, SockAddr(from, fromlen), now);
139  }
140 
151  virtual void get(const InfoHash& key, GetCallback cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) override;
152  virtual void get(const InfoHash& key, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter&& f={}, Where&& w = {}) override {
153  get(key, cb, bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
154  }
155  virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) override {
156  get(key, bindGetCb(cb), donecb, std::forward<Value::Filter>(f), std::forward<Where>(w));
157  }
158  virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallbackSimple donecb, Value::Filter&& f={}, Where&& w = {}) override {
159  get(key, bindGetCb(cb), bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
160  }
171  virtual void query(const InfoHash& key, QueryCallback cb, DoneCallback done_cb = {}, Query&& q = {}) override;
172  virtual void query(const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {}) override {
173  query(key, cb, bindDoneCb(done_cb), std::forward<Query>(q));
174  }
175 
179  std::vector<Sp<Value>> getLocal(const InfoHash& key, const Value::Filter& f = {}) const override;
180 
184  Sp<Value> getLocalById(const InfoHash& key, Value::Id vid) const override;
185 
192  void put(const InfoHash& key,
193  Sp<Value>,
194  DoneCallback cb=nullptr,
195  time_point created=time_point::max(),
196  bool permanent = false) override;
197  void put(const InfoHash& key,
198  const Sp<Value>& v,
199  DoneCallbackSimple cb,
200  time_point created=time_point::max(),
201  bool permanent = false) override
202  {
203  put(key, v, bindDoneCb(cb), created, permanent);
204  }
205 
206  void put(const InfoHash& key,
207  Value&& v,
208  DoneCallback cb=nullptr,
209  time_point created=time_point::max(),
210  bool permanent = false) override
211  {
212  put(key, std::make_shared<Value>(std::move(v)), cb, created, permanent);
213  }
214  void put(const InfoHash& key,
215  Value&& v,
216  DoneCallbackSimple cb,
217  time_point created=time_point::max(),
218  bool permanent = false) override
219  {
220  put(key, std::forward<Value>(v), bindDoneCb(cb), created, permanent);
221  }
222 
226  std::vector<Sp<Value>> getPut(const InfoHash&) const override;
227 
231  Sp<Value> getPut(const InfoHash&, const Value::Id&) const override;
232 
237  bool cancelPut(const InfoHash&, const Value::Id&) override;
238 
246  size_t listen(const InfoHash&, ValueCallback, Value::Filter={}, Where={}) override;
247 
248  size_t listen(const InfoHash& key, GetCallback cb, Value::Filter f={}, Where w={}) override {
249  return listen(key, [cb](const std::vector<Sp<Value>>& vals, bool expired){
250  if (not expired)
251  return cb(vals);
252  return true;
253  }, std::forward<Value::Filter>(f), std::forward<Where>(w));
254  }
255  size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w={}) override {
256  return listen(key, bindGetCb(cb), std::forward<Value::Filter>(f), std::forward<Where>(w));
257  }
258 
259  bool cancelListen(const InfoHash&, size_t token) override;
260 
266  void connectivityChanged(sa_family_t) override;
267  void connectivityChanged() override {
268  reported_addr.clear();
269  connectivityChanged(AF_INET);
270  connectivityChanged(AF_INET6);
271  }
272 
277  std::vector<NodeExport> exportNodes() const override;
278 
279  std::vector<ValuesExport> exportValues() const override;
280  void importValues(const std::vector<ValuesExport>&) override;
281 
282  void saveState(const std::string& path) const;
283  void loadState(const std::string& path);
284 
285  NodeStats getNodesStats(sa_family_t af) const override;
286 
287  std::string getStorageLog() const override;
288  std::string getStorageLog(const InfoHash&) const override;
289 
290  std::string getRoutingTablesLog(sa_family_t) const override;
291  std::string getSearchesLog(sa_family_t) const override;
292  std::string getSearchLog(const InfoHash&, sa_family_t af = AF_UNSPEC) const override;
293 
294  void dumpTables() const override;
295  std::vector<unsigned> getNodeMessageStats(bool in = false) override {
296  return network_engine.getNodeMessageStats(in);
297  }
298 
302  void setStorageLimit(size_t limit = DEFAULT_STORAGE_LIMIT) override {
303  max_store_size = limit;
304  }
305  size_t getStorageLimit() const override {
306  return max_store_size;
307  }
308 
313  std::pair<size_t, size_t> getStoreSize() const override {
314  return {total_store_size, total_values};
315  }
316 
317  std::vector<SockAddr> getPublicAddress(sa_family_t family = 0) override;
318 
319  void pushNotificationReceived(const std::map<std::string, std::string>&) override {}
320  void resubscribe(unsigned) {}
321 
322 private:
323 
324  /* When performing a search, we search for up to SEARCH_NODES closest nodes
325  to the destination, and use the additional ones to backtrack if any of
326  the target 8 turn out to be dead. */
327  static constexpr unsigned SEARCH_NODES {14};
328 
329  /* The number of bad nodes is limited in order to help determine
330  * presence of connectivity changes. See
331  * https://github.com/savoirfairelinux/opendht/issues/137 for details.
332  *
333  * According to the tables, 25 is a good average value for big networks. If
334  * the network is small, normal search expiration process will handle the
335  * situation.
336  * */
337  static constexpr unsigned SEARCH_MAX_BAD_NODES {25};
338 
339  /* Concurrent search nodes requested count */
340  static constexpr unsigned MAX_REQUESTED_SEARCH_NODES {4};
341 
342  /* Number of listening nodes */
343  static constexpr unsigned LISTEN_NODES {4};
344 
345  /* The maximum number of hashes we're willing to track. */
346  static constexpr unsigned MAX_HASHES {1024 * 1024 * 1024};
347 
348  /* The maximum number of searches we keep data about. */
349  static constexpr unsigned MAX_SEARCHES {1024 * 1024};
350 
351  static constexpr std::chrono::minutes MAX_STORAGE_MAINTENANCE_EXPIRE_TIME {10};
352 
353  /* The time after which we consider a search to be expirable. */
354  static constexpr std::chrono::minutes SEARCH_EXPIRE_TIME {62};
355 
356  /* Timeout for listen */
357  static constexpr duration LISTEN_EXPIRE_TIME {std::chrono::seconds(30)};
358  static constexpr duration LISTEN_EXPIRE_TIME_PUBLIC {std::chrono::minutes(5)};
359 
360  static constexpr duration REANNOUNCE_MARGIN {std::chrono::seconds(10)};
361 
362  static constexpr std::chrono::seconds BOOTSTRAP_PERIOD {10};
363 
364  static constexpr size_t TOKEN_SIZE {32};
365 
366  // internal structures
367  struct SearchNode;
368  struct Get;
369  struct Announce;
370  struct Search;
371 
372  // prevent copy
373  Dht(const Dht&) = delete;
374  Dht& operator=(const Dht&) = delete;
375 
376  std::mt19937_64 rd {crypto::getSeededRandomEngine<std::mt19937_64>()};
377 
378  InfoHash myid {};
379 
380  uint64_t secret {};
381  uint64_t oldsecret {};
382 
383  // registred types
384  TypeStore types;
385 
386  using SearchMap = std::map<InfoHash, Sp<Search>>;
387  struct Kad {
388  RoutingTable buckets {};
389  SearchMap searches {};
390  unsigned pending_pings {0};
391  NodeStatus status;
392 
393  NodeStatus getStatus(time_point now) const;
394  NodeStats getNodesStats(time_point now, const InfoHash& myid) const;
395  };
396 
397  Kad dht4 {};
398  Kad dht6 {};
399 
400  std::vector<std::pair<std::string,std::string>> bootstrap_nodes {};
401  std::chrono::steady_clock::duration bootstrap_period {BOOTSTRAP_PERIOD};
402  Sp<Scheduler::Job> bootstrapJob {};
403 
404  std::map<InfoHash, Storage> store;
405  std::map<SockAddr, StorageBucket, SockAddr::ipCmp> store_quota;
406  size_t total_values {0};
407  size_t total_store_size {0};
408  size_t max_store_keys {MAX_HASHES};
409  size_t max_store_size {DEFAULT_STORAGE_LIMIT};
410 
411  size_t max_searches {MAX_SEARCHES};
412  size_t search_id {0};
413 
414  // map a global listen token to IPv4, IPv6 specific listen tokens.
415  // 0 is the invalid token.
416  std::map<size_t, std::tuple<size_t, size_t, size_t>> listeners {};
417  size_t listener_token {1};
418 
419 
420  // timing
421  Scheduler scheduler;
422  Sp<Scheduler::Job> nextNodesConfirmation {};
423  Sp<Scheduler::Job> nextStorageMaintenance {};
424 
425  net::NetworkEngine network_engine;
426  using ReportedAddr = std::pair<unsigned, SockAddr>;
427  std::vector<ReportedAddr> reported_addr;
428 
429  std::string persistPath;
430 
431  // are we a bootstrap node ?
432  // note: Any running node can be used as a bootstrap node.
433  // Only nodes running only as bootstrap nodes should
434  // be put in bootstrap mode.
435  const bool is_bootstrap {false};
436  const bool maintain_storage {false};
437  const bool public_stable {false};
438 
439  inline const duration& getListenExpiration() const {
440  return public_stable ? LISTEN_EXPIRE_TIME_PUBLIC : LISTEN_EXPIRE_TIME;
441  }
442 
443  void rotateSecrets();
444 
445  Blob makeToken(const SockAddr&, bool old) const;
446  bool tokenMatch(const Blob& token, const SockAddr&) const;
447 
448  void reportedAddr(const SockAddr&);
449 
450  // Storage
451  void storageAddListener(const InfoHash& id, const Sp<Node>& node, size_t tid, Query&& = {}, int version = 0);
452  bool storageStore(const InfoHash& id, const Sp<Value>& value, time_point created, const SockAddr& sa = {}, bool permanent = false);
453  bool storageRefresh(const InfoHash& id, Value::Id vid);
454  void expireStore();
455  void expireStorage(InfoHash h);
456  void expireStore(decltype(store)::iterator);
457 
458  void storageRemoved(const InfoHash& id, Storage& st, const std::vector<Sp<Value>>& values, size_t totalSize);
459  void storageChanged(const InfoHash& id, Storage& st, const Sp<Value>&, bool newValue);
460  std::string printStorageLog(const decltype(store)::value_type&) const;
461 
467  void dataPersistence(InfoHash id);
468  size_t maintainStorage(decltype(store)::value_type&, bool force=false, const DoneCallback& donecb={});
469 
470  // Buckets
471  Kad& dht(sa_family_t af) { return af == AF_INET ? dht4 : dht6; }
472  const Kad& dht(sa_family_t af) const { return af == AF_INET ? dht4 : dht6; }
473  RoutingTable& buckets(sa_family_t af) { return dht(af).buckets; }
474  const RoutingTable& buckets(sa_family_t af) const { return dht(af).buckets; }
475  Bucket* findBucket(const InfoHash& id, sa_family_t af) {
476  auto& b = buckets(af);
477  auto it = b.findBucket(id);
478  return it == b.end() ? nullptr : &(*it);
479  }
480  const Bucket* findBucket(const InfoHash& id, sa_family_t af) const {
481  return const_cast<Dht*>(this)->findBucket(id, af);
482  }
483 
484  void expireBuckets(RoutingTable&);
485  void sendCachedPing(Bucket& b);
486  bool bucketMaintenance(RoutingTable&);
487  void dumpBucket(const Bucket& b, std::ostream& out) const;
488  void bootstrap();
489  void startBootstrap();
490  void stopBootstrap();
491 
492  // Nodes
493  void onNewNode(const Sp<Node>& node, int confirm);
494  const Sp<Node> findNode(const InfoHash& id, sa_family_t af) const;
495  bool trySearchInsert(const Sp<Node>& node);
496 
497  // Searches
498  inline SearchMap& searches(sa_family_t af) { return dht(af).searches; }
499  inline const SearchMap& searches(sa_family_t af) const { return dht(af).searches; }
500 
505  Sp<Search> search(const InfoHash& id, sa_family_t af, GetCallback = {}, QueryCallback = {}, DoneCallback = {}, Value::Filter = {}, const Sp<Query>& q = {});
506 
507  void announce(const InfoHash& id, sa_family_t af, Sp<Value> value, DoneCallback callback, time_point created=time_point::max(), bool permanent = false);
508  size_t listenTo(const InfoHash& id, sa_family_t af, ValueCallback cb, Value::Filter f = {}, const Sp<Query>& q = {});
509 
517  unsigned refill(Search& sr);
518  void expireSearches();
519 
520  void confirmNodes();
521  void expire();
522 
523  void onConnected();
524  void onDisconnected();
525 
534  void searchNodeGetDone(const net::Request& status,
535  net::RequestAnswer&& answer,
536  std::weak_ptr<Search> ws,
537  Sp<Query> query);
538 
548  void searchNodeGetExpired(const net::Request& status, bool over, std::weak_ptr<Search> ws, Sp<Query> query);
549 
557  void paginate(std::weak_ptr<Search> ws, Sp<Query> query, SearchNode* n);
558 
562  SearchNode* searchSendGetValues(Sp<Search> sr, SearchNode *n = nullptr, bool update = true);
563 
570  void searchSendAnnounceValue(const Sp<Search>& sr);
571 
579  void searchStep(std::weak_ptr<Search> ws);
580 
581  void searchSynchedNodeListen(const Sp<Search>&, SearchNode&);
582 
583  void dumpSearch(const Search& sr, std::ostream& out) const;
584 
585  bool neighbourhoodMaintenance(RoutingTable&);
586 
587  void onError(Sp<net::Request> node, net::DhtProtocolException e);
588  /* when our address is reported by a distant peer. */
589  void onReportedAddr(const InfoHash& id, const SockAddr&);
590  /* when we receive a ping request */
591  net::RequestAnswer onPing(Sp<Node> node);
592  /* when we receive a "find node" request */
593  net::RequestAnswer onFindNode(Sp<Node> node, const InfoHash& hash, want_t want);
594  void onFindNodeDone(const Sp<Node>& status,
595  net::RequestAnswer& a,
596  Sp<Search> sr);
597  /* when we receive a "get values" request */
598  net::RequestAnswer onGetValues(Sp<Node> node,
599  const InfoHash& hash,
600  want_t want,
601  const Query& q);
602  void onGetValuesDone(const Sp<Node>& status,
603  net::RequestAnswer& a,
604  Sp<Search>& sr,
605  const Sp<Query>& orig_query);
606  /* when we receive a listen request */
607  net::RequestAnswer onListen(Sp<Node> node,
608  const InfoHash& hash,
609  const Blob& token,
610  size_t socket_id,
611  const Query& query,
612  int version = 0);
613  void onListenDone(const Sp<Node>& status,
614  net::RequestAnswer& a,
615  Sp<Search>& sr);
616  /* when we receive an announce request */
617  net::RequestAnswer onAnnounce(Sp<Node> node,
618  const InfoHash& hash,
619  const Blob& token,
620  const std::vector<Sp<Value>>& v,
621  const time_point& created);
622  net::RequestAnswer onRefresh(Sp<Node> node,
623  const InfoHash& hash,
624  const Blob& token,
625  const Value::Id& vid);
626  void onAnnounceDone(const Sp<Node>& status,
627  net::RequestAnswer& a,
628  Sp<Search>& sr);
629 };
630 
631 }
std::pair< size_t, size_t > getStoreSize() const override
Definition: dht.h:313
size_t listen(const InfoHash &key, GetCallback cb, Value::Filter f={}, Where w={}) override
Definition: dht.h:248
const InfoHash & getNodeId() const override
Definition: dht.h:78
NodeStatus
Definition: callbacks.h:42
std::vector< uint8_t > Blob
Definition: utils.h:151
void pushNotificationReceived(const std::map< std::string, std::string > &) override
Definition: dht.h:319
Serializable dht::Value filter.
Definition: value.h:800
Definition: callbacks.h:35
void setStorageLimit(size_t limit=DEFAULT_STORAGE_LIMIT) override
Definition: dht.h:302
NodeStatus getStatus(sa_family_t af) const override
Definition: dht.h:85
Definition: dht.h:62