gwenhywfar  5.11.2beta
endpoint.c
Go to the documentation of this file.
1 /****************************************************************************
2  * This file is part of the project Gwenhywfar.
3  * Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved.
4  *
5  * The license for this file can be found in the file COPYING which you
6  * should have received along with this file.
7  ****************************************************************************/
8 
9 #ifdef HAVE_CONFIG_H
10 # include <config.h>
11 #endif
12 
13 /*#define DISABLE_DEBUGLOG*/
14 
15 
16 #include "msgio/endpoint_p.h"
17 
18 #include <gwenhywfar/debug.h>
19 #include <gwenhywfar/text.h>
20 
21 
22 #define GWEN_MSG_ENDPOINT_DEFAULT_MSGSIZE 1024
23 
24 
25 
27 GWEN_TREE2_FUNCTIONS(GWEN_MSG_ENDPOINT, GWEN_MsgEndpoint)
28 
29 
30 
31 GWEN_MSG_ENDPOINT *GWEN_MsgEndpoint_new(const char *name, int groupId)
32 {
34 
37  GWEN_TREE2_INIT(GWEN_MSG_ENDPOINT, ep, GWEN_MsgEndpoint);
38 
39  ep->name=strdup(name?name:"<unnamed>");
40  ep->groupId=groupId;
41 
42  ep->receivedMessageList=GWEN_Msg_List_new();
43  ep->sendMessageList=GWEN_Msg_List_new();
44 
45  ep->defaultMessageSize=GWEN_MSG_ENDPOINT_DEFAULT_MSGSIZE;
46 
47  return ep;
48 }
49 
50 
51 
53 {
54  if (ep) {
56  "Deleting endpoint \"%s\" (%d msgs in recv list, %d msgs in send list)",
57  (ep->name)?(ep->name):"<unnamed>",
58  GWEN_Msg_List_GetCount(ep->receivedMessageList),
59  GWEN_Msg_List_GetCount(ep->sendMessageList));
60  GWEN_TREE2_FINI(GWEN_MSG_ENDPOINT, ep, GWEN_MsgEndpoint);
62  if (ep->socket) {
63  GWEN_Socket_Close(ep->socket);
64  GWEN_Socket_free(ep->socket);
65  }
66  GWEN_Msg_free(ep->currentlyReceivedMsg);
67  GWEN_Msg_List_free(ep->receivedMessageList);
68  GWEN_Msg_List_free(ep->sendMessageList);
69  free(ep->name);
70  GWEN_FREE_OBJECT(ep);
71  }
72 }
73 
74 
75 
77 {
78  return (ep?ep->name:NULL);
79 }
80 
81 
82 
84 {
85  return (ep?ep->groupId:0);
86 }
87 
88 
89 
91 {
92  return (ep?ep->socket:NULL);
93 }
94 
95 
96 
98 {
99  if (ep) {
100  if (ep->socket) {
101  GWEN_Socket_Close(ep->socket);
102  GWEN_Socket_free(ep->socket);
103  }
104  ep->socket=sk;
105  }
106 }
107 
108 
109 
111 {
112  return (ep?ep->state:GWEN_MSG_ENDPOINT_STATE_UNCONNECTED);
113 }
114 
115 
116 
118 {
119  if (ep) {
120  if (ep->state!=m) {
121  ep->timeOfLastStateChange=time(NULL);
122  DBG_INFO(GWEN_LOGDOMAIN, "Changing status of endpoint %s to %d", GWEN_MsgEndpoint_GetName(ep), m);
123  ep->state=m;
124  }
125  }
126 }
127 
128 
129 
131 {
132  return (ep?ep->timeOfLastStateChange:0);
133 }
134 
135 
136 
137 
138 
139 
141 {
142  return (ep?ep->flags:0);
143 }
144 
145 
146 
148 {
149  if (ep)
150  ep->flags=f;
151 }
152 
153 
154 
156 {
157  if (ep)
158  ep->flags|=f;
159 }
160 
161 
162 
164 {
165  if (ep)
166  ep->flags&=~f;
167 }
168 
169 
170 
172 {
173  return ep?ep->defaultMessageSize:0;
174 }
175 
176 
177 
179 {
180  if (ep)
181  ep->defaultMessageSize=i;
182 }
183 
184 
185 
187 {
188  return (ep?ep->receivedMessageList:NULL);
189 }
190 
191 
192 
194 {
195  return (ep?ep->sendMessageList:NULL);
196 }
197 
198 
199 
201 {
202  if (ep && m) {
204  GWEN_Msg_List_Add(m, ep->receivedMessageList);
205  }
206 }
207 
208 
209 
211 {
212  return ep?GWEN_Msg_List_First(ep->receivedMessageList):NULL;
213 }
214 
215 
216 
218 {
219  GWEN_MSG *msg;
220 
222  if (msg)
223  GWEN_Msg_List_Del(msg);
224  return msg;
225 }
226 
227 
228 
230 {
231  if (ep && m) {
233  GWEN_Msg_List_Add(m, ep->sendMessageList);
234  }
235 }
236 
237 
238 
240 {
241  return (ep?GWEN_Msg_List_First(ep->sendMessageList):NULL);
242 }
243 
244 
245 
247 {
248  return (ep && GWEN_Msg_List_GetCount(ep->sendMessageList)>0)?1:0;
249 }
250 
251 
252 
254 {
255  return (ep?ep->currentlyReceivedMsg:NULL);
256 }
257 
258 
259 
261 {
262  if (ep)
263  ep->currentlyReceivedMsg=m;
264 }
265 
266 
267 
269 {
270  if (ep && ep->addSocketsFn)
271  ep->addSocketsFn(ep, readSet, writeSet, xSet);
272 }
273 
274 
275 
277 {
278  if (ep && ep->checkSocketsFn)
279  ep->checkSocketsFn(ep, readSet, writeSet, xSet);
280 }
281 
282 
283 
285  GWEN_SOCKETSET *readSet,
286  GWEN_SOCKETSET *writeSet,
287  GWEN_SOCKETSET *xSet)
288 {
289  GWEN_MSG_ENDPOINT *epChild;
290 
291  epChild=GWEN_MsgEndpoint_Tree2_GetFirstChild(ep);
292  while(epChild) {
293  GWEN_MsgEndpoint_AddSockets(epChild, readSet, writeSet, xSet);
294  epChild=GWEN_MsgEndpoint_Tree2_GetNext(epChild);
295  }
296 }
297 
298 
299 
301  GWEN_SOCKETSET *readSet,
302  GWEN_SOCKETSET *writeSet,
303  GWEN_SOCKETSET *xSet)
304 {
305  GWEN_MSG_ENDPOINT *epChild;
306 
307  epChild=GWEN_MsgEndpoint_Tree2_GetFirstChild(ep);
308  while(epChild) {
309  GWEN_MsgEndpoint_CheckSockets(epChild, readSet, writeSet, xSet);
310  epChild=GWEN_MsgEndpoint_Tree2_GetNext(epChild);
311  }
312 }
313 
314 
315 
317 {
318  if (ep) {
319  GWEN_MSG_ENDPOINT *epChild;
320 
321  epChild=GWEN_MsgEndpoint_Tree2_GetFirstChild(ep);
322  while(epChild) {
323  GWEN_MSG_ENDPOINT *epNext;
324 
325  epNext=GWEN_MsgEndpoint_Tree2_GetNext(epChild);
328  DBG_INFO(GWEN_LOGDOMAIN, "Endpoint %s: Disconnected and empty, removing", GWEN_MsgEndpoint_GetName(epChild));
329  GWEN_MsgEndpoint_Tree2_Unlink(epChild);
330  GWEN_MsgEndpoint_free(epChild);
331  }
332  epChild=epNext;
333  }
334  }
335 }
336 
337 
338 
340 {
341  GWEN_SOCKETSET *readSet;
342  GWEN_SOCKETSET *writeSet;
343  GWEN_SOCKETSET *xSet;
344  int rv;
345 
346  readSet=GWEN_SocketSet_new();
347  writeSet=GWEN_SocketSet_new();
348  xSet=GWEN_SocketSet_new();
349  GWEN_MsgEndpoint_AddSockets(ep, readSet, writeSet, xSet);
350 
351  do {
353  GWEN_SocketSet_GetSocketCount(writeSet)?writeSet:NULL,
355  timeout);
356  } while(rv==GWEN_ERROR_INTERRUPTED);
357  if (rv<0) {
358  if (rv!=GWEN_ERROR_TIMEOUT) {
359  DBG_INFO(GWEN_LOGDOMAIN, "Error on GWEN_Socket_Select: %d", rv);
360  }
361  }
362  else
363  GWEN_MsgEndpoint_CheckSockets(ep, readSet, writeSet, xSet);
364  GWEN_SocketSet_free(xSet);
365  GWEN_SocketSet_free(writeSet);
366  GWEN_SocketSet_free(readSet);
367 }
368 
369 
370 
372 {
373  GWEN_SOCKETSET *readSet;
374  GWEN_SOCKETSET *writeSet;
375  GWEN_SOCKETSET *xSet;
376  int rv;
377 
378  readSet=GWEN_SocketSet_new();
379  writeSet=GWEN_SocketSet_new();
380  xSet=GWEN_SocketSet_new();
381  GWEN_MsgEndpoint_ChildrenAddSockets(ep, readSet, writeSet, xSet);
382 
383  do {
385  GWEN_SocketSet_GetSocketCount(writeSet)?writeSet:NULL,
387  timeout);
388  } while(rv==GWEN_ERROR_INTERRUPTED);
389  if (rv<0) {
390  if (rv!=GWEN_ERROR_TIMEOUT) {
391  DBG_INFO(GWEN_LOGDOMAIN, "Error on GWEN_Socket_Select: %d", rv);
392  }
393  }
394  else
395  GWEN_MsgEndpoint_ChildrenCheckSockets(ep, readSet, writeSet, xSet);
396  GWEN_SocketSet_free(xSet);
397  GWEN_SocketSet_free(writeSet);
398  GWEN_SocketSet_free(readSet);
399 }
400 
401 
402 
403 int GWEN_MsgEndpoint_ReadFromSocket(GWEN_MSG_ENDPOINT *ep, uint8_t *bufferPtr, uint32_t bufferLen)
404 {
405  int len;
406  int rv;
407 
408  len=bufferLen;
409  DBG_DEBUG(GWEN_LOGDOMAIN, "Endpoint %s: Reading from socket", GWEN_MsgEndpoint_GetName(ep));
410  do {
411  rv=GWEN_Socket_Read(ep->socket, (char*) bufferPtr, &len);
412  } while(rv==GWEN_ERROR_INTERRUPTED);
413  if (rv<0) {
414  DBG_INFO(GWEN_LOGDOMAIN, "Endpoint %s: here (%d)", GWEN_MsgEndpoint_GetName(ep), rv);
415  return rv;
416  }
417  DBG_DEBUG(GWEN_LOGDOMAIN, "Endpoint %s: Read %d bytes from socket", GWEN_MsgEndpoint_GetName(ep), len);
418  return len;
419 }
420 
421 
422 
423 int GWEN_MsgEndpoint_WriteToSocket(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferPtr, uint32_t bufferLen)
424 {
425  int len;
426  int rv;
427 
428  len=bufferLen;
429  do {
430  rv=GWEN_Socket_Write(ep->socket, (const char*) bufferPtr, &len);
431  } while(rv==GWEN_ERROR_INTERRUPTED);
432  if (rv<0)
433  return rv;
434  return len;
435 }
436 
437 
438 
440 {
441  int rv;
442  uint8_t buffer[64];
443 
444  do {
445  rv=GWEN_MsgEndpoint_ReadFromSocket(ep, buffer, sizeof(buffer));
446  } while(rv>0);
447  if (rv<0 && rv!=GWEN_ERROR_TIMEOUT) {
448  DBG_INFO(GWEN_LOGDOMAIN, "Error on read(): %d", rv);
449  return rv;
450  }
451  else if (rv==0) {
452  DBG_INFO(GWEN_LOGDOMAIN, "EOF met on read()");
453 #if 0
454  return GWEN_ERROR_IO;
455 #endif
456  }
457  return 0;
458 }
459 
460 
461 
463 {
464  if (ep) {
465  DBG_INFO(GWEN_LOGDOMAIN, "Disconnecting endpoint");
466  if (ep->socket) {
467  DBG_INFO(GWEN_LOGDOMAIN, "Disconnecting socket");
468  GWEN_Socket_Close(ep->socket);
469  GWEN_Socket_free(ep->socket);
470  ep->socket=NULL;
471  }
473  }
474 }
475 
476 
477 
478 
479 
480 
481 
482 
484 {
485  if (ep) {
487 
488  o=ep->addSocketsFn;
489  ep->addSocketsFn=fn;
490  return o;
491  }
492  return NULL;
493 }
494 
495 
496 
498 {
499  if (ep) {
501 
502  o=ep->checkSocketsFn;
503  ep->checkSocketsFn=fn;
504  return o;
505  }
506  return NULL;
507 }
508 
509 
510 
#define GWEN_MSG_ENDPOINT_STATE_UNCONNECTED
Definition: endpoint.h:23
GWEN_SOCKET * GWEN_MsgEndpoint_GetSocket(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:90
void(* GWEN_MSG_ENDPOINT_ADDSOCKETS_FN)(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet)
Definition: endpoint.h:183
int GWEN_MsgEndpoint_WriteToSocket(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferPtr, uint32_t bufferLen)
Definition: endpoint.c:423
GWENHYWFAR_API void GWEN_SocketSet_free(GWEN_SOCKETSET *ssp)
void GWEN_MsgEndpoint_SetFlags(GWEN_MSG_ENDPOINT *ep, uint32_t f)
Definition: endpoint.c:147
#define GWEN_INHERIT_FINI(t, element)
Definition: inherit.h:238
struct GWEN_MSG_ENDPOINT GWEN_MSG_ENDPOINT
Object which can send and receive messages (base class).
Definition: endpoint.h:37
GWENHYWFAR_API int GWEN_Socket_Write(GWEN_SOCKET *sp, const char *buffer, int *bsize)
#define DBG_DEBUG(dbg_logger, format,...)
Definition: debug.h:214
void GWEN_MsgEndpoint_AddReceivedMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *m)
Definition: endpoint.c:200
void GWEN_MsgEndpoint_AddSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet)
Definition: endpoint.c:268
void GWEN_Msg_free(GWEN_MSG *msg)
Definition: msg.c:74
#define GWEN_FREE_OBJECT(varname)
Definition: memory.h:61
#define NULL
Definition: binreloc.c:300
GWEN_MSG * GWEN_MsgEndpoint_GetCurrentlyReceivedMsg(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:253
int GWEN_MsgEndpoint_DiscardInput(GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:439
GWEN_MSG_LIST * GWEN_Msg_List_new()
uint32_t GWEN_MsgEndpoint_GetFlags(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:140
int GWEN_MsgEndpoint_GetGroupId(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:83
struct GWEN_SOCKETSETSTRUCT GWEN_SOCKETSET
Definition: inetsocket.h:41
GWEN_MSG * GWEN_Msg_List_First(const GWEN_MSG_LIST *l)
#define GWEN_LOGDOMAIN
Definition: logger.h:35
int GWEN_Msg_RewindCurrentPos(GWEN_MSG *msg)
Definition: msg.c:247
void GWEN_MsgEndpoint_SetDefaultMessageSize(GWEN_MSG_ENDPOINT *ep, int i)
Definition: endpoint.c:178
void GWEN_MsgEndpoint_ChildrenIoLoop(GWEN_MSG_ENDPOINT *ep, int timeout)
Definition: endpoint.c:371
void GWEN_Msg_List_Del(GWEN_MSG *element)
GWENHYWFAR_API int GWEN_Socket_Read(GWEN_SOCKET *sp, char *buffer, int *bsize)
void GWEN_MsgEndpoint_SetCurrentlyReceivedMsg(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *m)
Definition: endpoint.c:260
#define GWEN_ERROR_IO
Definition: error.h:123
#define GWEN_ERROR_INTERRUPTED
Definition: error.h:74
GWEN_MSG_ENDPOINT * GWEN_MsgEndpoint_new(const char *name, int groupId)
Definition: endpoint.c:31
GWENHYWFAR_API int GWEN_SocketSet_GetSocketCount(GWEN_SOCKETSET *ssp)
#define GWEN_TREE2_FINI(t, element, pr)
Definition: tree2.h:457
time_t GWEN_MsgEndpoint_GetTimeOfLastStateChange(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:130
#define GWEN_MSG_ENDPOINT_DEFAULT_MSGSIZE
Definition: endpoint.c:22
void GWEN_MsgEndpoint_SetSocket(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKET *sk)
Definition: endpoint.c:97
#define GWEN_NEW_OBJECT(typ, varname)
Definition: memory.h:55
GWEN_MSG_LIST * GWEN_MsgEndpoint_GetReceivedMessageList(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:186
void GWEN_MsgEndpoint_free(GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:52
int GWEN_MsgEndpoint_HaveMessageToSend(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:246
void GWEN_MsgEndpoint_IoLoop(GWEN_MSG_ENDPOINT *ep, int timeout)
Definition: endpoint.c:339
int GWEN_MsgEndpoint_GetState(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:110
#define GWEN_INHERIT_INIT(t, element)
Definition: inherit.h:223
GWENHYWFAR_API GWEN_SOCKETSET * GWEN_SocketSet_new(void)
GWENHYWFAR_API int GWEN_Socket_Close(GWEN_SOCKET *sp)
GWEN_TREE2_FUNCTIONS(GWEN_JSON_ELEM, GWEN_JsonElement)
GWEN_MSG * GWEN_MsgEndpoint_GetFirstSendMessage(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:239
void GWEN_MsgEndpoint_AddSendMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *m)
Definition: endpoint.c:229
GWEN_MSG * GWEN_MsgEndpoint_TakeFirstReceivedMessage(GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:217
void GWEN_MsgEndpoint_RemoveUnconnectedAndEmptyChildren(GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:316
void GWEN_MsgEndpoint_DelFlags(GWEN_MSG_ENDPOINT *ep, uint32_t f)
Definition: endpoint.c:163
void GWEN_MsgEndpoint_Disconnect(GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:462
struct GWEN_MSG GWEN_MSG
Definition: msg.h:25
GWENHYWFAR_API int GWEN_Socket_Select(GWEN_SOCKETSET *rs, GWEN_SOCKETSET *ws, GWEN_SOCKETSET *xs, int timeout)
int GWEN_MsgEndpoint_GetDefaultMessageSize(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:171
int GWEN_MsgEndpoint_ReadFromSocket(GWEN_MSG_ENDPOINT *ep, uint8_t *bufferPtr, uint32_t bufferLen)
Definition: endpoint.c:403
void(* GWEN_MSG_ENDPOINT_CHECKSOCKETS_FN)(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet)
Definition: endpoint.h:188
void GWEN_MsgEndpoint_CheckSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet)
Definition: endpoint.c:276
void GWEN_Msg_List_free(GWEN_MSG_LIST *l)
#define GWEN_TREE2_INIT(t, element, pr)
Definition: tree2.h:445
void GWEN_MsgEndpoint_ChildrenCheckSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet)
Definition: endpoint.c:300
struct GWEN_SOCKET GWEN_SOCKET
Definition: inetsocket.h:40
#define GWEN_ERROR_TIMEOUT
Definition: error.h:71
void GWEN_MsgEndpoint_SetState(GWEN_MSG_ENDPOINT *ep, int m)
Definition: endpoint.c:117
#define DBG_INFO(dbg_logger, format,...)
Definition: debug.h:181
const char * GWEN_MsgEndpoint_GetName(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:76
uint32_t GWEN_Msg_List_GetCount(const GWEN_MSG_LIST *l)
GWEN_MSG * GWEN_MsgEndpoint_GetFirstReceivedMessage(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:210
void GWEN_MsgEndpoint_ChildrenAddSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet)
Definition: endpoint.c:284
void GWEN_MsgEndpoint_AddFlags(GWEN_MSG_ENDPOINT *ep, uint32_t f)
Definition: endpoint.c:155
GWEN_MSG_ENDPOINT_CHECKSOCKETS_FN GWEN_MsgEndpoint_SetCheckSocketsFn(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT_CHECKSOCKETS_FN fn)
Definition: endpoint.c:497
#define GWEN_INHERIT_FUNCTIONS(t)
Definition: inherit.h:163
GWENHYWFAR_API void GWEN_Socket_free(GWEN_SOCKET *sp)
void GWEN_Msg_List_Add(GWEN_MSG *element, GWEN_MSG_LIST *list)
GWEN_MSG_LIST * GWEN_MsgEndpoint_GetSendMessageList(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:193
GWEN_MSG_ENDPOINT_ADDSOCKETS_FN GWEN_MsgEndpoint_SetAddSocketsFn(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT_ADDSOCKETS_FN fn)
Definition: endpoint.c:483