Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file CCPool.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694(* This file is free software, part of containers. See file "license" for more details. *)(** {1 Thread Pool, and Futures} *)type+'astate=|Doneof'a|Waiting|FailedofexnmoduletypePARAM=sigvalmax_size:int(** Maximum number of threads in the pool *)endexceptionStopped(*$inject
module P = Make(struct let max_size = 30 end)
module P2 = Make(struct let max_size = 15 end)
module Fut = P.Fut
module Fut2 = P2.Fut
*)(** {2 Thread pool} *)moduleMake(P:PARAM)=structtypejob=|Job1:('a->_)*'a->job|Job2:('a->'b->_)*'a*'b->job|Job3:('a->'b->'c->_)*'a*'b*'c->job|Job4:('a->'b->'c->'d->_)*'a*'b*'c*'d->jobtypet={mutablestop:bool;(* indicate that threads should stop *)mutableexn_handler:(exn->unit);mutex:Mutex.t;cond:Condition.t;jobs:jobQueue.t;(* waiting jobs *)mutablecur_size:int;(* total number of threads *)mutablecur_idle:int;(* number of idle threads *)}(** Dynamic, growable thread pool *)letnop__=()(* singleton pool *)letpool={stop=false;exn_handler=nop_;cond=Condition.create();cur_size=0;cur_idle=0;jobs=Queue.create();mutex=Mutex.create();}letset_exn_handlerf=pool.exn_handler<-fletwith_lock_tf=Mutex.lockt.mutex;tryletx=ftinMutex.unlockt.mutex;xwithe->Mutex.unlockt.mutex;raiseeletincr_size_p=p.cur_size<-p.cur_size+1letdecr_size_p=p.cur_size<-p.cur_size-1letincr_idle_p=p.cur_idle<-p.cur_idle+1letdecr_idle_p=p.cur_idle<-p.cur_idle-1(* next thing a thread should do *)typecommand=|Processofjob|Wait(* wait on condition *)|Die(* thread has no work to do *)(* thread: seek what to do next (including dying).
Assumes the pool is locked. *)letget_next_pool=(*Printf.printf "get_next (cur=%d, idle=%d, stop=%B)\n%!" pool.cur_size pool.cur_idle pool.stop;*)ifpool.stop||(Queue.is_emptypool.jobs&&pool.cur_size>0)then((* die: the thread would be idle otherwise *)(*Printf.printf "time… to die (cur=%d, idle=%d, stop=%B)\n%!" pool.cur_size pool.cur_idle pool.stop;*)decr_size_pool;Die)elseifQueue.is_emptypool.jobsthen(Wait)else(letjob=Queue.poppool.jobsinProcessjob)(* Thread: entry point. They seek jobs in the queue *)letrecservepool=assert(pool.cur_size<=P.max_size);assert(pool.cur_size>0);letcmd=with_lock_poolget_next_inrun_cmdcmd(* run a command *)andrun_cmd=function|Die->()|Wait->with_lock_pool(funp->incr_idle_pool;Condition.waitp.condp.mutex;decr_idle_pool);servepool|Process(Job1(f,x))->begintryignore(fx)withe->pool.exn_handlereend;servepool|Process(Job2(f,x,y))->begintryignore(fxy)withe->pool.exn_handlereend;servepool|Process(Job3(f,x,y,z))->begintryignore(fxyz)withe->pool.exn_handlereend;servepool|Process(Job4(f,x,y,z,w))->begintryignore(fxyzw)withe->pool.exn_handlereend;servepool(* create a new worker thread *)letlaunch_worker_pool=with_lock_pool(funpool->incr_size_pool;ignore(Thread.createservepool))(* heuristic criterion for starting a new thread. *)letcan_start_thread_p=p.cur_size<P.max_sizeletrun_jobjob=(* acquire lock and push job in queue, or start thread directly
if the queue is empty *)with_lock_pool(funpool->ifpool.stopthenraiseStopped;ifQueue.is_emptypool.jobs&&can_start_thread_pool&&pool.cur_idle=0then((* create the thread now, on [job], as it will not break order of
jobs. We do not want to wait for the busy threads to do our task
if we are allowed to spawn a new thread. *)incr_size_pool;ignore(Thread.createrun_cmd(Processjob)))else((* cannot start thread, push and wait for some worker to pick it up *)Queue.pushjobpool.jobs;Condition.broadcastpool.cond;(* wake up some worker, if any *)(* might want to process in the background, if all threads are busy *)ifnot(Queue.is_emptypool.jobs)&&pool.cur_idle=0&&can_start_thread_poolthen(launch_worker_pool;)))(* run the function on the argument in the given pool *)letrun1fx=run_job(Job1(f,x))letrunf=run1f()letrun2fxy=run_job(Job2(f,x,y))letrun3fxyz=run_job(Job3(f,x,y,z))letrun4fxyzw=run_job(Job4(f,x,y,z,w))letactive()=notpool.stop(* kill threads in the pool *)letstop()=with_lock_pool(funp->p.stop<-true;Queue.clearp.jobs)(* stop threads if pool is GC'd *)let()=Gc.finalise(fun_->stop())pool(** {6 Futures} *)moduleFut=structtype'ahandler='astate->unit(** A proper future, with a delayed computation *)type'acell={mutablestate:'astate;mutablehandlers:'ahandlerlist;(* handlers *)f_mutex:Mutex.t;condition:Condition.t;}(** A future value of type 'a *)type'at=|Returnof'a|FailNowofexn|Runof'acelltype'afuture='at(** {2 Basic Future functions} *)letreturnx=Returnxletfaile=FailNoweletcreate_cell()={state=Waiting;handlers=[];f_mutex=Mutex.create();condition=Condition.create();}letwith_lock_cellf=Mutex.lockcell.f_mutex;tryletx=fcellinMutex.unlockcell.f_mutex;xwithe->Mutex.unlockcell.f_mutex;raisee(* TODO: exception handler for handler errors *)letset_done_cellx=with_lock_cell(funcell->matchcell.statewith|Waiting->(* set state and signal *)cell.state<-Donex;Condition.broadcastcell.condition;List.iter(funf->tryfcell.statewithe->pool.exn_handlere)cell.handlers|_->assertfalse)letset_fail_celle=with_lock_cell(funcell->matchcell.statewith|Waiting->cell.state<-Failede;Condition.broadcastcell.condition;List.iter(funf->tryfcell.statewithe->pool.exn_handlere)cell.handlers|_->assertfalse)(* calls [f x], and put result or exception in [cell] *)letrun_and_set1cellfx=trylety=fxinset_done_cellywithe->set_fail_celleletrun_and_set2cellfxy=tryletz=fxyinset_done_cellzwithe->set_fail_celleletmake1fx=letcell=create_cell()inrun3run_and_set1cellfx;Runcellletmakef=make1f()(*$R
List.iter
(fun n ->
let l = Iter.(1 -- n) |> Iter.to_list in
let l = List.rev_map (fun i ->
Fut.make
(fun () ->
Thread.delay 0.01;
1
)) l in
let l' = List.map Fut.get l in
OUnit.assert_equal n (List.fold_left (+) 0 l');
)
[ 10; 300; ]
*)(*$R
List.iter
(fun n ->
let l = Iter.(1 -- n) |> Iter.to_list in
let l = List.rev_map (fun i ->
Fut2.make
(fun () ->
Thread.delay 0.01;
1
)) l in
let l' = List.map Fut2.get l in
OUnit.assert_equal n (List.fold_left (+) 0 l');
)
[ 10; 300; ]
*)letmake2fxy=letcell=create_cell()inrun4run_and_set2cellfxy;Runcellletget=function|Returnx->x|FailNowe->raisee|Runcell->letrecget_cell=matchcell.statewith|Waiting->Condition.waitcell.conditioncell.f_mutex;(* wait *)get_cell|Donex->x|Failede->raiseeinwith_lock_cellget_(* access the result without locking *)letget_nolock_=function|Returnx|Run{state=Donex;_}->x|FailNow_|Run{state=(Failed_|Waiting);_}->assertfalseletstate=function|Returnx->Donex|FailNowe->Failede|Runcell->with_lock_cell(funcell->cell.state)letis_not_waiting=function|Waiting->false|Failed_|Done_->trueletis_done=function|Return_|FailNow_->true|Runcell->with_lock_cell(func->is_not_waitingc.state)(** {2 Combinators *)letadd_handler_cellf=with_lock_cell(funcell->matchcell.statewith|Waiting->cell.handlers<-f::cell.handlers|Done_|Failed_->fcell.state)leton_finishfutk=matchfutwith|Returnx->k(Donex)|FailNowe->k(Failede)|Runcell->add_handler_cellkleton_successfutk=on_finishfut(function|Donex->kx|_->())leton_failurefutk=on_finishfut(function|Failede->ke|_->())letmap_cell_~asyncfcell~into:cell'=add_handler_cell(function|Donex->ifasyncthenrun3run_and_set1cell'fxelserun_and_set1cell'fx|Failede->set_fail_cell'e|Waiting->assertfalse);Runcell'letmap_~asyncffut=matchfutwith|Returnx->ifasyncthenmake1fxelseReturn(fx)|FailNowe->FailNowe|Runcell->map_cell_~asyncfcell~into:(create_cell())letmapffut=map_~async:falseffutletmap_asyncffut=map_~async:trueffutletapp_~asyncfx=matchf,xwith|Returnf,Returnx->ifasyncthenmake1fxelseReturn(fx)|FailNowe,_|_,FailNowe->FailNowe|Returnf,Runx->map_cell_~async(funx->fx)x~into:(create_cell())|Runf,Returnx->map_cell_~async(funf->fx)f~into:(create_cell())|Runf,Runx->letcell'=create_cell()inadd_handler_f(function|Donef->ignore(map_cell_~asyncfx~into:cell')|Failede->set_fail_cell'e|Waiting->assertfalse);Runcell'letappfx=app_~async:falsefxletapp_asyncfx=app_~async:truefx(*$R
let a = Fut.make (fun () -> 1) in
let b = Fut.return 42 in
let c = Fut.monoid_product CCPair.make a b in
OUnit.assert_equal (1,42) (Fut.get c)
*)(*$R
let a = Fut.make (fun () -> 1) in
let b = Fut.make (fun () -> 42) in
let c = Fut.monoid_product CCPair.make a b in
OUnit.assert_equal (1,42) (Fut.get c)
*)(*$R
let a = Fut.make (fun () -> 1) in
let b = Fut.map succ @@ Fut.make (fun () -> 41) in
let c = Fut.monoid_product CCPair.make a b in
OUnit.assert_equal (1,42) (Fut.get c)
*)letmonoid_productfxy=matchx,ywith|Returnx,Returny->Return(fxy)|FailNowe,_|_,FailNowe->FailNowe|Returnx,Runy->map_cell_~async:false(funy->fxy)y~into:(create_cell())|Runx,Returny->map_cell_~async:false(funx->fxy)x~into:(create_cell())|Runx,Runy->letcell'=create_cell()inadd_handler_x(function|Donex->ignore(map_cell_~async:false(funy->fxy)y~into:cell')|Failede->set_fail_cell'e|Waiting->assertfalse);Runcell'letflat_mapffut=matchfutwith|Returnx->fx|FailNowe->FailNowe|Runcell->letcell'=create_cell()inadd_handler_cell(function|Donex->letfut'=fxinon_finishfut'(function|Doney->set_done_cell'y|Failede->set_fail_cell'e|Waiting->assertfalse)|Failede->set_fail_cell'e|Waiting->assertfalse);Runcell'letand_thenfutf=flat_map(fun_->f())futtype_array_or_list=|A_:'aarray->'aarray_or_list|L_:'alist->'aarray_or_listletiter_aol:typea.aarray_or_list->(a->unit)->unit=funaolf->matchaolwith|A_a->Array.iterfa|L_l->List.iterfl(* [sequence_ l f] returns a future that waits for every element of [l]
to return of fail, and call [f ()] to obtain the result (as a closure)
in case every element succeeded (otherwise a failure is
returned automatically) *)letsequence_:typeares.atarray_or_list->(unit->res)->rest=funaolf->letn=matchaolwith|A_a->Array.lengtha|L_l->List.lengthlinassert(n>0);letcell=create_cell()inletn_err=CCLock.create0in(* number of failed threads *)letn_ok=CCLock.create0in(* number of succeeding threads *)iter_aolaol(funfut->on_finishfut(function|Failede->letx=CCLock.incr_then_getn_errin(* if first failure, then seal [cell]'s fate now *)ifx=1thenset_fail_celle|Done_->letx=CCLock.incr_then_getn_okin(* if [n] successes, then [cell] succeeds. Otherwise, some
job has not finished or some job has failed. *)ifx=nthen(letres=f()inset_done_cellres)|Waiting->assertfalse));Runcell(* map an array of futures to a future array *)letsequence_aa=matchawith|[||]->return[||]|_->sequence_(A_a)(fun()->Array.mapget_nolock_a)letmap_afa=sequence_a(Array.mapfa)letsequence_ll=matchlwith|[]->return[]|_::_->letl=List.revlinsequence_(L_l)(fun()->List.rev_mapget_nolock_l)(* reverse twice *)letmap_lfl=matchlwith|[]->return[]|_->letl=List.rev_mapflinsequence_(L_l)(fun()->List.rev_mapget_nolock_l)(*$=
[2;3] (Fut.get @@ Fut.map_l (fun x -> Fut.return (x+1)) [1;2])
[] (Fut.get @@ Fut.map_l (fun x -> Fut.return (x+1)) [])
*)(*$R
let l = CCList.(1 -- 50) in
let l' = l
|> List.map
(fun x -> Fut.make (fun () -> Thread.delay 0.1; x*10))
|> Fut.sequence_l
|> Fut.map (List.fold_left (+) 0)
in
let expected = List.fold_left (fun acc x -> acc + 10 * x) 0 l in
OUnit.assert_equal expected (Fut.get l')
*)(*$R
let l = CCList.(1 -- 100_000) in
let l' = l
|> CCList.map
(fun x -> Fut.make (fun () -> 1))
|> Fut.sequence_l
|> Fut.map (List.fold_left (+) 0)
in
let expected = 100_000 in
OUnit.assert_equal expected (Fut.get l')
*)(*$R
let l = CCList.(1 -- 50) in
let l' = l
|> List.map
(fun x -> Fut.make (fun () -> Thread.delay 0.1; if x = 5 then raise Exit; x))
|> Fut.sequence_l
|> Fut.map (List.fold_left (+) 0)
in
OUnit.assert_raises Exit (fun () -> Fut.get l')
*)(*$R
let rec fib x = if x<2 then 1 else fib (x-1)+fib(x-2) in
let l =
CCList.(1--10_000)
|> List.rev_map
(fun x-> Fut.make (fun () -> Thread.yield(); fib (x mod 20)))
|> Fut.(map_l (fun x->x>|= fun x->x+1))
in
OUnit.assert_bool "not done" (Fut.state l = Waiting);
let l' = Fut.get l in
OUnit.assert_equal 10_000 (List.length l');
*)(*$R
let l = CCList.(1 -- 50) in
let l' = l
|> List.map
(fun x -> Fut2.make (fun () -> Thread.delay 0.1; x*10))
|> Fut2.sequence_l
|> Fut2.map (List.fold_left (+) 0)
in
let expected = List.fold_left (fun acc x -> acc + 10 * x) 0 l in
OUnit.assert_equal expected (Fut2.get l')
*)(*$R
let l = CCList.(1 -- 50) in
let l' = l
|> List.map
(fun x -> Fut2.make (fun () -> Thread.delay 0.1; if x = 5 then raise Exit; x))
|> Fut2.sequence_l
|> Fut2.map (List.fold_left (+) 0)
in
OUnit.assert_raises Exit (fun () -> Fut2.get l')
*)(*$R
let rec fib x = if x<2 then 1 else fib (x-1)+fib(x-2) in
let l =
CCList.(1--10_000)
|> List.rev_map
(fun x-> Fut2.make (fun () -> Thread.yield(); fib (x mod 20)))
|> Fut2.(map_l (fun x->x>|= fun x->x+1))
in
OUnit.assert_bool "not done" (Fut2.state l = Waiting);
let l' = Fut2.get l in
OUnit.assert_equal 10_000 (List.length l');
*)letchoose_:typea.atarray_or_list->at=funaol->letcell=create_cell()inletis_done=CCLock.createfalseiniter_aolaol(funfut->on_finishfut(funres->matchreswith|Waiting->assertfalse|Donex->letwas_done=CCLock.get_then_clearis_doneinifnotwas_donethenset_done_cellx|Failede->letwas_done=CCLock.get_then_clearis_doneinifnotwas_donethenset_fail_celle));Runcellletchoose_aa=choose_(A_a)letchoose_ll=choose_(L_l)letsleeptime=make1Thread.delaytime(*$R
let start = Unix.gettimeofday () in
let pause = 0.2 and n = 10 in
let l = CCList.(1 -- n)
|> List.map (fun _ -> Fut.make (fun () -> Thread.delay pause))
in
List.iter Fut.get l;
let stop = Unix.gettimeofday () in
OUnit.assert_bool "some_parallelism" (stop -. start < float_of_int n *. pause);
*)(*$R
let start = Unix.gettimeofday () in
let pause = 0.2 and n = 10 in
let l = CCList.(1 -- n)
|> List.map (fun _ -> Fut2.make (fun () -> Thread.delay pause))
in
List.iter Fut2.get l;
let stop = Unix.gettimeofday () in
OUnit.assert_bool "some_parallelism" (stop -. start < float_of_int n *. pause);
*)moduleInfix=structlet(>>=)xf=flat_mapfxlet(>>)af=and_thenaflet(>|=)af=mapfalet(<*>)=appincludeCCShimsMkLet_.Make(structtypenonrec'at='atlet(>>=)=(>>=)let(>|=)=(>|=)letmonoid_producta1a2=monoid_product(funxy->x,y)a1a2end)endincludeInfixendend