fastcgi++
A C++ FastCGI/Web API
transceiver.cpp
Go to the documentation of this file.
1 
10 /*******************************************************************************
11 * Copyright (C) 2017 Eddie Carle [eddie@isatec.ca] *
12 * *
13 * This file is part of fastcgi++. *
14 * *
15 * fastcgi++ is free software: you can redistribute it and/or modify it under *
16 * the terms of the GNU Lesser General Public License as published by the Free *
17 * Software Foundation, either version 3 of the License, or (at your option) *
18 * any later version. *
19 * *
20 * fastcgi++ is distributed in the hope that it will be useful, but WITHOUT ANY *
21 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS *
22 * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for *
23 * more details. *
24 * *
25 * You should have received a copy of the GNU Lesser General Public License *
26 * along with fastcgi++. If not, see <http://www.gnu.org/licenses/>. *
27 *******************************************************************************/
28 
30 
31 #include "fastcgi++/log.hpp"
33 {
34  std::unique_ptr<Record> record;
35 
36  while(!m_sendBuffer.empty())
37  {
38  {
39  std::lock_guard<std::mutex> lock(m_sendBufferMutex);
40  record = std::move(m_sendBuffer.front());
41  m_sendBuffer.pop_front();
42  }
43 
44  const ssize_t sent = record->socket.write(
45  record->read,
46  record->data.end()-record->read);
47  if(sent>=0)
48  {
49  record->read += sent;
50  if(record->read != record->data.end())
51  {
52  {
53  std::lock_guard<std::mutex> lock(m_sendBufferMutex);
54  m_sendBuffer.push_front(std::move(record));
55  }
56  return false;
57  }
58 #if FASTCGIPP_LOG_LEVEL > 3
59  ++m_recordsSent;
60 #endif
61  if(record->kill)
62  {
63  record->socket.close();
64  m_receiveBuffers.erase(record->socket);
65 #if FASTCGIPP_LOG_LEVEL > 3
66  ++m_connectionKillCount;
67 #endif
68  }
69  }
70  }
71 
72  return true;
73 }
74 
76 {
77  bool flushed=false;
78  Socket socket;
79 
80  while(!m_terminate && !(m_stop && m_sockets.size()==0))
81  {
82  socket = m_sockets.poll(flushed);
83  receive(socket);
84  flushed = transmit();
85  }
86 }
87 
89 {
90  m_stop=true;
91  m_sockets.accept(false);
92 }
93 
95 {
96  m_terminate=true;
97  m_sockets.wake();
98 }
99 
101 {
102  m_stop=false;
103  m_terminate=false;
104  m_sockets.accept(true);
105  if(!m_thread.joinable())
106  {
107  std::thread thread(&Fastcgipp::Transceiver::handler, this);
108  m_thread.swap(thread);
109  }
110 }
111 
113 {
114  if(m_thread.joinable())
115  {
116  m_thread.join();
117  }
118 }
119 
121  const std::function<void(Protocol::RequestId, Message&&)> sendMessage):
122  m_sendMessage(sendMessage)
123 #if FASTCGIPP_LOG_LEVEL > 3
124  ,m_connectionKillCount(0),
125  m_connectionRDHupCount(0),
126  m_recordsSent(0),
127  m_recordsQueued(0),
128  m_recordsReceived(0)
129 #endif
130 {
131  DIAG_LOG("Transceiver::Transciever(): Initialized")
132 }
133 
135 {
136  if(socket.valid())
137  {
138  Block& buffer=m_receiveBuffers[socket];
139 
140  // Are we receiving a header?
141  if(buffer.size() < sizeof(Protocol::Header))
142  {
143  buffer.reserve(sizeof(Protocol::Header));
144 
145  const ssize_t read = socket.read(
146  buffer.begin()+buffer.size(),
147  buffer.reserve()-buffer.size());
148  if(read<0)
149  {
150  cleanupSocket(socket);
151  return;
152  }
153  buffer.size(buffer.size() + read);
154  if(buffer.size() < sizeof(Protocol::Header))
155  return;
156  }
157 
158  if(buffer.size() == sizeof(Protocol::Header))
159  buffer.reserve(sizeof(Protocol::Header)
160  +reinterpret_cast<Protocol::Header*>(
161  buffer.begin())->contentLength
162  +reinterpret_cast<Protocol::Header*>(
163  buffer.begin())->paddingLength);
164 
166  = *reinterpret_cast<Protocol::Header*>(buffer.begin());
167 
168  const ssize_t read = socket.read(
169  buffer.begin()+buffer.size(),
170  buffer.reserve()-buffer.size());
171 
172  if(read<0)
173  {
174  cleanupSocket(socket);
175  return;
176  }
177  buffer.size(buffer.size() + read);
178  if(buffer.size() < buffer.reserve())
179  return;
180 
181  Message message;
182  message.data = std::move(buffer);
183 
184  m_sendMessage(
185  Protocol::RequestId(header.fcgiId, socket),
186  std::move(message));
187 #if FASTCGIPP_LOG_LEVEL > 3
188  ++m_recordsReceived;
189 #endif
190  }
191 }
192 
194 {
195  m_receiveBuffers.erase(socket);
196  m_sendMessage(
198  Message());
199  socket.close();
200 #if FASTCGIPP_LOG_LEVEL > 3
201  ++m_connectionRDHupCount;
202 #endif
203 }
204 
206  const Socket& socket,
207  Block&& data,
208  bool kill)
209 {
210  std::unique_ptr<Record> record(new Record(
211  socket,
212  std::move(data),
213  kill));
214  {
215  std::lock_guard<std::mutex> lock(m_sendBufferMutex);
216  m_sendBuffer.push_back(std::move(record));
217  }
218  m_sockets.wake();
219 #if FASTCGIPP_LOG_LEVEL > 3
220  ++m_recordsQueued;
221 #endif
222 }
223 
225 {
226  terminate();
227  DIAG_LOG("Transceiver::~Transceiver(): Locally closed sockets ==== " \
228  << m_connectionKillCount)
229  DIAG_LOG("Transceiver::~Transceiver(): Remotely closed sockets === " \
230  << m_connectionRDHupCount)
231  DIAG_LOG("Transceiver::~Transceiver(): Remaining receive buffers = " \
232  << m_receiveBuffers.size())
233  DIAG_LOG("Transceiver::~Transceiver(): Records queued === " \
234  << m_recordsQueued)
235  DIAG_LOG("Transceiver::~Transceiver(): Records sent ===== " \
236  << m_recordsSent)
237  DIAG_LOG("Transceiver::~Transceiver(): Records received = " \
238  << m_recordsReceived)
239 }
void send(const Socket &socket, Block &&data, bool kill)
Queue up a block of data for transmission.
void close() const
Call this to close the socket.
Definition: sockets.cpp:122
std::deque< std::unique_ptr< Record > > m_sendBuffer
Buffer for transmitting data
void handler()
General transceiver handler.
Definition: transceiver.cpp:75
void header(Level level)
Send a log header to logstream.
Definition: log.cpp:107
void join()
Block until a stop() or terminate() is called and completed.
void cleanupSocket(const Socket &socket)
Cleanup a dead socket.
Data structure used to pass messages to requests.
Definition: message.hpp:46
Data structure used as the header for FastCGI records.
Definition: protocol.hpp:292
Transceiver(const std::function< void(Protocol::RequestId, Message &&)> sendMessage)
Constructor.
size_t size() const
See the relevant data size.
Definition: block.hpp:94
bool valid() const
Returns true if this socket is still open and capable of read/write.
Definition: sockets.hpp:252
#define FASTCGIPP_LOG_LEVEL
Definition: config.hpp:7
const uint16_t badFcgiId
Constant that defines a bad/special FcgiId.
Definition: protocol.hpp:60
void terminate()
Call from any thread to terminate the handler() thread.
Definition: transceiver.cpp:94
Simple FastCGI record to queue up for transmission.
void start()
Call from any thread to start the handler() thread.
ssize_t read(char *buffer, size_t size) const
Try and read a chunk of data out of the socket.
Definition: sockets.cpp:68
std::map< Socket, Block > m_receiveBuffers
Container associating sockets with their receive buffers.
Class for representing an OS level I/O socket.
Definition: sockets.hpp:83
void receive(Socket &socket)
Receive data on the specified socket.
A unique identifier for each FastCGI request.
Definition: protocol.hpp:71
Block data
The raw data being passed along with the message.
Definition: message.hpp:75
ssize_t write(const char *buffer, size_t size) const
Try and write a chunk of data into the socket.
Definition: sockets.cpp:99
void stop()
Call from any thread to stop the handler() thread.
Definition: transceiver.cpp:88
bool transmit()
Transmit all buffered data possible.
Definition: transceiver.cpp:32
#define DIAG_LOG(data)
Definition: log.hpp:158
Declares the Fastcgipp::Transceiver class.
size_t reserve() const
See the reserve size.
Definition: block.hpp:80
char * end()
Pointer to 1+ the last element.
Definition: block.hpp:115
Declares the Fastcgipp debugging/logging facilities.
std::mutex m_sendBufferMutex
Thread safe the send buffer.
Data structure to hold a block of raw data.
Definition: block.hpp:44
char * begin()
Pointer to the first element.
Definition: block.hpp:103