42#define CLUSTER_RECV_PROC_ERRLIMIT (1<<8)
43#define NODES_ALLOC (MAX(MAXNODES, NODEID_NOID)+1)
47struct sockaddr_in sa_i = {0};
50struct sockaddr_in sa_o = {0};
54pthread_t pthread_cluster = 0;
56nodeinfo_t nodeinfo[NODES_ALLOC] = {{0}};
58nodeinfo_t *nodeinfo_my = NULL;
59uint8_t node_id_my = NODEID_NOID;
60uint8_t node_ids[NODES_ALLOC] = {0};
61unsigned int cluster_timeout = 0;
62uint8_t node_count = 0;
63uint8_t node_online = 0;
65cluster_recvproc_funct_t recvproc_funct[COUNT_CLUSTERCMDID] = {NULL};
67window_t window_i = {0};
68window_t window_o = {0};
80static inline int clustercmd_window_add ( window_t *window_p, clustercmd_t *clustercmd_p, GHashTable *serial2queuedpacket_ht )
84 if ( clustercmd_p->h.src_node_id >=
MAXNODES ) {
85 error (
"Invalid src_node_id: %i.", clustercmd_p->h.src_node_id );
92 if ( window_p->packets_len >= window_p->size ) {
94# define CXREALLOC(a, size) \
95 (typeof(a))xrealloc((char *)(a), (size_t)(size) * sizeof(*(a)))
96 window_p->packets_id = CXREALLOC ( window_p->packets_id, window_p->size );
97 window_p->occupied_sides = CXREALLOC ( window_p->occupied_sides, window_p->size );
102 size_t clustercmd_size = CLUSTERCMD_SIZE ( clustercmd_p );
103 size_t required_space =
sizeof ( clustercmdqueuedpackethdr_t ) + clustercmd_size;
105 size_t occupied_left = SIZE_MAX, occupied_right = 0;
109 while ( i < window_p->packets_len ) {
110 unsigned int window_id;
111 window_id = window_p->packets_id[ i++ ];
112 occupied_left = MIN ( occupied_left, window_p->occupied_sides[window_id].left );
113 occupied_right = MAX ( occupied_right, window_p->occupied_sides[window_id].right );
116 debug ( 3,
"w.size == %u, b_left == %u; b_right == %u; w.buf_size == %u; r_space == %u",
117 window_p->size, occupied_left, occupied_right, window_p->buf_size, required_space );
119 size_t buf_coordinate = SIZE_MAX;
121 if ( window_p->packets_len ) {
123 size_t free_left = occupied_left;
125 size_t free_right = window_p->buf_size - occupied_right;
127 if ( free_left > required_space )
128 buf_coordinate = free_left - required_space;
129 else if ( free_right > required_space )
130 buf_coordinate = occupied_right;
134 window_p->buf = xrealloc ( window_p->buf, window_p->buf_size );
135 buf_coordinate = occupied_right;
138 debug ( 3,
"f_left == %u; f_right == %u; b_coord == %u; w.buf_size == %u",
139 free_left, free_right, buf_coordinate, window_p->buf_size );
143 if ( window_p->buf_size <= required_space ) {
145 window_p->buf = xrealloc ( window_p->buf, window_p->buf_size );
149 unsigned int window_id;
151 window_id = window_p->packets_len;
153 window_p->occupied_sides[window_id].left = buf_coordinate;
154 window_p->occupied_sides[window_id].right = buf_coordinate + required_space;
156 clustercmdqueuedpacket_t *queuedpacket_p;
157 debug ( 3,
"b_coord == %u", buf_coordinate );
158 queuedpacket_p = ( clustercmdqueuedpacket_t * ) &window_p->buf[buf_coordinate];
159 memset ( &queuedpacket_p->h, 0, sizeof ( queuedpacket_p->h ) );
160 memcpy ( &queuedpacket_p->cmd, clustercmd_p, clustercmd_size );
161 queuedpacket_p->h.window_id = window_id;
163 g_hash_table_insert ( serial2queuedpacket_ht, GINT_TO_POINTER ( clustercmd_p->h.serial ), queuedpacket_p );
164 window_p->packets_id[window_p->packets_len++] = window_id;
179static inline int clustercmd_window_del ( window_t *window_p, clustercmdqueuedpacket_t *queuedpacket_p, GHashTable *serial2queuedpacket_ht )
183 if ( !window_p->size ) {
184 error (
"window not allocated." );
188 if ( !window_p->packets_len ) {
189 error (
"there already no packets in the window." );
194 unsigned int window_id_del = queuedpacket_p->h.window_id;
195 unsigned int window_id_last = --window_p->packets_len;
200 if ( window_id_del != window_id_last ) {
201 debug ( 3,
"%i -> %i", window_id_last, window_id_del );
202 window_p->packets_id[window_id_del] = window_p->packets_id[window_id_last];
203 memcpy ( &window_p->occupied_sides[window_id_del], &window_p->occupied_sides[window_id_last], sizeof ( window_p->occupied_sides[window_id_del] ) );
207 g_hash_table_remove ( serial2queuedpacket_ht, GINT_TO_POINTER ( queuedpacket_p->cmd.h.serial ) );
223int clustercmd_adler32_calc ( clustercmd_t *clustercmd_p, clustercmdadler32_t *clustercmdadler32_p, adler32_calc_t flags )
225 debug ( 15,
"(%p, %p, 0x%x)", clustercmd_p, clustercmdadler32_p, flags );
227 if ( flags & ADLER32_CALC_DATA ) {
229 uint32_t size = clustercmd_p->h.data_len;
230 char *ptr = clustercmd_p->data.p;
232 adler32 =
adler32_calc ( (
unsigned char * ) ptr, CLUSTER_PAD ( size ) );
233 debug ( 20,
"dat: 0x%x", adler32 );
235 clustercmdadler32_p->dat = adler32 ^ 0xFFFFFFFF;
238 if ( flags & ADLER32_CALC_HEADER ) {
240 clustercmdadler32_t adler32_save;
242 memcpy ( &adler32_save.hdr, &clustercmd_p->h.adler32.hdr, sizeof ( clustercmd_p->h.adler32.hdr ) );
243 memset ( &clustercmd_p->h.adler32.hdr, 0, sizeof ( clustercmd_p->h.adler32.hdr ) );
244 adler32 = 0xFFFFFFFF;
245 uint32_t size =
sizeof ( clustercmdhdr_t );
246 char *ptr = (
char * ) &clustercmd_p->h;
248 adler32 =
adler32_calc ( (
unsigned char * ) ptr, size );
249 debug ( 20,
"hdr: 0x%x", adler32 );
251 memcpy ( &clustercmd_p->h.adler32.hdr, &adler32_save.hdr, sizeof ( clustercmd_p->h.adler32.hdr ) );
252 clustercmdadler32_p->hdr = adler32 ^ 0xFFFFFFFF;
269int _cluster_send ( clustercmd_t *clustercmd_p )
271 debug ( 10,
"{h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u, data_len: %u}",
272 clustercmd_p->h.dst_node_id, clustercmd_p->h.src_node_id, clustercmd_p->h.cmd_id,
273 clustercmd_p->h.data_len );
274 clustercmd_p->h.src_node_id = node_id_my;
275 SAFE ( clustercmd_adler32_calc ( clustercmd_p, &clustercmd_p->h.adler32, ADLER32_CALC_ALL ),
return _SAFE_rc );
276 debug ( 3,
"Sending: "
277 "{h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u, adler32.hdr: %p, adler32.dat: %p, data_len: %u}",
278 clustercmd_p->h.dst_node_id, clustercmd_p->h.src_node_id, clustercmd_p->h.cmd_id,
279 (
void * ) (
long ) clustercmd_p->h.adler32.hdr, (
void * ) (
long ) clustercmd_p->h.adler32.dat,
280 clustercmd_p->h.data_len );
281 nodeinfo_t *nodeinfo_p;
282 nodeinfo_p = &nodeinfo[clustercmd_p->h.dst_node_id];
285 switch ( nodeinfo_p->status ) {
286 case NODESTATUS_DOESNTEXIST:
287 case NODESTATUS_OFFLINE:
288 debug ( 1,
"There's no online node with id %u. Skipping sending.", clustercmd_p->h.dst_node_id );
289 return EADDRNOTAVAIL;
296 if ( nodeinfo_my != NULL )
297 clustercmd_window_add ( &window_o, clustercmd_p, nodeinfo_my->serial2queuedpacket_ht );
300 debug ( 10,
"sendto(%p, %p, %i, 0, %p, %i)", sock_o, clustercmd_p, CLUSTERCMD_SIZE_PADDED ( clustercmd_p ), &sa_o,
sizeof ( sa_o ) );
301 debug ( 50,
"clustercmd_p->data.p[0] == 0x%x", clustercmd_p->data.p[0] );
302 critical_on ( sendto ( sock_o, clustercmd_p, CLUSTERCMD_SIZE_PADDED ( clustercmd_p ), 0, &sa_o,
sizeof ( sa_o ) ) == -1 );
307static inline int cluster_send ( clustercmd_t *clustercmd_p )
310 rc = _cluster_send ( clustercmd_p );
311 debug ( 10,
"__________________sent___________________ %i", rc );
326int node_update_status ( uint8_t node_id, uint8_t node_status )
328 debug ( 3,
"%i, %i", node_id, node_status );
329 uint8_t node_status_old = nodeinfo[node_id].status;
330 nodeinfo_t *nodeinfo_p = &nodeinfo[node_id];
332 if ( ( node_status == NODESTATUS_DOESNTEXIST ) && ( node_status_old != NODESTATUS_DOESNTEXIST ) ) {
334 node_ids[nodeinfo_p->num] = node_ids[node_count];
335 g_hash_table_destroy ( nodeinfo_p->modtime_ht );
336 g_hash_table_destroy ( nodeinfo_p->serial2queuedpacket_ht );
338 memset ( nodeinfo_p, 0,
sizeof ( *nodeinfo_p ) );
343 if ( node_status == node_status_old )
346 switch ( node_status_old ) {
347 case NODESTATUS_DOESNTEXIST:
348 nodeinfo_p->id = node_id;
349 nodeinfo_p->num = node_count;
350 nodeinfo_p->last_serial = -1;
351 nodeinfo_p->modtime_ht = g_hash_table_new_full ( g_str_hash, g_str_equal, free, 0 );
352 nodeinfo_p->serial2queuedpacket_ht = g_hash_table_new_full ( g_direct_hash, g_direct_equal, 0, 0 );
353 node_ids[node_count] = node_id;
357 if ( node_status == NODESTATUS_OFFLINE )
362 case NODESTATUS_OFFLINE:
363 nodeinfo_p->last_serial = -1;
368 if ( node_status == NODESTATUS_OFFLINE )
374 nodeinfo[node_id].status = node_status;
390int node_update_name ( uint8_t node_id,
const char *node_name )
392 debug ( 3,
"%i, \"%s\"", node_id, node_name );
393 nodeinfo_t *nodeinfo_p = &nodeinfo[node_id];
394 void *ret = g_hash_table_lookup ( indexes_p->nodenames_ht, node_name );
398 uint8_t node_old_id = GPOINTER_TO_INT ( ret );
399 nodeinfo_t *nodeinfo_old_p = &nodeinfo[node_old_id];
400 debug ( 5,
"nodename \"%s\" had been used by node_id == %i", node_name, node_old_id );
402 if ( node_old_id == node_id ) {
403 debug ( 10,
"node_old_id [%i] == node_id [%i]", node_old_id, node_id );
408 debug ( 15,
"Sending a DIE command to node_id (%i)", node_old_id );
409 clustercmd_t *clustercmd_p = CLUSTER_ALLOCA ( clustercmd_die_t, 0 );
410 clustercmd_p->h.cmd_id = CLUSTERCMDID_DIE;
411 clustercmd_p->h.dst_node_id = node_old_id;
413 if ( ( rc = cluster_send ( clustercmd_p ) ) )
417 nodeinfo_old_p->node_name = NULL;
418 debug ( 3,
"changing status of node_id == %i to NODESTATUS_OFFLINE", node_old_id );
419 node_update_status ( node_old_id, NODESTATUS_OFFLINE );
422 char *node_name_dup = strdup ( node_name );
423 nodeinfo_p->node_name = node_name_dup;
424 g_hash_table_replace ( indexes_p->nodenames_ht, node_name_dup, GINT_TO_POINTER ( node_id ) );
439static inline int cluster_recv_proc_set ( clustercmd_id_t cmd_id, cluster_recvproc_funct_t procfunct )
441 recvproc_funct[cmd_id] = procfunct;
458static inline int cluster_read (
int sock, clustercmd_t **cmd_pp, cluster_read_flags_t flags )
461 static clustercmd_t *cmd_p = (
void * ) buf;
462 struct sockaddr_in sa_in;
463 size_t sa_in_len =
sizeof ( sa_in );
465 debug ( 20,
"%i, %p, 0x%x", sock, buf, flags );
466 int readret = recvfrom ( sock, buf,
CLUSTER_PACKET_MAXSIZE, MSG_WAITALL, (
struct sockaddr * ) &sa_in, ( socklen_t * restrict ) &sa_in_len );
467 debug ( 30,
"recvfrom(%i, %p, 0x%x, %p, %p) -> %i", sock, buf, MSG_WAITALL, &sa_in, &sa_in_len, readret );
471 error (
"recvfrom() returned 0. This shouldn't happen. Exit." );
478 error (
"recvfrom() returned %i. "
479 "Seems, that something wrong with network socket.",
481 return errno != -1 ? errno : -2;
484 debug ( 2,
"Got message from %s (len: %i).", inet_ntoa ( sa_in.sin_addr ), readret );
486 if ( (
unsigned int ) readret <
sizeof ( clustercmdhdr_t ) ) {
488 error (
"Warning: cluster_read(): Got too short message from node (no header [or too short]). Ignoring it." );
492 if ( (
unsigned int ) readret < CLUSTERCMD_SIZE ( cmd_p ) ) {
494 error (
"Warning: cluster_read(): Got too short message from node (no data [or too short]). Ignoring it." );
499 if ( readret & 0x3 ) {
500 error (
"Warning: cluster_recv(): Received packet of size %i (not a multiple of 4). Ignoring it.", readret );
519static inline int clustercmd_reject ( clustercmd_t *clustercmd_p, uint8_t reason )
521 clustercmd_t *clustercmd_rej_p = CLUSTER_ALLOCA ( clustercmd_rej_t, 0 );
522 clustercmd_rej_p->h.dst_node_id = clustercmd_p->h.src_node_id;
523 clustercmd_rej_p->data.rej.serial = clustercmd_p->h.serial;
524 clustercmd_rej_p->data.rej.reason = reason;
525 return cluster_send ( clustercmd_rej_p );
529#define CLUSTER_RECV_RETURNMESSAGE(clustercmd_p) {\
530 last_serial = (clustercmd_p)->h.serial;\
531 last_src_node_id = (clustercmd_p)->h.src_node_id;\
532 if(clustercmd_pp != NULL)\
533 *clustercmd_pp = (clustercmd_p);\
548static int _cluster_recv ( clustercmd_t **clustercmd_pp,
struct timeval *timeout )
550 clustercmd_t *clustercmd_p;
551 static uint8_t last_src_node_id = NODEID_NOID;
552 static uint32_t last_serial = 0;
555 if ( last_src_node_id != NODEID_NOID ) {
556 nodeinfo_t *nodeinfo_p = &nodeinfo[last_src_node_id];
558 if ( nodeinfo_p->serial2queuedpacket_ht != NULL ) {
559 clustercmdqueuedpacket_t *clustercmdqueuedpacket_p = ( clustercmdqueuedpacket_t * )
560 g_hash_table_lookup ( nodeinfo_p->serial2queuedpacket_ht, GINT_TO_POINTER ( last_serial + 1 ) );
562 if ( clustercmdqueuedpacket_p != NULL )
563 CLUSTER_RECV_RETURNMESSAGE ( &clustercmdqueuedpacket_p->cmd );
571 FD_SET ( sock_i, &rfds );
572 debug ( 3,
"select() with timeout {%u, %u}", timeout->tv_sec, timeout->tv_usec );
573 int selret = select ( sock_i + 1, &rfds, NULL, NULL, timeout );
577 error (
"got error while select()." );
582 debug ( 3,
"no new messages." );
586 debug ( 3,
"got new message(s)." );
587 debug ( 10,
"Reading new message's header" );
588 clustercmdadler32_t adler32;
593 if ( ( ret = cluster_read ( sock_i, clustercmd_pp, CLREAD_NONE ) ) ) {
594 if ( ret == -1 )
return 0;
596 error (
"Got error from cluster_read()." );
601 clustercmd_p = *clustercmd_pp;
602 debug ( 3,
"Received: {h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u,"
603 " serial: %u, adler32: {0x%x, 0x%x}, data_len: %u}",
604 clustercmd_p->h.dst_node_id, clustercmd_p->h.src_node_id,
605 clustercmd_p->h.cmd_id, clustercmd_p->h.serial,
606 clustercmd_p->h.adler32.hdr, clustercmd_p->h.adler32.dat,
607 clustercmd_p->h.data_len );
609 clustercmd_adler32_calc ( clustercmd_p, &adler32, ADLER32_CALC_HEADER );
611 if ( adler32.hdr != clustercmd_p->h.adler32.hdr ) {
612 debug ( 1,
"hdr-adler32 mismatch: %p != %p.",
613 (
void* ) (
long ) clustercmd_p->h.adler32.hdr, (
void* ) (
long ) adler32.hdr );
615 if ( ( ret = clustercmd_reject ( clustercmd_p, REJ_ADLER32MISMATCH ) ) != EADDRNOTAVAIL ) {
616 error (
"Got error while clustercmd_reject()." );
623 uint8_t src_node_id = clustercmd_p->h.src_node_id;
624 uint8_t dst_node_id = clustercmd_p->h.dst_node_id;
627 if ( src_node_id == NODEID_NOID ) {
629 if ( clustercmd_p->h.cmd_id != CLUSTERCMDID_HELLO ) {
630 error (
"Warning: cluster_recv(): Got non hello packet from NOID node. Ignoring the packet." );
634 if ( clustercmd_p->h.serial != 0 ) {
635 error (
"Warning: cluster_recv(): Got packet with non-zero serial from NOID node. Ignoring the packet." );
648 if ( dst_node_id == NODEID_NOID ) {
663 if ( ( clustercmd_p->h.src_node_id == node_id_my ) && ( node_id_my != NODEID_NOID ) )
666 nodeinfo_t *nodeinfo_p = &nodeinfo[src_node_id];
668 long serial_diff = clustercmd_p->h.serial - nodeinfo_p->last_serial;
669 debug ( 10,
"serial_diff == %i", serial_diff );
672 debug ( 1,
"Ignoring packet (serial %i) from %i due to serial_diff: 0 <= || > %i",
678 if ( clustercmd_p->h.serial != nodeinfo_p->last_serial + 1 ) {
679 clustercmd_window_add ( &window_i, clustercmd_p, nodeinfo_p->serial2queuedpacket_ht );
684 if ( clustercmd_p->h.data_len == 0 )
685 CLUSTER_RECV_RETURNMESSAGE ( clustercmd_p );
689 error (
"Warning: cluster_recv(): Got too big message from node %i. Ignoring it.",
711 clustercmd_adler32_calc ( clustercmd_p, &adler32, ADLER32_CALC_DATA );
713 if ( adler32.dat != clustercmd_p->h.adler32.dat ) {
714 debug ( 1,
"dat-adler32 mismatch: %p != %p.",
715 (
void* ) (
long ) clustercmd_p->h.adler32.dat, (
void* ) (
long ) adler32.dat );
717 if ( ( ret = clustercmd_reject ( clustercmd_p, REJ_ADLER32MISMATCH ) ) != EADDRNOTAVAIL ) {
718 error (
"Got error while clustercmd_reject()." );
724 CLUSTER_RECV_RETURNMESSAGE ( clustercmd_p );
727static inline int cluster_recv ( clustercmd_t **clustercmd_pp,
struct timeval *timeout )
730 rc = _cluster_recv ( clustercmd_pp, timeout );
731 debug ( 10,
"__________________recv___________________ %i", rc );
746int cluster_recv_proc (
unsigned int timeout_rel_int )
748 struct timeval timeout_rel, timeout_abs, tv_abs;
749 int error_count, iteration;
750 debug ( 3,
"cluster_recv_proc(%i)", timeout_rel_int );
751 clustercmd_t *clustercmd_p = NULL;
753 timeout_rel.tv_sec = timeout_rel_int / 1000;
754 timeout_rel.tv_usec = timeout_rel_int % 1000;
755 gettimeofday ( &tv_abs, NULL );
756 timeradd ( &tv_abs, &timeout_rel, &timeout_abs );
762 gettimeofday ( &tv_abs, NULL );
764 if ( timercmp ( &timeout_abs, &tv_abs, > ) )
765 timersub ( &timeout_abs, &tv_abs, &timeout_rel );
767 memset ( &timeout_rel, 0,
sizeof ( timeout_rel ) );
769 debug ( 5,
"timeout_abs == {%u, %u}; tv_abs == {%u, %u}; timeout_rel == {%u, %u}",
770 timeout_abs.tv_sec, timeout_abs.tv_usec,
771 tv_abs.tv_sec, tv_abs.tv_usec,
772 timeout_rel.tv_sec, timeout_rel.tv_usec
776 ret = cluster_recv ( &clustercmd_p, &timeout_rel );
783 warning (
"Got error while cluster_recv(). error_count == %i", error_count );
785 critical_on ( error_count >= CLUSTER_RECV_PROC_ERRLIMIT );
789 if ( recvproc_funct[clustercmd_p->h.cmd_id] != NULL ) {
790 debug ( 2,
"Calling function by pointer %p", recvproc_funct[clustercmd_p->h.cmd_id] );
792 if ( ( ret = recvproc_funct[clustercmd_p->h.cmd_id] ( clustercmd_p ) ) ) {
793 error (
"Got error from recvproc_funct[%i]: %s (%i)",
794 clustercmd_p->h.cmd_id );
802 debug ( 2,
"There's no appropriate callback function for cmd_id == %i", clustercmd_p->h.cmd_id );
819static int cluster_recvproc_die ( clustercmd_t *clustercmd_p )
821 critical (
"Got DIE message from node_id == %i,", clustercmd_p->h.src_node_id );
836static int cluster_recvproc_ack ( clustercmd_t *clustercmd_p )
838 uint32_t cmd_serial_ack = clustercmd_p->data.ack.serial;
839 clustercmdqueuedpacket_t *queuedpacket_p =
840 ( clustercmdqueuedpacket_t * ) g_hash_table_lookup ( nodeinfo_my->serial2queuedpacket_ht, GINT_TO_POINTER ( cmd_serial_ack ) );
842 if ( queuedpacket_p == NULL )
845 uint8_t node_id_from = clustercmd_p->h.src_node_id;
847 if ( ! queuedpacket_p->h.w.o.ack_from[node_id_from] ) {
848 queuedpacket_p->h.w.o.ack_count++;
849 queuedpacket_p->h.w.o.ack_from[node_id_from]++;
851 if ( queuedpacket_p->h.w.o.ack_count == node_count - 1 )
852 clustercmd_window_del ( &window_o, queuedpacket_p, nodeinfo_my->serial2queuedpacket_ht );
872 cluster_recv_proc_set ( CLUSTERCMDID_ACK, cluster_recvproc_ack );
873 cluster_recv_proc_set ( CLUSTERCMDID_DIE, cluster_recvproc_die );
886int cluster_io_deinit()
888 if ( window_i.buf_size ) {
891 if ( window_i.buf == NULL ) {
892 error (
"window_i.buf_size != 0, but window_i.buf == NULL." );
895 free ( window_i.buf );
898 if ( window_o.buf_size ) {
901 if ( window_o.buf == NULL ) {
902 error (
"window_o.buf_size != 0, but window_o.buf == NULL." );
905 free ( window_o.buf );
922static int cluster_recvproc_welcome ( clustercmd_t *clustercmd_p )
924 debug ( 20,
"%p", clustercmd_p );
926 clustercmd_welcome_t *data_welcome_p = &clustercmd_p->data.welcome;
933 uint32_t recv_nodename_len;
934 recv_nodename_len = welcome_to_node_name_len ( clustercmd_p );
936 if ( recv_nodename_len !=
ctx_p->cluster_nodename_len ) {
937 debug ( 9,
"recv_nodename_len [%i] != ctx_p->cluster_nodename_len [%i]", recv_nodename_len,
ctx_p->cluster_nodename_len );
942 if ( memcmp ( welcome_to_node_name ( data_welcome_p ),
ctx_p->cluster_nodename, recv_nodename_len ) ) {
943 debug ( 9,
"to_node_name != ctx_p->cluster_nodename" );
948 node_update_status ( clustercmd_p->h.src_node_id, NODESTATUS_SEEMSONLINE );
951 node_name = alloca ( data_welcome_p->from_node_name_len );
952 memcpy ( node_name, data_welcome_p->from_node_name, data_welcome_p->from_node_name_len );
953 node_update_name ( clustercmd_p->h.src_node_id, node_name );
956 node_id_my = clustercmd_p->h.dst_node_id;
972static int cluster_recvproc_hello ( clustercmd_t *clustercmd_p )
974 clustercmd_hello_t *data_hello_p = &clustercmd_p->data.hello;
979 debug ( 15,
"Preparing a welcome message for nodename \"%s\" from nodename == \"%s\"", data_hello_p->node_name,
ctx_p->cluster_nodename );
980 size_t data_len =
sizeof ( clustercmd_welcome_t ) + clustercmd_p->h.data_len +
ctx_p->cluster_nodename_len;
981 clustercmd_t *answer_p = CLUSTER_ALLOCA (
void, data_len );
982 clustercmd_welcome_t *answer_data_p = &answer_p->data.welcome;
983 answer_data_p->from_node_name_len =
ctx_p->cluster_nodename_len;
984 memcpy ( answer_data_p->from_node_name,
ctx_p->cluster_nodename,
ctx_p->cluster_nodename_len );
985 memcpy ( welcome_to_node_name ( answer_data_p ), data_hello_p->node_name, clustercmd_p->h.data_len );
988 char *node_name = alloca ( clustercmd_p->h.data_len + 1 );
989 memcpy ( node_name, data_hello_p->node_name, clustercmd_p->h.data_len + 1 );
990 ptr = g_hash_table_lookup ( indexes_p->nodenames_ht, node_name );
991 node_id = ( ptr == NULL ? NODEID_NOID : GPOINTER_TO_INT ( ptr ) );
992 debug ( 3,
"\"%s\" -> %i", node_name, node_id );
994 answer_p->h.data_len = data_len;
995 answer_p->h.cmd_id = CLUSTERCMDID_WELCOME;
996 answer_p->h.dst_node_id = node_id;
998 if ( ( ret = cluster_send ( answer_p ) ) )
1015static int cluster_recvproc_register ( clustercmd_t *clustercmd_p )
1017 clustercmd_reg_t *data_register_p = &clustercmd_p->data.reg;
1018 char *node_name = alloca ( clustercmd_p->h.data_len + 1 );
1019 memcpy ( node_name, data_register_p->node_name, clustercmd_p->h.data_len + 1 );
1020 node_update_name ( clustercmd_p->h.src_node_id, node_name );
1024extern int cluster_loop();
1041 if (
ctx_p != NULL ) {
1042 error (
"cluster subsystem is already initialized." );
1048 indexes_p = _indexes_p;
1049 cluster_timeout =
ctx_p->cluster_timeout;
1050 indexes_p->nodenames_ht = g_hash_table_new_full ( g_str_hash, g_str_equal, free, 0 );
1051 node_update_status ( NODEID_NOID, NODESTATUS_ONLINE );
1056 nodeinfo[i].last_serial = -1;
1063 sock_i = socket ( AF_INET, SOCK_DGRAM, 0 );
1066 error (
"Cannot create socket for input traffic" );
1074 if ( setsockopt ( sock_i, SOL_SOCKET, SO_REUSEADDR, (
char * ) &reuse,
sizeof ( reuse ) ) < 0 ) {
1075 error (
"Got error while setsockopt()" );
1080 sa_i.sin_family = AF_INET;
1081 sa_i.sin_port = htons (
ctx_p->cluster_mcastipport );
1082 sa_i.sin_addr.s_addr = INADDR_ANY;
1084 if ( bind ( sock_i, (
struct sockaddr* ) &sa_i,
sizeof ( sa_i ) ) ) {
1085 error (
"Got error while bind()" );
1090 struct ip_mreq group;
1091 group.imr_interface.s_addr = inet_addr (
ctx_p->cluster_iface );
1092 group.imr_multiaddr.s_addr = inet_addr (
ctx_p->cluster_mcastipaddr );
1094 if ( setsockopt ( sock_i, IPPROTO_IP, IP_ADD_MEMBERSHIP,
1095 (
char * ) &group,
sizeof ( group ) ) < 0 ) {
1096 error (
"Cannot setsockopt() to enter to membership %s -> %s",
1097 ctx_p->cluster_iface,
ctx_p->cluster_mcastipaddr );
1103 sock_o = socket ( AF_INET, SOCK_DGRAM, 0 );
1106 error (
"Cannot create socket for output traffic" );
1111 sa_o.sin_family = AF_INET;
1112 sa_o.sin_port = htons (
ctx_p->cluster_mcastipport );
1113 sa_o.sin_addr.s_addr = inet_addr (
ctx_p->cluster_mcastipaddr );
1118 if ( setsockopt ( sock_o, IPPROTO_IP, IP_MULTICAST_LOOP, (
char * ) &loopch,
sizeof ( loopch ) ) < 0 ) {
1119 error (
"Cannot disable loopback for output socket." );
1125 struct in_addr addr_o;
1126 addr_o.s_addr = inet_addr (
ctx_p->cluster_iface );
1128 if ( setsockopt ( sock_o, IPPROTO_IP, IP_MULTICAST_IF, &addr_o,
sizeof ( addr_o ) ) < 0 ) {
1129 error (
"Cannot set local interface for outbound traffic" );
1139 debug ( 15,
"Preparing a message with my nodename == \"%s\" (%i)",
ctx_p->cluster_nodename,
ctx_p->cluster_nodename_len );
1140 clustercmd_t *clustercmd_p = CLUSTER_ALLOCA ( clustercmd_hello_t,
ctx_p->cluster_nodename_len );
1141 clustercmd_p->h.data_len =
ctx_p->cluster_nodename_len;
1142 memcpy ( clustercmd_p->data.hello.node_name,
ctx_p->cluster_nodename, clustercmd_p->h.data_len + 1 );
1143 clustercmd_p->h.cmd_id = CLUSTERCMDID_HELLO;
1144 clustercmd_p->h.dst_node_id = NODEID_NOID;
1146 if ( ( ret = cluster_send ( clustercmd_p ) ) )
1150 cluster_recv_proc_set ( CLUSTERCMDID_WELCOME, cluster_recvproc_welcome );
1152 if ( ( ret = cluster_recv_proc ( cluster_timeout ) ) )
1156 cluster_recv_proc_set ( CLUSTERCMDID_WELCOME, NULL );
1157 debug ( 3,
"After communicating with others, my node_id is %i.", node_id_my );
1160 if ( node_id_my == NODEID_NOID ) {
1164 if ( nodeinfo[i].status == NODESTATUS_DOESNTEXIST ) {
1172 debug ( 3,
"I was have to set my node_id to %i.", node_id_my );
1176 if ( node_id_my == NODEID_NOID ) {
1177 error (
"Cannot find free node ID. Seems, that all %i ID-s are already occupied." );
1182 nodeinfo_my = &nodeinfo[node_id_my];
1185 node_update_status ( node_id_my, NODESTATUS_SEEMSONLINE );
1187 clustercmd_t *clustercmd_p = CLUSTER_ALLOCA ( clustercmd_reg_t,
ctx_p->cluster_nodename_len );
1188 clustercmd_reg_t *data_reg_p = &clustercmd_p->data.reg;
1189 memcpy ( data_reg_p->node_name,
ctx_p->cluster_nodename,
ctx_p->cluster_nodename_len + 1 );
1190 clustercmd_p->h.data_len =
ctx_p->cluster_nodename_len + 1;
1191 clustercmd_p->h.cmd_id = CLUSTERCMDID_REG;
1192 clustercmd_p->h.dst_node_id = NODEID_NOID;
1194 if ( ( ret = cluster_send ( clustercmd_p ) ) )
1198 cluster_recv_proc_set ( CLUSTERCMDID_HELLO, cluster_recvproc_hello );
1199 cluster_recv_proc_set ( CLUSTERCMDID_REG, cluster_recvproc_register );
1202 if ( ( ret = cluster_recv_proc ( cluster_timeout ) ) )
1205 node_update_status ( node_id_my, NODESTATUS_ONLINE );
1208 ret = pthread_create ( &pthread_cluster, NULL, (
void * ( * ) (
void * ) ) cluster_loop, NULL );
1222static inline int cluster_signal (
int signal )
1224 if ( pthread_cluster )
1225 return pthread_kill ( pthread_cluster, signal );
1231extern int cluster_modtime_exchange_cleanup();
1243 cluster_signal ( SIGTERM );
1244 ret = pthread_join ( pthread_cluster, NULL );
1245 cluster_io_deinit();
1246 node_update_status ( NODEID_NOID, NODESTATUS_DOESNTEXIST );
1251 while ( node_count ) {
1255 error (
"cluster_deinit() looped. Forcing break." );
1260 node_update_status ( 0, NODESTATUS_DOESNTEXIST );
1266 memset ( nodeinfo, 0,
sizeof ( nodeinfo_t ) * NODES_ALLOC );
1270 node_id_my = NODEID_NOID;
1271 memset ( &sa_i, 0,
sizeof ( sa_i ) );
1272 memset ( &sa_o, 0,
sizeof ( sa_o ) );
1274 cluster_modtime_exchange_cleanup();
1275 g_hash_table_destroy ( indexes_p->nodenames_ht );
1290int cluster_lock (
const char *fpath )
1305int cluster_lock_byindexes()
1319int cluster_unlock_all()
1325#define CLUSTER_LOOP_CHECK(a) {\
1344 sigset_t sigset_cluster;
1346 sigemptyset ( &sigset_cluster );
1347 sigaddset ( &sigset_cluster, SIGINT );
1348 CLUSTER_LOOP_CHECK ( pthread_sigmask ( SIG_BLOCK, &sigset_cluster, NULL ) );
1350 sigemptyset ( &sigset_cluster );
1351 sigaddset ( &sigset_cluster, SIGTERM );
1352 CLUSTER_LOOP_CHECK ( pthread_sigmask ( SIG_UNBLOCK, &sigset_cluster, NULL ) );
1354 debug ( 3,
"cluster_loop() started." );
1361 FD_SET ( sock_i, &rfds );
1362 debug ( 3,
"select()" );
1363 _ret = select ( sock_i + 1, &rfds, NULL, NULL, NULL );
1366 if ( ( _ret == -1 ) && ( errno != EINTR ) ) {
1373 debug ( 3,
"sigpending()" );
1375 if ( sigpending ( &sigset_cluster ) )
1376 if ( sigismember ( &sigset_cluster, SIGTERM ) )
1380 debug ( 3,
"cluster_recv_proc()" );
1382 if ( ( ret = cluster_recv_proc ( 0 ) ) ) {
1388 debug ( 3,
"cluster_loop() finished with exitcode %i.", ret );
1408int cluster_modtime_update (
const char *path,
short int dirlevel, mode_t st_mode )
1415 if ( ( st_mode & S_IFMT ) == S_IFDIR )
1419 if ( ( dirlevel_rel >
ctx_p->cluster_scan_dl_max ) || ( dirlevel_rel < ctx_p->cluster_hash_dl_min ) )
1424 ret = lstat64 ( path, &stat64 );
1427 error (
"Cannot lstat64()", path );
1432 const char *dirpath;
1434 if ( ( st_mode & S_IFMT ) == S_IFDIR ) {
1437 char *path_dup = strdup ( path );
1438 dirpath = (
const char * ) dirname ( path_dup );
1444 size_t dirpath_len = strlen ( dirpath );
1445 char *dirpath_rel_p = xmalloc ( dirpath_len + 1 );
1446 char *dirpath_rel = dirpath_rel_p;
1451 size_t dirpath_rel_end = 0;
1453 while ( dirpath_rel_full[dirpath_rel_end] && ( dirpath_rel_end < dirpath_rel_full_len ) ) {
1454 if ( dirpath_rel_full[dirpath_rel_end] ==
'/' ) {
1457 if ( slashcount >=
ctx_p->cluster_hash_dl_max )
1465 memcpy ( dirpath_rel, dirpath_rel_full, dirpath_rel_end );
1469 gpointer ctime_gp = g_hash_table_lookup ( nodeinfo_my->modtime_ht, dirpath_rel );
1471 if ( ctime_gp == NULL )
1473 else if ( GPOINTER_TO_INT ( ctime_gp ) < stat64.st_ctime )
1478 g_hash_table_replace ( nodeinfo_my->modtime_ht, strdup ( dirpath_rel ), GINT_TO_POINTER ( stat64.st_ctime ) );
1492void cluster_modtime_exchange_pushentry ( gpointer dir_gp, gpointer modtype_gp,
void *pushentry_arg_gp )
1495 char *dir = (
char * ) dir_gp;
1496 time_t ctime = ( time_t ) GPOINTER_TO_INT ( modtype_gp );
1497 size_t size = strlen ( dir ) + 1;
1504 (
char * ) pushentry_arg_p->
entry,
1511 pushentry_arg_p->
entry[pushentry_arg_p->
total].
dat1 = (
void * ) ctime;
1512 pushentry_arg_p->
entry[pushentry_arg_p->
total].
size1 =
sizeof ( ctime );
1513 pushentry_arg_p->
size += size;
1514 pushentry_arg_p->
total++;
1528int cluster_modtime_exchange_cleanup()
1533 while ( i < pushentry_arg_p->
allocated ) {
1535 free ( pushentry_arg_p->
entry[i].
dat0 );
1538 free ( pushentry_arg_p->
entry[i].
dat1 );
1543 free ( pushentry_arg_p->
entry );
1545 memset ( pushentry_arg_p, 0,
sizeof ( *pushentry_arg_p ) );
1559int cluster_modtime_exchange()
1563 pushentry_arg_p->
size = 0;
1564 pushentry_arg_p->
total = 0;
1565 g_hash_table_foreach ( nodeinfo_my->modtime_ht, cluster_modtime_exchange_pushentry, (
void * ) pushentry_arg_p );
1567 if ( !pushentry_arg_p->
total ) {
1575 while ( i < pushentry_arg_p->
total ) {
1582 clustercmd_t *clustercmd_p = ( clustercmd_t * ) xmalloc (
sizeof ( clustercmdhdr_t ) + toalloc );
1583 memset ( clustercmd_p, 0,
sizeof ( clustercmdhdr_t ) );
1585 clustercmd_p->h.dst_node_id = NODEID_NOID;
1586 clustercmd_p->h.cmd_id = CLUSTERCMDID_HT_EXCH;
1587 clustercmd_p->h.data_len = toalloc;
1590 clustercmd_ht_exch_t *clustercmd_ht_exch_p = &clustercmd_p->data.ht_exch;
1592 while ( i < pushentry_arg_p->
total ) {
1594 clustercmd_ht_exch_p->ctime = ( time_t ) pushentry_arg_p->
entry[i].
dat1;
1595 clustercmd_ht_exch_p->path_length = ( time_t ) pushentry_arg_p->
entry[i].
size0;
1597 clustercmd_ht_exch_p->path,
1599 clustercmd_ht_exch_p->path_length
1602 size_t offset =
sizeof ( clustercmd_ht_exch_t ) - 1 + pushentry_arg_p->
entry[i].
size0;
1603 clustercmd_ht_exch_p = ( clustercmd_ht_exch_t * )
1604 ( & ( (
char * ) clustercmd_ht_exch_p ) [offset] );
1608 cluster_send ( clustercmd_p );
1610 free ( clustercmd_p );
1625int cluster_initialsync()
1627 cluster_modtime_exchange();
1642int cluster_capture (
const char *path )
uint32_t adler32_calc(const unsigned char *const data, uint32_t len)
Calculated Adler32 value for char array.
#define CLUSTER_WINDOW_PCKTLIMIT
#define CLUSTER_WINDOW_BUFSIZE_PORTION
#define CLUSTER_PACKET_MAXSIZE
#define debug(debug_level,...)
#define critical_on(cond)
#define SAFE(code, onfail)
short int watchdir_dirlevel
struct doubleentry * entry
int sync_term(int exitcode)