Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file async_inotify.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289(* We don't make calls to [Inotify] functions ([add_watch], [rm_watch], [read]) in
[In_thread.run] because:
- we don't think they can block for a while
- Inotify doesn't release the OCaml lock anyway
- it avoids changes to the set of watches racing with the Inotify.read loop below, by
preventing adding a watch and seeing an event about it before having filled the
hashtable (not that we have observed this particular race). *)openCoreopenAsyncmoduleInotify=Ocaml_inotify.Inotifytypemodify_event_selector=[`Any_change|`Closed_writable_fd]moduleEvent=structmoduleSelector=structtypet=|Created|Unlinked|Modified|Moved[@@derivingenumerate,compare,sexp_of]letinotify_selectorstsmodify_event_selector=List.dedup_and_sortts~compare|>List.concat_map~f:(function|Created->[Inotify.S_Create]|Unlinked->[S_Delete]|Modified->(matchmodify_event_selectorwith|`Any_change->[S_Modify]|`Closed_writable_fd->[S_Close_write])|Moved->[S_Move_self;S_Moved_from;S_Moved_to]);;endtypemove=|Awayofstring|Intoofstring|Moveofstring*string[@@derivingsexp_of]typet=|Createdofstring|Unlinkedofstring|Modifiedofstring|Movedofmove|Queue_overflow[@@derivingsexp_of]letmove_to_stringm=matchmwith|Aways->sprintf"%s -> Unknown"s|Intos->sprintf"Unknown -> %s"s|Move(f,t)->sprintf"%s -> %s"ft;;letto_stringt=matchtwith|Createds->sprintf"created %s"s|Unlinkeds->sprintf"unlinked %s"s|Movedmv->sprintf"moved %s"(move_to_stringmv)|Modifieds->sprintf"modified %s"s|Queue_overflow->"queue overflow";;endopenEventmoduleWatch=structtypet=Inotify.watchletcomparet1t2=Int.compare(Inotify.int_of_watcht1)(Inotify.int_of_watcht2)lethasht=Int.hash(Inotify.int_of_watcht)letsexp_of_tt=sexp_of_int(Inotify.int_of_watcht)endtypet={fd:Fd.t;watch_table:(Inotify.watch,string)Hashtbl.t;path_table:Inotify.watchString.Table.t;modify_event_selector:modify_event_selector;default_selectors:Inotify.selectorlist}typefile_info=string*Unix.Stats.tletadd?eventstpath=letwatch=Fd.with_file_descr_exnt.fd(funfd->Inotify.add_watchfdpath(matcheventswith|None->t.default_selectors|Somee->Event.Selector.inotify_selectorset.modify_event_selector))inHashtbl.sett.watch_table~key:watch~data:path;Hashtbl.sett.path_table~key:path~data:watch;return();;(* adds all the directories under path (including path) to t *)letadd_all?skip_dir?eventstpath=letoptions={Async_find.Options.defaultwithon_open_errors=Print;on_stat_errors=Print;skip_dir}inlet%bind()=add?eventstpathinletf=Async_find.create~optionspathinAsync_find.foldf~init:[]~f:(funfiles(fn,stat)->matchstat.kindwith|`Directory->let%map()=add?eventstfnin(fn,stat)::files|_->return((fn,stat)::files));;letremovetpath=matchHashtbl.findt.path_tablepathwith|None->return()|Somewatch->Fd.with_file_descr_exnt.fd(funfd->Inotify.rm_watchfdwatch);Hashtbl.removet.watch_tablewatch;Hashtbl.removet.path_tablepath;return();;(* with streams, this was effectively infinite, so pick a big number *)letsize_budget=10_000_000letraw_event_pipet=Pipe.create_reader~size_budget~close_on_exception:false(funw->Deferred.repeat_until_finished()(fun()->match%bindFd.ready_tot.fd`Readwith|`Bad_fd->failwith"Bad Inotify file descriptor"|`Closed->return(`Finished())|`Ready->(* Read in the async thread. We should reading memory, like what happens with
pipes and sockets, and unlike what happens with files, and we should know
that there's data. Ensure the fd is nonblock so the read raises instead of
blocking async if something has gone wrong. *)(matchFd.with_file_descr~nonblocking:truet.fdInotify.readwith|`Already_closed->return(`Finished())|`Errorexn->raise_s[%sexp"Inotify.read failed",(exn:Exn.t)]|`Okevents->letev_kinds=List.filter_mapevents~f:(fun(watch,ev_kinds,trans_id,fn)->(* queue overflow event is always reported on watch -1 *)ifInotify.int_of_watchwatch=-1then(letmaybe_overflow=List.filter_mapev_kinds~f:(funev->matchevwith|Q_overflow->Some(ev,trans_id,"<overflow>")|_->None)inifList.is_emptymaybe_overflowthenNoneelseSomemaybe_overflow)else(matchHashtbl.findt.watch_tablewatchwith|None->Print.eprintf"Events for an unknown watch (%d) [%s]\n"(Inotify.int_of_watchwatch)(String.concat~sep:", "(List.mapev_kinds~f:Inotify.string_of_event_kind));None|Somepath->letfn=matchfnwith|None->path|Somefn->path^/fninSome(List.mapev_kinds~f:(funev->ev,trans_id,fn))))|>List.concatinletpending_mv,actions=List.foldev_kinds~init:(None,[])~f:(fun(pending_mv,actions)(kind,trans_id,fn)->letadd_pendinglst=matchpending_mvwith|None->lst|Some(_,fn)->Moved(Awayfn)::lstinmatchkindwith|Moved_from->Some(trans_id,fn),add_pendingactions|Moved_to->(matchpending_mvwith|None->None,Moved(Intofn)::actions|Some(m_trans_id,m_fn)->ifInt32.(=)m_trans_idtrans_idthenNone,Moved(Move(m_fn,fn))::actionselseNone,Moved(Awaym_fn)::Moved(Intofn)::actions)|Move_self->Some(trans_id,fn),add_pendingactions|Create->None,Createdfn::add_pendingactions|Delete->None,Unlinkedfn::add_pendingactions|Modify|Close_write->None,Modifiedfn::add_pendingactions|Q_overflow->None,Queue_overflow::add_pendingactions|Delete_self->None,add_pendingactions|Access|Attrib|Open|Ignored|Isdir|Unmount|Close_nowrite->None,add_pendingactions)inletactions=List.rev(matchpending_mvwith|None->actions|Some(_,fn)->Moved(Awayfn)::actions)inList.iteractions~f:(Pipe.write_without_pushback_if_openw);let%map()=Pipe.pushbackwin`Repeat())));;letevent_pipe~watch_new_dirs?eventst=ifnotwatch_new_dirsthenraw_event_pipetelsePipe.create_reader~size_budget~close_on_exception:false(funw->Pipe.iter(raw_event_pipet)~f:(function|(Queue_overflow|Unlinked_|Moved_|Modified_)asev->Pipe.write_if_openwev|Createdpath->(match%bindMonitor.try_with(fun()->Unix.statpath)with|Error_->(* created file has already disappeared *)return()|Okstat->(matchstat.kindwith|`File|`Char|`Block|`Link|`Fifo|`Socket->Pipe.write_if_openw(Createdpath)|`Directory->Pipe.write_without_pushback_if_openw(Createdpath);let%bindadditions=add_all?eventstpathinList.iteradditions~f:(fun(file,_stat)->Pipe.write_without_pushback_if_openw(Createdfile));Pipe.pushbackw))));;letcreate_internal~modify_event_selector=letfd=Inotify.create()inlet%map()=In_thread.run(fun()->Core_unix.set_close_on_execfd)in(* fstat an on inotify fd says the filetype is File, but we tell async Fifo instead.
The reason is that async considers that for File, Fd.ready_to is meaningless and so
should return immediately. So instead we say Fifo, because an inotify fd is basically
the read end of a pipe whose write end is owned by the kernel, and more importantly,
Fd.create knows that fifos support nonblocking. *)letfd=Fd.createFifofd(Info.of_string"async_inotify")inletwatch_table=Hashtbl.create(moduleWatch)~size:10in{fd;watch_table;path_table=Hashtbl.create(moduleString)~size:10;modify_event_selector;default_selectors=Event.Selector.inotify_selectorsEvent.Selector.allmodify_event_selector};;letcreate_empty~modify_event_selector=let%mapt=create_internal~modify_event_selectorint,event_pipe~watch_new_dirs:falset;;letcreate?(modify_event_selector=`Any_change)?(recursive=true)?(watch_new_dirs=true)?(events=Event.Selector.all)path=letevents=ifwatch_new_dirsthenEvent.Selector.Created::Event.Selector.Moved::eventselseeventsinlet%bindt=create_internal~modify_event_selectorinletskip_dir=ifrecursivethenNoneelseSome(fun_->returntrue)inlet%mapinitial_files=add_all?skip_dir~eventstpathint,initial_files,event_pipe~watch_new_dirs~eventst;;letstopt=Fd.closet.fd