clsync
Loading...
Searching...
No Matches
mon_gio.c
Go to the documentation of this file.
1/*
2 clsync - file tree sync utility based on inotify/kqueue/bsm/gio
3
4 Copyright (C) 2013-2014 Dmitry Yu Okunev <dyokunev@ut.mephi.ru> 0x8E30679C
5
6 This program is free software: you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, either version 3 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20// The "queue" is actually "stack" in this code. It's a lack of design of this code.
21
22#include "common.h"
23#include "error.h"
24#include "sync.h"
25#include "indexes.h"
26#include "privileged.h"
27
28#include <pthread.h>
29#include <gio/gio.h>
30
31#include "mon_gio.h"
32
35 GFile *file;
36 GFileMonitor *filemon;
37 gulong handle_id;
38};
40
49typedef struct event event_t;
50
51GHashTable *mondirs_ht;
52pthread_spinlock_t queue_lock;
53pthread_mutex_t gio_mutex_prefetcher = PTHREAD_MUTEX_INITIALIZER;
54pthread_cond_t gio_cond_gotevent = PTHREAD_COND_INITIALIZER;
55event_t *queue = NULL;
58
59static inline void event_free ( event_t *ev )
60{
61 free ( ev->path );
62 return;
63}
64
66{
67 event_t *ev;
68 debug ( 30, "pthread_spin_lock(&queue_lock);" );
69 pthread_spin_lock ( &queue_lock );
70
71 if ( queue_length >= queue_alloc ) {
74 queue = xrealloc ( queue, queue_alloc * sizeof ( *queue ) );
75 }
76
77 ev = &queue[queue_length++];
78 ev->path = path;
79 ev->event_id = event;
80 ev->handle_id = handle_id;
84 debug ( 30, "pthread_spin_unlock(&queue_lock);" );
85 pthread_spin_unlock ( &queue_lock );
86 return 0;
87}
88
89static inline event_t *event_pop()
90{
91 static event_t ev;
92 debug ( 30, "pthread_spin_lock(&queue_lock);" );
93 pthread_spin_lock ( &queue_lock );
95 memcpy ( &ev, &queue[--queue_length], sizeof ( ev ) );
96 debug ( 30, "pthread_spin_unlock(&queue_lock);" );
97 pthread_spin_unlock ( &queue_lock );
98 return &ev;
99}
100
101static void dir_gotevent (
102 GFileMonitor *filemon,
103 GFile *file,
104 GFile *file_other,
105 GFileMonitorEvent event,
106 gpointer arg
107)
108{
110 filemondata_t *fmdat = arg;
111 ctx_t *ctx_p = fmdat->ctx_p;
112 GFileType filetype = g_file_query_file_type ( file, G_FILE_QUERY_INFO_NOFOLLOW_SYMLINKS, NULL );
113 debug ( 10, "%p %p %p %i %p %i", filemon, file, file_other, event, arg, filetype );
114 char *path_full, *path_rel = NULL;
115
116 switch ( event ) {
117 case G_FILE_MONITOR_EVENT_DELETED:
118 case G_FILE_MONITOR_EVENT_CREATED:
119 case G_FILE_MONITOR_EVENT_CHANGED:
120 case G_FILE_MONITOR_EVENT_ATTRIBUTE_CHANGED:
121 path_full = g_file_get_path ( file );
122 path_rel = strdup ( &path_full[ctx_p->watchdirlen + 1] );
123 g_free ( path_full );
124 debug ( 9, "Got event %i for \"%s\" (%i)", event, path_rel, filetype );
125 break;
126
127 default:
128 break;
129 }
130
131 switch ( filetype ) {
132 case G_FILE_TYPE_DIRECTORY:
133 objtype = EOT_DIR;
134 break;
135
136 default:
137 objtype = EOT_FILE;
138 break;
139 }
140
141 switch ( event ) {
142 case G_FILE_MONITOR_EVENT_DELETED:
143 objtype_old = objtype;
145 break;
146
147 case G_FILE_MONITOR_EVENT_CREATED:
149 objtype_new = objtype;
150 break;
151
152 default:
153 objtype_old = objtype;
154 objtype_new = objtype;
155 break;
156 }
157
158 switch ( event ) {
159 case G_FILE_MONITOR_EVENT_DELETED:
160 debug ( 20, "g_hash_table_remove(mondirs_ht, \"%s\")", path_rel );
161 g_hash_table_remove ( mondirs_ht, path_rel );
162
163 case G_FILE_MONITOR_EVENT_CREATED:
164 case G_FILE_MONITOR_EVENT_CHANGED:
165 case G_FILE_MONITOR_EVENT_ATTRIBUTE_CHANGED:
166#ifdef PARANOID
167 critical_on ( path_rel == NULL );
168#endif
169 debug ( 20, "event_push(\"%s\", %i, %i, %i, %i, %i)", path_rel, fmdat->handle_id, event, objtype, objtype_old, objtype_new );
170 critical_on ( event_push ( path_rel, fmdat->handle_id, event, objtype, objtype_old, objtype_new ) );
171 break;
172
173 default:
174 break;
175 }
176
177 return;
178}
179
180int gio_add_watch_dir ( ctx_t *ctx_p, indexes_t *indexes_p, const char *const accpath )
181{
182 ( void ) indexes_p;
183 filemondata_t *fmdat;
184 GError *error = NULL;
185 debug ( 3, "\"%s\"", accpath );
186#ifdef PARANOID
187 fmdat = g_hash_table_lookup ( mondirs_ht, accpath );
188
189 if ( fmdat != NULL ) {
190 errno = EADDRINUSE;
191 warning ( "Directory \"%s\" is already monitored.", accpath );
192 return -1;
193 }
194
195#endif
196 fmdat = xmalloc ( sizeof ( *fmdat ) );
197 fmdat->ctx_p = ctx_p;
198 fmdat->file = g_file_new_for_path ( accpath );
199 fmdat->filemon = g_file_monitor_directory ( fmdat->file, 0, NULL, &error );
200 fmdat->handle_id = g_signal_connect ( fmdat->filemon, "changed", G_CALLBACK ( dir_gotevent ), fmdat );
201 g_hash_table_replace ( mondirs_ht, strdup ( accpath ), fmdat );
202 return fmdat->handle_id;
203}
204
207void *g_iteration_stop ( void *_timeout_p )
208{
209 struct timeval *timeout_p = _timeout_p;
210 struct timeval tv_abs, timeout_abs;
211 struct timespec ts_abs;
212 debug ( 10, "{%u, %u}", timeout_p->tv_sec, timeout_p->tv_usec );
213 critical_on ( pthread_mutex_lock ( &gio_mutex_prefetcher ) );
214
216 critical_on ( pthread_mutex_unlock ( &gio_mutex_prefetcher ) );
217 return NULL;
218 }
219
220#define INFINITETIME (3600 * 24 * 365 * 10) /* ~10 years */
221
222 if ( timeout_p->tv_sec > INFINITETIME )
223 timeout_p->tv_sec = INFINITETIME;
224
225#undef INFINITETIME
226 gettimeofday ( &tv_abs, NULL );
227 timeradd ( &tv_abs, timeout_p, &timeout_abs );
228 ts_abs.tv_sec = timeout_abs.tv_sec;
229 ts_abs.tv_nsec = timeout_abs.tv_usec * 1000;
230 debug ( 10, "{%u, %u}", ts_abs.tv_sec, ts_abs.tv_nsec );
231
232 switch ( ( errno = pthread_cond_timedwait ( &gio_cond_gotevent, &gio_mutex_prefetcher, &ts_abs ) ) ) {
233 case 0:
234 case ETIMEDOUT:
235 break;
236
237 default:
238 critical ( "Got error while pthread_cond_timedwait(&gio_cond_gotevent, &gio_mutex_prefetcher, &ts_abs)" );
239 }
240
241 g_main_context_wakeup ( NULL );
242 pthread_mutex_unlock ( &gio_mutex_prefetcher );
243 debug ( 10, "return" );
244 return NULL;
245}
246
247static inline int gio_wait_now ( ctx_t *ctx_p, struct indexes *indexes_p, struct timeval *tv_p )
248{
249 ( void ) ctx_p;
250 ( void ) indexes_p;
251 void *ret;
252 int result;
253 debug ( 3, "(ctx_p, indexes_p, %p {%u, %u})", tv_p, tv_p == NULL ? -1 : tv_p->tv_sec, tv_p == NULL ? 0 : tv_p->tv_usec );
254#ifdef PARANOID
255 critical_on ( tv_p == NULL );
256#endif
257
258 if ( queue_length ) {
259 debug ( 9, "already: queue_length == %i", queue_length );
260 return queue_length;
261 }
262
263 if ( tv_p->tv_sec == 0 && tv_p->tv_usec == 0 ) {
264 g_main_context_iteration ( NULL, FALSE );
265 debug ( 9, "nowait: queue_length == %i", queue_length );
266 return queue_length;
267 }
268
270 pthread_create ( &thread_g_iteration_stop, NULL, g_iteration_stop, tv_p );
271 /*
272 debug(30, "pthread_spin_unlock(&queue_lock);");
273 pthread_spin_unlock(&queue_lock);
274 debug(30 , "g_main_context_iteration(NULL, FALSE);");
275 result = g_main_context_iteration(NULL, FALSE);
276 debug(30, "pthread_spin_lock(&queue_lock);");
277 pthread_spin_lock(&queue_lock);
278
279 if (queue_length) {
280 debug(9, "already2: queue_length == %i", queue_length);
281 return queue_length;
282 }
283 */
284 debug_call ( 40, pthread_spin_unlock ( &queue_lock ) );
285 debug ( 20, "g_main_context_iteration(NULL, TRUE); queue_length == %i", queue_length );
286 result = g_main_context_iteration ( NULL, TRUE );
287 debug ( 10, "g_main_context_iteration() -> %i", result );
288 debug_call ( 40, pthread_spin_lock ( &queue_lock ) );
289 critical_on ( pthread_mutex_lock ( &gio_mutex_prefetcher ) );
291 critical_on ( pthread_mutex_unlock ( &gio_mutex_prefetcher ) );
292 critical_on ( pthread_cond_broadcast ( &gio_cond_gotevent ) );
293 critical_on ( pthread_join ( thread_g_iteration_stop, &ret ) );
294 debug ( 9, "queue_length == %i", queue_length );
295 return queue_length;
296}
297int gio_wait ( ctx_t *ctx_p, struct indexes *indexes_p, struct timeval *tv_p )
298{
299 ( void ) ctx_p;
300 int ret;
301 debug ( 30, "pthread_spin_lock(&queue_lock);" );
302 debug_call ( 40, pthread_spin_lock ( &queue_lock ) );
303 ret = gio_wait_now ( ctx_p, indexes_p, tv_p );
304 debug ( 30, "pthread_spin_unlock(&queue_lock);" );
305 debug_call ( 40, pthread_spin_unlock ( &queue_lock ) );
306 return ret;
307}
308
309int gio_handle ( ctx_t *ctx_p, indexes_t *indexes_p )
310{
311 static struct timeval tv = {0};
312 int count;
313 char *path_full = NULL;
314 size_t path_full_len = 0;
315 count = 0;
316
317 while ( gio_wait ( ctx_p, indexes_p, &tv ) ) {
318 event_t *ev = event_pop();
319 stat64_t lst, *lst_p;
320 mode_t st_mode;
321 size_t st_size;
322
323 if ( ( ev->objtype_new == EOT_DOESNTEXIST ) || ( ctx_p->flags[CANCEL_SYSCALLS]&CSC_MON_STAT ) || lstat64 ( ev->path, &lst ) ) {
324 debug ( 2, "Cannot lstat64(\"%s\", lst). Seems, that the object had been deleted (%i) or option \"--cancel-syscalls mon_stat\" (%i) is set.", ev->path, ev->objtype_new == EOT_DOESNTEXIST, ctx_p->flags[CANCEL_SYSCALLS]&CSC_MON_STAT );
325 st_mode = ( ev->objtype_event == EOT_DIR ? S_IFDIR : S_IFREG );
326 st_size = 0;
327 lst_p = NULL;
328 } else {
329 st_mode = lst.st_mode;
330 st_size = lst.st_size;
331 lst_p = &lst;
332 }
333
334 if ( sync_prequeue_loadmark ( 1, ctx_p, indexes_p, NULL, ev->path, lst_p, ev->objtype_old, ev->objtype_new, ev->event_id, ev->handle_id, st_mode, st_size, &path_full, &path_full_len, NULL ) ) {
335 event_free ( ev );
336 count = -1;
337 break;
338 }
339
340 event_free ( ev );
341 count++;
342 }
343
344 // Globally queueing captured events:
345 // Moving events from local queue to global ones
346 sync_prequeue_unload ( ctx_p, indexes_p );
347 free ( path_full );
348#ifdef VERYPARANOID
349 path_full = NULL;
350 path_full_len = 0;
351#endif
352 return count;
353}
354
355void free_filemondat ( void *_fmdat )
356{
357 filemondata_t *fmdat = _fmdat;
358 g_signal_handler_disconnect ( fmdat->file, fmdat->handle_id );
359 free ( fmdat->file );
360 free ( fmdat->filemon );
361 free ( fmdat );
362 return;
363}
364
365GMainLoop *gio_loop = NULL;
367{
368 ( void ) ctx_p;
369 queue_length = 0;
370 queue_alloc = 0;
371 pthread_mutex_init ( &gio_mutex_prefetcher, NULL );
372 pthread_cond_init ( &gio_cond_gotevent, NULL );
373 pthread_spin_init ( &queue_lock, PTHREAD_PROCESS_SHARED );
374 mondirs_ht = g_hash_table_new_full ( g_str_hash, g_str_equal, free, free_filemondat );
375 gio_loop = g_main_loop_new ( NULL, TRUE );
376 g_main_context_iteration ( NULL, FALSE );
377 return 0;
378}
379
381{
382 ( void ) ctx_p;
383 /*
384 g_main_loop_quit(gio_loop);
385 g_hash_table_destroy (mondirs_ht);
386 pthread_spin_destroy (&queue_lock);
387 pthread_cond_destroy (&gio_cond_gotevent);
388 pthread_mutex_destroy (&gio_mutex_prefetcher);
389 */
390 return 0;
391}
392
enum eventobjtype eventobjtype_t
Definition clsync.h:37
@ EOT_DIR
Definition clsync.h:33
@ EOT_DOESNTEXIST
Definition clsync.h:31
@ EOT_FILE
Definition clsync.h:32
#define GIO_QUEUE_LENGTH_MAX
#define ALLOC_PORTION
@ CANCEL_SYSCALLS
Definition ctx.h:128
@ CSC_MON_STAT
Definition ctx.h:310
#define critical(...)
Definition error.h:32
#define error(...)
Definition error.h:36
#define debug(debug_level,...)
Definition error.h:50
#define debug_call(debug_level, code)
Definition error.h:55
#define warning(...)
Definition error.h:40
#define critical_on(cond)
Definition error.h:33
pthread_cond_t gio_cond_gotevent
Definition mon_gio.c:54
int gio_deinit(ctx_t *ctx_p)
Definition mon_gio.c:380
int cancel_g_iteration_stop
Definition mon_gio.c:205
pthread_t thread_g_iteration_stop
Definition mon_gio.c:206
GHashTable * mondirs_ht
Definition mon_gio.c:51
GMainLoop * gio_loop
Definition mon_gio.c:365
pthread_mutex_t gio_mutex_prefetcher
Definition mon_gio.c:53
int gio_init(ctx_t *ctx_p)
Definition mon_gio.c:366
int queue_alloc
Definition mon_gio.c:57
int gio_add_watch_dir(ctx_t *ctx_p, indexes_t *indexes_p, const char *const accpath)
Definition mon_gio.c:180
int queue_length
Definition mon_gio.c:56
event_t * queue
Definition mon_gio.c:55
#define INFINITETIME
static int gio_wait_now(ctx_t *ctx_p, struct indexes *indexes_p, struct timeval *tv_p)
Definition mon_gio.c:247
void free_filemondat(void *_fmdat)
Definition mon_gio.c:355
static int event_push(char *path, gulong handle_id, GFileMonitorEvent event, eventobjtype_t objtype_event, eventobjtype_t objtype_old, eventobjtype_t objtype_new)
Definition mon_gio.c:65
static event_t * event_pop()
Definition mon_gio.c:89
static void event_free(event_t *ev)
Definition mon_gio.c:59
pthread_spinlock_t queue_lock
Definition mon_gio.c:52
int gio_wait(ctx_t *ctx_p, struct indexes *indexes_p, struct timeval *tv_p)
Definition mon_gio.c:297
int gio_handle(ctx_t *ctx_p, indexes_t *indexes_p)
Definition mon_gio.c:309
static void dir_gotevent(GFileMonitor *filemon, GFile *file, GFile *file_other, GFileMonitorEvent event, gpointer arg)
Definition mon_gio.c:101
void * g_iteration_stop(void *_timeout_p)
Definition mon_gio.c:207
ctx_t * ctx_p
Definition mon_kqueue.c:85
struct stat64 stat64_t
Definition port-hacks.h:65
Definition ctx.h:315
int flags[(1<< 10)]
Definition ctx.h:338
size_t watchdirlen
Definition ctx.h:375
eventobjtype_t objtype_old
Definition mon_gio.c:46
GFileMonitorEvent event_id
Definition mon_gio.c:44
eventobjtype_t objtype_new
Definition mon_gio.c:47
char * path
Definition mon_gio.c:42
gulong handle_id
Definition mon_gio.c:43
eventobjtype_t objtype_event
Definition mon_gio.c:45
GFileMonitor * filemon
Definition mon_gio.c:36
gulong handle_id
Definition mon_gio.c:37
ctx_t * ctx_p
Definition mon_gio.c:34
GFile * file
Definition mon_gio.c:35
int sync_prequeue_unload(ctx_t *ctx_p, indexes_t *indexes_p)
Definition sync.c:2564
int sync_prequeue_loadmark(int monitored, ctx_t *ctx_p, indexes_t *indexes_p, const char *path_full, const char *path_rel, stat64_t *lst_p, eventobjtype_t objtype_old, eventobjtype_t objtype_new, uint32_t event_mask, int event_wd, mode_t st_mode, off_t st_size, char **path_buf_p, size_t *path_buf_len_p, eventinfo_t *evinfo)
Definition sync.c:2143