Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file requester.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)(* Copyright (c) 2021 Nomadic Labs, <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)moduletypeREQUESTER=sigtypettypekeytypevaluetypeparamvalknown:t->key->boolLwt.ttypeerror+=Missing_dataofkeytypeerror+=Canceledofkeytypeerror+=Timeoutofkeyvalread:t->key->valuetzresultLwt.tvalread_opt:t->key->valueoptionLwt.tvalinject:t->key->value->boolLwt.tvalfetch:t->?peer:P2p_peer.Id.t->?timeout:Time.System.Span.t->key->param->valuetzresultLwt.tvalclear_or_cancel:t->key->unitendmoduletypeFULL_REQUESTER=sigincludeREQUESTERtypestoretyperequest_paramtypenotified_valuevalpending:t->key->boolvalwatch:t->(key*value)Lwt_stream.t*Lwt_watcher.stoppervalnotify:t->P2p_peer.Id.t->key->notified_value->unitLwt.tvalmemory_table_length:t->intvalpending_requests:t->intvalcreate:?random_table:bool->?global_input:(key*value)Lwt_watcher.input->request_param->store->tvalshutdown:t->unitLwt.tendmoduletypeDISK_TABLE=sigtypestoretypekeytypevaluevalknown:store->key->boolLwt.tvalread:store->key->valuetzresultLwt.tvalread_opt:store->key->valueoptionLwt.tendmoduletypeMEMORY_TABLE=sigtype'attypekeyvalcreate:entry_type:string->?random:bool->int->'atvalfind:'at->key->'aoptionvaladd:'at->key->'a->unitvalreplace:'at->key->'a->unitvalremove:'at->key->unitvalfold:(key->'a->'b->'b)->'at->'b->'bvallength:'at->intendmoduletypeSCHEDULER=sigtypettypekeytypeparamvalrequest:t->P2p_peer.Id.toption->key->unitvalnotify:t->P2p_peer.Id.t->key->unitLwt.tvalnotify_cancellation:t->key->unitvalnotify_unrequested:t->P2p_peer.Id.t->key->unitLwt.tvalnotify_duplicate:t->P2p_peer.Id.t->key->unitLwt.tvalnotify_invalid:t->P2p_peer.Id.t->key->unitLwt.tvalpending_requests:t->intvalcreate:param->tvalshutdown:t->unitLwt.tendmoduletypePROBE=sigtypekeytypeparamtypenotified_valuetypevaluevalprobe:key->param->notified_value->valueoptionendmoduletypeREQUEST=sigtypekeytypeparamvalinitial_delay:Time.System.Span.tvalactive:param->P2p_peer.Set.tvalsend:param->P2p_peer.Id.t->keylist->unitendmoduletypeHASH=sigtypetvalname:stringvalencoding:tData_encoding.tvalpp:Format.formatter->t->unitend(** The requester uses a generic scheduler to schedule its requests.
The [Memory_table] must be shared between the scheduler and the requester
as it is used to store both pending requests and found values. *)moduleMake_request_scheduler(Hash:HASH)(Table:MEMORY_TABLEwithtypekey:=Hash.t)(Request:REQUESTwithtypekey:=Hash.t):sigincludeSCHEDULERwithtypekey:=Hash.tandtypeparam:=Request.paramend=structmoduleEvents=Requester_event.Make(Hash)typekey=Hash.ttypet={param:Request.param;pending:statusTable.t;queue:eventLwt_pipe.Unbounded.t;mutableevents:eventlistLwt.t;canceler:Lwt_canceler.t;mutableworker:unitLwt.t;}andstatus={peers:P2p_peer.Set.t;next_request:Time.System.t;delay:Time.System.Span.t;}andevent=|RequestofP2p_peer.Id.toption*key|NotifyofP2p_peer.Id.t*key|Notify_cancellationofkey|Notify_invalidofP2p_peer.Id.t*key|Notify_duplicateofP2p_peer.Id.t*key|Notify_unrequestedofP2p_peer.Id.t*keyletrequesttpk=Lwt_pipe.Unbounded.pusht.queue(Request(p,k))letnotifytpk=letopenLwt_syntaxinlet*()=Events.(emitnotify_push)(k,p)inLwt_pipe.Unbounded.pusht.queue(Notify(p,k));Lwt.return()(* [notify_cancellation] is used within non-Lwt context and needs to
perform logging without yielding. We use
[emit__dont_wait__use_with_care] to that end. Other events are used
within Lwt context so we use the recommended [emit] for them. *)letnotify_cancellationtk=Events.(emit__dont_wait__use_with_carenotify_push_cancellation)k;Lwt_pipe.Unbounded.pusht.queue(Notify_cancellationk)letnotify_invalidtpk=letopenLwt_syntaxinlet*()=Events.(emitnotify_push_invalid)(k,p)inLwt_pipe.Unbounded.pusht.queue(Notify_invalid(p,k));Lwt.return()letnotify_duplicatetpk=letopenLwt_syntaxinlet*()=Events.(emitnotify_push_duplicate)(k,p)inLwt_pipe.Unbounded.pusht.queue(Notify_duplicate(p,k));Lwt.return()letnotify_unrequestedtpk=letopenLwt_syntaxinlet*()=Events.(emitnotify_push_unrequested)(k,p)inLwt_pipe.Unbounded.pusht.queue(Notify_unrequested(p,k));Lwt.return()letcompute_timeoutstate=letnext=Table.fold(fun_{next_request;_}acc->matchaccwith|None->Somenext_request|Somex->Some(Time.System.minxnext_request))state.pendingNoneinmatchnextwith|None->fst@@Lwt.task()|Somenext->letnow=Time.System.now()inletdelay=Ptime.diffnextnowinifPtime.Span.comparedelayPtime.Span.zero<=0thenLwt.return_unitelseSystime_os.sleepdelayletprocess_eventstatenow=letopenLwt_syntaxinfunction|Request(peer,key)->(let*()=Events.(emitregistering_request)(key,peer)inmatchTable.findstate.pendingkeywith|Somedata->letpeers=matchpeerwith|None->data.peers|Somepeer->P2p_peer.Set.addpeerdata.peersinTable.replacestate.pendingkey{peers;next_request=now;delay=Request.initial_delay};Events.(emitregistering_request_replaced)(key,peer)|None->letpeers=matchpeerwith|None->P2p_peer.Set.empty|Somepeer->P2p_peer.Set.singletonpeerinTable.addstate.pendingkey{peers;next_request=now;delay=Request.initial_delay};Events.(emitregistering_request_added)(key,peer))|Notify(peer,key)->Table.removestate.pendingkey;Events.(emitnotify_received)(key,peer)|Notify_cancellationkey->Table.removestate.pendingkey;Events.(emitnotify_cancelled)key|Notify_invalid(peer,key)->(* TODO: Punish peer *)Events.(emitnotify_invalid)(key,peer)|Notify_unrequested(peer,key)->(* TODO: Punish peer *)Events.(emitnotify_unrequested)(key,peer)|Notify_duplicate(peer,key)->(* TODO: Punish peer *)Events.(emitnotify_duplicate)(key,peer)letworker_loopstate=letopenLwt_syntaxinletshutdown=Lwt_canceler.when_cancelingstate.cancelerinletrecloopstate=lettimeout=compute_timeoutstateinlet*()=Lwt.choose[(let*_=state.eventsinLwt.return_unit);timeout;shutdown;]inifLwt.stateshutdown<>Lwt.SleepthenEvents.(emitterminated)()elseifLwt.statestate.events<>Lwt.Sleepthen(letnow=Time.System.now()inlet*events=state.eventsinstate.events<-Lwt_pipe.Unbounded.pop_allstate.queue;let*()=List.iter_s(process_eventstatenow)eventsinloopstate)elselet*()=Events.(emittimeout)()inletnow=Time.System.now()inletactive_peers=Request.activestate.paraminletrequests=Table.fold(funkey{peers;next_request;delay}acc->ifPtime.is_laternext_request~than:nowthenaccelseletremaining_peers=P2p_peer.Set.interpeersactive_peersinifP2p_peer.Set.is_emptyremaining_peers&¬(P2p_peer.Set.is_emptypeers)then(Table.removestate.pendingkey;acc)elseletrequested_peer=P2p_peer.Id.Set.random_elt(ifP2p_peer.Set.is_emptyremaining_peersthenactive_peerselseremaining_peers)inletnext_request=Option.value~default:Ptime.max(Ptime.add_spannowdelay)inletnext={peers=remaining_peers;next_request;delay=Time.System.Span.multiply_exn1.5delay;}inTable.replacestate.pendingkeynext;letrequests=key::Option.value~default:[](P2p_peer.Map.findrequested_peeracc)inP2p_peer.Map.addrequested_peerrequestsacc)state.pendingP2p_peer.Map.emptyinP2p_peer.Map.iter(Request.sendstate.param)requests;let*()=P2p_peer.Map.iter_s(funpeerrequest->List.iter_s(fun(key:key)->Events.(emitrequested)(key,peer))request)requestsinloopstateinloopstateletcreateparam=letstate={param;queue=Lwt_pipe.Unbounded.create();pending=Table.create~entry_type:"pending_requests"~random:true17;events=Lwt.return_nil;canceler=Lwt_canceler.create();worker=Lwt.return_unit;}instate.worker<-Lwt_utils.worker"db_request_scheduler"~on_event:Internal_event.Lwt_worker_logger.on_event~run:(fun()->worker_loopstate)~cancel:(fun()->Error_monad.cancel_with_exceptionsstate.canceler);stateletshutdowns=Error_monad.cancel_with_exceptionss.cancelerletpending_requestss=Table.lengths.pendingendmoduleMake(Hash:HASH)(Disk_table:DISK_TABLEwithtypekey:=Hash.t)(Memory_table:MEMORY_TABLEwithtypekey:=Hash.t)(Request:REQUESTwithtypekey:=Hash.t)(Probe:PROBEwithtypekey:=Hash.tandtypevalue:=Disk_table.value):FULL_REQUESTERwithtypekey=Hash.tandtypevalue=Disk_table.valueandtypeparam=Probe.paramandtyperequest_param=Request.paramandtypenotified_value=Probe.notified_valueandtypestore=Disk_table.store=structtypekey=Hash.ttypevalue=Disk_table.valuetypeparam=Probe.paramtyperequest_param=Request.paramtypenotified_value=Probe.notified_valuetypestore=Disk_table.storemoduleScheduler=Make_request_scheduler(Hash)(Memory_table)(Request)typet={scheduler:Scheduler.t;disk:Disk_table.store;memory:statusMemory_table.t;global_input:(key*value)Lwt_watcher.inputoption;input:(key*value)Lwt_watcher.input;}andstatus=|Pendingof{waiter:valuetzresultLwt.t;wakener:valuetzresultLwt.u;mutablewaiters:int;param:param;}|Foundofvalueletknownsk=matchMemory_table.finds.memorykwith|None->Disk_table.knowns.diskk|Some(Pending_)->Lwt.return_false|Some(Found_)->Lwt.return_trueletread_optsk=matchMemory_table.finds.memorykwith|None->Disk_table.read_opts.diskk|Some(Foundv)->Lwt.return_somev|Some(Pending_)->Lwt.return_nonetypeerror+=Missing_dataofkeytypeerror+=Canceledofkeytypeerror+=Timeoutofkeylet()=(* Missing data key *)register_error_kind`Permanent~id:("requester."^Hash.name^".missing")~title:("Missing "^Hash.name)~description:("Some "^Hash.name^" is missing from the requester")~pp:(funppfkey->Format.fprintfppf"Missing %s %a"Hash.nameHash.ppkey)(Data_encoding.obj1(Data_encoding.req"key"Hash.encoding))(functionMissing_datakey->Somekey|_->None)(funkey->Missing_datakey);(* Canceled key *)register_error_kind`Permanent~title:("Canceled fetch of a "^Hash.name)~description:("The fetch of a "^Hash.name^" has been canceled")~id:("requester."^Hash.name^".fetch_canceled")~pp:(funppfkey->Format.fprintfppf"Fetch of %s %a canceled"Hash.nameHash.ppkey)Data_encoding.(obj1(req"key"Hash.encoding))(functionCanceledkey->Somekey|_->None)(funkey->Canceledkey);(* Timeout key *)register_error_kind`Permanent~title:("Timed out fetch of a "^Hash.name)~description:("The fetch of a "^Hash.name^" has timed out")~id:("requester."^Hash.name^".fetch_timeout")~pp:(funppfkey->Format.fprintfppf"Fetch of %s %a timed out"Hash.nameHash.ppkey)Data_encoding.(obj1(req"key"Hash.encoding))(functionTimeoutkey->Somekey|_->None)(funkey->Timeoutkey)letreadsk=letopenLwt_result_syntaxinmatchMemory_table.finds.memorykwith|None->trace(Missing_datak)@@Disk_table.reads.diskk|Some(Foundv)->returnv|Some(Pending_)->tzfail(Missing_datak)letwrapsk?timeoutt=letopenLwt_syntaxinlett=Lwt.protectedtinLwt.on_cancelt(fun()->matchMemory_table.finds.memorykwith|None->()|Some(Found_)->()|Some(Pending({wakener=w;_}asdata))->data.waiters<-data.waiters-1;ifdata.waiters=0then(Memory_table.removes.memoryk;Scheduler.notify_cancellations.schedulerk;Lwt.wakeup_laterw(Result_syntax.tzfail(Canceledk))));matchtimeoutwith|None->t|Somedelay->lettimeout=let*()=Systime_os.sleepdelayinLwt_result_syntax.tzfail(Timeoutk)inLwt.pick[t;timeout]letfetchs?peer?timeoutkparam=letopenLwt_syntaxinmatchMemory_table.finds.memorykwith|None->(let*o=Disk_table.read_opts.diskkinmatchowith|Somev->return_okv|None->((* It is necessary to check the memory-table again in case another
promise has altered it whilst this one was waiting for the
disk-table query. *)matchMemory_table.finds.memorykwith|None->letwaiter,wakener=Lwt.wait()inMemory_table.adds.memoryk(Pending{waiter;wakener;waiters=1;param});Scheduler.requests.schedulerpeerk;wrapsk?timeoutwaiter|Some(Pendingdata)->Scheduler.requests.schedulerpeerk;data.waiters<-data.waiters+1;wrapsk?timeoutdata.waiter|Some(Foundv)->return_okv))|Some(Pendingdata)->Scheduler.requests.schedulerpeerk;data.waiters<-data.waiters+1;wrapsk?timeoutdata.waiter|Some(Foundv)->return_okvletnotify_when_pendingspkwparamv=letopenLwt_syntaxinmatchProbe.probekparamvwith|None->Scheduler.notify_invalids.schedulerpk|Somev->let*()=Scheduler.notifys.schedulerpkinMemory_table.replaces.memoryk(Foundv);Lwt.wakeup_laterw(Okv);Option.iter(funinput->Lwt_watcher.notifyinput(k,v))s.global_input;Lwt_watcher.notifys.input(k,v);Lwt.return_unitletnotifyspkv=letopenLwt_syntaxinmatchMemory_table.finds.memorykwith|None->(let*b=Disk_table.knowns.diskkinmatchbwith|true->Scheduler.notify_duplicates.schedulerpk|false->((* It is necessary to check the memory-table again in case another
promise has altered it whilst this one was waiting for the
disk-table query. *)matchMemory_table.finds.memorykwith|None->Scheduler.notify_unrequesteds.schedulerpk|Some(Pending{wakener=w;param;_})->notify_when_pendingspkwparamv|Some(Found_)->Scheduler.notify_duplicates.schedulerpk))|Some(Pending{wakener=w;param;_})->notify_when_pendingspkwparamv|Some(Found_)->Scheduler.notify_duplicates.schedulerpkletinjectskv=letopenLwt_syntaxinmatchMemory_table.finds.memorykwith|None->(let*b=Disk_table.knowns.diskkinmatchbwith|true->Lwt.return_false|false->((* It is necessary to check the memory-table again in case another
promise has altered it whilst this one was waiting for the
disk-table query. *)matchMemory_table.finds.memorykwith|None->Memory_table.adds.memoryk(Foundv);Lwt.return_true|Some(Pending_)|Some(Found_)->Lwt.return_false))|Some(Pending_)|Some(Found_)->Lwt.return_falseletclear_or_cancelsk=matchMemory_table.finds.memorykwith|None->()|Some(Pendingstatus)->ifstatus.waiters<=1then(Scheduler.notify_cancellations.schedulerk;Memory_table.removes.memoryk;Lwt.wakeup_laterstatus.wakener(Result_syntax.tzfail(Canceledk)))elsestatus.waiters<-status.waiters-1|Some(Found_)->Memory_table.removes.memorykletwatchs=Lwt_watcher.create_streams.inputletcreate?random_table:random?global_inputrequest_paramdisk=letscheduler=Scheduler.createrequest_paraminletmemory=Memory_table.create~entry_type:"entries"?random17inletinput=Lwt_watcher.create_input()in{scheduler;disk;memory;input;global_input}letpendingsk=matchMemory_table.finds.memorykwith|None->false|Some(Found_)->false|Some(Pending_)->trueletmemory_table_lengths=Memory_table.lengths.memoryletpending_requestss=Scheduler.pending_requestss.schedulerletshutdowns=Scheduler.shutdowns.schedulerend