libzypp  17.22.0
eventdispatcher_glib.cc
Go to the documentation of this file.
1 #include "eventdispatcher.h"
2 #include "timer.h"
4 
5 #include <zypp/base/Exception.h>
6 #include <zypp/base/Logger.h>
7 namespace zyppng {
8 
9 static int inline readMask () {
10  return ( G_IO_IN | G_IO_HUP );
11 }
12 
13 static int inline writeMask () {
14  return ( G_IO_OUT );
15 }
16 
17 static int inline excpMask () {
18  return ( G_IO_PRI );
19 }
20 
21 //returns the thread local dispatcher, we only support one EventDispatcher per thread
23 {
24  static __thread EventDispatcher *threadDispatch = nullptr;
25  if ( set ) {
26  if ( threadDispatch )
27  ZYPP_THROW( zypp::Exception( "EventDispatcher can only be created once per thread" ) );
28  threadDispatch = set;
29  }
30  return &threadDispatch;
31 }
32 
33 static GSourceFuncs abstractEventSourceFuncs = {
37  nullptr,
38  nullptr,
39  nullptr
40 };
41 
43  GAbstractEventSource *src = nullptr;
44  src = reinterpret_cast<GAbstractEventSource *>(g_source_new(&abstractEventSourceFuncs, sizeof(GAbstractEventSource)));
45  (void) new (&src->pollfds) std::vector<GUnixPollFD>();
46 
47  src->eventSource = nullptr;
48  src->_ev = ev;
49  return src;
50 }
51 
53 {
54  for ( GUnixPollFD &fd : src->pollfds ) {
55  if ( fd.tag )
56  g_source_remove_unix_fd( &src->source, fd.tag );
57  }
58 
59  src->pollfds.clear();
60  src->pollfds.std::vector< GUnixPollFD >::~vector();
61  g_source_destroy( &src->source );
62  g_source_unref( &src->source );
63 }
64 
65 gboolean GAbstractEventSource::prepare(GSource *, gint *timeout)
66 {
67  //we can not yet determine if the GSource is ready, polling FDs also have no
68  //timeout, so lets continue
69  if ( timeout )
70  *timeout = -1;
71  return false;
72 }
73 
74 //here we need to figure out which FDs are pending
75 gboolean GAbstractEventSource::check( GSource *source )
76 {
77  GAbstractEventSource *src = reinterpret_cast<GAbstractEventSource*>( source );
78 
79  //check for pending and remove orphaned entries
80  bool hasPending = false;
81 
82  for ( auto fdIt = src->pollfds.begin(); fdIt != src->pollfds.end(); ) {
83  if ( fdIt->tag == nullptr ) {
84  //this pollfd was removed, clear it from the list
85  //for now keep the object in the sources list if the pollfd list gets empty, if it does not register new events until
86  //next check it is removed for good
87  fdIt = src->pollfds.erase( fdIt );
88  } else {
89  GIOCondition pendEvents = g_source_query_unix_fd( source, fdIt->tag );
90  if ( pendEvents & G_IO_NVAL ){
91  //that poll is broken, do we need to do more????
92  fdIt = src->pollfds.erase( fdIt );
93  } else {
94  hasPending = hasPending || ( pendEvents & fdIt->reqEvents );
95  fdIt++;
96  }
97  }
98  }
99 
100  //if the pollfds are empty trigger dispatch so this source can be removed
101  return hasPending || src->pollfds.empty();
102 }
103 
104 //Trigger all event sources that have been activated
105 gboolean GAbstractEventSource::dispatch(GSource *source, GSourceFunc, gpointer)
106 {
107  GAbstractEventSource *src = reinterpret_cast<GAbstractEventSource*>( source );
108 
109  if ( !src )
110  return G_SOURCE_REMOVE;
111 
112  //sources are only removed here so we do not accidentially mess with the pollfd iterator in the next loop
113  //were we trigger all ready FDs
114  if ( src->pollfds.empty() ) {
115  auto it = std::find( src->_ev->_eventSources.begin(), src->_ev->_eventSources.end(), src );
116 
117  if ( it != src->_ev->_eventSources.end() ) {
119  src->_ev->_eventSources.erase( it );
120  return G_SOURCE_REMOVE;
121  }
122  }
123 
124  for ( const GUnixPollFD &pollfd : src->pollfds ) {
125  //do not trigger orphaned ones
126  if ( pollfd.tag != nullptr ) {
127  GIOCondition pendEvents = g_source_query_unix_fd( source, pollfd.tag );
128 
129  if ( (pendEvents & pollfd.reqEvents ) != 0 ) {
130  int ev = 0;
131  if ( ( pendEvents & readMask() ) && ( pollfd.reqEvents & readMask() ) )
133  if ( (pendEvents & writeMask() ) && ( pollfd.reqEvents & writeMask() ) )
134  ev = ev | AbstractEventSource::Write;
135  if ( (pendEvents & excpMask()) && ( pollfd.reqEvents & excpMask() ) )
137  if ( (pendEvents & G_IO_ERR) && ( pollfd.reqEvents & G_IO_ERR ) )
138  ev = ev | AbstractEventSource::Error;
139 
140  src->eventSource->onFdReady( pollfd.pollfd, ev );
141  }
142  }
143  }
144 
145  return G_SOURCE_CONTINUE;
146 }
147 
148 static GSourceFuncs glibTimerSourceFuncs = {
152  nullptr,
153  nullptr,
154  nullptr
155 };
156 
157 //check when this timer expires and set the correct timeout
158 gboolean GLibTimerSource::prepare(GSource *src, gint *timeout)
159 {
160  GLibTimerSource *source = reinterpret_cast<GLibTimerSource *>( src );
161  if ( !source )
162  return false; //not ready for dispatch
163 
164  if ( !source->_t )
165  return false;
166 
167  uint64_t nextTimeout = source->_t->remaining();
168  if ( timeout ) {
169  //this would be a really looong timeout, but be safe
170  if ( nextTimeout > G_MAXINT )
171  *timeout = G_MAXINT;
172  else
173  *timeout = static_cast<gint>( nextTimeout );
174  }
175  return ( nextTimeout == 0 );
176 }
177 
178 //this is essentially the same as prepare
179 gboolean GLibTimerSource::check(GSource *source)
180 {
181  return prepare( source, nullptr );
182 }
183 
184 //emit the expired timers, restart timers that are no single shots
185 gboolean GLibTimerSource::dispatch(GSource *src, GSourceFunc, gpointer)
186 {
187  GLibTimerSource *source = reinterpret_cast<GLibTimerSource *>( src );
188  if ( !source )
189  return true;
190 
191  if ( source->_t == nullptr )
192  return true;
193  //this will emit the expired signal and reset the timer
194  //or stop it in case its a single shot timer
195  source->_t->expire();
196  return true;
197 }
198 
200 {
201  GLibTimerSource *src = nullptr;
202  src = reinterpret_cast<GLibTimerSource *>(g_source_new(&glibTimerSourceFuncs, sizeof(GLibTimerSource)));
203  src->_t = nullptr;
204  return src;
205 }
206 
208 {
209  g_source_destroy( &src->source );
210  g_source_unref( &src->source );
211 }
212 
216 static gboolean eventLoopIdleFunc ( gpointer user_data )
217 {
218  auto dPtr = reinterpret_cast<EventDispatcherPrivate *>( user_data );
219  if ( dPtr ) {
220  if( dPtr->runIdleTasks() ) {
221  return G_SOURCE_CONTINUE;
222  }
223  }
224  return G_SOURCE_REMOVE;
225 }
226 
227 
229 {
230  _myThreadId = std::this_thread::get_id();
231 
232  //if we get a context specified ( usually when created for main thread ) we use it
233  //otherwise we create our own
234  if ( ctx ) {
235  _ctx = ctx;
236  g_main_context_ref ( _ctx );
237  } else {
238  _ctx = g_main_context_get_thread_default();
239  if ( !_ctx ) {
240  _ctx = g_main_context_new();
241  } else {
242  g_main_context_ref ( _ctx );
243  }
244  }
245  g_main_context_push_thread_default( _ctx );
246 
247  _loop = g_main_loop_new( _ctx, false );
248 
249  _idleSource = g_idle_source_new ();
250  g_source_set_callback ( _idleSource, eventLoopIdleFunc, this, nullptr );
251 }
252 
254 {
255  std::for_each ( _runningTimers.begin(), _runningTimers.end(), []( GLibTimerSource *src ){
257  });
258  std::for_each ( _eventSources.begin(), _eventSources.end(), []( GAbstractEventSource *src ){
260  });
261  _runningTimers.clear();
262 
263  g_source_destroy( _idleSource );
264  g_source_unref ( _idleSource );
265 
266  g_main_context_pop_thread_default( _ctx );
267  g_main_context_unref( _ctx );
268  g_main_loop_unref( _loop );
269 }
270 
272 {
273  //run all user defined idle functions
274  //if they return true, they are executed again in the next idle run
275  decltype ( _idleFuncs ) rerunQueue;
276  while ( _idleFuncs.size() ) {
277  EventDispatcher::IdleFunction fun( std::move( _idleFuncs.front() ) );
278  _idleFuncs.pop();
279  if ( fun() )
280  rerunQueue.push( std::move(fun) );
281  }
282  if ( !rerunQueue.empty() )
283  _idleFuncs.swap( rerunQueue );
284 
285 
286  //keep this as the last thing to call after all user code was executed
287  if ( _unrefLater.size() )
288  _unrefLater.clear();
289 
290  return _idleFuncs.size() || _unrefLater.size();
291 }
292 
294 {
295  if ( !_idleSource->context )
296  g_source_attach ( _idleSource, _ctx );
297 }
298 
299 
301  : Base ( * new EventDispatcherPrivate( reinterpret_cast<GMainContext*>(ctx) ) )
302 {
303  threadLocalDispatcher( this );
304 }
305 
306 std::shared_ptr<EventDispatcher> EventDispatcher::createMain()
307 {
308  return std::shared_ptr<EventDispatcher>( new EventDispatcher(g_main_context_default()) );
309 }
310 
311 std::shared_ptr<EventDispatcher> EventDispatcher::createForThread()
312 {
313  return std::shared_ptr<EventDispatcher>( new EventDispatcher() );
314 }
315 
317 {
318  *threadLocalDispatcher() = nullptr;
319 }
320 
321 void EventDispatcher::updateEventSource( AbstractEventSource *notifier, int fd, int mode )
322 {
323  Z_D();
324  if ( notifier->eventDispatcher().lock().get() != this )
325  ZYPP_THROW( zypp::Exception("Invalid event dispatcher used to update event source") );
326 
327  GAbstractEventSource *evSrc = nullptr;
328  auto &evSrcList = d->_eventSources;
329  auto itToEvSrc = std::find_if( evSrcList.begin(), evSrcList.end(), [ notifier ]( const auto elem ){ return elem->eventSource == notifier; } );
330  if ( itToEvSrc == evSrcList.end() ) {
331 
332  evSrc = GAbstractEventSource::create( d );
333  evSrc->eventSource = notifier;
334  evSrcList.push_back( evSrc );
335 
336  g_source_attach( &evSrc->source, d->_ctx );
337 
338  } else
339  evSrc = (*itToEvSrc);
340 
341  int cond = 0;
342  if ( mode & AbstractEventSource::Read ) {
343  cond = readMask() | G_IO_ERR;
344  }
345  if ( mode & AbstractEventSource::Write ) {
346  cond = cond | writeMask() | G_IO_ERR;
347  }
348  if ( mode & AbstractEventSource::Exception ) {
349  cond = cond | excpMask() | G_IO_ERR;
350  }
351 
352  auto it = std::find_if( evSrc->pollfds.begin(), evSrc->pollfds.end(), [fd]( const auto &currPollFd ) {
353  return currPollFd.pollfd == fd;
354  });
355 
356  if ( it != evSrc->pollfds.end() ) {
357  //found
358  it->reqEvents = static_cast<GIOCondition>( cond );
359  g_source_modify_unix_fd( &evSrc->source, it->tag, static_cast<GIOCondition>(cond) );
360  } else {
361  evSrc->pollfds.push_back(
362  GUnixPollFD {
363  static_cast<GIOCondition>(cond),
364  fd,
365  g_source_add_unix_fd( &evSrc->source, fd, static_cast<GIOCondition>(cond) )
366  }
367  );
368  }
369 }
370 
372 {
373  Z_D();
374 
375  if ( notifier->eventDispatcher().lock().get() != this )
376  ZYPP_THROW( zypp::Exception("Invalid event dispatcher used to remove event source") );
377 
378  auto &evList = d->_eventSources;
379  auto it = std::find_if( evList.begin(), evList.end(), [ notifier ]( const auto elem ){ return elem->eventSource == notifier; } );
380 
381  if ( it == evList.end() )
382  return;
383 
384  auto &fdList = (*it)->pollfds;
385 
386  if ( fd == -1 ) {
387  //we clear out all unix_fd watches but do not destroy the source just yet. We currently might
388  //be in the dispatch() function of that AbstractEventSource, make sure not to break the iterator
389  //for the fd's
390  for ( auto &pFD : fdList ) {
391  if ( pFD.tag )
392  g_source_remove_unix_fd( &(*it)->source, pFD.tag );
393  pFD.pollfd = -1;
394  pFD.tag = nullptr; //mark as orphaned, do not delete the element here this might break dipatching
395  }
396  } else {
397  auto fdIt = std::find_if( fdList.begin(), fdList.end(), [ fd ]( const auto &pFd ){ return pFd.pollfd == fd; } );
398  if ( fdIt != fdList.end() ) {
399  if ( fdIt->tag )
400  g_source_remove_unix_fd( &(*it)->source, (*fdIt).tag );
401  //also do not remove here, mark as orphaned only to not break iterating in dispatch()
402  fdIt->tag = nullptr;
403  fdIt->pollfd = -1;
404  }
405  }
406 }
407 
409 {
410  Z_D();
411  //make sure timer is not double registered
412  for ( const GLibTimerSource *t : d->_runningTimers ) {
413  if ( t->_t == timer )
414  return;
415  }
416 
418  newSrc->_t = timer;
419  d->_runningTimers.push_back( newSrc );
420 
421  g_source_attach( &newSrc->source, d->_ctx );
422 }
423 
425 {
426  Z_D();
427  auto it = std::find_if( d->_runningTimers.begin(), d->_runningTimers.end(), [ timer ]( const GLibTimerSource *src ){
428  return src->_t == timer;
429  });
430 
431  if ( it != d->_runningTimers.end() ) {
432  GLibTimerSource *src = *it;
433  d->_runningTimers.erase( it );
435  }
436 }
437 
439 {
440  return g_main_context_iteration( d_func()->_ctx, false );
441 }
442 
444 {
445  g_main_loop_run( d_func()->_loop );
446 }
447 
449 {
450  g_main_loop_quit( d_func()->_loop );
451 }
452 
454 {
455  auto d = instance()->d_func();
456  d->_idleFuncs.push( std::move(callback) );
457  d->enableIdleSource();
458 }
459 
460 void EventDispatcher::unrefLaterImpl(std::shared_ptr<void> &&ptr )
461 {
462  Z_D();
463  d->_unrefLater.push_back( std::forward< std::shared_ptr<void> >(ptr) );
464  d->enableIdleSource();
465 }
466 
468 {
469  return d_func()->_runningTimers.size();
470 }
471 
472 std::shared_ptr<EventDispatcher> EventDispatcher::instance()
473 {
474  auto ev = *threadLocalDispatcher();
475  if ( ev )
476  return ev->shared_this<EventDispatcher>();
477  return std::shared_ptr<EventDispatcher>();
478 }
479 
480 }
std::vector< std::shared_ptr< void > > _unrefLater
std::vector< GAbstractEventSource * > _eventSources
#define ZYPP_THROW(EXCPT)
Drops a logline and throws the Exception.
Definition: Exception.h:392
std::function< bool()> IdleFunction
static void destruct(GAbstractEventSource *src)
virtual void onFdReady(int fd, int events)=0
static int writeMask()
static std::shared_ptr< EventDispatcher > createForThread()
static gboolean check(GSource *source)
static GLibTimerSource * create()
#define Z_D()
Definition: zyppglobal.h:44
virtual void removeTimer(Timer *timer)
static gboolean eventLoopIdleFunc(gpointer user_data)
Called when the event loop is idle, here we run cleanup tasks and call later() callbacks of the user...
static void destruct(GLibTimerSource *src)
time_t timeout
Definition: MediaCurl.cc:66
void unrefLaterImpl(std::shared_ptr< void > &&ptr)
static EventDispatcher ** threadLocalDispatcher(EventDispatcher *set=nullptr)
static std::shared_ptr< EventDispatcher > createMain()
virtual void registerTimer(Timer *timer)
The Timer class provides repetitive and single-shot timers.
Definition: timer.h:42
static GSourceFuncs glibTimerSourceFuncs
EventDispatcher(void *ctx=nullptr)
std::vector< GLibTimerSource * > _runningTimers
static std::shared_ptr< EventDispatcher > instance()
static GSourceFuncs abstractEventSourceFuncs
EventDispatcherPrivate * _ev
static gboolean prepare(GSource *, gint *timeout)
std::vector< GUnixPollFD > pollfds
Base class for Exception.
Definition: Exception.h:145
std::queue< EventDispatcher::IdleFunction > _idleFuncs
static int excpMask()
static gboolean dispatch(GSource *source, GSourceFunc, gpointer)
static int readMask()
static gboolean prepare(GSource *src, gint *timeout)
static GAbstractEventSource * create(EventDispatcherPrivate *ev)
std::weak_ptr< EventDispatcher > eventDispatcher() const
virtual void updateEventSource(AbstractEventSource *notifier, int fd, int mode)
void invokeOnIdleImpl(IdleFunction &&callback)
static gboolean dispatch(GSource *source, GSourceFunc, gpointer)
virtual void removeEventSource(AbstractEventSource *notifier, int fd=-1)
static gboolean check(GSource *source)