Electroneum
http_server.h
Go to the documentation of this file.
1 #pragma once
2 
3 #include <boost/date_time/posix_time/posix_time.hpp>
4 #include <boost/asio.hpp>
5 #ifdef CROW_ENABLE_SSL
6 #include <boost/asio/ssl.hpp>
7 #endif
8 #include <cstdint>
9 #include <atomic>
10 #include <future>
11 #include <vector>
12 
13 #include <memory>
14 
15 #include "http_connection.h"
16 #include "logging.h"
17 #include "dumb_timer_queue.h"
18 
19 namespace crow
20 {
21  using namespace boost;
22  using tcp = asio::ip::tcp;
23 
24  template <typename Handler, typename Adaptor = SocketAdaptor, typename ... Middlewares>
25  class Server
26  {
27  public:
28  Server(Handler* handler, std::string bindaddr, uint16_t port, std::tuple<Middlewares...>* middlewares = nullptr, uint16_t concurrency = 1, typename Adaptor::context* adaptor_ctx = nullptr)
29  : acceptor_(io_service_, tcp::endpoint(boost::asio::ip::address::from_string(bindaddr), port)),
30  signals_(io_service_, SIGINT, SIGTERM),
31  handler_(handler),
32  concurrency_(concurrency),
33  port_(port),
34  bindaddr_(bindaddr),
35  middlewares_(middlewares),
36  adaptor_ctx_(adaptor_ctx)
37  {
38  }
39 
40  void run()
41  {
42  if (concurrency_ < 0)
43  concurrency_ = 1;
44 
45  for(int i = 0; i < concurrency_; i++)
46  io_service_pool_.emplace_back(new boost::asio::io_service());
47  get_cached_date_str_pool_.resize(concurrency_);
48  timer_queue_pool_.resize(concurrency_);
49 
50  std::vector<std::future<void>> v;
51  std::atomic<int> init_count(0);
52  for(uint16_t i = 0; i < concurrency_; i ++)
53  v.push_back(
54  std::async(std::launch::async, [this, i, &init_count]{
55 
56  // thread local date string get function
57  auto last = std::chrono::steady_clock::now();
58 
59  std::string date_str;
60  auto update_date_str = [&]
61  {
62  auto last_time_t = time(0);
63  tm my_tm;
64 
65 #ifdef _MSC_VER
66  gmtime_s(&my_tm, &last_time_t);
67 #else
68  gmtime_r(&last_time_t, &my_tm);
69 #endif
70  date_str.resize(100);
71  size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm);
72  date_str.resize(date_str_sz);
73  };
74  update_date_str();
75  get_cached_date_str_pool_[i] = [&]()->std::string
76  {
77  if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
78  {
79  last = std::chrono::steady_clock::now();
80  update_date_str();
81  }
82  return date_str;
83  };
84 
85  // initializing timer queue
86  detail::dumb_timer_queue timer_queue;
87  timer_queue_pool_[i] = &timer_queue;
88 
89  timer_queue.set_io_service(*io_service_pool_[i]);
90  boost::asio::deadline_timer timer(*io_service_pool_[i]);
91  timer.expires_from_now(boost::posix_time::seconds(1));
92 
93  std::function<void(const boost::system::error_code& ec)> handler;
94  handler = [&](const boost::system::error_code& ec){
95  if (ec)
96  return;
97  timer_queue.process();
98  timer.expires_from_now(boost::posix_time::seconds(1));
99  timer.async_wait(handler);
100  };
101  timer.async_wait(handler);
102 
103  init_count ++;
104  try
105  {
106  io_service_pool_[i]->run();
107  } catch(std::exception& e)
108  {
109  CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what();
110  }
111  }));
112  CROW_LOG_INFO << server_name_ << " server is running, local port " << port_;
113 
114  signals_.async_wait(
115  [&](const boost::system::error_code& /*error*/, int /*signal_number*/){
116  stop();
117  });
118 
119  while(concurrency_ != init_count)
120  std::this_thread::yield();
121 
122  do_accept();
123 
124  std::thread([this]{
125  io_service_.run();
126  CROW_LOG_INFO << "Exiting.";
127  }).join();
128  }
129 
130  void stop()
131  {
132  io_service_.stop();
133  for(auto& io_service:io_service_pool_)
134  io_service->stop();
135  }
136 
137  private:
138  asio::io_service& pick_io_service()
139  {
140  // TODO load balancing
141  roundrobin_index_++;
142  if (roundrobin_index_ >= io_service_pool_.size())
143  roundrobin_index_ = 0;
144  return *io_service_pool_[roundrobin_index_];
145  }
146 
147  void do_accept()
148  {
149  asio::io_service& is = pick_io_service();
150  auto p = new Connection<Adaptor, Handler, Middlewares...>(
151  is, handler_, server_name_, middlewares_,
152  get_cached_date_str_pool_[roundrobin_index_], *timer_queue_pool_[roundrobin_index_],
153  adaptor_ctx_);
154  acceptor_.async_accept(p->socket(),
155  [this, p, &is](boost::system::error_code ec)
156  {
157  if (!ec)
158  {
159  is.post([p]
160  {
161  p->start();
162  });
163  }
164  do_accept();
165  });
166  }
167 
168  private:
169  asio::io_service io_service_;
170  std::vector<std::unique_ptr<asio::io_service>> io_service_pool_;
171  std::vector<detail::dumb_timer_queue*> timer_queue_pool_;
172  std::vector<std::function<std::string()>> get_cached_date_str_pool_;
173  tcp::acceptor acceptor_;
174  boost::asio::signal_set signals_;
175 
176  Handler* handler_;
177  uint16_t concurrency_{1};
178  std::string server_name_ = "Crow/0.1";
179  uint16_t port_;
180  std::string bindaddr_;
181  unsigned int roundrobin_index_{};
182 
183  std::tuple<Middlewares...>* middlewares_;
184 
185 #ifdef CROW_ENABLE_SSL
186  bool use_ssl_{false};
187  boost::asio::ssl::context ssl_context_{boost::asio::ssl::context::sslv23};
188 #endif
190  };
191 }
tcp::acceptor acceptor_
Definition: http_server.h:173
boost::asio::signal_set signals_
Definition: http_server.h:174
Definition: unordered_containers_boost_serialization.h:38
void process()
Definition: dumb_timer_queue.h:42
Definition: dumb_timer_queue.h:16
std::vector< std::function< std::string()> > get_cached_date_str_pool_
Definition: http_server.h:172
void do_accept()
Definition: http_server.h:147
std::tuple< Middlewares... > * middlewares_
Definition: http_server.h:183
std::vector< std::unique_ptr< asio::io_service > > io_service_pool_
Definition: http_server.h:170
Definition: http_connection.h:182
asio::io_service io_service_
Definition: http_server.h:169
ArgJoin< char, It > join(It first, It last, const BasicCStringRef< char > &sep)
Definition: format.h:3893
void run()
Definition: http_server.h:40
#define CROW_LOG_ERROR
Definition: logging.h:130
Definition: http_server.h:25
std::vector< detail::dumb_timer_queue * > timer_queue_pool_
Definition: http_server.h:171
uint16_t port_
Definition: http_server.h:179
Handler * handler_
Definition: http_server.h:176
std::string bindaddr_
Definition: http_server.h:180
asio::io_service & pick_io_service()
Definition: http_server.h:138
Definition: ci_map.h:7
asio::ip::tcp tcp
Definition: http_connection.h:23
void set_io_service(boost::asio::io_service &io_service)
Definition: dumb_timer_queue.h:64
Adaptor::context * adaptor_ctx_
Definition: http_server.h:189
Server(Handler *handler, std::string bindaddr, uint16_t port, std::tuple< Middlewares... > *middlewares=nullptr, uint16_t concurrency=1, typename Adaptor::context *adaptor_ctx=nullptr)
Definition: http_server.h:28
void stop()
Definition: http_server.h:130
#define CROW_LOG_INFO
Definition: logging.h:136
json::wvalue context
Definition: mustache.h:12