clsync
cluster.c
Go to the documentation of this file.
1 /*
2  clsync - file tree sync utility based on inotify
3 
4  Copyright (C) 2013 Dmitry Yu Okunev <dyokunev@ut.mephi.ru> 0x8E30679C
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 /*
21  Hello, dear developer.
22 
23  You can ask me directly by e-mail or IRC, if something seems too
24  hard.
25 
26  -- 0x8E30679C
27  */
28 
29 
30 #ifdef CLUSTER_SUPPORT
31 
32 #include "common.h"
33 #include "indexes.h"
34 #include "error.h"
35 #include "cluster.h"
36 #include "sync.h"
37 #include "calc.h"
38 #include "malloc.h"
39 
40 // Global variables. They will be initialized in cluster_init()
41 
42 #define CLUSTER_RECV_PROC_ERRLIMIT (1<<8)
43 #define NODES_ALLOC (MAX(MAXNODES, NODEID_NOID)+1)
44 
45 
46 int sock_i = -1;
47 struct sockaddr_in sa_i = {0};
48 
49 int sock_o = -1;
50 struct sockaddr_in sa_o = {0};
51 
52 ctx_t *ctx_p = NULL;
53 indexes_t *indexes_p = NULL;
54 pthread_t pthread_cluster = 0;
55 
56 nodeinfo_t nodeinfo[NODES_ALLOC] = {{0}};
57 
58 nodeinfo_t *nodeinfo_my = NULL;
59 uint8_t node_id_my = NODEID_NOID;
60 uint8_t node_ids[NODES_ALLOC] = {0};
61 unsigned int cluster_timeout = 0;
62 uint8_t node_count = 0;
63 uint8_t node_online = 0;
64 
65 cluster_recvproc_funct_t recvproc_funct[COUNT_CLUSTERCMDID] = {NULL};
66 
67 window_t window_i = {0};
68 window_t window_o = {0};
69 
70 /**
71  * @brief Adds command (message) to window_p->buffer
72  *
73  * @param[in] clustercmd_p Pointer to cluster cmd to put into window
74  *
75  * @retval zero Successful
76  * @retval non-zero If got error while deleting the message. The error-code is placed into returned value.
77  *
78  */
79 
80 static inline int clustercmd_window_add ( window_t *window_p, clustercmd_t *clustercmd_p, GHashTable *serial2queuedpacket_ht )
81 {
82 #ifdef PARANOID
83 
84  if ( clustercmd_p->h.src_node_id >= MAXNODES ) {
85  error ( "Invalid src_node_id: %i.", clustercmd_p->h.src_node_id );
86  return EINVAL;
87  }
88 
89 #endif
90 
91  // Checking if there enough window_p->cells allocated
92  if ( window_p->packets_len >= window_p->size ) {
93  window_p->size += ALLOC_PORTION;
94 # define CXREALLOC(a, size) \
95  (typeof(a))xrealloc((char *)(a), (size_t)(size) * sizeof(*(a)))
96  window_p->packets_id = CXREALLOC ( window_p->packets_id, window_p->size );
97  window_p->occupied_sides = CXREALLOC ( window_p->occupied_sides, window_p->size );
98 # undef CXREALLOC
99  }
100 
101  // Calculating required memory space in buffer for the message
102  size_t clustercmd_size = CLUSTERCMD_SIZE ( clustercmd_p );
103  size_t required_space = sizeof ( clustercmdqueuedpackethdr_t ) + clustercmd_size;
104  // Searching occupied boundaries in the window_p->buffer
105  size_t occupied_left = SIZE_MAX, occupied_right = 0;
106  unsigned int i;
107  i = 0;
108 
109  while ( i < window_p->packets_len ) {
110  unsigned int window_id;
111  window_id = window_p->packets_id[ i++ ];
112  occupied_left = MIN ( occupied_left, window_p->occupied_sides[window_id].left );
113  occupied_right = MAX ( occupied_right, window_p->occupied_sides[window_id].right );
114  }
115 
116  debug ( 3, "w.size == %u, b_left == %u; b_right == %u; w.buf_size == %u; r_space == %u",
117  window_p->size, occupied_left, occupied_right, window_p->buf_size, required_space );
118  // Trying to find a space in the buffer to place message
119  size_t buf_coordinate = SIZE_MAX;
120 
121  if ( window_p->packets_len ) {
122  // Free space from left (start of buffer)
123  size_t free_left = occupied_left;
124  // Free space from right (end of buffer)
125  size_t free_right = window_p->buf_size - occupied_right;
126 
127  if ( free_left > required_space )
128  buf_coordinate = free_left - required_space;
129  else if ( free_right > required_space )
130  buf_coordinate = occupied_right;
131  else {
132  // Not enough space in the window_p->buffer;
133  window_p->buf_size += MAX ( CLUSTER_WINDOW_BUFSIZE_PORTION, required_space );
134  window_p->buf = xrealloc ( window_p->buf, window_p->buf_size );
135  buf_coordinate = occupied_right;
136  }
137 
138  debug ( 3, "f_left == %u; f_right == %u; b_coord == %u; w.buf_size == %u",
139  free_left, free_right, buf_coordinate, window_p->buf_size );
140  } else {
141  buf_coordinate = 0;
142 
143  if ( window_p->buf_size <= required_space ) {
144  window_p->buf_size += MAX ( CLUSTER_WINDOW_BUFSIZE_PORTION, required_space );
145  window_p->buf = xrealloc ( window_p->buf, window_p->buf_size );
146  }
147  }
148 
149  unsigned int window_id;
150  // packet id in window
151  window_id = window_p->packets_len;
152  // reserving the space in buffer
153  window_p->occupied_sides[window_id].left = buf_coordinate;
154  window_p->occupied_sides[window_id].right = buf_coordinate + required_space;
155  // placing information into buffer
156  clustercmdqueuedpacket_t *queuedpacket_p;
157  debug ( 3, "b_coord == %u", buf_coordinate );
158  queuedpacket_p = ( clustercmdqueuedpacket_t * ) &window_p->buf[buf_coordinate];
159  memset ( &queuedpacket_p->h, 0, sizeof ( queuedpacket_p->h ) );
160  memcpy ( &queuedpacket_p->cmd, clustercmd_p, clustercmd_size );
161  queuedpacket_p->h.window_id = window_id;
162  // remembering new packet
163  g_hash_table_insert ( serial2queuedpacket_ht, GINT_TO_POINTER ( clustercmd_p->h.serial ), queuedpacket_p );
164  window_p->packets_id[window_p->packets_len++] = window_id;
165  return 0;
166 }
167 
168 
169 /**
170  * @brief Removes command (message) from window_p->buffer
171  *
172  * @param[in] queuedpacket_p Pointer to queuedpacket structure of the command (message)
173  *
174  * @retval zero Successful
175  * @retval non-zero If got error while deleting the message. The error-code is placed into returned value.
176  *
177  */
178 
179 static inline int clustercmd_window_del ( window_t *window_p, clustercmdqueuedpacket_t *queuedpacket_p, GHashTable *serial2queuedpacket_ht )
180 {
181 #ifdef PARANOID
182 
183  if ( !window_p->size ) {
184  error ( "window not allocated." );
185  return EINVAL;
186  }
187 
188  if ( !window_p->packets_len ) {
189  error ( "there already no packets in the window." );
190  return EINVAL;
191  }
192 
193 #endif
194  unsigned int window_id_del = queuedpacket_p->h.window_id;
195  unsigned int window_id_last = --window_p->packets_len;
196 
197  // Forgeting the packet
198 
199  // Moving the last packet into place of deleting packet, to free the tail in "window_p->packets_id" and "window_p->occupied_sides"
200  if ( window_id_del != window_id_last ) {
201  debug ( 3, "%i -> %i", window_id_last, window_id_del );
202  window_p->packets_id[window_id_del] = window_p->packets_id[window_id_last];
203  memcpy ( &window_p->occupied_sides[window_id_del], &window_p->occupied_sides[window_id_last], sizeof ( window_p->occupied_sides[window_id_del] ) );
204  }
205 
206  // Removing from hash table
207  g_hash_table_remove ( serial2queuedpacket_ht, GINT_TO_POINTER ( queuedpacket_p->cmd.h.serial ) );
208  return 0;
209 }
210 
211 
212 /**
213  * @brief Calculates Adler32 for clustercmd
214  *
215  * @param[in] clustercmd_p Pointer to clustercmd
216  * @param[out] clustercmdadler32_p Pointer to structure to return value(s)
217  *
218  * @retval zero On successful calculation
219  * @retval non-zero On error. Error-code is placed into returned value.
220  *
221  */
222 
223 int clustercmd_adler32_calc ( clustercmd_t *clustercmd_p, clustercmdadler32_t *clustercmdadler32_p, adler32_calc_t flags )
224 {
225  debug ( 15, "(%p, %p, 0x%x)", clustercmd_p, clustercmdadler32_p, flags );
226 
227  if ( flags & ADLER32_CALC_DATA ) {
228  uint32_t adler32;
229  uint32_t size = clustercmd_p->h.data_len;
230  char *ptr = clustercmd_p->data.p;
231  // Calculating
232  adler32 = adler32_calc ( ( unsigned char * ) ptr, CLUSTER_PAD ( size ) );
233  debug ( 20, "dat: 0x%x", adler32 );
234  // Ending
235  clustercmdadler32_p->dat = adler32 ^ 0xFFFFFFFF;
236  }
237 
238  if ( flags & ADLER32_CALC_HEADER ) {
239  uint32_t adler32;
240  clustercmdadler32_t adler32_save;
241  // Preparing
242  memcpy ( &adler32_save.hdr, &clustercmd_p->h.adler32.hdr, sizeof ( clustercmd_p->h.adler32.hdr ) );
243  memset ( &clustercmd_p->h.adler32.hdr, 0, sizeof ( clustercmd_p->h.adler32.hdr ) );
244  adler32 = 0xFFFFFFFF;
245  uint32_t size = sizeof ( clustercmdhdr_t );
246  char *ptr = ( char * ) &clustercmd_p->h;
247  // Calculating
248  adler32 = adler32_calc ( ( unsigned char * ) ptr, size );
249  debug ( 20, "hdr: 0x%x", adler32 );
250  // Ending
251  memcpy ( &clustercmd_p->h.adler32.hdr, &adler32_save.hdr, sizeof ( clustercmd_p->h.adler32.hdr ) );
252  clustercmdadler32_p->hdr = adler32 ^ 0xFFFFFFFF;
253  }
254 
255  return 0;
256 }
257 
258 
259 /**
260  * @brief Sends message to another nodes of the cluster.
261  *
262  * @param[in] clustercmd_p Command structure pointer.
263  *
264  * @retval zero Successfully send.
265  * @retval non-zero Got error, while sending.
266  *
267  */
268 
269 int _cluster_send ( clustercmd_t *clustercmd_p )
270 {
271  debug ( 10, "{h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u, data_len: %u}",
272  clustercmd_p->h.dst_node_id, clustercmd_p->h.src_node_id, clustercmd_p->h.cmd_id,
273  clustercmd_p->h.data_len );
274  clustercmd_p->h.src_node_id = node_id_my;
275  SAFE ( clustercmd_adler32_calc ( clustercmd_p, &clustercmd_p->h.adler32, ADLER32_CALC_ALL ), return _SAFE_rc );
276  debug ( 3, "Sending: "
277  "{h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u, adler32.hdr: %p, adler32.dat: %p, data_len: %u}",
278  clustercmd_p->h.dst_node_id, clustercmd_p->h.src_node_id, clustercmd_p->h.cmd_id,
279  ( void * ) ( long ) clustercmd_p->h.adler32.hdr, ( void * ) ( long ) clustercmd_p->h.adler32.dat,
280  clustercmd_p->h.data_len );
281  nodeinfo_t *nodeinfo_p;
282  nodeinfo_p = &nodeinfo[clustercmd_p->h.dst_node_id];
283 
284  // Checking if the node online
285  switch ( nodeinfo_p->status ) {
286  case NODESTATUS_DOESNTEXIST:
287  case NODESTATUS_OFFLINE:
288  debug ( 1, "There's no online node with id %u. Skipping sending.", clustercmd_p->h.dst_node_id );
289  return EADDRNOTAVAIL;
290 
291  default:
292  break;
293  }
294 
295  // Putting the message into an output window
296  if ( nodeinfo_my != NULL )
297  clustercmd_window_add ( &window_o, clustercmd_p, nodeinfo_my->serial2queuedpacket_ht );
298 
299  // Sending the message
300  debug ( 10, "sendto(%p, %p, %i, 0, %p, %i)", sock_o, clustercmd_p, CLUSTERCMD_SIZE_PADDED ( clustercmd_p ), &sa_o, sizeof ( sa_o ) );
301  debug ( 50, "clustercmd_p->data.p[0] == 0x%x", clustercmd_p->data.p[0] );
302  critical_on ( sendto ( sock_o, clustercmd_p, CLUSTERCMD_SIZE_PADDED ( clustercmd_p ), 0, &sa_o, sizeof ( sa_o ) ) == -1 );
303  // Finishing
304  return 0;
305 }
306 
307 static inline int cluster_send ( clustercmd_t *clustercmd_p )
308 {
309  int rc;
310  rc = _cluster_send ( clustercmd_p );
311  debug ( 10, "__________________sent___________________ %i", rc );
312  return rc;
313 }
314 
315 /**
316  * @brief Changes information about node's status in nodeinfo[] and updates connected information.
317  *
318  * @param[in] node_id node_id of the node.
319  * @param[in] node_status New node status.
320  *
321  * @retval zero Successful
322  * @retval non-zero If got error while changing the status. The error-code is placed into returned value.
323  *
324  */
325 
326 int node_update_status ( uint8_t node_id, uint8_t node_status )
327 {
328  debug ( 3, "%i, %i", node_id, node_status );
329  uint8_t node_status_old = nodeinfo[node_id].status;
330  nodeinfo_t *nodeinfo_p = &nodeinfo[node_id];
331 
332  if ( ( node_status == NODESTATUS_DOESNTEXIST ) && ( node_status_old != NODESTATUS_DOESNTEXIST ) ) {
333  node_count--;
334  node_ids[nodeinfo_p->num] = node_ids[node_count];
335  g_hash_table_destroy ( nodeinfo_p->modtime_ht );
336  g_hash_table_destroy ( nodeinfo_p->serial2queuedpacket_ht );
337 #ifdef VERYPARANOID
338  memset ( nodeinfo_p, 0, sizeof ( *nodeinfo_p ) );
339 #endif
340  return 0;
341  }
342 
343  if ( node_status == node_status_old )
344  return 0;
345 
346  switch ( node_status_old ) {
347  case NODESTATUS_DOESNTEXIST:
348  nodeinfo_p->id = node_id;
349  nodeinfo_p->num = node_count;
350  nodeinfo_p->last_serial = -1;
351  nodeinfo_p->modtime_ht = g_hash_table_new_full ( g_str_hash, g_str_equal, free, 0 );
352  nodeinfo_p->serial2queuedpacket_ht = g_hash_table_new_full ( g_direct_hash, g_direct_equal, 0, 0 );
353  node_ids[node_count] = node_id;
354  node_count++;
355 #ifdef PARANOID
356 
357  if ( node_status == NODESTATUS_OFFLINE )
358  break; // In case of NODESTATUS_DOESNTEXIST -> NODESTATUS_OFFLINE, node_online should be increased
359 
360 #endif
361 
362  case NODESTATUS_OFFLINE:
363  nodeinfo_p->last_serial = -1;
364  node_online++;
365  break;
366 
367  default:
368  if ( node_status == NODESTATUS_OFFLINE )
369  node_online--;
370 
371  break;
372  }
373 
374  nodeinfo[node_id].status = node_status;
375  return 0;
376 }
377 
378 
379 /**
380  * @brief Changes information about node's name and updates connected information.
381  *
382  * @param[in] node_id node_id of the node.
383  * @param[in] node_name New node name
384  *
385  * @retval zero Successful
386  * @retval non-zero If got error while changing the status. The error-code is placed into returned value.
387  *
388  */
389 
390 int node_update_name ( uint8_t node_id, const char *node_name )
391 {
392  debug ( 3, "%i, \"%s\"", node_id, node_name );
393  nodeinfo_t *nodeinfo_p = &nodeinfo[node_id];
394  void *ret = g_hash_table_lookup ( indexes_p->nodenames_ht, node_name );
395 
396  if ( ret != NULL ) {
397  int rc;
398  uint8_t node_old_id = GPOINTER_TO_INT ( ret );
399  nodeinfo_t *nodeinfo_old_p = &nodeinfo[node_old_id];
400  debug ( 5, "nodename \"%s\" had been used by node_id == %i", node_name, node_old_id );
401 
402  if ( node_old_id == node_id ) {
403  debug ( 10, "node_old_id [%i] == node_id [%i]", node_old_id, node_id );
404  return 0;
405  }
406 
407  {
408  debug ( 15, "Sending a DIE command to node_id (%i)", node_old_id );
409  clustercmd_t *clustercmd_p = CLUSTER_ALLOCA ( clustercmd_die_t, 0 );
410  clustercmd_p->h.cmd_id = CLUSTERCMDID_DIE;
411  clustercmd_p->h.dst_node_id = node_old_id;
412 
413  if ( ( rc = cluster_send ( clustercmd_p ) ) )
414  return rc;
415  }
416 
417  nodeinfo_old_p->node_name = NULL;
418  debug ( 3, "changing status of node_id == %i to NODESTATUS_OFFLINE", node_old_id );
419  node_update_status ( node_old_id, NODESTATUS_OFFLINE );
420  }
421 
422  char *node_name_dup = strdup ( node_name );
423  nodeinfo_p->node_name = node_name_dup;
424  g_hash_table_replace ( indexes_p->nodenames_ht, node_name_dup, GINT_TO_POINTER ( node_id ) );
425  return 0;
426 }
427 
428 /**
429  * @brief Sets message processing functions for cluster_recv_proc() function for specified command type
430  *
431  * @param[in] cmd_id The command type
432  * @param[in] procfunct The processing function for messages with specified cmd_id
433  *
434  * @retval zero Successful
435  * @retval non-zero If got error while setting processing function. The error-code is placed into returned value.
436  *
437  */
438 
439 static inline int cluster_recv_proc_set ( clustercmd_id_t cmd_id, cluster_recvproc_funct_t procfunct )
440 {
441  recvproc_funct[cmd_id] = procfunct;
442  return 0;
443 }
444 
445 
446 /**
447  * @brief Safe wrapper for recvfrom() function
448  *
449  * @param[in] sock The socket descriptor
450  * @param[out] _buf_p Pointer to pointer to buffer
451  * @param[in] flags Flags...
452  *
453  * @retval zero Successful
454  * @retval non-zero If got error while read()-ing. The error-code is placed into returned value. "-1" means that message is too short.
455  *
456  */
457 
458 static inline int cluster_read ( int sock, clustercmd_t **cmd_pp, cluster_read_flags_t flags )
459 {
460  static char buf[CLUSTER_PACKET_MAXSIZE];
461  static clustercmd_t *cmd_p = ( void * ) buf;
462  struct sockaddr_in sa_in;
463  size_t sa_in_len = sizeof ( sa_in );
464  *cmd_pp = cmd_p;
465  debug ( 20, "%i, %p, 0x%x", sock, buf, flags );
466  int readret = recvfrom ( sock, buf, CLUSTER_PACKET_MAXSIZE, MSG_WAITALL, ( struct sockaddr * ) &sa_in, ( socklen_t * restrict ) &sa_in_len );
467  debug ( 30, "recvfrom(%i, %p, 0x%x, %p, %p) -> %i", sock, buf, MSG_WAITALL, &sa_in, &sa_in_len, readret );
468 #ifdef PARANOID
469 
470  if ( !readret ) {
471  error ( "recvfrom() returned 0. This shouldn't happen. Exit." );
472  return EINVAL;
473  }
474 
475 #endif
476 
477  if ( readret < 0 ) {
478  error ( "recvfrom() returned %i. "
479  "Seems, that something wrong with network socket.",
480  readret );
481  return errno != -1 ? errno : -2;
482  }
483 
484  debug ( 2, "Got message from %s (len: %i).", inet_ntoa ( sa_in.sin_addr ), readret );
485 
486  if ( ( unsigned int ) readret < sizeof ( clustercmdhdr_t ) ) {
487  // Too short message
488  error ( "Warning: cluster_read(): Got too short message from node (no header [or too short]). Ignoring it." );
489  return -1;
490  }
491 
492  if ( ( unsigned int ) readret < CLUSTERCMD_SIZE ( cmd_p ) ) {
493  // Too short message
494  error ( "Warning: cluster_read(): Got too short message from node (no data [or too short]). Ignoring it." );
495  return -1;
496  }
497 
498  // Incorrect size?
499  if ( readret & 0x3 ) {
500  error ( "Warning: cluster_recv(): Received packet of size %i (not a multiple of 4). Ignoring it.", readret );
501  return 0;
502  }
503 
504  return 0;
505 }
506 
507 
508 /**
509  * @brief Sends packet-reject notification
510  *
511  * @param[in] clustercmd_p Pointer to clustercmd that will be rejected
512  * @param[in] reason Reason why the clustercmd is denied
513  *
514  * @retval zero Successful
515  * @retval non-zero If got error while read()-ing. The error-code is placed into returned value. "-1" means that message is too short.
516  *
517  */
518 
519 static inline int clustercmd_reject ( clustercmd_t *clustercmd_p, uint8_t reason )
520 {
521  clustercmd_t *clustercmd_rej_p = CLUSTER_ALLOCA ( clustercmd_rej_t, 0 );
522  clustercmd_rej_p->h.dst_node_id = clustercmd_p->h.src_node_id;
523  clustercmd_rej_p->data.rej.serial = clustercmd_p->h.serial;
524  clustercmd_rej_p->data.rej.reason = reason;
525  return cluster_send ( clustercmd_rej_p );
526 }
527 
528 
529 #define CLUSTER_RECV_RETURNMESSAGE(clustercmd_p) {\
530  last_serial = (clustercmd_p)->h.serial;\
531  last_src_node_id = (clustercmd_p)->h.src_node_id;\
532  if(clustercmd_pp != NULL)\
533  *clustercmd_pp = (clustercmd_p);\
534  return 1;\
535  }
536 /**
537  * @brief Receives message from another nodes of the cluster. (not thread-safe)
538  *
539  * @param[out] clustercmd_pp Pointer to command structure pointer. It will be re-allocated every time when size is not enough. Allocated space will be reused on next calling.
540  * @param[i] timeout Timeout (in milliseconds).
541  *
542  * @retval 1 If there's new message.
543  * @retval 0 If there's no new messages.
544  * @retval -1 If got error while receiving. The error-code is placed into "errno".
545  *
546  */
547 
548 static int _cluster_recv ( clustercmd_t **clustercmd_pp, struct timeval *timeout )
549 {
550  clustercmd_t *clustercmd_p;
551  static uint8_t last_src_node_id = NODEID_NOID;
552  static uint32_t last_serial = 0;
553 
554  // Checking if there message is waiting in the window
555  if ( last_src_node_id != NODEID_NOID ) {
556  nodeinfo_t *nodeinfo_p = &nodeinfo[last_src_node_id];
557 
558  if ( nodeinfo_p->serial2queuedpacket_ht != NULL ) {
559  clustercmdqueuedpacket_t *clustercmdqueuedpacket_p = ( clustercmdqueuedpacket_t * )
560  g_hash_table_lookup ( nodeinfo_p->serial2queuedpacket_ht, GINT_TO_POINTER ( last_serial + 1 ) );
561 
562  if ( clustercmdqueuedpacket_p != NULL )
563  CLUSTER_RECV_RETURNMESSAGE ( &clustercmdqueuedpacket_p->cmd );
564  }
565  }
566 
567  // Checking if there any event on read socket
568  // select()
569  fd_set rfds;
570  FD_ZERO ( &rfds );
571  FD_SET ( sock_i, &rfds );
572  debug ( 3, "select() with timeout {%u, %u}", timeout->tv_sec, timeout->tv_usec );
573  int selret = select ( sock_i + 1, &rfds, NULL, NULL, timeout );
574 
575  // processing select()'s retuned value
576  if ( selret < 0 ) {
577  error ( "got error while select()." );
578  return -1;
579  }
580 
581  if ( selret == 0 ) {
582  debug ( 3, "no new messages." );
583  return 0;
584  }
585 
586  debug ( 3, "got new message(s)." );
587  debug ( 10, "Reading new message's header" );
588  clustercmdadler32_t adler32;
589  //clustercmd_t *clustercmd_p = (clustercmd_t *)mmap(NULL, sizeof(clustercmdhdr_t), PROT_NONE,
590  // MAP_PRIVATE, sock, 0);
591  int ret;
592 
593  if ( ( ret = cluster_read ( sock_i, clustercmd_pp, CLREAD_NONE ) ) ) {
594  if ( ret == -1 ) return 0; // Invalid message? Skipping.
595 
596  error ( "Got error from cluster_read()." );
597  errno = ret;
598  return -1;
599  }
600 
601  clustercmd_p = *clustercmd_pp;
602  debug ( 3, "Received: {h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u,"
603  " serial: %u, adler32: {0x%x, 0x%x}, data_len: %u}",
604  clustercmd_p->h.dst_node_id, clustercmd_p->h.src_node_id,
605  clustercmd_p->h.cmd_id, clustercmd_p->h.serial,
606  clustercmd_p->h.adler32.hdr, clustercmd_p->h.adler32.dat,
607  clustercmd_p->h.data_len );
608  // Checking adler32 of packet headers.
609  clustercmd_adler32_calc ( clustercmd_p, &adler32, ADLER32_CALC_HEADER );
610 
611  if ( adler32.hdr != clustercmd_p->h.adler32.hdr ) {
612  debug ( 1, "hdr-adler32 mismatch: %p != %p.",
613  ( void* ) ( long ) clustercmd_p->h.adler32.hdr, ( void* ) ( long ) adler32.hdr );
614 
615  if ( ( ret = clustercmd_reject ( clustercmd_p, REJ_ADLER32MISMATCH ) ) != EADDRNOTAVAIL ) {
616  error ( "Got error while clustercmd_reject()." );
617  errno = ret;
618  return -1;
619  }
620  }
621 
622  // Checking src_node_id and dst_node_id
623  uint8_t src_node_id = clustercmd_p->h.src_node_id;
624  uint8_t dst_node_id = clustercmd_p->h.dst_node_id;
625 
626  // Packet from registering node?
627  if ( src_node_id == NODEID_NOID ) {
628  // Wrong command from registering node?
629  if ( clustercmd_p->h.cmd_id != CLUSTERCMDID_HELLO ) {
630  error ( "Warning: cluster_recv(): Got non hello packet from NOID node. Ignoring the packet." );
631  return 0;
632  }
633 
634  if ( clustercmd_p->h.serial != 0 ) {
635  error ( "Warning: cluster_recv(): Got packet with non-zero serial from NOID node. Ignoring the packet." );
636  return 0;
637  }
638  } else
639 
640  // Wrong src_node_id?
641  if ( src_node_id >= MAXNODES ) {
642  error ( "Warning: cluster_recv(): Invalid h.src_node_id: %i >= "XTOSTR ( MAXNODES ) "",
643  src_node_id );
644  return -1;
645  }
646 
647  // Is this broadcast message?
648  if ( dst_node_id == NODEID_NOID ) {
649  // CODE HERE
650  } else
651 
652  // Wrong dst_node_id?
653  if ( dst_node_id >= MAXNODES ) {
654  error ( "Warning: cluster_recv(): Invalid h.dst_node_id: %i >= "XTOSTR ( MAXNODES ) "",
655  dst_node_id );
656  return -1;
657  }
658 
659  // Seems, that headers are correct. Continuing.
660 
661  // Paranoid routines
662  // The message from us? Something wrong if it is.
663  if ( ( clustercmd_p->h.src_node_id == node_id_my ) && ( node_id_my != NODEID_NOID ) )
664  critical ( "node_id collision" );
665 
666  nodeinfo_t *nodeinfo_p = &nodeinfo[src_node_id];
667  // Not actual packet?
668  long serial_diff = clustercmd_p->h.serial - nodeinfo_p->last_serial;
669  debug ( 10, "serial_diff == %i", serial_diff );
670 
671  if ( serial_diff <= 0 || serial_diff > CLUSTER_WINDOW_PCKTLIMIT ) {
672  debug ( 1, "Ignoring packet (serial %i) from %i due to serial_diff: 0 <= || > %i",
673  clustercmd_p->h.serial, src_node_id, serial_diff, CLUSTER_WINDOW_PCKTLIMIT );
674  return -1;
675  }
676 
677  // Is this misordered packet?
678  if ( clustercmd_p->h.serial != nodeinfo_p->last_serial + 1 ) {
679  clustercmd_window_add ( &window_i, clustercmd_p, nodeinfo_p->serial2queuedpacket_ht );
680  return -1;
681  }
682 
683  // Is this the end of packet (packet without data)
684  if ( clustercmd_p->h.data_len == 0 )
685  CLUSTER_RECV_RETURNMESSAGE ( clustercmd_p );
686 
687  // Too big data?
688  if ( clustercmd_p->h.data_len > CLUSTER_PACKET_MAXSIZE ) {
689  error ( "Warning: cluster_recv(): Got too big message from node %i. Ignoring it.",
690  src_node_id );
691  return -1;
692  }
693 
694  /*
695  // Need more space for this packet?
696  if (CLUSTERCMD_SIZE(clustercmd_p) > size) {
697  size = CLUSTERCMD_SIZE(clustercmd_p);
698  clustercmd_p = (clustercmd_t *)xrealloc((char *)clustercmd_p, size);
699  }
700 
701  debug(10, "Reading the data");
702  if ((ret=cluster_read(sock_i, (void *)clustercmd_p->data.p, CLUSTER_PAD(clustercmd_p->h.data_len), CLREAD_CONTINUE))) {
703  if (ret == -1) return 0;
704 
705  error("Got error from cluster_read().");
706  errno = ret;
707  return -1;
708  }
709  */
710  // Checking adler32 of packet data.
711  clustercmd_adler32_calc ( clustercmd_p, &adler32, ADLER32_CALC_DATA );
712 
713  if ( adler32.dat != clustercmd_p->h.adler32.dat ) {
714  debug ( 1, "dat-adler32 mismatch: %p != %p.",
715  ( void* ) ( long ) clustercmd_p->h.adler32.dat, ( void* ) ( long ) adler32.dat );
716 
717  if ( ( ret = clustercmd_reject ( clustercmd_p, REJ_ADLER32MISMATCH ) ) != EADDRNOTAVAIL ) {
718  error ( "Got error while clustercmd_reject()." );
719  errno = ret;
720  return -1;
721  }
722  }
723 
724  CLUSTER_RECV_RETURNMESSAGE ( clustercmd_p );
725 }
726 
727 static inline int cluster_recv ( clustercmd_t **clustercmd_pp, struct timeval *timeout )
728 {
729  int rc;
730  rc = _cluster_recv ( clustercmd_pp, timeout );
731  debug ( 10, "__________________recv___________________ %i", rc );
732  return rc;
733 }
734 
735 
736 /**
737  * @brief (hsyncop) Reads messages for time "_timeout" and proceeding them to recvproc_funct[] functions
738  *
739  * @param[in] _timeout How long to wait messages (totally)
740  *
741  * @retval zero Successful
742  * @retval non-zero If got error while reading or processing messages. The error-code is placed into returned value.
743  *
744  */
745 
746 int cluster_recv_proc ( unsigned int timeout_rel_int )
747 {
748  struct timeval timeout_rel, timeout_abs, tv_abs;
749  int error_count, iteration;
750  debug ( 3, "cluster_recv_proc(%i)", timeout_rel_int );
751  clustercmd_t *clustercmd_p = NULL;
752  int ret;
753  timeout_rel.tv_sec = timeout_rel_int / 1000;
754  timeout_rel.tv_usec = timeout_rel_int % 1000;
755  gettimeofday ( &tv_abs, NULL );
756  timeradd ( &tv_abs, &timeout_rel, &timeout_abs );
757  error_count = 0;
758  iteration = 0;
759 
760  while ( 1 ) {
761  if ( iteration++ ) {
762  gettimeofday ( &tv_abs, NULL );
763 
764  if ( timercmp ( &timeout_abs, &tv_abs, > ) )
765  timersub ( &timeout_abs, &tv_abs, &timeout_rel );
766  else
767  memset ( &timeout_rel, 0, sizeof ( timeout_rel ) );
768 
769  debug ( 5, "timeout_abs == {%u, %u}; tv_abs == {%u, %u}; timeout_rel == {%u, %u}",
770  timeout_abs.tv_sec, timeout_abs.tv_usec,
771  tv_abs.tv_sec, tv_abs.tv_usec,
772  timeout_rel.tv_sec, timeout_rel.tv_usec
773  );
774  }
775 
776  ret = cluster_recv ( &clustercmd_p, &timeout_rel );
777 
778  if ( !ret )
779  break;
780 
781  // Exit if error
782  if ( ret == -1 ) {
783  warning ( "Got error while cluster_recv(). error_count == %i", error_count );
784  error_count++;
785  critical_on ( error_count >= CLUSTER_RECV_PROC_ERRLIMIT );
786  }
787 
788  // If we have appropriate callback function, then call it! :)
789  if ( recvproc_funct[clustercmd_p->h.cmd_id] != NULL ) {
790  debug ( 2, "Calling function by pointer %p", recvproc_funct[clustercmd_p->h.cmd_id] );
791 
792  if ( ( ret = recvproc_funct[clustercmd_p->h.cmd_id] ( clustercmd_p ) ) ) {
793  error ( "Got error from recvproc_funct[%i]: %s (%i)",
794  clustercmd_p->h.cmd_id );
795  return ret;
796  }
797 
798  continue;
799  }
800 
801  // We didn't found an appropriate callback function
802  debug ( 2, "There's no appropriate callback function for cmd_id == %i", clustercmd_p->h.cmd_id );
803  }
804 
805  return 0;
806 }
807 
808 
809 /**
810  * @brief recvproc-function for DIE-messages
811  *
812  * @param[in] clustercmd_p Pointer to clustercmd
813  *
814  * @retval zero Successfully initialized. (never happens)
815  * @retval non-zero Got error, while initializing.
816  *
817  */
818 
819 static int cluster_recvproc_die ( clustercmd_t *clustercmd_p )
820 {
821  critical ( "Got DIE message from node_id == %i,", clustercmd_p->h.src_node_id );
822  return 0;
823 }
824 
825 
826 /**
827  * @brief recvproc-function for ACK-messages
828  *
829  * @param[in] clustercmd_p Pointer to clustercmd
830  *
831  * @retval zero Successfully initialized.
832  * @retval non-zero Got error, while initializing.
833  *
834  */
835 
836 static int cluster_recvproc_ack ( clustercmd_t *clustercmd_p )
837 {
838  uint32_t cmd_serial_ack = clustercmd_p->data.ack.serial;
839  clustercmdqueuedpacket_t *queuedpacket_p =
840  ( clustercmdqueuedpacket_t * ) g_hash_table_lookup ( nodeinfo_my->serial2queuedpacket_ht, GINT_TO_POINTER ( cmd_serial_ack ) );
841 
842  if ( queuedpacket_p == NULL )
843  return 0;
844 
845  uint8_t node_id_from = clustercmd_p->h.src_node_id;
846 
847  if ( ! queuedpacket_p->h.w.o.ack_from[node_id_from] ) {
848  queuedpacket_p->h.w.o.ack_count++;
849  queuedpacket_p->h.w.o.ack_from[node_id_from]++;
850 
851  if ( queuedpacket_p->h.w.o.ack_count == node_count - 1 )
852  clustercmd_window_del ( &window_o, queuedpacket_p, nodeinfo_my->serial2queuedpacket_ht );
853  }
854 
855  return 0;
856 }
857 
858 
859 /**
860  * @brief Sets message processing functions for cluster_recv_proc() function for specified command type
861  *
862  * @param[in] cmd_id The command type
863  * @param[in] procfunct The processing function for messages with specified cmd_id
864  *
865  * @retval zero Successful
866  * @retval non-zero If got error while setting processing function. The error-code is placed into returned value.
867  *
868  */
869 
870 int cluster_io_init()
871 {
872  cluster_recv_proc_set ( CLUSTERCMDID_ACK, cluster_recvproc_ack );
873  cluster_recv_proc_set ( CLUSTERCMDID_DIE, cluster_recvproc_die );
874  return 0;
875 }
876 
877 
878 /**
879  * @brief Antagonist of cluster_recv_proc_init() function. Freeing everything what was allocated in cluster_recv_proc_init()
880  *
881  * @retval zero Successfully initialized
882  * @retval non-zero Got error, while initializing
883  *
884  */
885 
886 int cluster_io_deinit()
887 {
888  if ( window_i.buf_size ) {
889 #ifdef PARANOID
890 
891  if ( window_i.buf == NULL ) {
892  error ( "window_i.buf_size != 0, but window_i.buf == NULL." );
893  } else
894 #endif
895  free ( window_i.buf );
896  }
897 
898  if ( window_o.buf_size ) {
899 #ifdef PARANOID
900 
901  if ( window_o.buf == NULL ) {
902  error ( "window_o.buf_size != 0, but window_o.buf == NULL." );
903  } else
904 #endif
905  free ( window_o.buf );
906  }
907 
908  return 0;
909 }
910 
911 
912 /**
913  * @brief recvproc-function for welcome-messages
914  *
915  * @param[in] clustercmd_p Pointer to clustercmd
916  *
917  * @retval zero Successfully initialized.
918  * @retval non-zero Got error, while initializing.
919  *
920  */
921 
922 static int cluster_recvproc_welcome ( clustercmd_t *clustercmd_p )
923 {
924  debug ( 20, "%p", clustercmd_p );
925 // static time_t updatets = 0;
926  clustercmd_welcome_t *data_welcome_p = &clustercmd_p->data.welcome;
927  /*
928  // Is this the most recent information? Skipping if not.
929  if (!(data_welcome_p->updatets > updatets))
930  return 0;
931  */
932  // Is the node name length in message equals to our node name length? Skipping if not.
933  uint32_t recv_nodename_len;
934  recv_nodename_len = welcome_to_node_name_len ( clustercmd_p );
935 
936  if ( recv_nodename_len != ctx_p->cluster_nodename_len ) {
937  debug ( 9, "recv_nodename_len [%i] != ctx_p->cluster_nodename_len [%i]", recv_nodename_len, ctx_p->cluster_nodename_len );
938  return 0;
939  }
940 
941  // Is the node name equals to ours? Skipping if not.
942  if ( memcmp ( welcome_to_node_name ( data_welcome_p ), ctx_p->cluster_nodename, recv_nodename_len ) ) {
943  debug ( 9, "to_node_name != ctx_p->cluster_nodename" );
944  return 0;
945  }
946 
947  // Remembering the node that answered us
948  node_update_status ( clustercmd_p->h.src_node_id, NODESTATUS_SEEMSONLINE );
949  {
950  char *node_name;
951  node_name = alloca ( data_welcome_p->from_node_name_len );
952  memcpy ( node_name, data_welcome_p->from_node_name, data_welcome_p->from_node_name_len );
953  node_update_name ( clustercmd_p->h.src_node_id, node_name );
954  }
955  // Seems, that somebody knows our node id, remembering it.
956  node_id_my = clustercmd_p->h.dst_node_id;
957 // updatets = data_welcome_p->updatets;
958  return 0;
959 }
960 
961 
962 /**
963  * @brief recvproc-function for hello-messages
964  *
965  * @param[in] clustercmd_p Pointer to clustercmd
966  *
967  * @retval zero Successfully initialized.
968  * @retval non-zero Got error, while initializing.
969  *
970  */
971 
972 static int cluster_recvproc_hello ( clustercmd_t *clustercmd_p )
973 {
974  clustercmd_hello_t *data_hello_p = &clustercmd_p->data.hello;
975  // Sending information to the new node
976  {
977  int ret;
978  uint8_t node_id;
979  debug ( 15, "Preparing a welcome message for nodename \"%s\" from nodename == \"%s\"", data_hello_p->node_name, ctx_p->cluster_nodename );
980  size_t data_len = sizeof ( clustercmd_welcome_t ) + clustercmd_p->h.data_len + ctx_p->cluster_nodename_len;
981  clustercmd_t *answer_p = CLUSTER_ALLOCA ( void, data_len );
982  clustercmd_welcome_t *answer_data_p = &answer_p->data.welcome;
983  answer_data_p->from_node_name_len = ctx_p->cluster_nodename_len;
984  memcpy ( answer_data_p->from_node_name, ctx_p->cluster_nodename, ctx_p->cluster_nodename_len );
985  memcpy ( welcome_to_node_name ( answer_data_p ), data_hello_p->node_name, clustercmd_p->h.data_len );
986  {
987  void *ptr;
988  char *node_name = alloca ( clustercmd_p->h.data_len + 1 );
989  memcpy ( node_name, data_hello_p->node_name, clustercmd_p->h.data_len + 1 );
990  ptr = g_hash_table_lookup ( indexes_p->nodenames_ht, node_name );
991  node_id = ( ptr == NULL ? NODEID_NOID : GPOINTER_TO_INT ( ptr ) );
992  debug ( 3, "\"%s\" -> %i", node_name, node_id );
993  }
994  answer_p->h.data_len = data_len;
995  answer_p->h.cmd_id = CLUSTERCMDID_WELCOME;
996  answer_p->h.dst_node_id = node_id; // broadcast
997 
998  if ( ( ret = cluster_send ( answer_p ) ) )
999  return ret;
1000  }
1001  return 0;
1002 }
1003 
1004 
1005 /**
1006  * @brief recvproc-function for register-messages
1007  *
1008  * @param[in] clustercmd_p Pointer to clustercmd
1009  *
1010  * @retval zero Successfully initialized.
1011  * @retval non-zero Got error, while initializing.
1012  *
1013  */
1014 
1015 static int cluster_recvproc_register ( clustercmd_t *clustercmd_p )
1016 {
1017  clustercmd_reg_t *data_register_p = &clustercmd_p->data.reg;
1018  char *node_name = alloca ( clustercmd_p->h.data_len + 1 );
1019  memcpy ( node_name, data_register_p->node_name, clustercmd_p->h.data_len + 1 );
1020  node_update_name ( clustercmd_p->h.src_node_id, node_name );
1021  return 0;
1022 }
1023 
1024 extern int cluster_loop();
1025 /**
1026  * @brief Initializes cluster subsystem.
1027  *
1028  * @param[in] _ctx_p Pointer to "glob" variable, defined in main().
1029  * @param[in] _indexes_p Pointer to "indexes" variable, defined in sync_run().
1030  *
1031  * @retval zero Successfully initialized.
1032  * @retval non-zero Got error, while initializing.
1033  *
1034  */
1035 
1036 int cluster_init ( ctx_t *_ctx_p, indexes_t *_indexes_p )
1037 {
1038  int ret;
1039 
1040  // Preventing double initializing
1041  if ( ctx_p != NULL ) {
1042  error ( "cluster subsystem is already initialized." );
1043  return EALREADY;
1044  }
1045 
1046  // Initializing global variables, pt. 1
1047  ctx_p = _ctx_p;
1048  indexes_p = _indexes_p;
1049  cluster_timeout = ctx_p->cluster_timeout;
1050  indexes_p->nodenames_ht = g_hash_table_new_full ( g_str_hash, g_str_equal, free, 0 );
1051  node_update_status ( NODEID_NOID, NODESTATUS_ONLINE );
1052  {
1053  int i = 0;
1054 
1055  while ( i < MAXNODES ) {
1056  nodeinfo[i].last_serial = -1;
1057  i++;
1058  }
1059  }
1060  // Initializing network routines
1061  // Input socket
1062  // Creating socket
1063  sock_i = socket ( AF_INET, SOCK_DGRAM, 0 );
1064 
1065  if ( sock_i < 0 ) {
1066  error ( "Cannot create socket for input traffic" );
1067  return errno;
1068  }
1069 
1070  // Enable SO_REUSEADDR to allow multiple instances of this application to receive copies
1071  // of the multicast datagrams.
1072  int reuse = 1;
1073 
1074  if ( setsockopt ( sock_i, SOL_SOCKET, SO_REUSEADDR, ( char * ) &reuse, sizeof ( reuse ) ) < 0 ) {
1075  error ( "Got error while setsockopt()" );
1076  return errno;
1077  }
1078 
1079  // Binding
1080  sa_i.sin_family = AF_INET;
1081  sa_i.sin_port = htons ( ctx_p->cluster_mcastipport );
1082  sa_i.sin_addr.s_addr = INADDR_ANY;
1083 
1084  if ( bind ( sock_i, ( struct sockaddr* ) &sa_i, sizeof ( sa_i ) ) ) {
1085  error ( "Got error while bind()" );
1086  return errno;
1087  }
1088 
1089  // Joining to multicast group
1090  struct ip_mreq group;
1091  group.imr_interface.s_addr = inet_addr ( ctx_p->cluster_iface );
1092  group.imr_multiaddr.s_addr = inet_addr ( ctx_p->cluster_mcastipaddr );
1093 
1094  if ( setsockopt ( sock_i, IPPROTO_IP, IP_ADD_MEMBERSHIP,
1095  ( char * ) &group, sizeof ( group ) ) < 0 ) {
1096  error ( "Cannot setsockopt() to enter to membership %s -> %s",
1097  ctx_p->cluster_iface, ctx_p->cluster_mcastipaddr );
1098  return errno;
1099  }
1100 
1101  // Output socket
1102  // Creating socket
1103  sock_o = socket ( AF_INET, SOCK_DGRAM, 0 );
1104 
1105  if ( sock_o < 0 ) {
1106  error ( "Cannot create socket for output traffic" );
1107  return errno;
1108  }
1109 
1110  // Initializing the group sockaddr structure
1111  sa_o.sin_family = AF_INET;
1112  sa_o.sin_port = htons ( ctx_p->cluster_mcastipport );
1113  sa_o.sin_addr.s_addr = inet_addr ( ctx_p->cluster_mcastipaddr );
1114  // Disable looping back output datagrams
1115  {
1116  char loopch = 0;
1117 
1118  if ( setsockopt ( sock_o, IPPROTO_IP, IP_MULTICAST_LOOP, ( char * ) &loopch, sizeof ( loopch ) ) < 0 ) {
1119  error ( "Cannot disable loopback for output socket." );
1120  return errno;
1121  }
1122  }
1123  // Setting local interface for output traffic
1124  {
1125  struct in_addr addr_o;
1126  addr_o.s_addr = inet_addr ( ctx_p->cluster_iface );
1127 
1128  if ( setsockopt ( sock_o, IPPROTO_IP, IP_MULTICAST_IF, &addr_o, sizeof ( addr_o ) ) < 0 ) {
1129  error ( "Cannot set local interface for outbound traffic" );
1130  return errno;
1131  }
1132  }
1133  // Initializing another routines
1134  cluster_io_init();
1135  // Getting my ID in the cluster
1136  // Trying to preserve my node_id after restart. :)
1137  // Asking another nodes about my previous node_id
1138  {
1139  debug ( 15, "Preparing a message with my nodename == \"%s\" (%i)", ctx_p->cluster_nodename, ctx_p->cluster_nodename_len );
1140  clustercmd_t *clustercmd_p = CLUSTER_ALLOCA ( clustercmd_hello_t, ctx_p->cluster_nodename_len );
1141  clustercmd_p->h.data_len = ctx_p->cluster_nodename_len;
1142  memcpy ( clustercmd_p->data.hello.node_name, ctx_p->cluster_nodename, clustercmd_p->h.data_len + 1 );
1143  clustercmd_p->h.cmd_id = CLUSTERCMDID_HELLO;
1144  clustercmd_p->h.dst_node_id = NODEID_NOID; // broadcast
1145 
1146  if ( ( ret = cluster_send ( clustercmd_p ) ) )
1147  return ret;
1148  }
1149  // Processing answers
1150  cluster_recv_proc_set ( CLUSTERCMDID_WELCOME, cluster_recvproc_welcome );
1151 
1152  if ( ( ret = cluster_recv_proc ( cluster_timeout ) ) )
1153  return ret;
1154 
1155  // Ignore next welcome messages
1156  cluster_recv_proc_set ( CLUSTERCMDID_WELCOME, NULL );
1157  debug ( 3, "After communicating with others, my node_id is %i.", node_id_my );
1158 
1159  // Getting free node_id if nobody said us the certain value (see above).
1160  if ( node_id_my == NODEID_NOID ) {
1161  int i = 0;
1162 
1163  while ( i < MAXNODES ) {
1164  if ( nodeinfo[i].status == NODESTATUS_DOESNTEXIST ) {
1165  node_id_my = i;
1166  break;
1167  }
1168 
1169  i++;
1170  }
1171 
1172  debug ( 3, "I was have to set my node_id to %i.", node_id_my );
1173  }
1174 
1175  // If there's no free id-s, then exit :(
1176  if ( node_id_my == NODEID_NOID ) {
1177  error ( "Cannot find free node ID. Seems, that all %i ID-s are already occupied." );
1178  return ENOMEM;
1179  }
1180 
1181  // Initializing global variables, pt. 2
1182  nodeinfo_my = &nodeinfo[node_id_my];
1183  // Registering in the cluster
1184  // Sending registration information
1185  node_update_status ( node_id_my, NODESTATUS_SEEMSONLINE );
1186  {
1187  clustercmd_t *clustercmd_p = CLUSTER_ALLOCA ( clustercmd_reg_t, ctx_p->cluster_nodename_len );
1188  clustercmd_reg_t *data_reg_p = &clustercmd_p->data.reg;
1189  memcpy ( data_reg_p->node_name, ctx_p->cluster_nodename, ctx_p->cluster_nodename_len + 1 );
1190  clustercmd_p->h.data_len = ctx_p->cluster_nodename_len + 1;
1191  clustercmd_p->h.cmd_id = CLUSTERCMDID_REG;
1192  clustercmd_p->h.dst_node_id = NODEID_NOID; // broadcast
1193 
1194  if ( ( ret = cluster_send ( clustercmd_p ) ) )
1195  return ret;
1196  }
1197  // Setting process functions
1198  cluster_recv_proc_set ( CLUSTERCMDID_HELLO, cluster_recvproc_hello );
1199  cluster_recv_proc_set ( CLUSTERCMDID_REG, cluster_recvproc_register );
1200 
1201  // Getting answers
1202  if ( ( ret = cluster_recv_proc ( cluster_timeout ) ) )
1203  return ret;
1204 
1205  node_update_status ( node_id_my, NODESTATUS_ONLINE );
1206  // Running thread, that will process background communicating routines with another nodes.
1207  // The process is based on function cluster_loop() [let's use shorthand "cluster_loop()-thread"]
1208  ret = pthread_create ( &pthread_cluster, NULL, ( void * ( * ) ( void * ) ) cluster_loop, NULL );
1209  return ret;
1210 }
1211 
1212 /**
1213  * @brief (syncop) Sends signal to cluster_loop()-thread
1214  *
1215  * @param[in] signal Signal number
1216  *
1217  * @retval zero Successfully send the signal
1218  * @retval non-zero Got error, while sending the signal
1219  *
1220  */
1221 
1222 static inline int cluster_signal ( int signal )
1223 {
1224  if ( pthread_cluster )
1225  return pthread_kill ( pthread_cluster, signal );
1226 
1227  return 0;
1228 }
1229 
1230 
1231 extern int cluster_modtime_exchange_cleanup();
1232 /**
1233  * @brief Antagonist of cluster_init() function. Kills cluster_loop()-thread and cleaning up
1234  *
1235  * @retval zero Successfully initialized
1236  * @retval non-zero Got error, while initializing
1237  *
1238  */
1239 
1240 int cluster_deinit()
1241 {
1242  int ret = 0;
1243  cluster_signal ( SIGTERM );
1244  ret = pthread_join ( pthread_cluster, NULL );
1245  cluster_io_deinit();
1246  node_update_status ( NODEID_NOID, NODESTATUS_DOESNTEXIST );
1247 #ifdef VERYPARANOID
1248  int i = 0;
1249 #endif
1250 
1251  while ( node_count ) {
1252 #ifdef VERYPARANOID
1253 
1254  if ( i++ > MAXNODES ) {
1255  error ( "cluster_deinit() looped. Forcing break." );
1256  break;
1257  }
1258 
1259 #endif
1260  node_update_status ( 0, NODESTATUS_DOESNTEXIST );
1261  }
1262 
1263  close ( sock_i );
1264  close ( sock_o );
1265 #ifdef VERYPARANOID
1266  memset ( nodeinfo, 0, sizeof ( nodeinfo_t ) * NODES_ALLOC );
1267  nodeinfo_my = NULL;
1268  node_count = 0;
1269  node_online = 0;
1270  node_id_my = NODEID_NOID;
1271  memset ( &sa_i, 0, sizeof ( sa_i ) );
1272  memset ( &sa_o, 0, sizeof ( sa_o ) );
1273 #endif
1274  cluster_modtime_exchange_cleanup();
1275  g_hash_table_destroy ( indexes_p->nodenames_ht );
1276  return ret;
1277 }
1278 
1279 
1280 /**
1281  * @brief (syncop) Forces anothes nodes to ignore events about the file or directory
1282  *
1283  * @param[in] fpath Path to the file or directory
1284  *
1285  * @retval zero Successfully initialized
1286  * @retval non-zero Got error, while initializing
1287  *
1288  */
1289 
1290 int cluster_lock ( const char *fpath )
1291 {
1292  ( void ) fpath;
1293  return 0;
1294 }
1295 
1296 
1297 /**
1298  * @brief (syncop) Forces anothes nodes to ignore events about all files and directories listed in queues of "indexes_p"
1299  *
1300  * @retval zero Successfully initialized
1301  * @retval non-zero Got error, while initializing
1302  *
1303  */
1304 
1305 int cluster_lock_byindexes()
1306 {
1307  return 0;
1308 }
1309 
1310 
1311 /**
1312  * @brief (syncop) Returns events-handling on another nodes about all files and directories, locked by cluster_lock() and cluster_lock_byindexes() from this node
1313  *
1314  * @retval zero Successfully initialized
1315  * @retval non-zero Got error, while initializing
1316  *
1317  */
1318 
1319 int cluster_unlock_all()
1320 {
1321  return 0;
1322 }
1323 
1324 
1325 #define CLUSTER_LOOP_CHECK(a) {\
1326  int ret = a;\
1327  if(ret) {\
1328  sync_term(ret);\
1329  return ret;\
1330  }\
1331  }
1332 
1333 /**
1334  * @brief Processes background communicating routines with another nodes. cluster_init() function create a thread for this function.
1335  *
1336  * @retval zero Successfully initialized
1337  * @retval non-zero Got error, while initializing
1338  *
1339  */
1340 
1341 int cluster_loop()
1342 {
1343  int ret = 0;
1344  sigset_t sigset_cluster;
1345  // Ignoring SIGINT signal
1346  sigemptyset ( &sigset_cluster );
1347  sigaddset ( &sigset_cluster, SIGINT );
1348  CLUSTER_LOOP_CHECK ( pthread_sigmask ( SIG_BLOCK, &sigset_cluster, NULL ) );
1349  // Don't ignoring SIGTERM signal
1350  sigemptyset ( &sigset_cluster );
1351  sigaddset ( &sigset_cluster, SIGTERM );
1352  CLUSTER_LOOP_CHECK ( pthread_sigmask ( SIG_UNBLOCK, &sigset_cluster, NULL ) );
1353  // Starting the loop
1354  debug ( 3, "cluster_loop() started." );
1355 
1356  while ( 1 ) {
1357  int _ret;
1358  // Waiting for event
1359  fd_set rfds;
1360  FD_ZERO ( &rfds );
1361  FD_SET ( sock_i, &rfds );
1362  debug ( 3, "select()" );
1363  _ret = select ( sock_i + 1, &rfds, NULL, NULL, NULL );
1364 
1365  // Exit if error
1366  if ( ( _ret == -1 ) && ( errno != EINTR ) ) {
1367  ret = errno;
1368  sync_term ( ret );
1369  break;
1370  }
1371 
1372  // Breaking the loop, if there's SIGTERM signal for this thread
1373  debug ( 3, "sigpending()" );
1374 
1375  if ( sigpending ( &sigset_cluster ) )
1376  if ( sigismember ( &sigset_cluster, SIGTERM ) )
1377  break;
1378 
1379  // Processing new messages
1380  debug ( 3, "cluster_recv_proc()" );
1381 
1382  if ( ( ret = cluster_recv_proc ( 0 ) ) ) {
1383  sync_term ( ret );
1384  break;
1385  }
1386  }
1387 
1388  debug ( 3, "cluster_loop() finished with exitcode %i.", ret );
1389  return ret;
1390 #ifdef DOXYGEN
1391  sync_term ( 0 );
1392 #endif
1393 }
1394 
1395 
1396 /**
1397  * @brief Updating information about modification time of a directory.
1398  *
1399  * @param[in] path Canonized path to updated file/dir
1400  * @param[in] dirlevel Directory level provided by fts (man 3 fts)
1401  * @param[in] st_mode st_mode value to detect is it directory or not (S_IFDIR or not)
1402  *
1403  * @retval zero Successfully initialized
1404  * @retval non-zero Got error, while initializing
1405  *
1406  */
1407 
1408 int cluster_modtime_update ( const char *path, short int dirlevel, mode_t st_mode )
1409 {
1410  // "modtime" is incorrent name-part of function. Actually it updates "change time" (man 2 lstat64).
1411  int ret;
1412  // Getting relative directory level (depth)
1413  short int dirlevel_rel = dirlevel - ctx_p->watchdir_dirlevel;
1414 
1415  if ( ( st_mode & S_IFMT ) == S_IFDIR )
1416  dirlevel_rel++;
1417 
1418  // Don't remembering information about directories with level beyond the limits
1419  if ( ( dirlevel_rel > ctx_p->cluster_scan_dl_max ) || ( dirlevel_rel < ctx_p->cluster_hash_dl_min ) )
1420  return 0;
1421 
1422  // Getting directory/file-'s information (including "change time" aka "st_ctime")
1423  stat64_t stat64;
1424  ret = lstat64 ( path, &stat64 );
1425 
1426  if ( ret ) {
1427  error ( "Cannot lstat64()", path );
1428  return errno;
1429  }
1430 
1431  // Getting absolute directory path
1432  const char *dirpath;
1433 
1434  if ( ( st_mode & S_IFMT ) == S_IFDIR ) {
1435  dirpath = path;
1436  } else {
1437  char *path_dup = strdup ( path );
1438  dirpath = ( const char * ) dirname ( path_dup );
1439  free ( path_dup );
1440  }
1441 
1442  // Getting relative directory path
1443  // Initializing
1444  size_t dirpath_len = strlen ( dirpath );
1445  char *dirpath_rel_p = xmalloc ( dirpath_len + 1 );
1446  char *dirpath_rel = dirpath_rel_p;
1447  const char *dirpath_rel_full = &dirpath[ctx_p->watchdirlen];
1448  size_t dirpath_rel_full_len = dirpath_len - ctx_p->watchdirlen;
1449  // Getting coodinate of the end (directory path is already canonized, so we can simply count number of slashes to get directory level)
1450  int slashcount = 0;
1451  size_t dirpath_rel_end = 0;
1452 
1453  while ( dirpath_rel_full[dirpath_rel_end] && ( dirpath_rel_end < dirpath_rel_full_len ) ) {
1454  if ( dirpath_rel_full[dirpath_rel_end] == '/' ) {
1455  slashcount++;
1456 
1457  if ( slashcount >= ctx_p->cluster_hash_dl_max )
1458  break;
1459  }
1460 
1461  dirpath_rel_end++;
1462  }
1463 
1464  // Copy the required part of path to dirpath_rel
1465  memcpy ( dirpath_rel, dirpath_rel_full, dirpath_rel_end );
1466  // Updating "st_ctime" information. We should check current value for this directory and update it only if it less or not set.
1467  // Checking current value
1468  char toupdate = 0;
1469  gpointer ctime_gp = g_hash_table_lookup ( nodeinfo_my->modtime_ht, dirpath_rel );
1470 
1471  if ( ctime_gp == NULL )
1472  toupdate++;
1473  else if ( GPOINTER_TO_INT ( ctime_gp ) < stat64.st_ctime )
1474  toupdate++;
1475 
1476  // g_hash_table_replace() will replace existent information about the directory or create it if it doesn't exist.
1477  if ( toupdate )
1478  g_hash_table_replace ( nodeinfo_my->modtime_ht, strdup ( dirpath_rel ), GINT_TO_POINTER ( stat64.st_ctime ) );
1479 
1480  // Why I'm using "st_ctime" instead of "st_mtime"? Because "st_ctime" also updates on updating inode information.
1481  return 0;
1482 }
1483 
1484 
1485 /**
1486  * @brief Puts entry to list to be send to other nodes. To be called from cluster_modtime_exchange()
1487  *
1488  * @param[in] pushrentry_arg_p Pointer to pushentry_arg structure
1489  *
1490  */
1491 
1492 void cluster_modtime_exchange_pushentry ( gpointer dir_gp, gpointer modtype_gp, void *pushentry_arg_gp )
1493 {
1494  struct pushdoubleentry_arg *pushentry_arg_p = ( struct pushdoubleentry_arg * ) pushentry_arg_gp;
1495  char *dir = ( char * ) dir_gp;
1496  time_t ctime = ( time_t ) GPOINTER_TO_INT ( modtype_gp );
1497  size_t size = strlen ( dir ) + 1; // TODO: strlen should be already prepared
1498  // but not re-calculated here
1499 
1500  if ( pushentry_arg_p->allocated <= pushentry_arg_p->total ) {
1501  pushentry_arg_p->allocated += ALLOC_PORTION;
1502  pushentry_arg_p->entry = ( struct doubleentry * )
1503  xrealloc (
1504  ( char * ) pushentry_arg_p->entry,
1505  pushentry_arg_p->allocated * sizeof ( *pushentry_arg_p->entry )
1506  );
1507  }
1508 
1509  pushentry_arg_p->entry[pushentry_arg_p->total].dat0 = dir;
1510  pushentry_arg_p->entry[pushentry_arg_p->total].size0 = size;
1511  pushentry_arg_p->entry[pushentry_arg_p->total].dat1 = ( void * ) ctime; // Will be problems if sizeof(time_t) > sizeof(void *)
1512  pushentry_arg_p->entry[pushentry_arg_p->total].size1 = sizeof ( ctime );
1513  pushentry_arg_p->size += size;
1514  pushentry_arg_p->total++;
1515  return;
1516 }
1517 
1518 
1519 static struct pushdoubleentry_arg cluster_modtime_exchange_pushentry_arg = {0};
1520 /**
1521  * @brief Clean up after the last run of cluster_modtime_exchange.
1522  *
1523  * @retval zero Successfully initialized
1524  * @retval non-zero Got error, while initializing
1525  *
1526  */
1527 
1528 int cluster_modtime_exchange_cleanup()
1529 {
1530  struct pushdoubleentry_arg *pushentry_arg_p = &cluster_modtime_exchange_pushentry_arg;
1531  int i = 0;
1532 
1533  while ( i < pushentry_arg_p->allocated ) {
1534  if ( pushentry_arg_p->entry[i].alloc0 )
1535  free ( pushentry_arg_p->entry[i].dat0 );
1536 
1537  if ( pushentry_arg_p->entry[i].alloc1 )
1538  free ( pushentry_arg_p->entry[i].dat1 );
1539 
1540  i++;
1541  }
1542 
1543  free ( pushentry_arg_p->entry );
1544 #ifdef VERYPARANOID
1545  memset ( pushentry_arg_p, 0, sizeof ( *pushentry_arg_p ) );
1546 #endif
1547  return 0;
1548 }
1549 
1550 
1551 /**
1552  * @brief Exchanging with "modtime_ht"-s to be able to compare them.
1553  *
1554  * @retval zero Successfully initialized
1555  * @retval non-zero Got error, while initializing
1556  *
1557  */
1558 
1559 int cluster_modtime_exchange()
1560 {
1561  struct pushdoubleentry_arg *pushentry_arg_p = &cluster_modtime_exchange_pushentry_arg;
1562  // Getting hash table entries
1563  pushentry_arg_p->size = 0;
1564  pushentry_arg_p->total = 0;
1565  g_hash_table_foreach ( nodeinfo_my->modtime_ht, cluster_modtime_exchange_pushentry, ( void * ) pushentry_arg_p );
1566 
1567  if ( !pushentry_arg_p->total ) {
1568  // !!!
1569  }
1570 
1571  // Calculating required RAM to compile clustercmd
1572  size_t toalloc = 0;
1573  int i = 0;
1574 
1575  while ( i < pushentry_arg_p->total ) {
1576  toalloc += 4; // for size header
1577  toalloc += pushentry_arg_p->entry[i].size0; // for path
1578  toalloc += pushentry_arg_p->entry[i].size1; // for ctime
1579  }
1580 
1581  // Allocating space for the clustercmd
1582  clustercmd_t *clustercmd_p = ( clustercmd_t * ) xmalloc ( sizeof ( clustercmdhdr_t ) + toalloc );
1583  memset ( clustercmd_p, 0, sizeof ( clustercmdhdr_t ) );
1584  // Setting up clustercmd
1585  clustercmd_p->h.dst_node_id = NODEID_NOID;
1586  clustercmd_p->h.cmd_id = CLUSTERCMDID_HT_EXCH;
1587  clustercmd_p->h.data_len = toalloc;
1588  // Filing clustercmd with hash-table entriyes
1589  i = 0;
1590  clustercmd_ht_exch_t *clustercmd_ht_exch_p = &clustercmd_p->data.ht_exch;
1591 
1592  while ( i < pushentry_arg_p->total ) {
1593  // Setting the data
1594  clustercmd_ht_exch_p->ctime = ( time_t ) pushentry_arg_p->entry[i].dat1;
1595  clustercmd_ht_exch_p->path_length = ( time_t ) pushentry_arg_p->entry[i].size0;
1596  memcpy (
1597  clustercmd_ht_exch_p->path,
1598  pushentry_arg_p->entry[i].dat0,
1599  clustercmd_ht_exch_p->path_length
1600  );
1601  // Pointing to space for next entry:
1602  size_t offset = sizeof ( clustercmd_ht_exch_t ) - 1 + pushentry_arg_p->entry[i].size0;
1603  clustercmd_ht_exch_p = ( clustercmd_ht_exch_t * )
1604  ( & ( ( char * ) clustercmd_ht_exch_p ) [offset] );
1605  }
1606 
1607  // Sending
1608  cluster_send ( clustercmd_p );
1609  // Cleanup
1610  free ( clustercmd_p );
1611  return 0;
1612 }
1613 
1614 
1615 /**
1616  * @brief (syncop) Syncing file tree with another nodes with using of directories' modification time as a recent-detector.
1617  *
1618  * @param[in] dirpath Path to the directory
1619  *
1620  * @retval zero Successfully initialized
1621  * @retval non-zero Got error, while initializing
1622  *
1623  */
1624 
1625 int cluster_initialsync()
1626 {
1627  cluster_modtime_exchange();
1628  return 0;
1629 }
1630 
1631 
1632 /**
1633  * @brief (syncop) "Captures" right to update the file or directory to another nodes. It just removes events about the file of directory from another nodes
1634  *
1635  * @param[in] dirpath Path to the directory
1636  *
1637  * @retval zero Successfully initialized
1638  * @retval non-zero Got error, while initializing
1639  *
1640  */
1641 
1642 int cluster_capture ( const char *path )
1643 {
1644  ( void ) path;
1645  return 0;
1646 }
1647 
1648 #endif
1649 
pushdoubleentry_arg
Definition: common.h:189
ctx
Definition: ctx.h:315
doubleentry::alloc0
size_t alloc0
Definition: common.h:183
cluster.h
close
close(fd_w)
ALLOC_PORTION
#define ALLOC_PORTION
Definition: configuration.h:117
doubleentry::dat1
void * dat1
Definition: common.h:186
sync.h
pushdoubleentry_arg::total
int total
Definition: common.h:191
ctx::watchdirlen
size_t watchdirlen
Definition: ctx.h:375
indexes
Definition: indexes.h:34
doubleentry::size1
size_t size1
Definition: common.h:182
doubleentry
Definition: common.h:180
CLUSTER_WINDOW_PCKTLIMIT
#define CLUSTER_WINDOW_PCKTLIMIT
Definition: configuration.h:120
pushdoubleentry_arg::allocated
int allocated
Definition: common.h:190
adler32_calc
uint32_t adler32_calc(const unsigned char *const data, uint32_t len)
Calculated Adler32 value for char array.
Definition: calc.c:41
calc.h
doubleentry::dat0
void * dat0
Definition: common.h:185
error
#define error(...)
Definition: error.h:36
indexes.h
CLUSTER_PACKET_MAXSIZE
#define CLUSTER_PACKET_MAXSIZE
Definition: configuration.h:119
error.h
CLUSTER_WINDOW_BUFSIZE_PORTION
#define CLUSTER_WINDOW_BUFSIZE_PORTION
Definition: configuration.h:118
malloc.h
doubleentry::alloc1
size_t alloc1
Definition: common.h:184
debug
#define debug(debug_level,...)
Definition: error.h:50
SAFE
#define SAFE(code, onfail)
Definition: macros.h:56
XTOSTR
#define XTOSTR(a)
Definition: macros.h:45
ctx::watchdir_dirlevel
short int watchdir_dirlevel
Definition: ctx.h:381
critical
#define critical(...)
Definition: error.h:32
warning
#define warning(...)
Definition: error.h:40
MAXNODES
#define MAXNODES
Definition: configuration.h:16
sync_term
int sync_term(int exitcode)
Definition: sync.c:3991
common.h
doubleentry::size0
size_t size0
Definition: common.h:181
pushdoubleentry_arg::size
size_t size
Definition: common.h:192
stat64_t
struct stat64 stat64_t
Definition: port-hacks.h:65
pushdoubleentry_arg::entry
struct doubleentry * entry
Definition: common.h:193
critical_on
#define critical_on(cond)
Definition: error.h:33
ctx_p
ctx_t * ctx_p
Definition: mon_kqueue.c:85