C++ Distributed Hash Table
dht_proxy_server.h
1 /*
2  * Copyright (C) 2014-2022 Savoir-faire Linux Inc.
3  * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
4  * Adrien Béraud <adrien.beraud@savoirfairelinux.com>
5  * Vsevolod Ivanov <vsevolod.ivanov@savoirfairelinux.com>
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program. If not, see <https://www.gnu.org/licenses/>.
19  */
20 
21 #pragma once
22 
23 #include "callbacks.h"
24 #include "def.h"
25 #include "infohash.h"
26 #include "proxy.h"
27 #include "scheduler.h"
28 #include "sockaddr.h"
29 #include "value.h"
30 #include "http.h"
31 
32 #include <restinio/all.hpp>
33 #include <restinio/tls.hpp>
34 #include <json/json.h>
35 
36 #include <memory>
37 #include <mutex>
38 
39 namespace dht {
40 enum class PushType {
41  None = 0,
42  Android,
43  iOS
44 };
45 }
46 MSGPACK_ADD_ENUM(dht::PushType)
47 
48 namespace http {
49 class Request;
50 struct ListenerSession;
51 }
52 
53 namespace Json {
54 class Value;
55 }
56 
57 namespace dht {
58 
59 class DhtRunner;
60 
61 using RestRouter = restinio::router::express_router_t<>;
62 using RequestStatus = restinio::request_handling_status_t;
63 
65  in_port_t port {8000};
66  std::string pushServer {};
67  std::string persistStatePath {};
68  dht::crypto::Identity identity {};
69 };
70 
74 class OPENDHT_PUBLIC DhtProxyServer
75 {
76 public:
85  DhtProxyServer(const std::shared_ptr<DhtRunner>& dht,
86  const ProxyServerConfig& config = {},
87  const std::shared_ptr<dht::Logger>& logger = {});
88 
89  virtual ~DhtProxyServer();
90 
91  DhtProxyServer(const DhtProxyServer& other) = delete;
92  DhtProxyServer(DhtProxyServer&& other) = delete;
93  DhtProxyServer& operator=(const DhtProxyServer& other) = delete;
94  DhtProxyServer& operator=(DhtProxyServer&& other) = delete;
95 
96  asio::io_context& io_context() const;
97 
98  struct ServerStats {
100  size_t listenCount {0};
102  size_t putCount {0};
104  size_t totalPermanentPuts {0};
106  size_t pushListenersCount {0};
108  double requestRate {0};
110  std::shared_ptr<NodeInfo> nodeInfo {};
111 
112  std::string toString() const {
113  std::ostringstream ss;
114  ss << "Listens: " << listenCount << " Puts: " << putCount << " PushListeners: " << pushListenersCount << std::endl;
115  ss << "Requests: " << requestRate << " per second." << std::endl;
116  if (nodeInfo) {
117  auto& ipv4 = nodeInfo->ipv4;
118  if (ipv4.table_depth > 1)
119  ss << "IPv4 Network estimation: " << ipv4.getNetworkSizeEstimation() << std::endl;;
120  auto& ipv6 = nodeInfo->ipv6;
121  if (ipv6.table_depth > 1)
122  ss << "IPv6 Network estimation: " << ipv6.getNetworkSizeEstimation() << std::endl;;
123  }
124  return ss.str();
125  }
126 
130  Json::Value toJson() const {
131  Json::Value result;
132  result["listenCount"] = static_cast<Json::UInt64>(listenCount);
133  result["putCount"] = static_cast<Json::UInt64>(putCount);
134  result["totalPermanentPuts"] = static_cast<Json::UInt64>(totalPermanentPuts);
135  result["pushListenersCount"] = static_cast<Json::UInt64>(pushListenersCount);
136  result["requestRate"] = requestRate;
137  if (nodeInfo)
138  result["nodeInfo"] = nodeInfo->toJson();
139  return result;
140  }
141  };
142 
143  std::shared_ptr<ServerStats> stats() const { return stats_; }
144 
145  std::shared_ptr<ServerStats> updateStats(std::shared_ptr<NodeInfo> info) const;
146 
147  std::shared_ptr<DhtRunner> getNode() const { return dht_; }
148 
149 private:
150  class ConnectionListener;
151  struct RestRouterTraitsTls;
152  struct RestRouterTraits;
153 
154  template <typename HttpResponse>
155  static HttpResponse initHttpResponse(HttpResponse response);
156  static restinio::request_handling_status_t serverError(restinio::request_t& request);
157 
158  template< typename ServerSettings >
159  void addServerSettings(ServerSettings& serverSettings,
160  const unsigned int max_pipelined_requests = 16);
161 
162  std::unique_ptr<RestRouter> createRestRouter();
163 
164  void onConnectionClosed(restinio::connection_id_t);
165 
173  RequestStatus getNodeInfo(restinio::request_handle_t request,
174  restinio::router::route_params_t params) const;
175 
182  RequestStatus getStats(restinio::request_handle_t request,
183  restinio::router::route_params_t params);
184 
195  RequestStatus get(restinio::request_handle_t request,
196  restinio::router::route_params_t params);
197 
208  RequestStatus listen(restinio::request_handle_t request,
209  restinio::router::route_params_t params);
210 
220  RequestStatus put(restinio::request_handle_t request,
221  restinio::router::route_params_t params);
222 
223  void handleCancelPermamentPut(const asio::error_code &ec, const InfoHash& key, Value::Id vid);
224 
225 #ifdef OPENDHT_PROXY_SERVER_IDENTITY
226 
235  RequestStatus putSigned(restinio::request_handle_t request,
236  restinio::router::route_params_t params) const;
237 
247  RequestStatus putEncrypted(restinio::request_handle_t request,
248  restinio::router::route_params_t params);
249 
250 #endif // OPENDHT_PROXY_SERVER_IDENTITY
251 
262  RequestStatus getFiltered(restinio::request_handle_t request,
263  restinio::router::route_params_t params);
264 
272  RequestStatus options(restinio::request_handle_t request,
273  restinio::router::route_params_t params);
274 
275 #ifdef OPENDHT_PUSH_NOTIFICATIONS
276 
285  RequestStatus subscribe(restinio::request_handle_t request,
286  restinio::router::route_params_t params);
287 
295  RequestStatus unsubscribe(restinio::request_handle_t request,
296  restinio::router::route_params_t params);
297 
303  void sendPushNotification(const std::string& key, Json::Value&& json, PushType type, bool highPriority);
304 
312  void handleNotifyPushListenExpire(const asio::error_code &ec, const std::string pushToken,
313  std::function<Json::Value()> json, PushType type);
314 
322  void handleCancelPushListen(const asio::error_code &ec, const std::string pushToken,
323  const InfoHash key, const std::string clientId);
324 
325 #endif //OPENDHT_PUSH_NOTIFICATIONS
326 
327  void handlePrintStats(const asio::error_code &ec);
328  void updateStats();
329 
330  template <typename Os>
331  void saveState(Os& stream);
332 
333  template <typename Is>
334  void loadState(Is& is, size_t size);
335 
336  using clock = std::chrono::steady_clock;
337  using time_point = clock::time_point;
338 
339  std::shared_ptr<asio::io_context> ioContext_;
340  std::shared_ptr<DhtRunner> dht_;
341  Json::StreamWriterBuilder jsonBuilder_;
342  Json::CharReaderBuilder jsonReaderBuilder_;
343  std::mt19937_64 rd {crypto::getSeededRandomEngine<std::mt19937_64>()};
344 
345  std::string persistPath_;
346 
347  // http server
348  std::thread serverThread_;
349  std::unique_ptr<restinio::http_server_t<RestRouterTraitsTls>> httpsServer_;
350  std::unique_ptr<restinio::http_server_t<RestRouterTraits>> httpServer_;
351 
352  // http client
353  std::pair<std::string, std::string> pushHostPort_;
354 
355  mutable std::mutex requestLock_;
356  std::map<unsigned int /*id*/, std::shared_ptr<http::Request>> requests_;
357 
358  std::shared_ptr<dht::Logger> logger_;
359 
360  std::shared_ptr<ServerStats> stats_;
361  std::shared_ptr<NodeInfo> nodeInfo_ {};
362  std::unique_ptr<asio::steady_timer> printStatsTimer_;
363 
364  // Thread-safe access to listeners map.
365  std::mutex lockListener_;
366  // Shared with connection listener.
367  std::map<restinio::connection_id_t, http::ListenerSession> listeners_;
368  // Connection Listener observing conn state changes.
369  std::shared_ptr<ConnectionListener> connListener_;
370 
371  struct PushSessionContext {
372  std::mutex lock;
373  std::string sessionId;
374  PushSessionContext(const std::string& id) : sessionId(id) {}
375  };
376  struct PermanentPut {
377  time_point expiration;
378  std::string pushToken;
379  std::string clientId;
380  std::shared_ptr<PushSessionContext> sessionCtx;
381  std::unique_ptr<asio::steady_timer> expireTimer;
382  std::unique_ptr<asio::steady_timer> expireNotifyTimer;
383  Sp<Value> value;
384  PushType type;
385 
386  template <typename Packer>
387  void msgpack_pack(Packer& p) const
388  {
389  p.pack_map(2 + (sessionCtx ? 1 : 0) + (clientId.empty() ? 0 : 1) + (type == PushType::None ? 0 : 2));
390  p.pack("value"); p.pack(value);
391  p.pack("exp"); p.pack(to_time_t(expiration));
392  if (not clientId.empty()) {
393  p.pack("cid"); p.pack(clientId);
394  }
395  if (sessionCtx) {
396  std::lock_guard<std::mutex> l(sessionCtx->lock);
397  p.pack("sid"); p.pack(sessionCtx->sessionId);
398  }
399  if (type != PushType::None) {
400  p.pack("t"); p.pack(type);
401  p.pack("token"); p.pack(pushToken);
402  }
403  }
404 
405  void msgpack_unpack(const msgpack::object& o);
406  };
407  struct SearchPuts {
408  std::map<dht::Value::Id, PermanentPut> puts;
409  MSGPACK_DEFINE_ARRAY(puts)
410  };
411  std::mutex lockSearchPuts_;
412  std::map<InfoHash, SearchPuts> puts_;
413 
414  mutable std::atomic<size_t> requestNum_ {0};
415  mutable std::atomic<time_point> lastStatsReset_ {time_point::min()};
416 
417  std::string pushServer_;
418 
419 #ifdef OPENDHT_PUSH_NOTIFICATIONS
420  struct Listener {
421  time_point expiration;
422  std::string clientId;
423  std::shared_ptr<PushSessionContext> sessionCtx;
424  std::future<size_t> internalToken;
425  std::unique_ptr<asio::steady_timer> expireTimer;
426  std::unique_ptr<asio::steady_timer> expireNotifyTimer;
427  PushType type;
428 
429  template <typename Packer>
430  void msgpack_pack(Packer& p) const
431  {
432  p.pack_map(sessionCtx ? 4 : 3);
433  p.pack("cid"); p.pack(clientId);
434  p.pack("exp"); p.pack(to_time_t(expiration));
435  if (sessionCtx) {
436  std::lock_guard<std::mutex> l(sessionCtx->lock);
437  p.pack("sid"); p.pack(sessionCtx->sessionId);
438  }
439  p.pack("t"); p.pack(type);
440  }
441 
442  void msgpack_unpack(const msgpack::object& o);
443  };
444  struct PushListener {
445  std::map<InfoHash, std::vector<Listener>> listeners;
446  MSGPACK_DEFINE_ARRAY(listeners)
447  };
448  std::map<std::string, PushListener> pushListeners_;
449  proxy::ListenToken tokenPushNotif_ {0};
450 #endif //OPENDHT_PUSH_NOTIFICATIONS
451 };
452 
453 }
Definition: callbacks.h:35