clsync
mon_gio.c
Go to the documentation of this file.
1 /*
2  clsync - file tree sync utility based on inotify/kqueue/bsm/gio
3 
4  Copyright (C) 2013-2014 Dmitry Yu Okunev <dyokunev@ut.mephi.ru> 0x8E30679C
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 // The "queue" is actually "stack" in this code. It's a lack of design of this code.
21 
22 #include "common.h"
23 #include "error.h"
24 #include "sync.h"
25 #include "indexes.h"
26 #include "privileged.h"
27 
28 #include <pthread.h>
29 #include <gio/gio.h>
30 
31 #include "mon_gio.h"
32 
33 struct filemondata {
35  GFile *file;
36  GFileMonitor *filemon;
37  gulong handle_id;
38 };
39 typedef struct filemondata filemondata_t;
40 
41 struct event {
42  char *path;
43  gulong handle_id;
44  GFileMonitorEvent event_id;
48 };
49 typedef struct event event_t;
50 
51 GHashTable *mondirs_ht;
52 pthread_spinlock_t queue_lock;
53 pthread_mutex_t gio_mutex_prefetcher = PTHREAD_MUTEX_INITIALIZER;
54 pthread_cond_t gio_cond_gotevent = PTHREAD_COND_INITIALIZER;
55 event_t *queue = NULL;
58 
59 static inline void event_free ( event_t *ev )
60 {
61  free ( ev->path );
62  return;
63 }
64 
66 {
67  event_t *ev;
68  debug ( 30, "pthread_spin_lock(&queue_lock);" );
69  pthread_spin_lock ( &queue_lock );
70 
71  if ( queue_length >= queue_alloc ) {
74  queue = xrealloc ( queue, queue_alloc * sizeof ( *queue ) );
75  }
76 
77  ev = &queue[queue_length++];
78  ev->path = path;
79  ev->event_id = event;
80  ev->handle_id = handle_id;
84  debug ( 30, "pthread_spin_unlock(&queue_lock);" );
85  pthread_spin_unlock ( &queue_lock );
86  return 0;
87 }
88 
89 static inline event_t *event_pop()
90 {
91  static event_t ev;
92  debug ( 30, "pthread_spin_lock(&queue_lock);" );
93  pthread_spin_lock ( &queue_lock );
95  memcpy ( &ev, &queue[--queue_length], sizeof ( ev ) );
96  debug ( 30, "pthread_spin_unlock(&queue_lock);" );
97  pthread_spin_unlock ( &queue_lock );
98  return &ev;
99 }
100 
101 static void dir_gotevent (
102  GFileMonitor *filemon,
103  GFile *file,
104  GFile *file_other,
105  GFileMonitorEvent event,
106  gpointer arg
107 )
108 {
110  filemondata_t *fmdat = arg;
111  ctx_t *ctx_p = fmdat->ctx_p;
112  GFileType filetype = g_file_query_file_type ( file, G_FILE_QUERY_INFO_NOFOLLOW_SYMLINKS, NULL );
113  debug ( 10, "%p %p %p %i %p %i", filemon, file, file_other, event, arg, filetype );
114  char *path_full, *path_rel = NULL;
115 
116  switch ( event ) {
117  case G_FILE_MONITOR_EVENT_DELETED:
118  case G_FILE_MONITOR_EVENT_CREATED:
119  case G_FILE_MONITOR_EVENT_CHANGED:
120  case G_FILE_MONITOR_EVENT_ATTRIBUTE_CHANGED:
121  path_full = g_file_get_path ( file );
122  path_rel = strdup ( &path_full[ctx_p->watchdirlen + 1] );
123  g_free ( path_full );
124  debug ( 9, "Got event %i for \"%s\" (%i)", event, path_rel, filetype );
125  break;
126 
127  default:
128  break;
129  }
130 
131  switch ( filetype ) {
132  case G_FILE_TYPE_DIRECTORY:
133  objtype = EOT_DIR;
134  break;
135 
136  default:
137  objtype = EOT_FILE;
138  break;
139  }
140 
141  switch ( event ) {
142  case G_FILE_MONITOR_EVENT_DELETED:
143  objtype_old = objtype;
145  break;
146 
147  case G_FILE_MONITOR_EVENT_CREATED:
149  objtype_new = objtype;
150  break;
151 
152  default:
153  objtype_old = objtype;
154  objtype_new = objtype;
155  break;
156  }
157 
158  switch ( event ) {
159  case G_FILE_MONITOR_EVENT_DELETED:
160  debug ( 20, "g_hash_table_remove(mondirs_ht, \"%s\")", path_rel );
161  g_hash_table_remove ( mondirs_ht, path_rel );
162 
163  case G_FILE_MONITOR_EVENT_CREATED:
164  case G_FILE_MONITOR_EVENT_CHANGED:
165  case G_FILE_MONITOR_EVENT_ATTRIBUTE_CHANGED:
166 #ifdef PARANOID
167  critical_on ( path_rel == NULL );
168 #endif
169  debug ( 20, "event_push(\"%s\", %i, %i, %i, %i, %i)", path_rel, fmdat->handle_id, event, objtype, objtype_old, objtype_new );
170  critical_on ( event_push ( path_rel, fmdat->handle_id, event, objtype, objtype_old, objtype_new ) );
171  break;
172 
173  default:
174  break;
175  }
176 
177  return;
178 }
179 
180 int gio_add_watch_dir ( ctx_t *ctx_p, indexes_t *indexes_p, const char *const accpath )
181 {
182  ( void ) indexes_p;
183  filemondata_t *fmdat;
184  GError *error = NULL;
185  debug ( 3, "\"%s\"", accpath );
186 #ifdef PARANOID
187  fmdat = g_hash_table_lookup ( mondirs_ht, accpath );
188 
189  if ( fmdat != NULL ) {
190  errno = EADDRINUSE;
191  warning ( "Directory \"%s\" is already monitored.", accpath );
192  return -1;
193  }
194 
195 #endif
196  fmdat = xmalloc ( sizeof ( *fmdat ) );
197  fmdat->ctx_p = ctx_p;
198  fmdat->file = g_file_new_for_path ( accpath );
199  fmdat->filemon = g_file_monitor_directory ( fmdat->file, 0, NULL, &error );
200  fmdat->handle_id = g_signal_connect ( fmdat->filemon, "changed", G_CALLBACK ( dir_gotevent ), fmdat );
201  g_hash_table_replace ( mondirs_ht, strdup ( accpath ), fmdat );
202  return fmdat->handle_id;
203 }
204 
207 void *g_iteration_stop ( void *_timeout_p )
208 {
209  struct timeval *timeout_p = _timeout_p;
210  struct timeval tv_abs, timeout_abs;
211  struct timespec ts_abs;
212  debug ( 10, "{%u, %u}", timeout_p->tv_sec, timeout_p->tv_usec );
213  critical_on ( pthread_mutex_lock ( &gio_mutex_prefetcher ) );
214 
215  if ( cancel_g_iteration_stop ) {
216  critical_on ( pthread_mutex_unlock ( &gio_mutex_prefetcher ) );
217  return NULL;
218  }
219 
220 #define INFINITETIME (3600 * 24 * 365 * 10) /* ~10 years */
221 
222  if ( timeout_p->tv_sec > INFINITETIME )
223  timeout_p->tv_sec = INFINITETIME;
224 
225 #undef INFINITETIME
226  gettimeofday ( &tv_abs, NULL );
227  timeradd ( &tv_abs, timeout_p, &timeout_abs );
228  ts_abs.tv_sec = timeout_abs.tv_sec;
229  ts_abs.tv_nsec = timeout_abs.tv_usec * 1000;
230  debug ( 10, "{%u, %u}", ts_abs.tv_sec, ts_abs.tv_nsec );
231 
232  switch ( ( errno = pthread_cond_timedwait ( &gio_cond_gotevent, &gio_mutex_prefetcher, &ts_abs ) ) ) {
233  case 0:
234  case ETIMEDOUT:
235  break;
236 
237  default:
238  critical ( "Got error while pthread_cond_timedwait(&gio_cond_gotevent, &gio_mutex_prefetcher, &ts_abs)" );
239  }
240 
241  g_main_context_wakeup ( NULL );
242  pthread_mutex_unlock ( &gio_mutex_prefetcher );
243  debug ( 10, "return" );
244  return NULL;
245 }
246 
247 static inline int gio_wait_now ( ctx_t *ctx_p, struct indexes *indexes_p, struct timeval *tv_p )
248 {
249  ( void ) ctx_p;
250  ( void ) indexes_p;
251  void *ret;
252  int result;
253  debug ( 3, "(ctx_p, indexes_p, %p {%u, %u})", tv_p, tv_p == NULL ? -1 : tv_p->tv_sec, tv_p == NULL ? 0 : tv_p->tv_usec );
254 #ifdef PARANOID
255  critical_on ( tv_p == NULL );
256 #endif
257 
258  if ( queue_length ) {
259  debug ( 9, "already: queue_length == %i", queue_length );
260  return queue_length;
261  }
262 
263  if ( tv_p->tv_sec == 0 && tv_p->tv_usec == 0 ) {
264  g_main_context_iteration ( NULL, FALSE );
265  debug ( 9, "nowait: queue_length == %i", queue_length );
266  return queue_length;
267  }
268 
270  pthread_create ( &thread_g_iteration_stop, NULL, g_iteration_stop, tv_p );
271  /*
272  debug(30, "pthread_spin_unlock(&queue_lock);");
273  pthread_spin_unlock(&queue_lock);
274  debug(30 , "g_main_context_iteration(NULL, FALSE);");
275  result = g_main_context_iteration(NULL, FALSE);
276  debug(30, "pthread_spin_lock(&queue_lock);");
277  pthread_spin_lock(&queue_lock);
278 
279  if (queue_length) {
280  debug(9, "already2: queue_length == %i", queue_length);
281  return queue_length;
282  }
283  */
284  debug_call ( 40, pthread_spin_unlock ( &queue_lock ) );
285  debug ( 20, "g_main_context_iteration(NULL, TRUE); queue_length == %i", queue_length );
286  result = g_main_context_iteration ( NULL, TRUE );
287  debug ( 10, "g_main_context_iteration() -> %i", result );
288  debug_call ( 40, pthread_spin_lock ( &queue_lock ) );
289  critical_on ( pthread_mutex_lock ( &gio_mutex_prefetcher ) );
291  critical_on ( pthread_mutex_unlock ( &gio_mutex_prefetcher ) );
292  critical_on ( pthread_cond_broadcast ( &gio_cond_gotevent ) );
293  critical_on ( pthread_join ( thread_g_iteration_stop, &ret ) );
294  debug ( 9, "queue_length == %i", queue_length );
295  return queue_length;
296 }
297 int gio_wait ( ctx_t *ctx_p, struct indexes *indexes_p, struct timeval *tv_p )
298 {
299  ( void ) ctx_p;
300  int ret;
301  debug ( 30, "pthread_spin_lock(&queue_lock);" );
302  debug_call ( 40, pthread_spin_lock ( &queue_lock ) );
303  ret = gio_wait_now ( ctx_p, indexes_p, tv_p );
304  debug ( 30, "pthread_spin_unlock(&queue_lock);" );
305  debug_call ( 40, pthread_spin_unlock ( &queue_lock ) );
306  return ret;
307 }
308 
309 int gio_handle ( ctx_t *ctx_p, indexes_t *indexes_p )
310 {
311  static struct timeval tv = {0};
312  int count;
313  char *path_full = NULL;
314  size_t path_full_len = 0;
315  count = 0;
316 
317  while ( gio_wait ( ctx_p, indexes_p, &tv ) ) {
318  event_t *ev = event_pop();
319  stat64_t lst, *lst_p;
320  mode_t st_mode;
321  size_t st_size;
322 
323  if ( ( ev->objtype_new == EOT_DOESNTEXIST ) || ( ctx_p->flags[CANCEL_SYSCALLS]&CSC_MON_STAT ) || lstat64 ( ev->path, &lst ) ) {
324  debug ( 2, "Cannot lstat64(\"%s\", lst). Seems, that the object had been deleted (%i) or option \"--cancel-syscalls mon_stat\" (%i) is set.", ev->path, ev->objtype_new == EOT_DOESNTEXIST, ctx_p->flags[CANCEL_SYSCALLS]&CSC_MON_STAT );
325  st_mode = ( ev->objtype_event == EOT_DIR ? S_IFDIR : S_IFREG );
326  st_size = 0;
327  lst_p = NULL;
328  } else {
329  st_mode = lst.st_mode;
330  st_size = lst.st_size;
331  lst_p = &lst;
332  }
333 
334  if ( sync_prequeue_loadmark ( 1, ctx_p, indexes_p, NULL, ev->path, lst_p, ev->objtype_old, ev->objtype_new, ev->event_id, ev->handle_id, st_mode, st_size, &path_full, &path_full_len, NULL ) ) {
335  event_free ( ev );
336  count = -1;
337  break;
338  }
339 
340  event_free ( ev );
341  count++;
342  }
343 
344  // Globally queueing captured events:
345  // Moving events from local queue to global ones
346  sync_prequeue_unload ( ctx_p, indexes_p );
347  free ( path_full );
348 #ifdef VERYPARANOID
349  path_full = NULL;
350  path_full_len = 0;
351 #endif
352  return count;
353 }
354 
355 void free_filemondat ( void *_fmdat )
356 {
357  filemondata_t *fmdat = _fmdat;
358  g_signal_handler_disconnect ( fmdat->file, fmdat->handle_id );
359  free ( fmdat->file );
360  free ( fmdat->filemon );
361  free ( fmdat );
362  return;
363 }
364 
365 GMainLoop *gio_loop = NULL;
367 {
368  ( void ) ctx_p;
369  queue_length = 0;
370  queue_alloc = 0;
371  pthread_mutex_init ( &gio_mutex_prefetcher, NULL );
372  pthread_cond_init ( &gio_cond_gotevent, NULL );
373  pthread_spin_init ( &queue_lock, PTHREAD_PROCESS_SHARED );
374  mondirs_ht = g_hash_table_new_full ( g_str_hash, g_str_equal, free, free_filemondat );
375  gio_loop = g_main_loop_new ( NULL, TRUE );
376  g_main_context_iteration ( NULL, FALSE );
377  return 0;
378 }
379 
381 {
382  ( void ) ctx_p;
383  /*
384  g_main_loop_quit(gio_loop);
385  g_hash_table_destroy (mondirs_ht);
386  pthread_spin_destroy (&queue_lock);
387  pthread_cond_destroy (&gio_cond_gotevent);
388  pthread_mutex_destroy (&gio_mutex_prefetcher);
389  */
390  return 0;
391 }
392 
sync_prequeue_unload
int sync_prequeue_unload(ctx_t *ctx_p, indexes_t *indexes_p)
Definition: sync.c:2564
queue_lock
pthread_spinlock_t queue_lock
Definition: mon_gio.c:52
ctx
Definition: ctx.h:315
event::path
char * path
Definition: mon_gio.c:42
ALLOC_PORTION
#define ALLOC_PORTION
Definition: configuration.h:117
sync.h
EOT_DOESNTEXIST
@ EOT_DOESNTEXIST
Definition: clsync.h:31
filemondata::file
GFile * file
Definition: mon_gio.c:35
eventobjtype_t
enum eventobjtype eventobjtype_t
Definition: clsync.h:37
event_free
static void event_free(event_t *ev)
Definition: mon_gio.c:59
sync_prequeue_loadmark
int sync_prequeue_loadmark(int monitored, ctx_t *ctx_p, indexes_t *indexes_p, const char *path_full, const char *path_rel, stat64_t *lst_p, eventobjtype_t objtype_old, eventobjtype_t objtype_new, uint32_t event_mask, int event_wd, mode_t st_mode, off_t st_size, char **path_buf_p, size_t *path_buf_len_p, eventinfo_t *evinfo)
Definition: sync.c:2143
ctx::watchdirlen
size_t watchdirlen
Definition: ctx.h:375
indexes
Definition: indexes.h:34
event_push
static int event_push(char *path, gulong handle_id, GFileMonitorEvent event, eventobjtype_t objtype_event, eventobjtype_t objtype_old, eventobjtype_t objtype_new)
Definition: mon_gio.c:65
gio_deinit
int gio_deinit(ctx_t *ctx_p)
Definition: mon_gio.c:380
event
Definition: mon_gio.c:41
filemondata::ctx_p
ctx_t * ctx_p
Definition: mon_gio.c:34
mon_gio.h
dir_gotevent
static void dir_gotevent(GFileMonitor *filemon, GFile *file, GFile *file_other, GFileMonitorEvent event, gpointer arg)
Definition: mon_gio.c:101
privileged.h
g_iteration_stop
void * g_iteration_stop(void *_timeout_p)
Definition: mon_gio.c:207
ctx::flags
int flags[(1<< 10)]
Definition: ctx.h:338
CSC_MON_STAT
@ CSC_MON_STAT
Definition: ctx.h:310
gio_handle
int gio_handle(ctx_t *ctx_p, indexes_t *indexes_p)
Definition: mon_gio.c:309
gio_cond_gotevent
pthread_cond_t gio_cond_gotevent
Definition: mon_gio.c:54
CANCEL_SYSCALLS
@ CANCEL_SYSCALLS
Definition: ctx.h:128
filemondata::handle_id
gulong handle_id
Definition: mon_gio.c:37
gio_init
int gio_init(ctx_t *ctx_p)
Definition: mon_gio.c:366
queue_length
int queue_length
Definition: mon_gio.c:56
gio_wait
int gio_wait(ctx_t *ctx_p, struct indexes *indexes_p, struct timeval *tv_p)
Definition: mon_gio.c:297
debug_call
#define debug_call(debug_level, code)
Definition: error.h:55
filemondata
Definition: mon_gio.c:33
error
#define error(...)
Definition: error.h:36
event::handle_id
gulong handle_id
Definition: mon_gio.c:43
indexes.h
event::objtype_old
eventobjtype_t objtype_old
Definition: mon_gio.c:46
error.h
gio_add_watch_dir
int gio_add_watch_dir(ctx_t *ctx_p, indexes_t *indexes_p, const char *const accpath)
Definition: mon_gio.c:180
event::objtype_new
eventobjtype_t objtype_new
Definition: mon_gio.c:47
filemondata::filemon
GFileMonitor * filemon
Definition: mon_gio.c:36
debug
#define debug(debug_level,...)
Definition: error.h:50
event::objtype_event
eventobjtype_t objtype_event
Definition: mon_gio.c:45
queue_alloc
int queue_alloc
Definition: mon_gio.c:57
critical
#define critical(...)
Definition: error.h:32
cancel_g_iteration_stop
int cancel_g_iteration_stop
Definition: mon_gio.c:205
warning
#define warning(...)
Definition: error.h:40
common.h
free_filemondat
void free_filemondat(void *_fmdat)
Definition: mon_gio.c:355
event_pop
static event_t * event_pop()
Definition: mon_gio.c:89
event::event_id
GFileMonitorEvent event_id
Definition: mon_gio.c:44
stat64_t
struct stat64 stat64_t
Definition: port-hacks.h:65
GIO_QUEUE_LENGTH_MAX
#define GIO_QUEUE_LENGTH_MAX
Definition: configuration.h:245
gio_wait_now
static int gio_wait_now(ctx_t *ctx_p, struct indexes *indexes_p, struct timeval *tv_p)
Definition: mon_gio.c:247
thread_g_iteration_stop
pthread_t thread_g_iteration_stop
Definition: mon_gio.c:206
gio_mutex_prefetcher
pthread_mutex_t gio_mutex_prefetcher
Definition: mon_gio.c:53
critical_on
#define critical_on(cond)
Definition: error.h:33
gio_loop
GMainLoop * gio_loop
Definition: mon_gio.c:365
EOT_FILE
@ EOT_FILE
Definition: clsync.h:32
EOT_DIR
@ EOT_DIR
Definition: clsync.h:33
mondirs_ht
GHashTable * mondirs_ht
Definition: mon_gio.c:51
ctx_p
ctx_t * ctx_p
Definition: mon_kqueue.c:85
INFINITETIME
#define INFINITETIME
queue
event_t * queue
Definition: mon_gio.c:55