C++ Distributed Hash Table
dht_proxy_client.h
1 /*
2  * Copyright (C) 2014-2022 Savoir-faire Linux Inc.
3  * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
4  * Adrien Béraud <adrien.beraud@savoirfairelinux.com>
5  * Vsevolod Ivanov <vsevolod.ivanov@savoirfairelinux.com>
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program. If not, see <https://www.gnu.org/licenses/>.
19  */
20 
21 #pragma once
22 
23 #include <functional>
24 #include <mutex>
25 
26 #include "callbacks.h"
27 #include "def.h"
28 #include "dht_interface.h"
29 #include "proxy.h"
30 #include "http.h"
31 
32 #include <restinio/all.hpp>
33 #include <json/json.h>
34 
35 #include <chrono>
36 #include <vector>
37 #include <functional>
38 
39 namespace Json {
40 class Value;
41 }
42 
43 namespace http {
44 class Resolver;
45 class Request;
46 }
47 
48 namespace dht {
49 
50 class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface {
51 public:
52 
54 
55  explicit DhtProxyClient(
56  std::shared_ptr<crypto::Certificate> serverCA, crypto::Identity clientIdentity,
57  std::function<void()> loopSignal, const std::string& serverHost,
58  const std::string& pushClientId = "", std::shared_ptr<Logger> logger = {});
59 
60  void setHeaderFields(http::Request& request);
61 
62  virtual void setPushNotificationToken(const std::string& token) override {
63 #ifdef OPENDHT_PUSH_NOTIFICATIONS
64  deviceKey_ = token;
65 #else
66  (void) token;
67 #endif
68  }
69 
70  virtual ~DhtProxyClient();
71 
75  inline const InfoHash& getNodeId() const override { return myid; }
76 
80  NodeStatus getStatus(sa_family_t af) const override;
81  NodeStatus getStatus() const override {
82  return std::max(getStatus(AF_INET), getStatus(AF_INET6));
83  }
84 
88  void shutdown(ShutdownCallback cb, bool) override;
89 
96  bool isRunning(sa_family_t af = 0) const override;
97 
108  virtual void get(const InfoHash& key, GetCallback cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) override;
109  virtual void get(const InfoHash& key, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter&& f={}, Where&& w = {}) override {
110  get(key, cb, bindDoneCb(std::move(donecb)), std::forward<Value::Filter>(f), std::forward<Where>(w));
111  }
112  virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) override {
113  get(key, bindGetCb(cb), std::move(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
114  }
115  virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallbackSimple donecb, Value::Filter&& f={}, Where&& w = {}) override {
116  get(key, bindGetCb(cb), bindDoneCb(std::move(donecb)), std::forward<Value::Filter>(f), std::forward<Where>(w));
117  }
118 
126  void put(const InfoHash& key,
127  Sp<Value>,
128  DoneCallback cb=nullptr,
129  time_point created=time_point::max(),
130  bool permanent = false) override;
131  void put(const InfoHash& key,
132  const Sp<Value>& v,
133  DoneCallbackSimple cb,
134  time_point created=time_point::max(),
135  bool permanent = false) override
136  {
137  put(key, v, bindDoneCb(std::move(cb)), created, permanent);
138  }
139 
140  void put(const InfoHash& key,
141  Value&& v,
142  DoneCallback cb=nullptr,
143  time_point created=time_point::max(),
144  bool permanent = false) override
145  {
146  put(key, std::make_shared<Value>(std::move(v)), std::move(cb), created, permanent);
147  }
148  void put(const InfoHash& key,
149  Value&& v,
150  DoneCallbackSimple cb,
151  time_point created=time_point::max(),
152  bool permanent = false) override
153  {
154  put(key, std::forward<Value>(v), bindDoneCb(std::move(cb)), created, permanent);
155  }
156 
161  NodeStats getNodesStats(sa_family_t af) const override;
162 
167  std::vector<SockAddr> getPublicAddress(sa_family_t family = 0) override;
168 
176  virtual size_t listen(const InfoHash&, ValueCallback, Value::Filter={}, Where={}) override;
177 
178  virtual size_t listen(const InfoHash& key, GetCallback cb, Value::Filter f={}, Where w={}) override {
179  return listen(key, [cb=std::move(cb)](const std::vector<Sp<Value>>& vals, bool expired){
180  if (not expired)
181  return cb(vals);
182  return true;
183  }, std::forward<Value::Filter>(f), std::forward<Where>(w));
184  }
185  virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w={}) override {
186  return listen(key, bindGetCb(std::move(cb)), std::forward<Value::Filter>(f), std::forward<Where>(w));
187  }
188  /*
189  * This function relies on the cache implementation.
190  * It means that there are no true cancel here, it keeps the caching in higher priority.
191  */
192  virtual bool cancelListen(const InfoHash& key, size_t token) override;
193 
198  void pushNotificationReceived(const std::map<std::string, std::string>& notification) override;
199 
200  time_point periodic(const uint8_t*, size_t, SockAddr, const time_point& now) override;
201  time_point periodic(const uint8_t* buf, size_t buflen, const sockaddr* from, socklen_t fromlen, const time_point& now) override {
202  return periodic(buf, buflen, SockAddr(from, fromlen), now);
203  }
204 
215  virtual void query(const InfoHash& /*key*/, QueryCallback /*cb*/, DoneCallback /*done_cb*/ = {}, Query&& /*q*/ = {}) override { }
216  virtual void query(const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {}) override {
217  query(key, cb, bindDoneCb(std::move(done_cb)), std::forward<Query>(q));
218  }
219 
223  std::vector<Sp<Value>> getPut(const InfoHash&) const override;
224 
228  Sp<Value> getPut(const InfoHash&, const Value::Id&) const override;
229 
234  bool cancelPut(const InfoHash&, const Value::Id&) override;
235 
236  void pingNode(SockAddr, DoneCallbackSimple&& /*cb*/={}) override { }
237 
238  virtual void registerType(const ValueType& type) override {
239  types.registerType(type);
240  }
241  const ValueType& getType(ValueType::Id type_id) const override {
242  return types.getType(type_id);
243  }
244 
245  std::vector<Sp<Value>> getLocal(const InfoHash& k, const Value::Filter& filter) const override;
246  Sp<Value> getLocalById(const InfoHash& k, Value::Id id) const override;
247 
252  void insertNode(const InfoHash&, const SockAddr&) override { }
253  void insertNode(const NodeExport&) override { }
254  std::pair<size_t, size_t> getStoreSize() const override { return {}; }
255  std::vector<NodeExport> exportNodes() const override { return {}; }
256  std::vector<ValuesExport> exportValues() const override { return {}; }
257  void importValues(const std::vector<ValuesExport>&) override {}
258  std::string getStorageLog() const override { return {}; }
259  std::string getStorageLog(const InfoHash&) const override { return {}; }
260  std::string getRoutingTablesLog(sa_family_t) const override { return {}; }
261  std::string getSearchesLog(sa_family_t) const override { return {}; }
262  std::string getSearchLog(const InfoHash&, sa_family_t) const override { return {}; }
263  void dumpTables() const override {}
264  std::vector<unsigned> getNodeMessageStats(bool) override { return {}; }
265  void setStorageLimit(size_t) override {}
266  virtual size_t getStorageLimit() const { return 0; }
267  void connectivityChanged(sa_family_t) override {
268  getProxyInfos();
269  }
270  void connectivityChanged() override {
271  getProxyInfos();
272  loopSignal_();
273  }
274 
275 private:
279  void startProxy();
280  void stop();
281 
286  struct InfoState;
287  void getProxyInfos();
288  void queryProxyInfo(const std::shared_ptr<InfoState>& infoState, const std::shared_ptr<http::Resolver>& resolver, sa_family_t family);
289  void onProxyInfos(const Json::Value& val, const sa_family_t family);
290  SockAddr parsePublicAddress(const Json::Value& val);
291 
292  void opFailed();
293 
294  void handleExpireListener(const asio::error_code &ec, const InfoHash& key);
295 
296  struct Listener;
297  struct OperationState;
298  enum class ListenMethod {
299  LISTEN,
300  SUBSCRIBE,
301  RESUBSCRIBE,
302  };
303  using CacheValueCallback = std::function<bool(const std::vector<std::shared_ptr<Value>>& values, bool expired, system_clock::time_point)>;
304 
308  void sendListen(const restinio::http_request_header_t& header, const CacheValueCallback& cb,
309  const Sp<OperationState>& opstate, Listener& listener, ListenMethod method = ListenMethod::LISTEN);
310  void handleResubscribe(const asio::error_code& ec, const InfoHash& key,
311  const size_t token, std::shared_ptr<OperationState> opstate);
312 
313  void doPut(const InfoHash&, Sp<Value>, DoneCallbackSimple, time_point created, bool permanent);
314  void handleRefreshPut(const asio::error_code& ec, InfoHash key, Value::Id id);
315 
319  void getConnectivityStatus();
323  void cancelAllListeners();
324 
325  std::atomic_bool isDestroying_ {false};
326 
327  std::string proxyUrl_;
328  dht::crypto::Identity clientIdentity_;
329  std::shared_ptr<dht::crypto::Certificate> serverCertificate_;
330  //std::pair<std::string, std::string> serverHostService_;
331  std::string pushClientId_;
332  std::string pushSessionId_;
333 
334  mutable std::mutex lockCurrentProxyInfos_;
335  NodeStatus statusIpv4_ {NodeStatus::Disconnected};
336  NodeStatus statusIpv6_ {NodeStatus::Disconnected};
337  NodeStats stats4_ {};
338  NodeStats stats6_ {};
339  SockAddr publicAddressV4_;
340  SockAddr publicAddressV6_;
341  std::atomic_bool launchConnectedCbs_ {false};
342 
343  InfoHash myid {};
344 
345  // registred types
346  TypeStore types;
347 
348  /*
349  * ASIO I/O Context for sockets in httpClient_
350  * Note: Each context is used in one thread only
351  */
352  asio::io_context httpContext_;
353  std::shared_ptr<http::Resolver> resolver_;
354 
355  mutable std::mutex requestLock_;
356  std::map<unsigned, std::shared_ptr<http::Request>> requests_;
357  /*
358  * Thread for executing the http io_context.run() blocking call
359  */
360  std::thread httpClientThread_;
361 
365  struct ProxySearch;
366 
367  mutable std::mutex searchLock_;
368  size_t listenerToken_ {0};
369  std::map<InfoHash, ProxySearch> searches_;
370 
374  std::mutex lockCallbacks_;
375  std::vector<std::function<void()>> callbacks_;
376 
377  Sp<InfoState> infoState_;
378 
382  void handleProxyConfirm(const asio::error_code &ec);
383  std::unique_ptr<asio::steady_timer> nextProxyConfirmationTimer_;
384  std::unique_ptr<asio::steady_timer> listenerRestartTimer_;
385 
389  void restartListeners(const asio::error_code &ec);
390 
395  void resubscribe(const InfoHash& key, const size_t token, Listener& listener);
396 
401  std::string deviceKey_ {};
402 
403  const std::function<void()> loopSignal_;
404 
405 #ifdef OPENDHT_PUSH_NOTIFICATIONS
406  std::string fillBody(bool resubscribe);
407  void getPushRequest(Json::Value&) const;
408 #endif // OPENDHT_PUSH_NOTIFICATIONS
409 
410  Json::StreamWriterBuilder jsonBuilder_;
411  std::unique_ptr<Json::CharReader> jsonReader_;
412 
413  std::shared_ptr<http::Request> buildRequest(const std::string& target = {});
414 };
415 
416 }
void setStorageLimit(size_t) override
void insertNode(const InfoHash &, const SockAddr &) override
NodeStatus
Definition: callbacks.h:42
std::vector< NodeExport > exportNodes() const override
Describes a query destined to another peer.
Definition: value.h:924
Serializable dht::Value filter.
Definition: value.h:800
std::pair< size_t, size_t > getStoreSize() const override
virtual size_t listen(const InfoHash &key, GetCallback cb, Value::Filter f={}, Where w={}) override
virtual void query(const InfoHash &, QueryCallback, DoneCallback={}, Query &&={}) override
Definition: callbacks.h:35
void connectivityChanged(sa_family_t) override
const InfoHash & getNodeId() const override