C++ Distributed Hash Table
dhtrunner.h
1 /*
2  * Copyright (C) 2014-2022 Savoir-faire Linux Inc.
3  * Authors: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
4  * Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
5  * Sébastien Blin <sebastien.blin@savoirfairelinux.com>
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program. If not, see <https://www.gnu.org/licenses/>.
19  */
20 
21 #pragma once
22 
23 #include "def.h"
24 #include "infohash.h"
25 #include "value.h"
26 #include "callbacks.h"
27 #include "sockaddr.h"
28 #include "log_enable.h"
29 #include "network_utils.h"
30 
31 #include <thread>
32 #include <mutex>
33 #include <atomic>
34 #include <condition_variable>
35 #include <future>
36 #include <exception>
37 #include <queue>
38 #include <chrono>
39 
40 namespace dht {
41 
42 struct Node;
43 class SecureDht;
44 class PeerDiscovery;
45 struct SecureDhtConfig;
46 
53 class OPENDHT_PUBLIC DhtRunner {
54 
55 public:
56  using StatusCallback = std::function<void(NodeStatus, NodeStatus)>;
57 
58  struct Config {
59  SecureDhtConfig dht_config {};
60  bool threaded {true};
61  std::string proxy_server {};
62  std::string push_node_id {};
63  std::string push_token {};
64  bool peer_discovery {false};
65  bool peer_publish {false};
66  std::shared_ptr<dht::crypto::Certificate> server_ca;
67  dht::crypto::Identity client_identity;
68  SockAddr bind4 {}, bind6 {};
69  };
70 
71  struct Context {
72  std::shared_ptr<Logger> logger {};
73  std::unique_ptr<net::DatagramSocket> sock;
74  std::shared_ptr<PeerDiscovery> peerDiscovery {};
75  StatusCallback statusChangedCallback {};
76  CertificateStoreQuery certificateStore {};
77  IdentityAnnouncedCb identityAnnouncedCb {};
78  Context() {}
79  };
80 
81  DhtRunner();
82  virtual ~DhtRunner();
83 
84  void get(InfoHash id, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter f = {}, Where w = {}) {
85  get(id, bindGetCb(cb), donecb, f, w);
86  }
87 
88  void get(InfoHash id, GetCallbackSimple cb, DoneCallbackSimple donecb={}, Value::Filter f = {}, Where w = {}) {
89  get(id, bindGetCb(cb), donecb, f, w);
90  }
91 
92  void get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f={}, Where w = {});
93 
94  void get(InfoHash id, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter f = {}, Where w = {}) {
95  get(id, cb, bindDoneCb(donecb), f, w);
96  }
97  void get(const std::string& key, GetCallback vcb, DoneCallbackSimple dcb={}, Value::Filter f = {}, Where w = {});
98 
99  template <class T>
100  void get(InfoHash hash, std::function<bool(std::vector<T>&&)> cb, DoneCallbackSimple dcb={})
101  {
102  get(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals) {
103  return cb(unpackVector<T>(vals));
104  },
105  dcb,
106  getFilterSet<T>());
107  }
108  template <class T>
109  void get(InfoHash hash, std::function<bool(T&&)> cb, DoneCallbackSimple dcb={})
110  {
111  get(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals) {
112  for (const auto& v : vals) {
113  try {
114  if (not cb(Value::unpack<T>(*v)))
115  return false;
116  } catch (const std::exception&) {
117  continue;
118  }
119  }
120  return true;
121  },
122  dcb,
123  getFilterSet<T>());
124  }
125 
126  std::future<std::vector<std::shared_ptr<dht::Value>>> get(InfoHash key, Value::Filter f = {}, Where w = {}) {
127  auto p = std::make_shared<std::promise<std::vector<std::shared_ptr< dht::Value >>>>();
128  auto values = std::make_shared<std::vector<std::shared_ptr< dht::Value >>>();
129  get(key, [=](const std::vector<std::shared_ptr<dht::Value>>& vlist) {
130  values->insert(values->end(), vlist.begin(), vlist.end());
131  return true;
132  }, [=](bool) {
133  p->set_value(std::move(*values));
134  },
135  f, w);
136  return p->get_future();
137  }
138 
139  template <class T>
140  std::future<std::vector<T>> get(InfoHash key) {
141  auto p = std::make_shared<std::promise<std::vector<T>>>();
142  auto values = std::make_shared<std::vector<T>>();
143  get<T>(key, [=](T&& v) {
144  values->emplace_back(std::move(v));
145  return true;
146  }, [=](bool) {
147  p->set_value(std::move(*values));
148  });
149  return p->get_future();
150  }
151 
152  void query(const InfoHash& hash, QueryCallback cb, DoneCallback done_cb = {}, Query q = {});
153  void query(const InfoHash& hash, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query q = {}) {
154  query(hash, cb, bindDoneCb(done_cb), q);
155  }
156 
157  std::future<size_t> listen(InfoHash key, ValueCallback vcb, Value::Filter f = {}, Where w = {});
158 
159  std::future<size_t> listen(InfoHash key, GetCallback cb, Value::Filter f={}, Where w={}) {
160  return listen(key, [cb=std::move(cb)](const std::vector<Sp<Value>>& vals, bool expired){
161  if (not expired)
162  return cb(vals);
163  return true;
164  }, std::forward<Value::Filter>(f), std::forward<Where>(w));
165  }
166  std::future<size_t> listen(const std::string& key, GetCallback vcb, Value::Filter f = {}, Where w = {});
167  std::future<size_t> listen(InfoHash key, GetCallbackSimple cb, Value::Filter f = {}, Where w = {}) {
168  return listen(key, bindGetCb(cb), f, w);
169  }
170 
171  template <class T>
172  std::future<size_t> listen(InfoHash hash, std::function<bool(std::vector<T>&&)> cb)
173  {
174  return listen(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals) {
175  return cb(unpackVector<T>(vals));
176  },
177  getFilterSet<T>());
178  }
179  template <class T>
180  std::future<size_t> listen(InfoHash hash, std::function<bool(std::vector<T>&&, bool)> cb)
181  {
182  return listen(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals, bool expired) {
183  return cb(unpackVector<T>(vals), expired);
184  },
185  getFilterSet<T>());
186  }
187 
188  template <typename T>
189  std::future<size_t> listen(InfoHash hash, std::function<bool(T&&)> cb, Value::Filter f = {}, Where w = {})
190  {
191  return listen(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals) {
192  for (const auto& v : vals) {
193  try {
194  if (not cb(Value::unpack<T>(*v)))
195  return false;
196  } catch (const std::exception&) {
197  continue;
198  }
199  }
200  return true;
201  },
202  getFilterSet<T>(f), w);
203  }
204  template <typename T>
205  std::future<size_t> listen(InfoHash hash, std::function<bool(T&&, bool)> cb, Value::Filter f = {}, Where w = {})
206  {
207  return listen(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals, bool expired) {
208  for (const auto& v : vals) {
209  try {
210  if (not cb(Value::unpack<T>(*v), expired))
211  return false;
212  } catch (const std::exception&) {
213  continue;
214  }
215  }
216  return true;
217  },
218  getFilterSet<T>(f), w);
219  }
220 
221  void cancelListen(InfoHash h, size_t token);
222  void cancelListen(InfoHash h, std::shared_future<size_t> token);
223 
224  void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}, time_point created=time_point::max(), bool permanent = false);
225  void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb, time_point created=time_point::max(), bool permanent = false) {
226  put(hash, value, bindDoneCb(cb), created, permanent);
227  }
228 
229  void put(InfoHash hash, Value&& value, DoneCallback cb={}, time_point created=time_point::max(), bool permanent = false);
230  void put(InfoHash hash, Value&& value, DoneCallbackSimple cb, time_point created=time_point::max(), bool permanent = false) {
231  put(hash, std::forward<Value>(value), bindDoneCb(cb), created, permanent);
232  }
233  void put(const std::string& key, Value&& value, DoneCallbackSimple cb={}, time_point created=time_point::max(), bool permanent = false);
234 
235  void cancelPut(const InfoHash& h, Value::Id id);
236  void cancelPut(const InfoHash& h, const std::shared_ptr<Value>& value);
237 
238  void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}, bool permanent = false);
239  void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb, bool permanent = false) {
240  putSigned(hash, value, bindDoneCb(cb), permanent);
241  }
242 
243  void putSigned(InfoHash hash, Value&& value, DoneCallback cb={}, bool permanent = false);
244  void putSigned(InfoHash hash, Value&& value, DoneCallbackSimple cb, bool permanent = false) {
245  putSigned(hash, std::forward<Value>(value), bindDoneCb(cb), permanent);
246  }
247  void putSigned(const std::string& key, Value&& value, DoneCallbackSimple cb={}, bool permanent = false);
248 
249  void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb={}, bool permanent = false);
250  void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallbackSimple cb, bool permanent = false) {
251  putEncrypted(hash, to, value, bindDoneCb(cb), permanent);
252  }
253 
254  void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallback cb={}, bool permanent = false);
255  void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallbackSimple cb, bool permanent = false) {
256  putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb), permanent);
257  }
258  void putEncrypted(const std::string& key, InfoHash to, Value&& value, DoneCallback cb={}, bool permanent = false);
259 
260  void putEncrypted(InfoHash hash, const std::shared_ptr<crypto::PublicKey>& to, std::shared_ptr<Value> value, DoneCallback cb={}, bool permanent = false);
261  void putEncrypted(InfoHash hash, const std::shared_ptr<crypto::PublicKey>& to, std::shared_ptr<Value> value, DoneCallbackSimple cb, bool permanent = false) {
262  putEncrypted(hash, to, value, bindDoneCb(cb), permanent);
263  }
264 
265  void putEncrypted(InfoHash hash, const std::shared_ptr<crypto::PublicKey>& to, Value&& value, DoneCallback cb={}, bool permanent = false);
266  void putEncrypted(InfoHash hash, const std::shared_ptr<crypto::PublicKey>& to, Value&& value, DoneCallbackSimple cb, bool permanent = false) {
267  putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb), permanent);
268  }
269 
270 
275  void bootstrap(std::vector<SockAddr> nodes, DoneCallbackSimple cb={});
276  void bootstrap(SockAddr addr, DoneCallbackSimple cb={});
277 
282  void bootstrap(std::vector<NodeExport> nodes);
283 
290  void bootstrap(const std::string& host, const std::string& service);
291  void bootstrap(const std::string& hostService);
292 
297  void bootstrap(const InfoHash& id, const SockAddr& address);
298 
302  void clearBootstrap();
303 
309  void connectivityChanged();
310 
311  void dumpTables() const;
312 
316  InfoHash getId() const;
317  std::shared_ptr<crypto::PublicKey> getPublicKey() const;
318 
322  InfoHash getNodeId() const;
323 
328  SockAddr getBound(sa_family_t f = AF_INET) const;
329 
334  in_port_t getBoundPort(sa_family_t f = AF_INET) const;
335 
336  std::pair<size_t, size_t> getStoreSize() const;
337 
338  void getStorageLimit() const;
339  void setStorageLimit(size_t limit = DEFAULT_STORAGE_LIMIT);
340 
341  std::vector<NodeExport> exportNodes() const;
342 
343  std::vector<ValuesExport> exportValues() const;
344 
345  void setLogger(const Sp<Logger>& logger = {});
346  void setLogger(const Logger& logger) {
347  setLogger(std::make_shared<Logger>(logger));
348  }
349  void setLoggers(LogMethod err = {}, LogMethod warn = {}, LogMethod debug = {});
350 
354  void setLogFilter(const InfoHash& f = {});
355 
356  void registerType(const ValueType& type);
357 
358  void importValues(const std::vector<ValuesExport>& values);
359 
360  bool isRunning() const {
361  return running != State::Idle;
362  }
363 
364  NodeStats getNodesStats(sa_family_t af) const;
365  unsigned getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_return, unsigned *cached_return, unsigned *incoming_return) const;
366  NodeInfo getNodeInfo() const;
367  void getNodeInfo(std::function<void(std::shared_ptr<NodeInfo>)>);
368 
369  std::vector<unsigned> getNodeMessageStats(bool in = false) const;
370  std::string getStorageLog() const;
371  std::string getStorageLog(const InfoHash&) const;
372  std::string getRoutingTablesLog(sa_family_t af) const;
373  std::string getSearchesLog(sa_family_t af = AF_UNSPEC) const;
374  std::string getSearchLog(const InfoHash&, sa_family_t af = AF_UNSPEC) const;
375  std::vector<SockAddr> getPublicAddress(sa_family_t af = AF_UNSPEC);
376  std::vector<std::string> getPublicAddressStr(sa_family_t af = AF_UNSPEC);
377  void getPublicAddress(std::function<void(std::vector<SockAddr>&&)>, sa_family_t af = AF_UNSPEC);
378 
379  // securedht methods
380 
381  void findCertificate(InfoHash hash, std::function<void(const std::shared_ptr<crypto::Certificate>&)>);
382  void registerCertificate(std::shared_ptr<crypto::Certificate> cert);
383  void setLocalCertificateStore(CertificateStoreQuery&& query_method);
384 
391  void run(in_port_t port = dht::net::DHT_DEFAULT_PORT, const crypto::Identity& identity = {}, bool threaded = true, NetId network = 0) {
392  Config config;
393  config.dht_config.node_config.network = network;
394  config.dht_config.id = identity;
395  config.threaded = threaded;
396  run(port, config);
397  }
398  void run(in_port_t port, Config& config, Context&& context = {});
399 
403  void run(const char* ip4, const char* ip6, const char* service, Config& config, Context&& context = {});
404 
405  void run(const Config& config, Context&& context);
406 
407  void setOnStatusChanged(StatusCallback&& cb) {
408  statusCb = std::move(cb);
409  }
410 
416  time_point loop() {
417  std::lock_guard<std::mutex> lck(dht_mtx);
418  return loop_();
419  }
420 
424  void shutdown(ShutdownCallback cb = {}, bool stop = false);
425 
431  void join();
432 
433  std::shared_ptr<PeerDiscovery> getPeerDiscovery() const { return peerDiscovery_; };
434 
435  void setProxyServer(const std::string& proxy, const std::string& pushNodeId = "");
436 
442  void enableProxy(bool proxify);
443 
444  /* Push notification methods */
445 
449  void setPushNotificationToken(const std::string& token);
450 
454  void pushNotificationReceived(const std::map<std::string, std::string>& data);
455 
456  /* Proxy server mothods */
457  void forwardAllMessages(bool forward);
458 
459 private:
460  enum class State {
461  Idle,
462  Running,
463  Stopping
464  };
465 
466  time_point loop_();
467 
468  NodeStatus getStatus() const {
469  return std::max(status4, status6);
470  }
471 
472  bool checkShutdown();
473  void opEnded();
474  DoneCallback bindOpDoneCallback(DoneCallback&& cb);
475  DoneCallbackSimple bindOpDoneCallback(DoneCallbackSimple&& cb);
476 
478  std::unique_ptr<SecureDht> dht_;
479 
481  std::unique_ptr<SecureDht> dht_via_proxy_;
482 
484  std::atomic_bool use_proxy {false};
485 
487  Config config_;
488  IdentityAnnouncedCb identityAnnouncedCb_;
489 
493  void resetDht();
497  SecureDht* activeDht() const;
498 
502  struct Listener;
503  std::map<size_t, Listener> listeners_;
504  size_t listener_token_ {1};
505 
506  mutable std::mutex dht_mtx {};
507  std::thread dht_thread {};
508  std::condition_variable cv {};
509  std::mutex sock_mtx {};
510  net::PacketList rcv {};
511  decltype(rcv) rcv_free {};
512 
513  std::queue<std::function<void(SecureDht&)>> pending_ops_prio {};
514  std::queue<std::function<void(SecureDht&)>> pending_ops {};
515  std::mutex storage_mtx {};
516 
517  std::atomic<State> running {State::Idle};
518  std::atomic_size_t ongoing_ops {0};
519  std::vector<ShutdownCallback> shutdownCallbacks_;
520 
521  NodeStatus status4 {NodeStatus::Disconnected},
522  status6 {NodeStatus::Disconnected};
523  StatusCallback statusCb {nullptr};
524 
526  std::shared_ptr<PeerDiscovery> peerDiscovery_;
527 
532  std::shared_ptr<dht::Logger> logger_;
533 };
534 
535 }
OPENDHT_PUBLIC Blob hash(const Blob &data, size_t hash_length=512/8)
NodeStatus
Definition: callbacks.h:42
void run(in_port_t port=dht::net::DHT_DEFAULT_PORT, const crypto::Identity &identity={}, bool threaded=true, NetId network=0)
Definition: dhtrunner.h:391
time_point loop()
Definition: dhtrunner.h:416
NetId network
Definition: callbacks.h:114
Definition: callbacks.h:35