Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file find_common.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254openSigsopenSmart_flowlet(<.>)fgx=f(gx)moduleLog=(valletsrc=Logs.Src.create"find-common"inLogs.src_logsrc:Logs.LOG)let_initial_flush=16let_max_in_vain=256let_large_flush=16384let_pipe_safe_flush=32(* XXX(dinosaure): this part is really **ugly**! But we must follow the same
behaviour of [git]. Instead to understand the synchronisation process of [git]
with Smart.v1 and implement a state of the art synchronisation algorithm, I
translated as is [fetch-pack.c:find_common] in OCaml. *)letunsafe_write_havectxhex=letpacket=Fmt.str"have %s\n"hexinSmart.Unsafe.writectxpacketletnext_flushstatelesscount=ifstatelessthenifcount<_large_flushthencountlsl1elsecount*11/10elseifcount<_pipe_safe_flushthencountlsl1elsecount+_pipe_safe_flushtypeconfiguration={stateless:bool;mutablemulti_ack:[`None|`Some|`Detailed];no_done:bool;}type'uidhex={to_hex:'uid->string;of_hex:string->'uid;compare:'uid->'uid->int;}lettips{bind;return}{get;deref;locals;_}storenegotiator=let(>>=)=bindinlet(>>|)xf=x>>=funx->return(fx)inletrecgo=function|[]->return()|reference::others->derefstorereference>>=Option.fold~none:(returnNone)~some:(funuid->getuidstore)>>|Option.iter(funobj->Default.tipnegotiatorobj)>>=fun()->goothersinlocalsstore>>=goletconsume_shallow_list({bind;return}asscheduler)ioflowcfgdeepen{of_hex;_}_access_storectx=let(>>=)=bindinifcfg.stateless&&Option.is_somedeepenthenrunschedulerraiseioflowSmart.(recvctxshallows)>>=funshallows->letlst=List.map(Smart.Shallow.map~f:of_hex)shallowsinreturnlstelsereturn[]lethandle_shallow({bind;return}asscheduler)ioflow{of_hex;_}accessstorectx=let(>>=)=bindinrunschedulerraiseioflowSmart.(recvctxshallows)>>=funshallows->letlst=List.map(Smart.Shallow.map~f:of_hex)shallowsinletf=function|Smart.Shallow.Shallowuid->access.shallowstoreuid|Smart.Shallow.Unshallowuid->access.unshallowstoreuidinletrecgo=function[]->return()|h::t->fh>>=fun()->gotingolstletfind_common({bind;return}asscheduler)ioflow({stateless;no_done;_}ascfg)({to_hex;of_hex;compare}ashex)accessstorenegotiatorctx?(deepen:[`Depthofint|`Timestampofint64]option)refs=let(>>=)=bindinlet(>>|)xf=x>>=funx->return(fx)inletfold_left_s~fal=letrecgoa=function|[]->returna|x::r->fax>>=funa->goaringoalinletfoldaccremote_uid=Log.debug(funm->m"<%s> exists locally?"(to_hexremote_uid));access.getremote_uidstore>>=function|Some_->returnacc|None->return((remote_uid,ref0)::acc)infold_left_s~f:fold[]refs>>|List.sort_uniq(fun(a,_)(b,_)->compareab)>>=function|[]->Log.debug(funm->m"Nothing to download.");runschedulerraiseioflowSmart.(sendctxflush())>>=fun()->return`Close|uid::others->Log.debug(funm->m"We want %d commit(s)."(List.length(uid::others)));access.shallowedstore>>=funshallowed->letshallowed=List.mapto_hexshallowedinrunschedulerraiseioflowSmart.(letuid=(to_hex<.>fst)uidinletothers=List.map(to_hex<.>fst)othersinletcapabilities,_=Smart.Context.capabilitiesctxinletdeepen=(deepen:>[`Depthofint|`Notofstring|`Timestampofint64]option)insendctxwant(Want.want~capabilities~shallows:shallowed?deepenuid~others))>>=fun()->(matchdeepenwith|None->return()|Some_->handle_shallowschedulerioflowhexaccessstorectx)>>=fun()->letin_vain=ref0inletcount=ref0inletflush_at=ref_initial_flushinletflushes=ref0inletgot_continue=reffalseinletgot_ready=reffalseinletretval=ref(-1)in(* TODO(dinosaure): handle [shallow] and [unshallow]. *)letrecgonegotiator=Default.nextscheduler~parents:access.parentsstorenegotiator>>=function|None->Log.debug(funm->m"Stop the negotiation loop.");return()|Someuid->Log.debug(funm->m"[+] have %s."(to_hexuid));unsafe_write_havectx(to_hexuid);(* completely unsafe! *)incrin_vain;incrcount;Log.debug(funm->m"count: %d, in-vain: %d, flush-at: %d.\n%!"!count!in_vain!flush_at);if!flush_at<=!countthen(runschedulerraiseioflowSmart.(sendctxflush())>>=fun()->incrflushes;flush_at:=next_flushstateless!count;if(notstateless)&&!count=_initial_flushthengonegotiatorelseconsume_shallow_listschedulerioflowcfgNonehexaccessstorectx>>=fun_shallows->letrecloop()=runschedulerraiseioflowSmart.(recvctxack)>>|Smart.Negotiation.map~f:of_hex>>=funack->matchackwith|Smart.Negotiation.NAK->Log.debug(funm->m"Receive NAK.");return`Continue|Smart.Negotiation.ACK_->flushes:=0;cfg.multi_ack<-`None;(* XXX(dinosaure): [multi_ack] supported by the client but it
is not supported by the server. TODO: use [Context.shared]. *)retval:=0;return`Done|Smart.Negotiation.ACK_commonuid|Smart.Negotiation.ACK_readyuid|Smart.Negotiation.ACK_continueuid->(access.getuidstore>>=function|None->assertfalse|Someobj->Default.ackscheduler~parents:access.parentsstorenegotiatorobj>>=funwas_common->ifstateless&&Smart.Negotiation.is_commonack&¬was_commonthen((* we need to replay the have for this object on the next RPC request so
the peer kows it is in common with us. *)Log.debug(funm->m"[+] have %s."(to_hexuid));unsafe_write_havectx(to_hexuid);(* reset [in_vain] because an ack for this commit has not been seen. *)in_vain:=0;retval:=0;got_continue:=true;loop())elseif(notstateless)||not(Smart.Negotiation.is_commonack)then(in_vain:=0;retval:=0;got_continue:=true;ifSmart.Negotiation.is_readyackthengot_ready:=true;loop())else(retval:=0;got_continue:=true;ifSmart.Negotiation.is_readyackthengot_ready:=true;loop()))inloop()>>=function|`Done->return()|`Continue->decrflushes;if!got_continue&&_max_in_vain<!in_vainthenreturn()elseif!got_readythenreturn()elsegonegotiator)elsegonegotiatoringonegotiator>>=fun()->Log.debug(funm->m"Negotiation (got ready: %b, no-done: %b)."!got_readyno_done);(if(not!got_ready)||notno_donethenrunschedulerraiseioflowSmart.(sendctxnegotiation_done())elsereturn())>>=fun()->if!retval<>0then(cfg.multi_ack<-`None;incrflushes);(if(not!got_ready)||notno_donethen(Log.debug(funm->m"Negotiation is done!");runschedulerraiseioflowSmart.(recvctxshallows)>>=fun_shallows->return())elsereturn())>>=fun()->letrecgo()=if!flushes>0||cfg.multi_ack=`Some||cfg.multi_ack=`Detailedthen(runschedulerraiseioflowSmart.(recvctxack)>>|Smart.Negotiation.map~f:of_hex>>=funack->matchackwith|Smart.Negotiation.ACK_->return(`Continue0)|Smart.Negotiation.ACK_common_|Smart.Negotiation.ACK_continue_|Smart.Negotiation.ACK_ready_->cfg.multi_ack<-`Some;go()|Smart.Negotiation.NAK->decrflushes;go())elseif!count>0thenreturn(`Continue!retval)elsereturn(`Continue0)ingo()