libzypp  17.22.0
networkrequestdispatcher.cc
Go to the documentation of this file.
4 #include <zypp/zyppng/base/Timer>
5 #include <zypp/zyppng/base/SocketNotifier>
6 #include <zypp/zyppng/base/EventDispatcher>
9 #include <assert.h>
10 
11 #include <zypp/base/Logger.h>
12 #include <zypp/base/String.h>
13 
14 using namespace boost;
15 
16 namespace zyppng {
17 
18 NetworkRequestDispatcherPrivate::NetworkRequestDispatcherPrivate( )
19  : _timer( Timer::create() )
20  , _multi ( curl_multi_init() )
21 {
23 
24  curl_multi_setopt( _multi, CURLMOPT_TIMERFUNCTION, NetworkRequestDispatcherPrivate::multi_timer_cb );
25  curl_multi_setopt( _multi, CURLMOPT_TIMERDATA, reinterpret_cast<void *>( this ) );
26  curl_multi_setopt( _multi, CURLMOPT_SOCKETFUNCTION, NetworkRequestDispatcherPrivate::static_socket_callback );
27  curl_multi_setopt( _multi, CURLMOPT_SOCKETDATA, reinterpret_cast<void *>( this ) );
28 
29  _timer->sigExpired().connect( sigc::mem_fun( *this, &NetworkRequestDispatcherPrivate::multiTimerTimout ) );
30 }
31 
33 {
35  curl_multi_cleanup( _multi );
36 }
37 
38 //called by curl to setup a timer
39 int NetworkRequestDispatcherPrivate::multi_timer_cb( CURLM *, long timeout_ms, void *thatPtr )
40 {
41  NetworkRequestDispatcherPrivate *that = reinterpret_cast<NetworkRequestDispatcherPrivate *>( thatPtr );
42  assert( that != nullptr );
43 
44  if ( timeout_ms >= 0 ) {
45  that->_timer->start( static_cast<uint64_t>(timeout_ms) );
46  } else {
47  //cancel the timer
48  that->_timer->stop();
49  }
50  return 0;
51 }
52 
54 {
55  handleMultiSocketAction( CURL_SOCKET_TIMEOUT, 0 );
56 }
57 
58 int NetworkRequestDispatcherPrivate::static_socket_callback(CURL * easy, curl_socket_t s, int what, void *userp, SocketNotifier *socketp )
59 {
60  NetworkRequestDispatcherPrivate *that = reinterpret_cast<NetworkRequestDispatcherPrivate *>( userp );
61  assert( that != nullptr );
62  return that->socketCallback( easy, s, what, socketp );
63 }
64 
65 int NetworkRequestDispatcherPrivate::socketCallback(CURL *easy, curl_socket_t s, int what, void * )
66 {
67  std::shared_ptr<SocketNotifier> socketp;
68 
69  if ( _socketHandler.count( s ) == 0 ) {
70  if ( what == CURL_POLL_REMOVE || what == CURL_POLL_NONE )
71  return 0;
72 
73  socketp = SocketNotifier::create( s, SocketNotifier::Read, false );
74  _socketHandler.insert( std::make_pair( s, socketp ) );
75 
76  socketp->sigActivated().connect( sigc::mem_fun(*this, &NetworkRequestDispatcherPrivate::onSocketActivated) );
77  } else {
78  socketp = _socketHandler[s];
79  }
80 
81  //should never happen
82  if ( !socketp ) {
83  if ( what == CURL_POLL_REMOVE || what == CURL_POLL_NONE )
84  return 0;
85 
86  if ( _socketHandler.count( s ) > 0 )
87  _socketHandler.erase( s );
88 
89  void *privatePtr = nullptr;
90  if ( curl_easy_getinfo( easy, CURLINFO_PRIVATE, &privatePtr ) != CURLE_OK ) {
91  privatePtr = nullptr; //make sure this was not filled with bad info
92  }
93 
94  if ( privatePtr ) {
95  NetworkRequestPrivate *request = reinterpret_cast<NetworkRequestPrivate *>( privatePtr );
96  //we stop the download, if we can not listen for socket changes we can not correctly do anything
97  setFinished( *request->z_func(), NetworkRequestErrorPrivate::customError( NetworkRequestError::InternalError, "Unable to assign socket listener." ) );
98  return 0;
99  } else {
100  //a broken handle without anything assigned, also should never happen but make sure and clean it up
101  DBG << "Cleaning up unassigned easy handle" << std::endl;
102  curl_multi_remove_handle( _multi, easy );
103  curl_easy_cleanup( easy );
104  return 0;
105  }
106  }
107 
108  //remove the socket
109  if ( what == CURL_POLL_REMOVE ) {
110  socketp->setEnabled( false );
111  _socketHandler.erase( s );
112 
113  //keep the reference until this iteration is over
114  EventDispatcher::unrefLater( socketp );
115  return 0;
116  }
117 
118  if ( what == CURL_POLL_IN ) {
119  socketp->setMode( SocketNotifier::Read );
120  } else if ( what == CURL_POLL_OUT ) {
121  socketp->setMode( SocketNotifier::Write );
122  } else if ( what == CURL_POLL_INOUT ) {
123  socketp->setMode( SocketNotifier::Read | SocketNotifier::Write );
124  }
125 
126  socketp->setEnabled();
127  return 0;
128 }
129 
131 {
132  int evBitmask = 0;
133  if ( (events & SocketNotifier::Read) == SocketNotifier::Read )
134  evBitmask |= CURL_CSELECT_IN;
135  if ( (events & SocketNotifier::Write) == SocketNotifier::Write )
136  evBitmask |= CURL_CSELECT_OUT;
137  if ( (events & SocketNotifier::Error) == SocketNotifier::Error )
138  evBitmask |= CURL_CSELECT_ERR;
139 
140  handleMultiSocketAction( listener.socket(), evBitmask );
141 }
142 
143 void NetworkRequestDispatcherPrivate::handleMultiSocketAction(curl_socket_t nativeSocket, int evBitmask)
144 {
145  int running = 0;
146  CURLMcode rc = curl_multi_socket_action( _multi, nativeSocket, evBitmask, &running );
147  if (rc != 0) {
148  //we can not recover from a error like that, cancel all and stop
150  cancelAll( err );
151  //emit error
152  _lastError = err;
153  _sigError.emit( *z_func() );
154  return;
155  }
156 
157  int msgs_left = 0;
158  CURLMsg *msg = nullptr;
159  while( (msg = curl_multi_info_read( _multi, &msgs_left )) ) {
160  if(msg->msg == CURLMSG_DONE) {
161  CURL *easy = msg->easy_handle;
162  CURLcode res = msg->data.result;
163 
164  void *privatePtr = nullptr;
165  if ( curl_easy_getinfo( easy, CURLINFO_PRIVATE, &privatePtr ) != CURLE_OK )
166  continue;
167 
168  if ( !privatePtr ) {
169  //broken easy handle not associated, should never happen but clean it up
170  DBG << "Cleaning up unassigned easy handle" << std::endl;
171  curl_multi_remove_handle( _multi, easy );
172  curl_easy_cleanup( easy );
173  continue;
174  }
175 
176  NetworkRequestPrivate *request = reinterpret_cast<NetworkRequestPrivate *>( privatePtr );
177 
178  //trigger notification about file downloaded
179  NetworkRequestError e = NetworkRequestErrorPrivate::fromCurlError( *request->z_func(), res, request->_errorBuf.data() );
180  setFinished( *request->z_func(), e );
181 
182  //attention request could be deleted from here on
183  }
184  }
185 }
186 
188 {
189  //prevent dequeuePending from filling up the runningDownloads again
190  _locked = true;
191 
192  while ( _runningDownloads.size() ) {
193  std::shared_ptr<NetworkRequest> &req = _runningDownloads.back();
194  setFinished(*req, result );
195  }
196  while ( _pendingDownloads.size() ) {
197  std::shared_ptr<NetworkRequest> &req = _pendingDownloads.back();
198  setFinished(*req, result );
199  }
200 
201  _locked = false;
202 }
203 
205 {
206  auto delReq = []( auto &list, NetworkRequest &req ) {
207  auto it = std::find_if( list.begin(), list.end(), [ &req ]( const std::shared_ptr<NetworkRequest> &r ) {
208  return req.d_func() == r->d_func();
209  } );
210  if ( it != list.end() ) {
212  list.erase( it );
213  }
214  };
215 
216  delReq( _runningDownloads, req );
217  delReq( _pendingDownloads, req );
218 
219  void *easyHandle = req.d_func()->_easyHandle;
220  if ( easyHandle ) {
221  curl_multi_remove_handle( _multi, easyHandle );
222  }
223 
224  req.d_func()->_dispatcher = nullptr;
225 
226  //first set the result, the Request might have a checksum to check as well so a currently
227  //successful request could fail later on
228  req.d_func()->setResult( std::move(result) );
229  _sigDownloadFinished.emit( *z_func(), req );
230 
231  //we got a open slot, try to dequeue or send the finished signals if all queues are empty
232  dequeuePending();
233 }
234 
236 {
237  if ( !_isRunning || _locked )
238  return;
239 
240  while ( _maxConnections > _runningDownloads.size() ) {
241  if ( !_pendingDownloads.size() )
242  break;
243 
244  std::shared_ptr<NetworkRequest> req = std::move( _pendingDownloads.front() );
245  _pendingDownloads.pop_front();
246 
247  std::string errBuf = "Failed to initialize easy handle";
248  if ( !req->d_func()->initialize( errBuf ) ) {
249  //@TODO store the CURL error in the errors extra info
252  continue;
253  }
254 
255  CURLMcode rc = curl_multi_add_handle( _multi, req->d_func()->_easyHandle );
256  if ( rc != 0 ) {
259  continue;
260  }
261 
262  req->d_func()->aboutToStart();
263  _sigDownloadStarted.emit( *z_func(), *req );
264 
265  _runningDownloads.push_back( std::move(req) );
266  }
267 
268  //check for empty queues
269  if ( _pendingDownloads.size() == 0 && _runningDownloads.size() == 0 ) {
270  //once we finished all requests, cancel the timer too, so curl is not called without requests
271  _timer->stop();
272  _sigQueueFinished.emit( *z_func() );
273  }
274 }
275 
276 NetworkRequestDispatcher::NetworkRequestDispatcher( )
278 {
279 
280 }
281 
282 bool NetworkRequestDispatcher::supportsProtocol( const Url &url )
283 {
284  curl_version_info_data *curl_info = nullptr;
285  curl_info = curl_version_info(CURLVERSION_NOW);
286  // curl_info does not need any free (is static)
287  if (curl_info->protocols)
288  {
289  const char * const *proto;
290  std::string scheme( url.getScheme() );
291  bool found = false;
292  for(proto=curl_info->protocols; !found && *proto; ++proto) {
293  if( scheme == std::string((const char *)*proto))
294  found = true;
295  }
296  return found;
297  }
298  return true;
299 }
300 
301 void NetworkRequestDispatcher::setMaximumConcurrentConnections( size_t maxConn )
302 {
303  d_func()->_maxConnections = maxConn;
304 }
305 
306 void NetworkRequestDispatcher::enqueue(const std::shared_ptr<NetworkRequest> &req )
307 {
308  if ( !req )
309  return;
310  Z_D();
311 
312  if ( std::find( d->_runningDownloads.begin(), d->_runningDownloads.end(), req ) != d->_runningDownloads.end() ) {
313  WAR << "Ignoring request to enqueue download " << req->url().asString() << " request is already running " << std::endl;
314  return;
315  }
316 
317  if ( std::find( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), req ) != d->_pendingDownloads.end() ) {
318  WAR << "Ignoring request to enqueue download " << req->url().asString() << " request is already enqueued " << std::endl;
319  return;
320  }
321 
322  req->d_func()->_dispatcher = this;
323  if ( req->priority() == NetworkRequest::Normal )
324  d->_pendingDownloads.push_back( req );
325  else {
326  auto it = std::find_if( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), []( const auto &req ){
327  return req->priority() == NetworkRequest::Normal;
328  });
329 
330  //if we have a valid iterator, decrement we found a Normal pending download request, insert before that
331  if ( it != d->_pendingDownloads.end() && it != d->_pendingDownloads.begin() )
332  it--;
333  d->_pendingDownloads.insert( it, req );
334  }
335 
336  //dequeue if running and we have capacity
337  d->dequeuePending();
338 }
339 
340 void NetworkRequestDispatcher::cancel( NetworkRequest &req, std::string reason )
341 {
342  cancel( req, NetworkRequestErrorPrivate::customError( NetworkRequestError::Cancelled, reason.size() ? std::move(reason) : "Request explicitely cancelled" ) );
343 }
344 
345 void NetworkRequestDispatcher::cancel(NetworkRequest &req, const NetworkRequestError &err)
346 {
347  Z_D();
348 
349  if ( req.d_func()->_dispatcher != this ) {
350  //TODO throw exception
351  return;
352  }
353 
354  d->setFinished( req, err );
355 }
356 
357 void NetworkRequestDispatcher::run()
358 {
359  Z_D();
360  d->_isRunning = true;
361 
362  if ( d->_pendingDownloads.size() )
363  d->dequeuePending();
364 }
365 
366 const zyppng::NetworkRequestError &NetworkRequestDispatcher::lastError() const
367 {
368  return d_func()->_lastError;
369 }
370 
371 SignalProxy<void (NetworkRequestDispatcher &, NetworkRequest &)> NetworkRequestDispatcher::sigDownloadStarted()
372 {
373  return d_func()->_sigDownloadStarted;
374 }
375 
376 SignalProxy<void (NetworkRequestDispatcher &, NetworkRequest &)> NetworkRequestDispatcher::sigDownloadFinished()
377 {
378  return d_func()->_sigDownloadFinished;
379 }
380 
381 SignalProxy<void ( NetworkRequestDispatcher &)> NetworkRequestDispatcher::sigQueueFinished()
382 {
383  return d_func()->_sigQueueFinished;
384 }
385 
386 SignalProxy<void ( NetworkRequestDispatcher &)> NetworkRequestDispatcher::sigError()
387 {
388  return d_func()->_sigError;
389 }
390 
391 }
std::string getScheme() const
Returns the scheme name of the URL.
Definition: Url.cc:528
void globalInitCurlOnce()
Definition: CurlHelper.cc:18
static void unrefLater(T &&ptr)
static int multi_timer_cb(CURLM *multi, long timeout_ms, void *g)
std::array< char, CURL_ERROR_SIZE+1 > _errorBuf
Definition: request_p.h:60
void cancelAll(NetworkRequestError result)
Boost libraries.
int socketCallback(CURL *easy, curl_socket_t s, int what, void *)
static zyppng::NetworkRequestError fromCurlMError(int nativeCode)
Url url
Definition: MediaCurl.cc:65
static Ptr create(int socket, int evTypes, bool enable=true)
signal< void(NetworkRequestDispatcher &, NetworkRequest &)> _sigDownloadFinished
signal< void(NetworkRequestDispatcher &)> _sigQueueFinished
#define Z_D()
Definition: zyppglobal.h:44
std::map< curl_socket_t, std::shared_ptr< SocketNotifier > > _socketHandler
static int static_socket_callback(CURL *easy, curl_socket_t s, int what, void *userp, SocketNotifier *socketp)
std::deque< std::shared_ptr< NetworkRequest > > _pendingDownloads
#define WAR
Definition: Logger.h:80
void handleMultiSocketAction(curl_socket_t nativeSocket, int evBitmask)
The NetworkRequestError class Represents a error that occured in.
The Timer class provides repetitive and single-shot timers.
Definition: timer.h:42
signal< void(NetworkRequestDispatcher &)> _sigError
signal< void(NetworkRequestDispatcher &, NetworkRequest &)> _sigDownloadStarted
static zyppng::NetworkRequestError fromCurlError(NetworkRequest &req, int nativeCode, const char *errBuf)
zypp::Url Url
Definition: url.h:15
void setFinished(NetworkRequest &req, NetworkRequestError result)
std::vector< std::shared_ptr< NetworkRequest > > _runningDownloads
void onSocketActivated(const SocketNotifier &listener, int events)
static zyppng::NetworkRequestError customError(NetworkRequestError::Type t, std::string &&errorMsg="", std::map< std::string, boost::any > &&extraInfo={})
Convenience interface for handling authentication data of media user.
#define DBG
Definition: Logger.h:78