Here we'll make a FastCGI application that counts out five seconds in the clients web browser. To do this properly we'll need to use the asynchronous nature of fastcgi++'s request handling. This example can act as a good starting point to how a request might give up processing time while waiting for a database query to complete. Concepts covered include:
- Pausing requests while waiting for callbacks.
- Flushing the output stream buffer to force a partial HTTP response.
- Defining the number of concurrent request handling threads.
You can build it with
make timer.fcgi
In order for this example to work properly you'll need to tell your web server to not buffer FastCGI data before sending if off to the client. Ultimately fastcgi++ does a good job buffering data itself so I tend to keep web server side buffering disabled at all times. If you're using mod_fastcgi with apache this means adding
FastCgiConfig -flush
to your server's configuration whereas with mod_fcgi use
FcgidOutputBufferSize 0
Walkthrough
First we'll define our request class.
#include <thread>
#include <condition_variable>
{
public:
Timer():
m_time(0),
m_startTime(
std::chrono::steady_clock::now())
{}
Now we'll make a simple stopwatch. I won't go into detail explaining how this works since it's not relevant to fastcgi++. Really it just runs in it's own thread doing callbacks at set times.
static void startStopwatch()
{
s_stopwatch.start();
}
static void stopStopwatch()
{
s_stopwatch.stop();
}
private:
class Stopwatch
{
private:
std::thread m_thread;
std::condition_variable m_cv;
bool m_kill;
struct Item
{
std::function<void(Fastcgipp::Message)> callback;
std::chrono::time_point<std::chrono::steady_clock> wakeup;
Item(
const std::chrono::time_point<std::chrono::steady_clock>&
wakeup_):
callback(callback_),
message(
std::move(message_)),
wakeup(wakeup_)
{}
bool operator<(const Item& item) const
{
return wakeup < item.wakeup;
}
bool operator==(const Item& item) const
{
return wakeup == item.wakeup;
}
};
std::set<Item> m_queue;
void handler()
{
std::unique_lock<std::mutex> lock(m_mutex);
while(!m_kill)
{
if(m_queue.empty())
m_cv.wait(lock);
else
{
const Item& item = *m_queue.begin();
if(item.wakeup <= std::chrono::steady_clock::now())
{
item.callback(std::move(item.message));
m_queue.erase(m_queue.begin());
}
else
m_cv.wait_until(lock, item.wakeup);
}
}
}
public:
void push(
std::chrono::time_point<std::chrono::steady_clock> wakeup)
{
std::lock_guard<std::mutex> lock(m_mutex);
m_queue.emplace_hint(
m_queue.end(),
callback,
std::move(message),
wakeup);
m_cv.notify_one();
}
void start()
{
std::lock_guard<std::mutex> lock(m_mutex);
if(!m_thread.joinable())
{
m_kill = false;
std::thread thread(std::bind(&Stopwatch::handler, this));
m_thread.swap(thread);
}
}
void stop()
{
std::unique_lock<std::mutex> lock(m_mutex);
if(m_thread.joinable())
{
m_kill = true;
m_cv.notify_one();
lock.unlock();
m_thread.join();
}
}
};
static Stopwatch s_stopwatch;
Since our response function will be called multiple times per request we'll need some members to keep track of our count.
unsigned m_time;
const std::chrono::time_point<std::chrono::steady_clock> m_startTime;
And here we have our response function. For each request this function should be called 6 times. On the first call m_time is 0 and we output our header stuff. This time, and 4 subsequent times we will return false indicating that the request is not yet complete and we are waiting on a callback. Notice that each time we return false we first call out.flush() forcing the request to empty it's buffer and send to the web server. On the final call (m_time==5) we output our footer and return true indicating that the request is now complete.
bool response()
{
if(m_time < 5)
{
if(m_time == 0)
out <<
"Content-Type: text/html; charset=iso-8859-1\r\n\r\n"
"<!DOCTYPE html>\n"
"<html lang='en'>"
"<head>"
"<meta charset='iso-8859-1' />"
"<title>fastcgi++: Timer</title>"
"</head>"
"<body>"
"<p>";
out << m_time++ << "...";
out.flush();
static const char messageText[] = "I was passed between threads!!";
message.
data.
assign(messageText,
sizeof(messageText)-1);
s_stopwatch.push(
callback(),
std::move(message),
m_startTime + std::chrono::seconds(m_time));
return false;
}
else
{
out << "5</p>"
"</body>"
"</html>";
return true;
}
}
};
Timer::Stopwatch Timer::s_stopwatch;
int main()
{
Timer::startStopwatch();
Notice this time around we're calling our manager constructor with an argument. This is how we define how many concurrent request handling threads we want. The default for this is the maximum hardware concurrency level but this time we'll bring it down by half since our requests aren't exactly doing much heavy lifting.
std::max(1u, unsigned(std::thread::hardware_concurrency()/2)));
Timer::stopStopwatch();
return 0;
}
Full Source Code
#include <thread>
#include <condition_variable>
{
public:
Timer():
m_time(0),
m_startTime(
std::chrono::steady_clock::now())
{}
static void startStopwatch()
{
s_stopwatch.start();
}
static void stopStopwatch()
{
s_stopwatch.stop();
}
private:
class Stopwatch
{
private:
std::thread m_thread;
std::condition_variable m_cv;
bool m_kill;
struct Item
{
std::function<void(Fastcgipp::Message)> callback;
std::chrono::time_point<std::chrono::steady_clock> wakeup;
Item(
const std::chrono::time_point<std::chrono::steady_clock>&
wakeup_):
callback(callback_),
message(
std::move(message_)),
wakeup(wakeup_)
{}
bool operator<(const Item& item) const
{
return wakeup < item.wakeup;
}
bool operator==(const Item& item) const
{
return wakeup == item.wakeup;
}
};
std::set<Item> m_queue;
void handler()
{
std::unique_lock<std::mutex> lock(m_mutex);
while(!m_kill)
{
if(m_queue.empty())
m_cv.wait(lock);
else
{
const Item& item = *m_queue.begin();
if(item.wakeup <= std::chrono::steady_clock::now())
{
item.callback(std::move(item.message));
m_queue.erase(m_queue.begin());
}
else
m_cv.wait_until(lock, item.wakeup);
}
}
}
public:
void push(
std::chrono::time_point<std::chrono::steady_clock> wakeup)
{
std::lock_guard<std::mutex> lock(m_mutex);
m_queue.emplace_hint(
m_queue.end(),
callback,
std::move(message),
wakeup);
m_cv.notify_one();
}
void start()
{
std::lock_guard<std::mutex> lock(m_mutex);
if(!m_thread.joinable())
{
m_kill = false;
std::thread thread(std::bind(&Stopwatch::handler, this));
m_thread.swap(thread);
}
}
void stop()
{
std::unique_lock<std::mutex> lock(m_mutex);
if(m_thread.joinable())
{
m_kill = true;
m_cv.notify_one();
lock.unlock();
m_thread.join();
}
}
};
static Stopwatch s_stopwatch;
unsigned m_time;
const std::chrono::time_point<std::chrono::steady_clock> m_startTime;
bool response()
{
if(m_time < 5)
{
if(m_time == 0)
out <<
"Content-Type: text/html; charset=iso-8859-1\r\n\r\n"
"<!DOCTYPE html>\n"
"<html lang='en'>"
"<head>"
"<meta charset='iso-8859-1' />"
"<title>fastcgi++: Timer</title>"
"</head>"
"<body>"
"<p>";
out << m_time++ << "...";
out.flush();
static const char messageText[] = "I was passed between threads!!";
message.
data.
assign(messageText,
sizeof(messageText)-1);
s_stopwatch.push(
callback(),
std::move(message),
m_startTime + std::chrono::seconds(m_time));
return false;
}
else
{
out << "5</p>"
"</body>"
"</html>";
return true;
}
}
};
Timer::Stopwatch Timer::s_stopwatch;
int main()
{
Timer::startStopwatch();
std::max(1u, unsigned(std::thread::hardware_concurrency()/2)));
Timer::stopStopwatch();
return 0;
}