clsync
Loading...
Searching...
No Matches
cluster.c
Go to the documentation of this file.
1/*
2 clsync - file tree sync utility based on inotify
3
4 Copyright (C) 2013 Dmitry Yu Okunev <dyokunev@ut.mephi.ru> 0x8E30679C
5
6 This program is free software: you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, either version 3 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20/*
21 Hello, dear developer.
22
23 You can ask me directly by e-mail or IRC, if something seems too
24 hard.
25
26 -- 0x8E30679C
27 */
28
29
30#ifdef CLUSTER_SUPPORT
31
32#include "common.h"
33#include "indexes.h"
34#include "error.h"
35#include "cluster.h"
36#include "sync.h"
37#include "calc.h"
38#include "malloc.h"
39
40// Global variables. They will be initialized in cluster_init()
41
42#define CLUSTER_RECV_PROC_ERRLIMIT (1<<8)
43#define NODES_ALLOC (MAX(MAXNODES, NODEID_NOID)+1)
44
45
46int sock_i = -1;
47struct sockaddr_in sa_i = {0};
48
49int sock_o = -1;
50struct sockaddr_in sa_o = {0};
51
52ctx_t *ctx_p = NULL;
53indexes_t *indexes_p = NULL;
54pthread_t pthread_cluster = 0;
55
56nodeinfo_t nodeinfo[NODES_ALLOC] = {{0}};
57
58nodeinfo_t *nodeinfo_my = NULL;
59uint8_t node_id_my = NODEID_NOID;
60uint8_t node_ids[NODES_ALLOC] = {0};
61unsigned int cluster_timeout = 0;
62uint8_t node_count = 0;
63uint8_t node_online = 0;
64
65cluster_recvproc_funct_t recvproc_funct[COUNT_CLUSTERCMDID] = {NULL};
66
67window_t window_i = {0};
68window_t window_o = {0};
69
70/**
71 * @brief Adds command (message) to window_p->buffer
72 *
73 * @param[in] clustercmd_p Pointer to cluster cmd to put into window
74 *
75 * @retval zero Successful
76 * @retval non-zero If got error while deleting the message. The error-code is placed into returned value.
77 *
78 */
79
80static inline int clustercmd_window_add ( window_t *window_p, clustercmd_t *clustercmd_p, GHashTable *serial2queuedpacket_ht )
81{
82#ifdef PARANOID
83
84 if ( clustercmd_p->h.src_node_id >= MAXNODES ) {
85 error ( "Invalid src_node_id: %i.", clustercmd_p->h.src_node_id );
86 return EINVAL;
87 }
88
89#endif
90
91 // Checking if there enough window_p->cells allocated
92 if ( window_p->packets_len >= window_p->size ) {
93 window_p->size += ALLOC_PORTION;
94# define CXREALLOC(a, size) \
95 (typeof(a))xrealloc((char *)(a), (size_t)(size) * sizeof(*(a)))
96 window_p->packets_id = CXREALLOC ( window_p->packets_id, window_p->size );
97 window_p->occupied_sides = CXREALLOC ( window_p->occupied_sides, window_p->size );
98# undef CXREALLOC
99 }
100
101 // Calculating required memory space in buffer for the message
102 size_t clustercmd_size = CLUSTERCMD_SIZE ( clustercmd_p );
103 size_t required_space = sizeof ( clustercmdqueuedpackethdr_t ) + clustercmd_size;
104 // Searching occupied boundaries in the window_p->buffer
105 size_t occupied_left = SIZE_MAX, occupied_right = 0;
106 unsigned int i;
107 i = 0;
108
109 while ( i < window_p->packets_len ) {
110 unsigned int window_id;
111 window_id = window_p->packets_id[ i++ ];
112 occupied_left = MIN ( occupied_left, window_p->occupied_sides[window_id].left );
113 occupied_right = MAX ( occupied_right, window_p->occupied_sides[window_id].right );
114 }
115
116 debug ( 3, "w.size == %u, b_left == %u; b_right == %u; w.buf_size == %u; r_space == %u",
117 window_p->size, occupied_left, occupied_right, window_p->buf_size, required_space );
118 // Trying to find a space in the buffer to place message
119 size_t buf_coordinate = SIZE_MAX;
120
121 if ( window_p->packets_len ) {
122 // Free space from left (start of buffer)
123 size_t free_left = occupied_left;
124 // Free space from right (end of buffer)
125 size_t free_right = window_p->buf_size - occupied_right;
126
127 if ( free_left > required_space )
128 buf_coordinate = free_left - required_space;
129 else if ( free_right > required_space )
130 buf_coordinate = occupied_right;
131 else {
132 // Not enough space in the window_p->buffer;
133 window_p->buf_size += MAX ( CLUSTER_WINDOW_BUFSIZE_PORTION, required_space );
134 window_p->buf = xrealloc ( window_p->buf, window_p->buf_size );
135 buf_coordinate = occupied_right;
136 }
137
138 debug ( 3, "f_left == %u; f_right == %u; b_coord == %u; w.buf_size == %u",
139 free_left, free_right, buf_coordinate, window_p->buf_size );
140 } else {
141 buf_coordinate = 0;
142
143 if ( window_p->buf_size <= required_space ) {
144 window_p->buf_size += MAX ( CLUSTER_WINDOW_BUFSIZE_PORTION, required_space );
145 window_p->buf = xrealloc ( window_p->buf, window_p->buf_size );
146 }
147 }
148
149 unsigned int window_id;
150 // packet id in window
151 window_id = window_p->packets_len;
152 // reserving the space in buffer
153 window_p->occupied_sides[window_id].left = buf_coordinate;
154 window_p->occupied_sides[window_id].right = buf_coordinate + required_space;
155 // placing information into buffer
156 clustercmdqueuedpacket_t *queuedpacket_p;
157 debug ( 3, "b_coord == %u", buf_coordinate );
158 queuedpacket_p = ( clustercmdqueuedpacket_t * ) &window_p->buf[buf_coordinate];
159 memset ( &queuedpacket_p->h, 0, sizeof ( queuedpacket_p->h ) );
160 memcpy ( &queuedpacket_p->cmd, clustercmd_p, clustercmd_size );
161 queuedpacket_p->h.window_id = window_id;
162 // remembering new packet
163 g_hash_table_insert ( serial2queuedpacket_ht, GINT_TO_POINTER ( clustercmd_p->h.serial ), queuedpacket_p );
164 window_p->packets_id[window_p->packets_len++] = window_id;
165 return 0;
166}
167
168
169/**
170 * @brief Removes command (message) from window_p->buffer
171 *
172 * @param[in] queuedpacket_p Pointer to queuedpacket structure of the command (message)
173 *
174 * @retval zero Successful
175 * @retval non-zero If got error while deleting the message. The error-code is placed into returned value.
176 *
177 */
178
179static inline int clustercmd_window_del ( window_t *window_p, clustercmdqueuedpacket_t *queuedpacket_p, GHashTable *serial2queuedpacket_ht )
180{
181#ifdef PARANOID
182
183 if ( !window_p->size ) {
184 error ( "window not allocated." );
185 return EINVAL;
186 }
187
188 if ( !window_p->packets_len ) {
189 error ( "there already no packets in the window." );
190 return EINVAL;
191 }
192
193#endif
194 unsigned int window_id_del = queuedpacket_p->h.window_id;
195 unsigned int window_id_last = --window_p->packets_len;
196
197 // Forgeting the packet
198
199 // Moving the last packet into place of deleting packet, to free the tail in "window_p->packets_id" and "window_p->occupied_sides"
200 if ( window_id_del != window_id_last ) {
201 debug ( 3, "%i -> %i", window_id_last, window_id_del );
202 window_p->packets_id[window_id_del] = window_p->packets_id[window_id_last];
203 memcpy ( &window_p->occupied_sides[window_id_del], &window_p->occupied_sides[window_id_last], sizeof ( window_p->occupied_sides[window_id_del] ) );
204 }
205
206 // Removing from hash table
207 g_hash_table_remove ( serial2queuedpacket_ht, GINT_TO_POINTER ( queuedpacket_p->cmd.h.serial ) );
208 return 0;
209}
210
211
212/**
213 * @brief Calculates Adler32 for clustercmd
214 *
215 * @param[in] clustercmd_p Pointer to clustercmd
216 * @param[out] clustercmdadler32_p Pointer to structure to return value(s)
217 *
218 * @retval zero On successful calculation
219 * @retval non-zero On error. Error-code is placed into returned value.
220 *
221 */
222
223int clustercmd_adler32_calc ( clustercmd_t *clustercmd_p, clustercmdadler32_t *clustercmdadler32_p, adler32_calc_t flags )
224{
225 debug ( 15, "(%p, %p, 0x%x)", clustercmd_p, clustercmdadler32_p, flags );
226
227 if ( flags & ADLER32_CALC_DATA ) {
228 uint32_t adler32;
229 uint32_t size = clustercmd_p->h.data_len;
230 char *ptr = clustercmd_p->data.p;
231 // Calculating
232 adler32 = adler32_calc ( ( unsigned char * ) ptr, CLUSTER_PAD ( size ) );
233 debug ( 20, "dat: 0x%x", adler32 );
234 // Ending
235 clustercmdadler32_p->dat = adler32 ^ 0xFFFFFFFF;
236 }
237
238 if ( flags & ADLER32_CALC_HEADER ) {
239 uint32_t adler32;
240 clustercmdadler32_t adler32_save;
241 // Preparing
242 memcpy ( &adler32_save.hdr, &clustercmd_p->h.adler32.hdr, sizeof ( clustercmd_p->h.adler32.hdr ) );
243 memset ( &clustercmd_p->h.adler32.hdr, 0, sizeof ( clustercmd_p->h.adler32.hdr ) );
244 adler32 = 0xFFFFFFFF;
245 uint32_t size = sizeof ( clustercmdhdr_t );
246 char *ptr = ( char * ) &clustercmd_p->h;
247 // Calculating
248 adler32 = adler32_calc ( ( unsigned char * ) ptr, size );
249 debug ( 20, "hdr: 0x%x", adler32 );
250 // Ending
251 memcpy ( &clustercmd_p->h.adler32.hdr, &adler32_save.hdr, sizeof ( clustercmd_p->h.adler32.hdr ) );
252 clustercmdadler32_p->hdr = adler32 ^ 0xFFFFFFFF;
253 }
254
255 return 0;
256}
257
258
259/**
260 * @brief Sends message to another nodes of the cluster.
261 *
262 * @param[in] clustercmd_p Command structure pointer.
263 *
264 * @retval zero Successfully send.
265 * @retval non-zero Got error, while sending.
266 *
267 */
268
269int _cluster_send ( clustercmd_t *clustercmd_p )
270{
271 debug ( 10, "{h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u, data_len: %u}",
272 clustercmd_p->h.dst_node_id, clustercmd_p->h.src_node_id, clustercmd_p->h.cmd_id,
273 clustercmd_p->h.data_len );
274 clustercmd_p->h.src_node_id = node_id_my;
275 SAFE ( clustercmd_adler32_calc ( clustercmd_p, &clustercmd_p->h.adler32, ADLER32_CALC_ALL ), return _SAFE_rc );
276 debug ( 3, "Sending: "
277 "{h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u, adler32.hdr: %p, adler32.dat: %p, data_len: %u}",
278 clustercmd_p->h.dst_node_id, clustercmd_p->h.src_node_id, clustercmd_p->h.cmd_id,
279 ( void * ) ( long ) clustercmd_p->h.adler32.hdr, ( void * ) ( long ) clustercmd_p->h.adler32.dat,
280 clustercmd_p->h.data_len );
281 nodeinfo_t *nodeinfo_p;
282 nodeinfo_p = &nodeinfo[clustercmd_p->h.dst_node_id];
283
284 // Checking if the node online
285 switch ( nodeinfo_p->status ) {
286 case NODESTATUS_DOESNTEXIST:
287 case NODESTATUS_OFFLINE:
288 debug ( 1, "There's no online node with id %u. Skipping sending.", clustercmd_p->h.dst_node_id );
289 return EADDRNOTAVAIL;
290
291 default:
292 break;
293 }
294
295 // Putting the message into an output window
296 if ( nodeinfo_my != NULL )
297 clustercmd_window_add ( &window_o, clustercmd_p, nodeinfo_my->serial2queuedpacket_ht );
298
299 // Sending the message
300 debug ( 10, "sendto(%p, %p, %i, 0, %p, %i)", sock_o, clustercmd_p, CLUSTERCMD_SIZE_PADDED ( clustercmd_p ), &sa_o, sizeof ( sa_o ) );
301 debug ( 50, "clustercmd_p->data.p[0] == 0x%x", clustercmd_p->data.p[0] );
302 critical_on ( sendto ( sock_o, clustercmd_p, CLUSTERCMD_SIZE_PADDED ( clustercmd_p ), 0, &sa_o, sizeof ( sa_o ) ) == -1 );
303 // Finishing
304 return 0;
305}
306
307static inline int cluster_send ( clustercmd_t *clustercmd_p )
308{
309 int rc;
310 rc = _cluster_send ( clustercmd_p );
311 debug ( 10, "__________________sent___________________ %i", rc );
312 return rc;
313}
314
315/**
316 * @brief Changes information about node's status in nodeinfo[] and updates connected information.
317 *
318 * @param[in] node_id node_id of the node.
319 * @param[in] node_status New node status.
320 *
321 * @retval zero Successful
322 * @retval non-zero If got error while changing the status. The error-code is placed into returned value.
323 *
324 */
325
326int node_update_status ( uint8_t node_id, uint8_t node_status )
327{
328 debug ( 3, "%i, %i", node_id, node_status );
329 uint8_t node_status_old = nodeinfo[node_id].status;
330 nodeinfo_t *nodeinfo_p = &nodeinfo[node_id];
331
332 if ( ( node_status == NODESTATUS_DOESNTEXIST ) && ( node_status_old != NODESTATUS_DOESNTEXIST ) ) {
333 node_count--;
334 node_ids[nodeinfo_p->num] = node_ids[node_count];
335 g_hash_table_destroy ( nodeinfo_p->modtime_ht );
336 g_hash_table_destroy ( nodeinfo_p->serial2queuedpacket_ht );
337#ifdef VERYPARANOID
338 memset ( nodeinfo_p, 0, sizeof ( *nodeinfo_p ) );
339#endif
340 return 0;
341 }
342
343 if ( node_status == node_status_old )
344 return 0;
345
346 switch ( node_status_old ) {
347 case NODESTATUS_DOESNTEXIST:
348 nodeinfo_p->id = node_id;
349 nodeinfo_p->num = node_count;
350 nodeinfo_p->last_serial = -1;
351 nodeinfo_p->modtime_ht = g_hash_table_new_full ( g_str_hash, g_str_equal, free, 0 );
352 nodeinfo_p->serial2queuedpacket_ht = g_hash_table_new_full ( g_direct_hash, g_direct_equal, 0, 0 );
353 node_ids[node_count] = node_id;
354 node_count++;
355#ifdef PARANOID
356
357 if ( node_status == NODESTATUS_OFFLINE )
358 break; // In case of NODESTATUS_DOESNTEXIST -> NODESTATUS_OFFLINE, node_online should be increased
359
360#endif
361
362 case NODESTATUS_OFFLINE:
363 nodeinfo_p->last_serial = -1;
364 node_online++;
365 break;
366
367 default:
368 if ( node_status == NODESTATUS_OFFLINE )
369 node_online--;
370
371 break;
372 }
373
374 nodeinfo[node_id].status = node_status;
375 return 0;
376}
377
378
379/**
380 * @brief Changes information about node's name and updates connected information.
381 *
382 * @param[in] node_id node_id of the node.
383 * @param[in] node_name New node name
384 *
385 * @retval zero Successful
386 * @retval non-zero If got error while changing the status. The error-code is placed into returned value.
387 *
388 */
389
390int node_update_name ( uint8_t node_id, const char *node_name )
391{
392 debug ( 3, "%i, \"%s\"", node_id, node_name );
393 nodeinfo_t *nodeinfo_p = &nodeinfo[node_id];
394 void *ret = g_hash_table_lookup ( indexes_p->nodenames_ht, node_name );
395
396 if ( ret != NULL ) {
397 int rc;
398 uint8_t node_old_id = GPOINTER_TO_INT ( ret );
399 nodeinfo_t *nodeinfo_old_p = &nodeinfo[node_old_id];
400 debug ( 5, "nodename \"%s\" had been used by node_id == %i", node_name, node_old_id );
401
402 if ( node_old_id == node_id ) {
403 debug ( 10, "node_old_id [%i] == node_id [%i]", node_old_id, node_id );
404 return 0;
405 }
406
407 {
408 debug ( 15, "Sending a DIE command to node_id (%i)", node_old_id );
409 clustercmd_t *clustercmd_p = CLUSTER_ALLOCA ( clustercmd_die_t, 0 );
410 clustercmd_p->h.cmd_id = CLUSTERCMDID_DIE;
411 clustercmd_p->h.dst_node_id = node_old_id;
412
413 if ( ( rc = cluster_send ( clustercmd_p ) ) )
414 return rc;
415 }
416
417 nodeinfo_old_p->node_name = NULL;
418 debug ( 3, "changing status of node_id == %i to NODESTATUS_OFFLINE", node_old_id );
419 node_update_status ( node_old_id, NODESTATUS_OFFLINE );
420 }
421
422 char *node_name_dup = strdup ( node_name );
423 nodeinfo_p->node_name = node_name_dup;
424 g_hash_table_replace ( indexes_p->nodenames_ht, node_name_dup, GINT_TO_POINTER ( node_id ) );
425 return 0;
426}
427
428/**
429 * @brief Sets message processing functions for cluster_recv_proc() function for specified command type
430 *
431 * @param[in] cmd_id The command type
432 * @param[in] procfunct The processing function for messages with specified cmd_id
433 *
434 * @retval zero Successful
435 * @retval non-zero If got error while setting processing function. The error-code is placed into returned value.
436 *
437 */
438
439static inline int cluster_recv_proc_set ( clustercmd_id_t cmd_id, cluster_recvproc_funct_t procfunct )
440{
441 recvproc_funct[cmd_id] = procfunct;
442 return 0;
443}
444
445
446/**
447 * @brief Safe wrapper for recvfrom() function
448 *
449 * @param[in] sock The socket descriptor
450 * @param[out] _buf_p Pointer to pointer to buffer
451 * @param[in] flags Flags...
452 *
453 * @retval zero Successful
454 * @retval non-zero If got error while read()-ing. The error-code is placed into returned value. "-1" means that message is too short.
455 *
456 */
457
458static inline int cluster_read ( int sock, clustercmd_t **cmd_pp, cluster_read_flags_t flags )
459{
460 static char buf[CLUSTER_PACKET_MAXSIZE];
461 static clustercmd_t *cmd_p = ( void * ) buf;
462 struct sockaddr_in sa_in;
463 size_t sa_in_len = sizeof ( sa_in );
464 *cmd_pp = cmd_p;
465 debug ( 20, "%i, %p, 0x%x", sock, buf, flags );
466 int readret = recvfrom ( sock, buf, CLUSTER_PACKET_MAXSIZE, MSG_WAITALL, ( struct sockaddr * ) &sa_in, ( socklen_t * restrict ) &sa_in_len );
467 debug ( 30, "recvfrom(%i, %p, 0x%x, %p, %p) -> %i", sock, buf, MSG_WAITALL, &sa_in, &sa_in_len, readret );
468#ifdef PARANOID
469
470 if ( !readret ) {
471 error ( "recvfrom() returned 0. This shouldn't happen. Exit." );
472 return EINVAL;
473 }
474
475#endif
476
477 if ( readret < 0 ) {
478 error ( "recvfrom() returned %i. "
479 "Seems, that something wrong with network socket.",
480 readret );
481 return errno != -1 ? errno : -2;
482 }
483
484 debug ( 2, "Got message from %s (len: %i).", inet_ntoa ( sa_in.sin_addr ), readret );
485
486 if ( ( unsigned int ) readret < sizeof ( clustercmdhdr_t ) ) {
487 // Too short message
488 error ( "Warning: cluster_read(): Got too short message from node (no header [or too short]). Ignoring it." );
489 return -1;
490 }
491
492 if ( ( unsigned int ) readret < CLUSTERCMD_SIZE ( cmd_p ) ) {
493 // Too short message
494 error ( "Warning: cluster_read(): Got too short message from node (no data [or too short]). Ignoring it." );
495 return -1;
496 }
497
498 // Incorrect size?
499 if ( readret & 0x3 ) {
500 error ( "Warning: cluster_recv(): Received packet of size %i (not a multiple of 4). Ignoring it.", readret );
501 return 0;
502 }
503
504 return 0;
505}
506
507
508/**
509 * @brief Sends packet-reject notification
510 *
511 * @param[in] clustercmd_p Pointer to clustercmd that will be rejected
512 * @param[in] reason Reason why the clustercmd is denied
513 *
514 * @retval zero Successful
515 * @retval non-zero If got error while read()-ing. The error-code is placed into returned value. "-1" means that message is too short.
516 *
517 */
518
519static inline int clustercmd_reject ( clustercmd_t *clustercmd_p, uint8_t reason )
520{
521 clustercmd_t *clustercmd_rej_p = CLUSTER_ALLOCA ( clustercmd_rej_t, 0 );
522 clustercmd_rej_p->h.dst_node_id = clustercmd_p->h.src_node_id;
523 clustercmd_rej_p->data.rej.serial = clustercmd_p->h.serial;
524 clustercmd_rej_p->data.rej.reason = reason;
525 return cluster_send ( clustercmd_rej_p );
526}
527
528
529#define CLUSTER_RECV_RETURNMESSAGE(clustercmd_p) {\
530 last_serial = (clustercmd_p)->h.serial;\
531 last_src_node_id = (clustercmd_p)->h.src_node_id;\
532 if(clustercmd_pp != NULL)\
533 *clustercmd_pp = (clustercmd_p);\
534 return 1;\
535 }
536/**
537 * @brief Receives message from another nodes of the cluster. (not thread-safe)
538 *
539 * @param[out] clustercmd_pp Pointer to command structure pointer. It will be re-allocated every time when size is not enough. Allocated space will be reused on next calling.
540 * @param[i] timeout Timeout (in milliseconds).
541 *
542 * @retval 1 If there's new message.
543 * @retval 0 If there's no new messages.
544 * @retval -1 If got error while receiving. The error-code is placed into "errno".
545 *
546 */
547
548static int _cluster_recv ( clustercmd_t **clustercmd_pp, struct timeval *timeout )
549{
550 clustercmd_t *clustercmd_p;
551 static uint8_t last_src_node_id = NODEID_NOID;
552 static uint32_t last_serial = 0;
553
554 // Checking if there message is waiting in the window
555 if ( last_src_node_id != NODEID_NOID ) {
556 nodeinfo_t *nodeinfo_p = &nodeinfo[last_src_node_id];
557
558 if ( nodeinfo_p->serial2queuedpacket_ht != NULL ) {
559 clustercmdqueuedpacket_t *clustercmdqueuedpacket_p = ( clustercmdqueuedpacket_t * )
560 g_hash_table_lookup ( nodeinfo_p->serial2queuedpacket_ht, GINT_TO_POINTER ( last_serial + 1 ) );
561
562 if ( clustercmdqueuedpacket_p != NULL )
563 CLUSTER_RECV_RETURNMESSAGE ( &clustercmdqueuedpacket_p->cmd );
564 }
565 }
566
567 // Checking if there any event on read socket
568 // select()
569 fd_set rfds;
570 FD_ZERO ( &rfds );
571 FD_SET ( sock_i, &rfds );
572 debug ( 3, "select() with timeout {%u, %u}", timeout->tv_sec, timeout->tv_usec );
573 int selret = select ( sock_i + 1, &rfds, NULL, NULL, timeout );
574
575 // processing select()'s retuned value
576 if ( selret < 0 ) {
577 error ( "got error while select()." );
578 return -1;
579 }
580
581 if ( selret == 0 ) {
582 debug ( 3, "no new messages." );
583 return 0;
584 }
585
586 debug ( 3, "got new message(s)." );
587 debug ( 10, "Reading new message's header" );
588 clustercmdadler32_t adler32;
589 //clustercmd_t *clustercmd_p = (clustercmd_t *)mmap(NULL, sizeof(clustercmdhdr_t), PROT_NONE,
590 // MAP_PRIVATE, sock, 0);
591 int ret;
592
593 if ( ( ret = cluster_read ( sock_i, clustercmd_pp, CLREAD_NONE ) ) ) {
594 if ( ret == -1 ) return 0; // Invalid message? Skipping.
595
596 error ( "Got error from cluster_read()." );
597 errno = ret;
598 return -1;
599 }
600
601 clustercmd_p = *clustercmd_pp;
602 debug ( 3, "Received: {h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u,"
603 " serial: %u, adler32: {0x%x, 0x%x}, data_len: %u}",
604 clustercmd_p->h.dst_node_id, clustercmd_p->h.src_node_id,
605 clustercmd_p->h.cmd_id, clustercmd_p->h.serial,
606 clustercmd_p->h.adler32.hdr, clustercmd_p->h.adler32.dat,
607 clustercmd_p->h.data_len );
608 // Checking adler32 of packet headers.
609 clustercmd_adler32_calc ( clustercmd_p, &adler32, ADLER32_CALC_HEADER );
610
611 if ( adler32.hdr != clustercmd_p->h.adler32.hdr ) {
612 debug ( 1, "hdr-adler32 mismatch: %p != %p.",
613 ( void* ) ( long ) clustercmd_p->h.adler32.hdr, ( void* ) ( long ) adler32.hdr );
614
615 if ( ( ret = clustercmd_reject ( clustercmd_p, REJ_ADLER32MISMATCH ) ) != EADDRNOTAVAIL ) {
616 error ( "Got error while clustercmd_reject()." );
617 errno = ret;
618 return -1;
619 }
620 }
621
622 // Checking src_node_id and dst_node_id
623 uint8_t src_node_id = clustercmd_p->h.src_node_id;
624 uint8_t dst_node_id = clustercmd_p->h.dst_node_id;
625
626 // Packet from registering node?
627 if ( src_node_id == NODEID_NOID ) {
628 // Wrong command from registering node?
629 if ( clustercmd_p->h.cmd_id != CLUSTERCMDID_HELLO ) {
630 error ( "Warning: cluster_recv(): Got non hello packet from NOID node. Ignoring the packet." );
631 return 0;
632 }
633
634 if ( clustercmd_p->h.serial != 0 ) {
635 error ( "Warning: cluster_recv(): Got packet with non-zero serial from NOID node. Ignoring the packet." );
636 return 0;
637 }
638 } else
639
640 // Wrong src_node_id?
641 if ( src_node_id >= MAXNODES ) {
642 error ( "Warning: cluster_recv(): Invalid h.src_node_id: %i >= "XTOSTR ( MAXNODES ) "",
643 src_node_id );
644 return -1;
645 }
646
647 // Is this broadcast message?
648 if ( dst_node_id == NODEID_NOID ) {
649 // CODE HERE
650 } else
651
652 // Wrong dst_node_id?
653 if ( dst_node_id >= MAXNODES ) {
654 error ( "Warning: cluster_recv(): Invalid h.dst_node_id: %i >= "XTOSTR ( MAXNODES ) "",
655 dst_node_id );
656 return -1;
657 }
658
659 // Seems, that headers are correct. Continuing.
660
661 // Paranoid routines
662 // The message from us? Something wrong if it is.
663 if ( ( clustercmd_p->h.src_node_id == node_id_my ) && ( node_id_my != NODEID_NOID ) )
664 critical ( "node_id collision" );
665
666 nodeinfo_t *nodeinfo_p = &nodeinfo[src_node_id];
667 // Not actual packet?
668 long serial_diff = clustercmd_p->h.serial - nodeinfo_p->last_serial;
669 debug ( 10, "serial_diff == %i", serial_diff );
670
671 if ( serial_diff <= 0 || serial_diff > CLUSTER_WINDOW_PCKTLIMIT ) {
672 debug ( 1, "Ignoring packet (serial %i) from %i due to serial_diff: 0 <= || > %i",
673 clustercmd_p->h.serial, src_node_id, serial_diff, CLUSTER_WINDOW_PCKTLIMIT );
674 return -1;
675 }
676
677 // Is this misordered packet?
678 if ( clustercmd_p->h.serial != nodeinfo_p->last_serial + 1 ) {
679 clustercmd_window_add ( &window_i, clustercmd_p, nodeinfo_p->serial2queuedpacket_ht );
680 return -1;
681 }
682
683 // Is this the end of packet (packet without data)
684 if ( clustercmd_p->h.data_len == 0 )
685 CLUSTER_RECV_RETURNMESSAGE ( clustercmd_p );
686
687 // Too big data?
688 if ( clustercmd_p->h.data_len > CLUSTER_PACKET_MAXSIZE ) {
689 error ( "Warning: cluster_recv(): Got too big message from node %i. Ignoring it.",
690 src_node_id );
691 return -1;
692 }
693
694 /*
695 // Need more space for this packet?
696 if (CLUSTERCMD_SIZE(clustercmd_p) > size) {
697 size = CLUSTERCMD_SIZE(clustercmd_p);
698 clustercmd_p = (clustercmd_t *)xrealloc((char *)clustercmd_p, size);
699 }
700
701 debug(10, "Reading the data");
702 if ((ret=cluster_read(sock_i, (void *)clustercmd_p->data.p, CLUSTER_PAD(clustercmd_p->h.data_len), CLREAD_CONTINUE))) {
703 if (ret == -1) return 0;
704
705 error("Got error from cluster_read().");
706 errno = ret;
707 return -1;
708 }
709 */
710 // Checking adler32 of packet data.
711 clustercmd_adler32_calc ( clustercmd_p, &adler32, ADLER32_CALC_DATA );
712
713 if ( adler32.dat != clustercmd_p->h.adler32.dat ) {
714 debug ( 1, "dat-adler32 mismatch: %p != %p.",
715 ( void* ) ( long ) clustercmd_p->h.adler32.dat, ( void* ) ( long ) adler32.dat );
716
717 if ( ( ret = clustercmd_reject ( clustercmd_p, REJ_ADLER32MISMATCH ) ) != EADDRNOTAVAIL ) {
718 error ( "Got error while clustercmd_reject()." );
719 errno = ret;
720 return -1;
721 }
722 }
723
724 CLUSTER_RECV_RETURNMESSAGE ( clustercmd_p );
725}
726
727static inline int cluster_recv ( clustercmd_t **clustercmd_pp, struct timeval *timeout )
728{
729 int rc;
730 rc = _cluster_recv ( clustercmd_pp, timeout );
731 debug ( 10, "__________________recv___________________ %i", rc );
732 return rc;
733}
734
735
736/**
737 * @brief (hsyncop) Reads messages for time "_timeout" and proceeding them to recvproc_funct[] functions
738 *
739 * @param[in] _timeout How long to wait messages (totally)
740 *
741 * @retval zero Successful
742 * @retval non-zero If got error while reading or processing messages. The error-code is placed into returned value.
743 *
744 */
745
746int cluster_recv_proc ( unsigned int timeout_rel_int )
747{
748 struct timeval timeout_rel, timeout_abs, tv_abs;
749 int error_count, iteration;
750 debug ( 3, "cluster_recv_proc(%i)", timeout_rel_int );
751 clustercmd_t *clustercmd_p = NULL;
752 int ret;
753 timeout_rel.tv_sec = timeout_rel_int / 1000;
754 timeout_rel.tv_usec = timeout_rel_int % 1000;
755 gettimeofday ( &tv_abs, NULL );
756 timeradd ( &tv_abs, &timeout_rel, &timeout_abs );
757 error_count = 0;
758 iteration = 0;
759
760 while ( 1 ) {
761 if ( iteration++ ) {
762 gettimeofday ( &tv_abs, NULL );
763
764 if ( timercmp ( &timeout_abs, &tv_abs, > ) )
765 timersub ( &timeout_abs, &tv_abs, &timeout_rel );
766 else
767 memset ( &timeout_rel, 0, sizeof ( timeout_rel ) );
768
769 debug ( 5, "timeout_abs == {%u, %u}; tv_abs == {%u, %u}; timeout_rel == {%u, %u}",
770 timeout_abs.tv_sec, timeout_abs.tv_usec,
771 tv_abs.tv_sec, tv_abs.tv_usec,
772 timeout_rel.tv_sec, timeout_rel.tv_usec
773 );
774 }
775
776 ret = cluster_recv ( &clustercmd_p, &timeout_rel );
777
778 if ( !ret )
779 break;
780
781 // Exit if error
782 if ( ret == -1 ) {
783 warning ( "Got error while cluster_recv(). error_count == %i", error_count );
784 error_count++;
785 critical_on ( error_count >= CLUSTER_RECV_PROC_ERRLIMIT );
786 }
787
788 // If we have appropriate callback function, then call it! :)
789 if ( recvproc_funct[clustercmd_p->h.cmd_id] != NULL ) {
790 debug ( 2, "Calling function by pointer %p", recvproc_funct[clustercmd_p->h.cmd_id] );
791
792 if ( ( ret = recvproc_funct[clustercmd_p->h.cmd_id] ( clustercmd_p ) ) ) {
793 error ( "Got error from recvproc_funct[%i]: %s (%i)",
794 clustercmd_p->h.cmd_id );
795 return ret;
796 }
797
798 continue;
799 }
800
801 // We didn't found an appropriate callback function
802 debug ( 2, "There's no appropriate callback function for cmd_id == %i", clustercmd_p->h.cmd_id );
803 }
804
805 return 0;
806}
807
808
809/**
810 * @brief recvproc-function for DIE-messages
811 *
812 * @param[in] clustercmd_p Pointer to clustercmd
813 *
814 * @retval zero Successfully initialized. (never happens)
815 * @retval non-zero Got error, while initializing.
816 *
817 */
818
819static int cluster_recvproc_die ( clustercmd_t *clustercmd_p )
820{
821 critical ( "Got DIE message from node_id == %i,", clustercmd_p->h.src_node_id );
822 return 0;
823}
824
825
826/**
827 * @brief recvproc-function for ACK-messages
828 *
829 * @param[in] clustercmd_p Pointer to clustercmd
830 *
831 * @retval zero Successfully initialized.
832 * @retval non-zero Got error, while initializing.
833 *
834 */
835
836static int cluster_recvproc_ack ( clustercmd_t *clustercmd_p )
837{
838 uint32_t cmd_serial_ack = clustercmd_p->data.ack.serial;
839 clustercmdqueuedpacket_t *queuedpacket_p =
840 ( clustercmdqueuedpacket_t * ) g_hash_table_lookup ( nodeinfo_my->serial2queuedpacket_ht, GINT_TO_POINTER ( cmd_serial_ack ) );
841
842 if ( queuedpacket_p == NULL )
843 return 0;
844
845 uint8_t node_id_from = clustercmd_p->h.src_node_id;
846
847 if ( ! queuedpacket_p->h.w.o.ack_from[node_id_from] ) {
848 queuedpacket_p->h.w.o.ack_count++;
849 queuedpacket_p->h.w.o.ack_from[node_id_from]++;
850
851 if ( queuedpacket_p->h.w.o.ack_count == node_count - 1 )
852 clustercmd_window_del ( &window_o, queuedpacket_p, nodeinfo_my->serial2queuedpacket_ht );
853 }
854
855 return 0;
856}
857
858
859/**
860 * @brief Sets message processing functions for cluster_recv_proc() function for specified command type
861 *
862 * @param[in] cmd_id The command type
863 * @param[in] procfunct The processing function for messages with specified cmd_id
864 *
865 * @retval zero Successful
866 * @retval non-zero If got error while setting processing function. The error-code is placed into returned value.
867 *
868 */
869
870int cluster_io_init()
871{
872 cluster_recv_proc_set ( CLUSTERCMDID_ACK, cluster_recvproc_ack );
873 cluster_recv_proc_set ( CLUSTERCMDID_DIE, cluster_recvproc_die );
874 return 0;
875}
876
877
878/**
879 * @brief Antagonist of cluster_recv_proc_init() function. Freeing everything what was allocated in cluster_recv_proc_init()
880 *
881 * @retval zero Successfully initialized
882 * @retval non-zero Got error, while initializing
883 *
884 */
885
886int cluster_io_deinit()
887{
888 if ( window_i.buf_size ) {
889#ifdef PARANOID
890
891 if ( window_i.buf == NULL ) {
892 error ( "window_i.buf_size != 0, but window_i.buf == NULL." );
893 } else
894#endif
895 free ( window_i.buf );
896 }
897
898 if ( window_o.buf_size ) {
899#ifdef PARANOID
900
901 if ( window_o.buf == NULL ) {
902 error ( "window_o.buf_size != 0, but window_o.buf == NULL." );
903 } else
904#endif
905 free ( window_o.buf );
906 }
907
908 return 0;
909}
910
911
912/**
913 * @brief recvproc-function for welcome-messages
914 *
915 * @param[in] clustercmd_p Pointer to clustercmd
916 *
917 * @retval zero Successfully initialized.
918 * @retval non-zero Got error, while initializing.
919 *
920 */
921
922static int cluster_recvproc_welcome ( clustercmd_t *clustercmd_p )
923{
924 debug ( 20, "%p", clustercmd_p );
925// static time_t updatets = 0;
926 clustercmd_welcome_t *data_welcome_p = &clustercmd_p->data.welcome;
927 /*
928 // Is this the most recent information? Skipping if not.
929 if (!(data_welcome_p->updatets > updatets))
930 return 0;
931 */
932 // Is the node name length in message equals to our node name length? Skipping if not.
933 uint32_t recv_nodename_len;
934 recv_nodename_len = welcome_to_node_name_len ( clustercmd_p );
935
936 if ( recv_nodename_len != ctx_p->cluster_nodename_len ) {
937 debug ( 9, "recv_nodename_len [%i] != ctx_p->cluster_nodename_len [%i]", recv_nodename_len, ctx_p->cluster_nodename_len );
938 return 0;
939 }
940
941 // Is the node name equals to ours? Skipping if not.
942 if ( memcmp ( welcome_to_node_name ( data_welcome_p ), ctx_p->cluster_nodename, recv_nodename_len ) ) {
943 debug ( 9, "to_node_name != ctx_p->cluster_nodename" );
944 return 0;
945 }
946
947 // Remembering the node that answered us
948 node_update_status ( clustercmd_p->h.src_node_id, NODESTATUS_SEEMSONLINE );
949 {
950 char *node_name;
951 node_name = alloca ( data_welcome_p->from_node_name_len );
952 memcpy ( node_name, data_welcome_p->from_node_name, data_welcome_p->from_node_name_len );
953 node_update_name ( clustercmd_p->h.src_node_id, node_name );
954 }
955 // Seems, that somebody knows our node id, remembering it.
956 node_id_my = clustercmd_p->h.dst_node_id;
957// updatets = data_welcome_p->updatets;
958 return 0;
959}
960
961
962/**
963 * @brief recvproc-function for hello-messages
964 *
965 * @param[in] clustercmd_p Pointer to clustercmd
966 *
967 * @retval zero Successfully initialized.
968 * @retval non-zero Got error, while initializing.
969 *
970 */
971
972static int cluster_recvproc_hello ( clustercmd_t *clustercmd_p )
973{
974 clustercmd_hello_t *data_hello_p = &clustercmd_p->data.hello;
975 // Sending information to the new node
976 {
977 int ret;
978 uint8_t node_id;
979 debug ( 15, "Preparing a welcome message for nodename \"%s\" from nodename == \"%s\"", data_hello_p->node_name, ctx_p->cluster_nodename );
980 size_t data_len = sizeof ( clustercmd_welcome_t ) + clustercmd_p->h.data_len + ctx_p->cluster_nodename_len;
981 clustercmd_t *answer_p = CLUSTER_ALLOCA ( void, data_len );
982 clustercmd_welcome_t *answer_data_p = &answer_p->data.welcome;
983 answer_data_p->from_node_name_len = ctx_p->cluster_nodename_len;
984 memcpy ( answer_data_p->from_node_name, ctx_p->cluster_nodename, ctx_p->cluster_nodename_len );
985 memcpy ( welcome_to_node_name ( answer_data_p ), data_hello_p->node_name, clustercmd_p->h.data_len );
986 {
987 void *ptr;
988 char *node_name = alloca ( clustercmd_p->h.data_len + 1 );
989 memcpy ( node_name, data_hello_p->node_name, clustercmd_p->h.data_len + 1 );
990 ptr = g_hash_table_lookup ( indexes_p->nodenames_ht, node_name );
991 node_id = ( ptr == NULL ? NODEID_NOID : GPOINTER_TO_INT ( ptr ) );
992 debug ( 3, "\"%s\" -> %i", node_name, node_id );
993 }
994 answer_p->h.data_len = data_len;
995 answer_p->h.cmd_id = CLUSTERCMDID_WELCOME;
996 answer_p->h.dst_node_id = node_id; // broadcast
997
998 if ( ( ret = cluster_send ( answer_p ) ) )
999 return ret;
1000 }
1001 return 0;
1002}
1003
1004
1005/**
1006 * @brief recvproc-function for register-messages
1007 *
1008 * @param[in] clustercmd_p Pointer to clustercmd
1009 *
1010 * @retval zero Successfully initialized.
1011 * @retval non-zero Got error, while initializing.
1012 *
1013 */
1014
1015static int cluster_recvproc_register ( clustercmd_t *clustercmd_p )
1016{
1017 clustercmd_reg_t *data_register_p = &clustercmd_p->data.reg;
1018 char *node_name = alloca ( clustercmd_p->h.data_len + 1 );
1019 memcpy ( node_name, data_register_p->node_name, clustercmd_p->h.data_len + 1 );
1020 node_update_name ( clustercmd_p->h.src_node_id, node_name );
1021 return 0;
1022}
1023
1024extern int cluster_loop();
1025/**
1026 * @brief Initializes cluster subsystem.
1027 *
1028 * @param[in] _ctx_p Pointer to "glob" variable, defined in main().
1029 * @param[in] _indexes_p Pointer to "indexes" variable, defined in sync_run().
1030 *
1031 * @retval zero Successfully initialized.
1032 * @retval non-zero Got error, while initializing.
1033 *
1034 */
1035
1036int cluster_init ( ctx_t *_ctx_p, indexes_t *_indexes_p )
1037{
1038 int ret;
1039
1040 // Preventing double initializing
1041 if ( ctx_p != NULL ) {
1042 error ( "cluster subsystem is already initialized." );
1043 return EALREADY;
1044 }
1045
1046 // Initializing global variables, pt. 1
1047 ctx_p = _ctx_p;
1048 indexes_p = _indexes_p;
1049 cluster_timeout = ctx_p->cluster_timeout;
1050 indexes_p->nodenames_ht = g_hash_table_new_full ( g_str_hash, g_str_equal, free, 0 );
1051 node_update_status ( NODEID_NOID, NODESTATUS_ONLINE );
1052 {
1053 int i = 0;
1054
1055 while ( i < MAXNODES ) {
1056 nodeinfo[i].last_serial = -1;
1057 i++;
1058 }
1059 }
1060 // Initializing network routines
1061 // Input socket
1062 // Creating socket
1063 sock_i = socket ( AF_INET, SOCK_DGRAM, 0 );
1064
1065 if ( sock_i < 0 ) {
1066 error ( "Cannot create socket for input traffic" );
1067 return errno;
1068 }
1069
1070 // Enable SO_REUSEADDR to allow multiple instances of this application to receive copies
1071 // of the multicast datagrams.
1072 int reuse = 1;
1073
1074 if ( setsockopt ( sock_i, SOL_SOCKET, SO_REUSEADDR, ( char * ) &reuse, sizeof ( reuse ) ) < 0 ) {
1075 error ( "Got error while setsockopt()" );
1076 return errno;
1077 }
1078
1079 // Binding
1080 sa_i.sin_family = AF_INET;
1081 sa_i.sin_port = htons ( ctx_p->cluster_mcastipport );
1082 sa_i.sin_addr.s_addr = INADDR_ANY;
1083
1084 if ( bind ( sock_i, ( struct sockaddr* ) &sa_i, sizeof ( sa_i ) ) ) {
1085 error ( "Got error while bind()" );
1086 return errno;
1087 }
1088
1089 // Joining to multicast group
1090 struct ip_mreq group;
1091 group.imr_interface.s_addr = inet_addr ( ctx_p->cluster_iface );
1092 group.imr_multiaddr.s_addr = inet_addr ( ctx_p->cluster_mcastipaddr );
1093
1094 if ( setsockopt ( sock_i, IPPROTO_IP, IP_ADD_MEMBERSHIP,
1095 ( char * ) &group, sizeof ( group ) ) < 0 ) {
1096 error ( "Cannot setsockopt() to enter to membership %s -> %s",
1097 ctx_p->cluster_iface, ctx_p->cluster_mcastipaddr );
1098 return errno;
1099 }
1100
1101 // Output socket
1102 // Creating socket
1103 sock_o = socket ( AF_INET, SOCK_DGRAM, 0 );
1104
1105 if ( sock_o < 0 ) {
1106 error ( "Cannot create socket for output traffic" );
1107 return errno;
1108 }
1109
1110 // Initializing the group sockaddr structure
1111 sa_o.sin_family = AF_INET;
1112 sa_o.sin_port = htons ( ctx_p->cluster_mcastipport );
1113 sa_o.sin_addr.s_addr = inet_addr ( ctx_p->cluster_mcastipaddr );
1114 // Disable looping back output datagrams
1115 {
1116 char loopch = 0;
1117
1118 if ( setsockopt ( sock_o, IPPROTO_IP, IP_MULTICAST_LOOP, ( char * ) &loopch, sizeof ( loopch ) ) < 0 ) {
1119 error ( "Cannot disable loopback for output socket." );
1120 return errno;
1121 }
1122 }
1123 // Setting local interface for output traffic
1124 {
1125 struct in_addr addr_o;
1126 addr_o.s_addr = inet_addr ( ctx_p->cluster_iface );
1127
1128 if ( setsockopt ( sock_o, IPPROTO_IP, IP_MULTICAST_IF, &addr_o, sizeof ( addr_o ) ) < 0 ) {
1129 error ( "Cannot set local interface for outbound traffic" );
1130 return errno;
1131 }
1132 }
1133 // Initializing another routines
1134 cluster_io_init();
1135 // Getting my ID in the cluster
1136 // Trying to preserve my node_id after restart. :)
1137 // Asking another nodes about my previous node_id
1138 {
1139 debug ( 15, "Preparing a message with my nodename == \"%s\" (%i)", ctx_p->cluster_nodename, ctx_p->cluster_nodename_len );
1140 clustercmd_t *clustercmd_p = CLUSTER_ALLOCA ( clustercmd_hello_t, ctx_p->cluster_nodename_len );
1141 clustercmd_p->h.data_len = ctx_p->cluster_nodename_len;
1142 memcpy ( clustercmd_p->data.hello.node_name, ctx_p->cluster_nodename, clustercmd_p->h.data_len + 1 );
1143 clustercmd_p->h.cmd_id = CLUSTERCMDID_HELLO;
1144 clustercmd_p->h.dst_node_id = NODEID_NOID; // broadcast
1145
1146 if ( ( ret = cluster_send ( clustercmd_p ) ) )
1147 return ret;
1148 }
1149 // Processing answers
1150 cluster_recv_proc_set ( CLUSTERCMDID_WELCOME, cluster_recvproc_welcome );
1151
1152 if ( ( ret = cluster_recv_proc ( cluster_timeout ) ) )
1153 return ret;
1154
1155 // Ignore next welcome messages
1156 cluster_recv_proc_set ( CLUSTERCMDID_WELCOME, NULL );
1157 debug ( 3, "After communicating with others, my node_id is %i.", node_id_my );
1158
1159 // Getting free node_id if nobody said us the certain value (see above).
1160 if ( node_id_my == NODEID_NOID ) {
1161 int i = 0;
1162
1163 while ( i < MAXNODES ) {
1164 if ( nodeinfo[i].status == NODESTATUS_DOESNTEXIST ) {
1165 node_id_my = i;
1166 break;
1167 }
1168
1169 i++;
1170 }
1171
1172 debug ( 3, "I was have to set my node_id to %i.", node_id_my );
1173 }
1174
1175 // If there's no free id-s, then exit :(
1176 if ( node_id_my == NODEID_NOID ) {
1177 error ( "Cannot find free node ID. Seems, that all %i ID-s are already occupied." );
1178 return ENOMEM;
1179 }
1180
1181 // Initializing global variables, pt. 2
1182 nodeinfo_my = &nodeinfo[node_id_my];
1183 // Registering in the cluster
1184 // Sending registration information
1185 node_update_status ( node_id_my, NODESTATUS_SEEMSONLINE );
1186 {
1187 clustercmd_t *clustercmd_p = CLUSTER_ALLOCA ( clustercmd_reg_t, ctx_p->cluster_nodename_len );
1188 clustercmd_reg_t *data_reg_p = &clustercmd_p->data.reg;
1189 memcpy ( data_reg_p->node_name, ctx_p->cluster_nodename, ctx_p->cluster_nodename_len + 1 );
1190 clustercmd_p->h.data_len = ctx_p->cluster_nodename_len + 1;
1191 clustercmd_p->h.cmd_id = CLUSTERCMDID_REG;
1192 clustercmd_p->h.dst_node_id = NODEID_NOID; // broadcast
1193
1194 if ( ( ret = cluster_send ( clustercmd_p ) ) )
1195 return ret;
1196 }
1197 // Setting process functions
1198 cluster_recv_proc_set ( CLUSTERCMDID_HELLO, cluster_recvproc_hello );
1199 cluster_recv_proc_set ( CLUSTERCMDID_REG, cluster_recvproc_register );
1200
1201 // Getting answers
1202 if ( ( ret = cluster_recv_proc ( cluster_timeout ) ) )
1203 return ret;
1204
1205 node_update_status ( node_id_my, NODESTATUS_ONLINE );
1206 // Running thread, that will process background communicating routines with another nodes.
1207 // The process is based on function cluster_loop() [let's use shorthand "cluster_loop()-thread"]
1208 ret = pthread_create ( &pthread_cluster, NULL, ( void * ( * ) ( void * ) ) cluster_loop, NULL );
1209 return ret;
1210}
1211
1212/**
1213 * @brief (syncop) Sends signal to cluster_loop()-thread
1214 *
1215 * @param[in] signal Signal number
1216 *
1217 * @retval zero Successfully send the signal
1218 * @retval non-zero Got error, while sending the signal
1219 *
1220 */
1221
1222static inline int cluster_signal ( int signal )
1223{
1224 if ( pthread_cluster )
1225 return pthread_kill ( pthread_cluster, signal );
1226
1227 return 0;
1228}
1229
1230
1231extern int cluster_modtime_exchange_cleanup();
1232/**
1233 * @brief Antagonist of cluster_init() function. Kills cluster_loop()-thread and cleaning up
1234 *
1235 * @retval zero Successfully initialized
1236 * @retval non-zero Got error, while initializing
1237 *
1238 */
1239
1240int cluster_deinit()
1241{
1242 int ret = 0;
1243 cluster_signal ( SIGTERM );
1244 ret = pthread_join ( pthread_cluster, NULL );
1245 cluster_io_deinit();
1246 node_update_status ( NODEID_NOID, NODESTATUS_DOESNTEXIST );
1247#ifdef VERYPARANOID
1248 int i = 0;
1249#endif
1250
1251 while ( node_count ) {
1252#ifdef VERYPARANOID
1253
1254 if ( i++ > MAXNODES ) {
1255 error ( "cluster_deinit() looped. Forcing break." );
1256 break;
1257 }
1258
1259#endif
1260 node_update_status ( 0, NODESTATUS_DOESNTEXIST );
1261 }
1262
1263 close ( sock_i );
1264 close ( sock_o );
1265#ifdef VERYPARANOID
1266 memset ( nodeinfo, 0, sizeof ( nodeinfo_t ) * NODES_ALLOC );
1267 nodeinfo_my = NULL;
1268 node_count = 0;
1269 node_online = 0;
1270 node_id_my = NODEID_NOID;
1271 memset ( &sa_i, 0, sizeof ( sa_i ) );
1272 memset ( &sa_o, 0, sizeof ( sa_o ) );
1273#endif
1274 cluster_modtime_exchange_cleanup();
1275 g_hash_table_destroy ( indexes_p->nodenames_ht );
1276 return ret;
1277}
1278
1279
1280/**
1281 * @brief (syncop) Forces anothes nodes to ignore events about the file or directory
1282 *
1283 * @param[in] fpath Path to the file or directory
1284 *
1285 * @retval zero Successfully initialized
1286 * @retval non-zero Got error, while initializing
1287 *
1288 */
1289
1290int cluster_lock ( const char *fpath )
1291{
1292 ( void ) fpath;
1293 return 0;
1294}
1295
1296
1297/**
1298 * @brief (syncop) Forces anothes nodes to ignore events about all files and directories listed in queues of "indexes_p"
1299 *
1300 * @retval zero Successfully initialized
1301 * @retval non-zero Got error, while initializing
1302 *
1303 */
1304
1305int cluster_lock_byindexes()
1306{
1307 return 0;
1308}
1309
1310
1311/**
1312 * @brief (syncop) Returns events-handling on another nodes about all files and directories, locked by cluster_lock() and cluster_lock_byindexes() from this node
1313 *
1314 * @retval zero Successfully initialized
1315 * @retval non-zero Got error, while initializing
1316 *
1317 */
1318
1319int cluster_unlock_all()
1320{
1321 return 0;
1322}
1323
1324
1325#define CLUSTER_LOOP_CHECK(a) {\
1326 int ret = a;\
1327 if(ret) {\
1328 sync_term(ret);\
1329 return ret;\
1330 }\
1331 }
1332
1333/**
1334 * @brief Processes background communicating routines with another nodes. cluster_init() function create a thread for this function.
1335 *
1336 * @retval zero Successfully initialized
1337 * @retval non-zero Got error, while initializing
1338 *
1339 */
1340
1341int cluster_loop()
1342{
1343 int ret = 0;
1344 sigset_t sigset_cluster;
1345 // Ignoring SIGINT signal
1346 sigemptyset ( &sigset_cluster );
1347 sigaddset ( &sigset_cluster, SIGINT );
1348 CLUSTER_LOOP_CHECK ( pthread_sigmask ( SIG_BLOCK, &sigset_cluster, NULL ) );
1349 // Don't ignoring SIGTERM signal
1350 sigemptyset ( &sigset_cluster );
1351 sigaddset ( &sigset_cluster, SIGTERM );
1352 CLUSTER_LOOP_CHECK ( pthread_sigmask ( SIG_UNBLOCK, &sigset_cluster, NULL ) );
1353 // Starting the loop
1354 debug ( 3, "cluster_loop() started." );
1355
1356 while ( 1 ) {
1357 int _ret;
1358 // Waiting for event
1359 fd_set rfds;
1360 FD_ZERO ( &rfds );
1361 FD_SET ( sock_i, &rfds );
1362 debug ( 3, "select()" );
1363 _ret = select ( sock_i + 1, &rfds, NULL, NULL, NULL );
1364
1365 // Exit if error
1366 if ( ( _ret == -1 ) && ( errno != EINTR ) ) {
1367 ret = errno;
1368 sync_term ( ret );
1369 break;
1370 }
1371
1372 // Breaking the loop, if there's SIGTERM signal for this thread
1373 debug ( 3, "sigpending()" );
1374
1375 if ( sigpending ( &sigset_cluster ) )
1376 if ( sigismember ( &sigset_cluster, SIGTERM ) )
1377 break;
1378
1379 // Processing new messages
1380 debug ( 3, "cluster_recv_proc()" );
1381
1382 if ( ( ret = cluster_recv_proc ( 0 ) ) ) {
1383 sync_term ( ret );
1384 break;
1385 }
1386 }
1387
1388 debug ( 3, "cluster_loop() finished with exitcode %i.", ret );
1389 return ret;
1390#ifdef DOXYGEN
1391 sync_term ( 0 );
1392#endif
1393}
1394
1395
1396/**
1397 * @brief Updating information about modification time of a directory.
1398 *
1399 * @param[in] path Canonized path to updated file/dir
1400 * @param[in] dirlevel Directory level provided by fts (man 3 fts)
1401 * @param[in] st_mode st_mode value to detect is it directory or not (S_IFDIR or not)
1402 *
1403 * @retval zero Successfully initialized
1404 * @retval non-zero Got error, while initializing
1405 *
1406 */
1407
1408int cluster_modtime_update ( const char *path, short int dirlevel, mode_t st_mode )
1409{
1410 // "modtime" is incorrent name-part of function. Actually it updates "change time" (man 2 lstat64).
1411 int ret;
1412 // Getting relative directory level (depth)
1413 short int dirlevel_rel = dirlevel - ctx_p->watchdir_dirlevel;
1414
1415 if ( ( st_mode & S_IFMT ) == S_IFDIR )
1416 dirlevel_rel++;
1417
1418 // Don't remembering information about directories with level beyond the limits
1419 if ( ( dirlevel_rel > ctx_p->cluster_scan_dl_max ) || ( dirlevel_rel < ctx_p->cluster_hash_dl_min ) )
1420 return 0;
1421
1422 // Getting directory/file-'s information (including "change time" aka "st_ctime")
1423 stat64_t stat64;
1424 ret = lstat64 ( path, &stat64 );
1425
1426 if ( ret ) {
1427 error ( "Cannot lstat64()", path );
1428 return errno;
1429 }
1430
1431 // Getting absolute directory path
1432 const char *dirpath;
1433
1434 if ( ( st_mode & S_IFMT ) == S_IFDIR ) {
1435 dirpath = path;
1436 } else {
1437 char *path_dup = strdup ( path );
1438 dirpath = ( const char * ) dirname ( path_dup );
1439 free ( path_dup );
1440 }
1441
1442 // Getting relative directory path
1443 // Initializing
1444 size_t dirpath_len = strlen ( dirpath );
1445 char *dirpath_rel_p = xmalloc ( dirpath_len + 1 );
1446 char *dirpath_rel = dirpath_rel_p;
1447 const char *dirpath_rel_full = &dirpath[ctx_p->watchdirlen];
1448 size_t dirpath_rel_full_len = dirpath_len - ctx_p->watchdirlen;
1449 // Getting coodinate of the end (directory path is already canonized, so we can simply count number of slashes to get directory level)
1450 int slashcount = 0;
1451 size_t dirpath_rel_end = 0;
1452
1453 while ( dirpath_rel_full[dirpath_rel_end] && ( dirpath_rel_end < dirpath_rel_full_len ) ) {
1454 if ( dirpath_rel_full[dirpath_rel_end] == '/' ) {
1455 slashcount++;
1456
1457 if ( slashcount >= ctx_p->cluster_hash_dl_max )
1458 break;
1459 }
1460
1461 dirpath_rel_end++;
1462 }
1463
1464 // Copy the required part of path to dirpath_rel
1465 memcpy ( dirpath_rel, dirpath_rel_full, dirpath_rel_end );
1466 // Updating "st_ctime" information. We should check current value for this directory and update it only if it less or not set.
1467 // Checking current value
1468 char toupdate = 0;
1469 gpointer ctime_gp = g_hash_table_lookup ( nodeinfo_my->modtime_ht, dirpath_rel );
1470
1471 if ( ctime_gp == NULL )
1472 toupdate++;
1473 else if ( GPOINTER_TO_INT ( ctime_gp ) < stat64.st_ctime )
1474 toupdate++;
1475
1476 // g_hash_table_replace() will replace existent information about the directory or create it if it doesn't exist.
1477 if ( toupdate )
1478 g_hash_table_replace ( nodeinfo_my->modtime_ht, strdup ( dirpath_rel ), GINT_TO_POINTER ( stat64.st_ctime ) );
1479
1480 // Why I'm using "st_ctime" instead of "st_mtime"? Because "st_ctime" also updates on updating inode information.
1481 return 0;
1482}
1483
1484
1485/**
1486 * @brief Puts entry to list to be send to other nodes. To be called from cluster_modtime_exchange()
1487 *
1488 * @param[in] pushrentry_arg_p Pointer to pushentry_arg structure
1489 *
1490 */
1491
1492void cluster_modtime_exchange_pushentry ( gpointer dir_gp, gpointer modtype_gp, void *pushentry_arg_gp )
1493{
1494 struct pushdoubleentry_arg *pushentry_arg_p = ( struct pushdoubleentry_arg * ) pushentry_arg_gp;
1495 char *dir = ( char * ) dir_gp;
1496 time_t ctime = ( time_t ) GPOINTER_TO_INT ( modtype_gp );
1497 size_t size = strlen ( dir ) + 1; // TODO: strlen should be already prepared
1498 // but not re-calculated here
1499
1500 if ( pushentry_arg_p->allocated <= pushentry_arg_p->total ) {
1501 pushentry_arg_p->allocated += ALLOC_PORTION;
1502 pushentry_arg_p->entry = ( struct doubleentry * )
1503 xrealloc (
1504 ( char * ) pushentry_arg_p->entry,
1505 pushentry_arg_p->allocated * sizeof ( *pushentry_arg_p->entry )
1506 );
1507 }
1508
1509 pushentry_arg_p->entry[pushentry_arg_p->total].dat0 = dir;
1510 pushentry_arg_p->entry[pushentry_arg_p->total].size0 = size;
1511 pushentry_arg_p->entry[pushentry_arg_p->total].dat1 = ( void * ) ctime; // Will be problems if sizeof(time_t) > sizeof(void *)
1512 pushentry_arg_p->entry[pushentry_arg_p->total].size1 = sizeof ( ctime );
1513 pushentry_arg_p->size += size;
1514 pushentry_arg_p->total++;
1515 return;
1516}
1517
1518
1519static struct pushdoubleentry_arg cluster_modtime_exchange_pushentry_arg = {0};
1520/**
1521 * @brief Clean up after the last run of cluster_modtime_exchange.
1522 *
1523 * @retval zero Successfully initialized
1524 * @retval non-zero Got error, while initializing
1525 *
1526 */
1527
1528int cluster_modtime_exchange_cleanup()
1529{
1530 struct pushdoubleentry_arg *pushentry_arg_p = &cluster_modtime_exchange_pushentry_arg;
1531 int i = 0;
1532
1533 while ( i < pushentry_arg_p->allocated ) {
1534 if ( pushentry_arg_p->entry[i].alloc0 )
1535 free ( pushentry_arg_p->entry[i].dat0 );
1536
1537 if ( pushentry_arg_p->entry[i].alloc1 )
1538 free ( pushentry_arg_p->entry[i].dat1 );
1539
1540 i++;
1541 }
1542
1543 free ( pushentry_arg_p->entry );
1544#ifdef VERYPARANOID
1545 memset ( pushentry_arg_p, 0, sizeof ( *pushentry_arg_p ) );
1546#endif
1547 return 0;
1548}
1549
1550
1551/**
1552 * @brief Exchanging with "modtime_ht"-s to be able to compare them.
1553 *
1554 * @retval zero Successfully initialized
1555 * @retval non-zero Got error, while initializing
1556 *
1557 */
1558
1559int cluster_modtime_exchange()
1560{
1561 struct pushdoubleentry_arg *pushentry_arg_p = &cluster_modtime_exchange_pushentry_arg;
1562 // Getting hash table entries
1563 pushentry_arg_p->size = 0;
1564 pushentry_arg_p->total = 0;
1565 g_hash_table_foreach ( nodeinfo_my->modtime_ht, cluster_modtime_exchange_pushentry, ( void * ) pushentry_arg_p );
1566
1567 if ( !pushentry_arg_p->total ) {
1568 // !!!
1569 }
1570
1571 // Calculating required RAM to compile clustercmd
1572 size_t toalloc = 0;
1573 int i = 0;
1574
1575 while ( i < pushentry_arg_p->total ) {
1576 toalloc += 4; // for size header
1577 toalloc += pushentry_arg_p->entry[i].size0; // for path
1578 toalloc += pushentry_arg_p->entry[i].size1; // for ctime
1579 }
1580
1581 // Allocating space for the clustercmd
1582 clustercmd_t *clustercmd_p = ( clustercmd_t * ) xmalloc ( sizeof ( clustercmdhdr_t ) + toalloc );
1583 memset ( clustercmd_p, 0, sizeof ( clustercmdhdr_t ) );
1584 // Setting up clustercmd
1585 clustercmd_p->h.dst_node_id = NODEID_NOID;
1586 clustercmd_p->h.cmd_id = CLUSTERCMDID_HT_EXCH;
1587 clustercmd_p->h.data_len = toalloc;
1588 // Filing clustercmd with hash-table entriyes
1589 i = 0;
1590 clustercmd_ht_exch_t *clustercmd_ht_exch_p = &clustercmd_p->data.ht_exch;
1591
1592 while ( i < pushentry_arg_p->total ) {
1593 // Setting the data
1594 clustercmd_ht_exch_p->ctime = ( time_t ) pushentry_arg_p->entry[i].dat1;
1595 clustercmd_ht_exch_p->path_length = ( time_t ) pushentry_arg_p->entry[i].size0;
1596 memcpy (
1597 clustercmd_ht_exch_p->path,
1598 pushentry_arg_p->entry[i].dat0,
1599 clustercmd_ht_exch_p->path_length
1600 );
1601 // Pointing to space for next entry:
1602 size_t offset = sizeof ( clustercmd_ht_exch_t ) - 1 + pushentry_arg_p->entry[i].size0;
1603 clustercmd_ht_exch_p = ( clustercmd_ht_exch_t * )
1604 ( & ( ( char * ) clustercmd_ht_exch_p ) [offset] );
1605 }
1606
1607 // Sending
1608 cluster_send ( clustercmd_p );
1609 // Cleanup
1610 free ( clustercmd_p );
1611 return 0;
1612}
1613
1614
1615/**
1616 * @brief (syncop) Syncing file tree with another nodes with using of directories' modification time as a recent-detector.
1617 *
1618 * @param[in] dirpath Path to the directory
1619 *
1620 * @retval zero Successfully initialized
1621 * @retval non-zero Got error, while initializing
1622 *
1623 */
1624
1625int cluster_initialsync()
1626{
1627 cluster_modtime_exchange();
1628 return 0;
1629}
1630
1631
1632/**
1633 * @brief (syncop) "Captures" right to update the file or directory to another nodes. It just removes events about the file of directory from another nodes
1634 *
1635 * @param[in] dirpath Path to the directory
1636 *
1637 * @retval zero Successfully initialized
1638 * @retval non-zero Got error, while initializing
1639 *
1640 */
1641
1642int cluster_capture ( const char *path )
1643{
1644 ( void ) path;
1645 return 0;
1646}
1647
1648#endif
1649
uint32_t adler32_calc(const unsigned char *const data, uint32_t len)
Calculated Adler32 value for char array.
Definition calc.c:41
#define CLUSTER_WINDOW_PCKTLIMIT
#define MAXNODES
#define CLUSTER_WINDOW_BUFSIZE_PORTION
#define CLUSTER_PACKET_MAXSIZE
#define ALLOC_PORTION
#define critical(...)
Definition error.h:32
#define error(...)
Definition error.h:36
#define debug(debug_level,...)
Definition error.h:50
#define warning(...)
Definition error.h:40
#define critical_on(cond)
Definition error.h:33
#define XTOSTR(a)
Definition macros.h:45
#define SAFE(code, onfail)
Definition macros.h:56
ctx_t * ctx_p
Definition mon_kqueue.c:85
struct stat64 stat64_t
Definition port-hacks.h:65
Definition ctx.h:315
short int watchdir_dirlevel
Definition ctx.h:381
size_t watchdirlen
Definition ctx.h:375
void * dat0
Definition common.h:185
size_t size1
Definition common.h:182
size_t size0
Definition common.h:181
size_t alloc1
Definition common.h:184
size_t alloc0
Definition common.h:183
void * dat1
Definition common.h:186
struct doubleentry * entry
Definition common.h:193
int sync_term(int exitcode)
Definition sync.c:3991