Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file implementations.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892openCore_kernelopenAsync_kernelopenUtilopenImplementation_types.ImplementationsmoduleP=ProtocolmoduleReader=Transport.ReadermoduleWriter=Transport.Writer(* The Result monad is also used. *)let(>>|~)=Result.(>>|)(* Commute Result and Deferred. *)letdefer_result:'a'b.('aDeferred.t,'b)Result.t->('a,'b)Result.tDeferred.t=function|Error_aserr->returnerr|Okd->matchDeferred.peekdwith|None->d>>|funx->Okx|Somed->return(Okd)moduleDeferred_immediate=structlet(>>=)df=matchDeferred.peekdwith|None->d>>=f|Somex->fx(* We may not be using this at a particular point in time, but we still want [>>=] to be
rebound when opening [Deferred_immediate] in case we do start using it later. *)let_=(>>=)let(>>|)df=matchDeferred.peekdwith|None->d>>|f|Somex->return(fx)let_=(>>|)endmoduleResponder=Implementation.Expert.Respondertype'connection_stateon_unknown_rpc=[`Raise|`Continue|`Close_connection|`Callof('connection_state->rpc_tag:string->version:int->[`Close_connection|`Continue])|`Expertof('connection_state->rpc_tag:string->version:int->Responder.t->Bigstring.t->pos:int->len:int->unitDeferred.t)]type'connection_statet='connection_stateImplementation_types.Implementations.t={implementations:'connection_stateImplementation.F.tDescription.Table.t;on_unknown_rpc:'connection_stateon_unknown_rpc}type'connection_stateimplementations='connection_statetletdescriptionst=Hashtbl.keyst.implementationsmoduleInstance=structtypestreaming_response=Instance.streaming_response=|Pipe:_Pipe.Reader.t->streaming_response|Direct:_Implementation_types.Direct_stream_writer.tsexp_opaque->streaming_response[@@derivingsexp_of]typestreaming_responses=(P.Query_id.t,streaming_response)Hashtbl.t[@@derivingsexp_of]type'aunpacked='aInstance.unpacked={implementations:'aimplementationssexp_opaque;writer:Writer.t;open_streaming_responses:streaming_responses;mutablestopped:bool;connection_state:'a;connection_description:Info.t;connection_close_started:Info.tDeferred.t;mutablelast_dispatched_implementation:(Description.t*'aImplementation.F.tsexp_opaque)option(* [packed_self] is here so we can essentially pack an unpacked instance without doing
any additional allocation. *);packed_self:tsexp_opaque}[@@derivingsexp_of]andt=Instance.t=T:_unpacked->tletsexp_of_t(Tt)=[%sexp_of:_unpacked]tletsend_write_errortidsexp=letdata:_P.Message.t=Response{id;data=Error(Write_errorsexp)}inmatchWriter.send_bin_prott.writerP.Message.bin_writer_nat0_tdatawith|Sent()|Closed->()|Message_too_big_asr->raise_s[%sexp"Failed to send write error to client",{error=(sexp:Sexp.t);reason=(r:unitTransport.Send_result.t)}];;lethandle_send_resulttid(result:_Transport.Send_result.t)=matchresultwith|Sent()->()|Closed->()|Message_too_big_asr->send_write_errortid([%sexp_of:unitTransport.Send_result.t]r);;letwrite_messaget~idbin_writerx=ifnott.stoppedthenWriter.send_bin_prott.writerbin_writerx|>handle_send_resulttid;;letwrite_message_expertt~idbin_writerx~buf~pos~len=ifnott.stoppedthenWriter.send_bin_prot_and_bigstringt.writerbin_writerx~buf~pos~len|>handle_send_resulttid;;letwrite_responsetidbin_writer_datadata=letbin_writer=P.Message.bin_writer_needs_length(Writer_with_length.of_writerbin_writer_data)inwrite_messaget~idbin_writer(Response{id;data});;moduleCached_stream_writer:sigtypeinstancetype'at='aImplementation_types.Cached_stream_writer.tvalcreate:id:P.Query_id.t->bin_writer:'aBin_prot.Type_class.writer->'atvalwrite:'at->instance->P.Query_id.t->'a->unitvalwrite_expert:'at->instance->P.Query_id.t->buf:Bigstring.t->pos:int->len:int->unitvalwrite_string:'at->instance->P.Query_id.t->string->unitendwithtypeinstance:=t=structtype'at='aImplementation_types.Cached_stream_writer.t={header_prefix:string(* Bin_protted constant prefix of the message *);(* Length of the user data part. We set this field when sending a message. This
relies on the fact that the message is serialized immediately (which is the
only acceptable semantics for the transport layer anyway, as it doesn't know if
the value is mutable or not).
[data_len] is passed to bin-prot writers by mutating [data_len] instead of by
passing an additional argument to avoid some allocation.
*)mutabledata_len:Nat0.t;bin_writer:'aBin_prot.Type_class.writer}typevoid=Voidletbin_size_voidVoid=0letbin_write_void_buf~posVoid=postypevoid_message=voidP.Message.needs_length[@@derivingbin_write]typevoid_stream_response_data=voidP.Stream_response_data.needs_length[@@derivingbin_write](* This is not re-entrant but Async code always runs on one thread at a time *)letbuffer=Bigstring.create32letcache_bin_protted(bin_writer:_Bin_prot.Type_class.writer)x=letlen=bin_writer.writebuffer~pos:0xinBigstring.To_string.subbuffer~pos:0~len;;letcreate(typea)~id~bin_writer:at=letheader_prefix=cache_bin_prottedbin_writer_void_message(Response{id;data=OkVoid})in{header_prefix;bin_writer;data_len=Nat0.of_int_exn0};;(* This part of the message header is a constant, make it a literal to make the
writing code slightly faster. *)letstream_response_data_header_len=4letstream_response_data_header_as_int32=0x8a79llet%test_unit"stream_response_* constants are correct"=letlen=bin_writer_void_stream_response_data.writebuffer~pos:0(`OkVoid:void_stream_response_data)inassert(len=stream_response_data_header_len);assert(Bigstring.unsafe_get_int32_t_lebuffer~pos:0=stream_response_data_header_as_int32);;;letbin_write_string_no_lengthbuf~posstr=letstr_len=String.lengthstrin(* Very low-level bin_prot stuff... *)Bin_prot.Common.assert_pospos;letnext=pos+str_leninBin_prot.Common.check_nextbufnext;Bin_prot.Common.unsafe_blit_string_buf~src_pos:0str~dst_pos:posbuf~len:str_len;next;;(* The two following functions are used by the 3 variants exposed by this module. They
serialize a [Response { id; data = Ok (`Ok data_len) }] value, taking care of
writing the [Nat0.t] length prefix where approriate.
Bear in mind that there are two levels of length prefixes for stream response data
message: one for the user data (under the `Ok, before the actual data), and one for
the response data (under the .data field, before the Ok). *)letbin_size_nat0_header{header_prefix;data_len;_}=letstream_response_data_nat0_len=stream_response_data_header_len+Nat0.bin_size_tdata_leninletstream_response_data_len=stream_response_data_nat0_len+(data_len:Nat0.t:>int)inString.lengthheader_prefix+Nat0.bin_size_t(Nat0.of_int_exnstream_response_data_len)+stream_response_data_nat0_lenletbin_write_nat0_headerbuf~pos{header_prefix;data_len;_}=letpos=bin_write_string_no_lengthbuf~posheader_prefixinletstream_response_data_len=stream_response_data_header_len+Nat0.bin_size_tdata_len+(data_len:Nat0.t:>int)inletpos=Nat0.bin_write_tbuf~pos(Nat0.of_int_exnstream_response_data_len)inletnext=pos+4inBin_prot.Common.check_nextbufnext;Bigstring.unsafe_set_int32_t_lebuf~posstream_response_data_header_as_int32;Nat0.bin_write_tbuf~pos:nextdata_lenletbin_writer_nat0_header:_Bin_prot.Type_class.writer={size=bin_size_nat0_header;write=bin_write_nat0_header}letbin_size_message(t,_)=bin_size_nat0_headert+(t.data_len:Nat0.t:>int)letbin_write_messagebuf~pos(t,data)=letpos=bin_write_nat0_headerbuf~postint.bin_writer.writebuf~posdataletbin_writer_message:_Bin_prot.Type_class.writer={size=bin_size_message;write=bin_write_message}letbin_size_message_as_string(t,_)=bin_size_nat0_headert+(t.data_len:Nat0.t:>int)letbin_write_message_as_stringbuf~pos(t,str)=letpos=bin_write_nat0_headerbuf~postinbin_write_string_no_lengthbuf~posstrletbin_writer_message_as_string:_Bin_prot.Type_class.writer={size=bin_size_message_as_string;write=bin_write_message_as_string}(* [write] and [write_string] both allocate 3 words for the tuples. [write_expert]
does not allocate. *)letwritet(Tinstance)iddata=t.data_len<-Nat0.of_int_exn(t.bin_writer.sizedata);write_messageinstance~idbin_writer_message(t,data)letwrite_stringt(Tinstance)idstr=t.data_len<-Nat0.of_int_exn(String.lengthstr);write_messageinstance~idbin_writer_message_as_string(t,str)letwrite_expertt(Tinstance)id~buf~pos~len=t.data_len<-Nat0.of_int_exnlen;write_message_expertinstance~idbin_writer_nat0_headert~buf~pos~lenendmoduleDirect_stream_writer=structmoduleT=Implementation_types.Direct_stream_writermoduleState=T.StatemoduleId=T.Idtype'at='aT.t={id:Id.t;mutablestate:'aState.t;closed:unitIvar.t;instance:Instance.t;query_id:P.Query_id.t;stream_writer:'aCached_stream_writer.t;groups:'agroup_entryBag.t}and'agroup_entry='aT.group_entry={group:'aT.Group.t;element_in_group:'atBag.Elt.t}letis_closedt=Ivar.is_fullt.closedletclosedt=Ivar.readt.closedletflushedt=let(Tinstance)=t.instanceinTransport.Writer.flushedinstance.writer;;letbin_writert=t.stream_writer.bin_writerletwrite_eof{instance=Tinstance;query_id;_}=write_responseinstancequery_idP.Stream_response_data.bin_writer_nat0_t(Ok`Eof);;letwrite_message{instance;stream_writer;query_id;_}x=Cached_stream_writer.writestream_writerinstancequery_idx;;letwrite_message_string{instance;stream_writer;query_id;_}x=Cached_stream_writer.write_stringstream_writerinstancequery_idx;;letwrite_message_expert{instance;stream_writer;query_id;_}~buf~pos~len=Cached_stream_writer.write_expertstream_writerinstancequery_id~buf~pos~len;;letclose_without_removing_from_instancet=ifnot(Ivar.is_fullt.closed)thenbeginIvar.fillt.closed();letgroups=t.groupsinifnot(Bag.is_emptygroups)thenAsync_kernel_scheduler.Very_low_priority_work.enqueue~f:(fun()->matchBag.remove_onegroupswith|None->Finished|Some{group;element_in_group}->Bag.removegroup.componentselement_in_group;Hashtbl.removegroup.components_by_idt.id;Not_finished);matcht.statewith|Not_started_->()|Started->write_eoftend;;letclose({instance=Tinstance;query_id;_}ast)=close_without_removing_from_instancet;Hashtbl.removeinstance.open_streaming_responsesquery_id;;letwrite_without_pushbacktx=ifIvar.is_fullt.closedthen`Closedelsebeginbeginmatcht.statewith|Not_startedq->Queue.enqueueq(Normalx)|Started->write_messagetxend;`Okend;;letwrite({instance=Tinstance;_}ast)x=matchwrite_without_pushbacktxwith|`Closed->`Closed|`Ok->`Flushed(Writer.flushedinstance.writer);;moduleExpert=structletwrite_without_pushbackt~buf~pos~len=ifIvar.is_fullt.closedthen`Closedelsebeginbeginmatcht.statewith|Not_startedq->Queue.enqueueq(Expert(Bigstring.To_string.subbuf~pos~len))|Started->write_message_expertt~buf~pos~lenend;`Okend;;letwrite({instance=Tinstance;_}ast)~buf~pos~len=matchwrite_without_pushbackt~buf~pos~lenwith|`Closed->`Closed|`Ok->`Flushed(Writer.flushedinstance.writer);;endletstartt=matcht.statewith|Started->failwith"attempted to start writer which was already started"|Not_startedq->t.state<-Started;Queue.iterq~f:(function|Normalx->write_messagetx|Expertx->write_message_stringtx);ifIvar.is_fullt.closedthenwrite_eoft;;;endletapply_implementationtimplementation~(query:Nat0.tP.Query.t)~read_buffer~read_buffer_pos_ref:_Transport.Handler_result.t=letid=query.idinmatchimplementationwith|Implementation.F.One_way(bin_query_reader,f)->letquery_contents=bin_read_from_bigstringbin_query_readerread_buffer~pos_ref:read_buffer_pos_ref~len:query.data~location:"server-side one-way rpc message un-bin-io'ing"in(matchquery_contentswith|Error_aserr->Stoperr|Okq->tryft.connection_stateq;Continuewithexn->Stop(Rpc_result.uncaught_exnexn~location:"server-side one-way rpc computation"))|Implementation.F.One_way_expertf->(tryletlen=(query.data:>int)inft.connection_stateread_buffer~pos:!read_buffer_pos_ref~len;read_buffer_pos_ref:=!read_buffer_pos_ref+len;Continuewithexn->Stop(Rpc_result.uncaught_exnexn~location:"server-side one-way rpc expert computation"))|Implementation.F.Rpc(bin_query_reader,bin_response_writer,f,result_mode)->letquery_contents=bin_read_from_bigstringbin_query_readerread_buffer~pos_ref:read_buffer_pos_ref~len:query.data~location:"server-side rpc query un-bin-io'ing"inbeginmatchresult_modewith|Implementation.F.Blocking->letdata=tryquery_contents>>|~ft.connection_statewith|exn->(* In the [Deferred] branch we use [Monitor.try_with], which includes
backtraces when it catches an exception. For consistency, we also get
backtraces here. *)letbacktrace=Backtrace.Exn.most_recent()inletsexp=[%sexp{location="server-side blocking rpc computation";exn=(exn:exn);backtrace=(backtrace:Backtrace.t)}]inError(Rpc_error.Uncaught_exnsexp)inwrite_responsetidbin_response_writerdata|Implementation.F.Deferred->letdata=Rpc_result.try_with~run:`Now~location:"server-side rpc computation"(fun()->defer_result(query_contents>>|~ft.connection_state))in(* In the common case that the implementation returns a value immediately, we will
write the response immediately as well (this is also why the above [try_with]
has [~run:`Now]). This can be a big performance win for servers that get many
queries in a single Async cycle. *)(matchDeferred.peekdatawith|None->data>>>write_responsetidbin_response_writer|Somedata->write_responsetidbin_response_writerdata);end;Continue|Implementation.F.Rpc_expert(f,result_mode)->letresponder=Implementation.Expert.Responder.createquery.idt.writerinletd=(* We need the [Monitor.try_with] even for the blocking mode as the implementation
might return [Delayed_reponse], so we don't bother optimizing the blocking
mode. *)Monitor.try_with~run:`Now(fun()->letlen=(query.data:>int)inletresult=ft.connection_stateresponderread_buffer~pos:!read_buffer_pos_ref~leninmatchresult_modewith|Implementation.F.Deferred->result|Implementation.F.Blocking->Deferred.returnresult)inlethandle_exnexn=letresult=Rpc_result.uncaught_exnexn~location:"server-side rpc expert computation"inifresponder.respondedthenresultelsebeginwrite_responsetidbin_writer_unitresult;Ok()endinletcheck_responded()=ifresponder.respondedthenOk()elsehandle_exn(Failure"Expert implementation did not reply")inletd=letopenDeferred_immediateind>>|function|Okresult->letd=matchresultwith|Replied->Deferred.unit|Delayed_responsed->dinifDeferred.is_determineddthencheck_responded()elsebeginupond(fun()->check_responded()|>Rpc_result.or_error~rpc_tag:query.tag~rpc_version:query.version~connection_description:t.connection_description~connection_close_started:t.connection_close_started|>ok_exn);Ok()end|Errorexn->handle_exnexnin(matchDeferred.peekdwith|None->Wait(d>>|funr->ok_exn(Rpc_result.or_error~rpc_tag:query.tag~rpc_version:query.version~connection_description:t.connection_description~connection_close_started:t.connection_close_startedr))|Someresult->matchresultwith|Ok()->Continue|Error_->Stopresult)|Implementation.F.Streaming_rpc(bin_query_reader,bin_init_writer,bin_update_writer,impl)->letstream_query=bin_read_from_bigstringP.Stream_query.bin_reader_nat0_tread_buffer~pos_ref:read_buffer_pos_ref~len:query.data~location:"server-side pipe_rpc stream_query un-bin-io'ing"~add_len:(function`Abort->0|`Query(len:Nat0.t)->(len:>int))inbeginmatchstream_querywith|Error_err->()|Ok`Abort->(* Note that there's some delay between when we receive a pipe RPC query and
when we put something in [open_streaming_responses] (we wait for
a user-supplied function to return). During this time, an abort message would
just be ignored. The dispatcher can't abort the query while this is
happening, though, since the interface doesn't expose the ID required to
abort the query until after a response has been returned. *)Option.iter(Hashtbl.findt.open_streaming_responsesquery.id)~f:(function|Pipepipe->Pipe.close_readpipe|Directw->Direct_stream_writer.closew)|Ok(`Querylen)->letdata=bin_read_from_bigstringbin_query_readerread_buffer~pos_ref:read_buffer_pos_ref~len~location:"streaming_rpc server-side query un-bin-io'ing"inletstream_writer=Cached_stream_writer.create~id~bin_writer:bin_update_writerinletimpl_with_state=matchimplwith|Pipef->`Pipef|Directf->letwriter:_Direct_stream_writer.t={id=Direct_stream_writer.Id.create();state=Not_started(Queue.create());closed=Ivar.create();instance=t.packed_self;query_id=id;groups=Bag.create();stream_writer}inHashtbl.sett.open_streaming_responses~key:query.id~data:(Directwriter);`Direct(f,writer)inletrun_implimplsplit_okhandle_ok=Rpc_result.try_with(fun()->defer_result(data>>|~impl))~location:"server-side pipe_rpc computation">>>function|Errorerr->Hashtbl.removet.open_streaming_responsesid;write_responsetidbin_init_writer(Errorerr)|Ok(Errorerr)->Hashtbl.removet.open_streaming_responsesid;write_responsetidbin_init_writer(Okerr)|Ok(Okok)->let(initial,rest)=split_okokinwrite_responsetidbin_init_writer(Okinitial);handle_okrestinmatchimpl_with_statewith|`Pipef->run_impl(fundata->ft.connection_statedata)Fn.id(funpipe_r->Hashtbl.sett.open_streaming_responses~key:id~data:(Pipepipe_r);don't_wait_for(Writer.transfert.writerpipe_r(Cached_stream_writer.writestream_writert.packed_selfid));Pipe.closedpipe_r>>>fun()->Pipe.upstream_flushedpipe_r>>>function|`Ok|`Reader_closed->write_responsetidP.Stream_response_data.bin_writer_nat0_t(Ok`Eof);Hashtbl.removet.open_streaming_responsesid)|`Direct(f,writer)->run_impl(fundata->ft.connection_statedatawriter)(funx->(x,()))(fun()->Direct_stream_writer.startwriter)end;Continue;;letflush(Tt)=assert(nott.stopped);letproducers_flushed=Hashtbl.foldt.open_streaming_responses~init:[]~f:(fun~key:_~dataacc->matchdatawith|Direct_->acc|Pipepipe->Deferred.ignore(Pipe.upstream_flushedpipe)::acc)inDeferred.all_unitproducers_flushed;;letstop(Tt)=t.stopped<-true;Hashtbl.itert.open_streaming_responses~f:(function|Directwriter->(* Don't remove the writer from the instance, as that would modify the hashtable
that we are currently iterating over. *)Direct_stream_writer.close_without_removing_from_instancewriter|Pipe_->());Hashtbl.cleart.open_streaming_responses;;lethandle_unknown_rpcon_unknown_rpcerrortquery:_Transport.Handler_result.t=matchon_unknown_rpcwith|`Continue->Continue|`Raise->Rpc_error.raiseerrort.connection_description|`Close_connection->Stop(Ok())|`Callf->matchft.connection_state~rpc_tag:(P.Rpc_tag.to_stringquery.P.Query.tag)~version:query.versionwith|`Close_connection->Stop(Ok())|`Continue->Continue;;lethandle_query_internalt~(query:Nat0.tP.Query.t)~read_buffer~read_buffer_pos_ref=let{implementations;on_unknown_rpc}=t.implementationsinletdescription:Description.t={name=P.Rpc_tag.to_stringquery.tag;version=query.version}inmatcht.last_dispatched_implementationwith|Some(last_desc,implementation)whenDescription.equallast_descdescription->apply_implementationtimplementation~query~read_buffer~read_buffer_pos_ref|None|Some_->matchHashtbl.findimplementationsdescriptionwith|Someimplementation->t.last_dispatched_implementation<-Some(description,implementation);apply_implementationtimplementation~query~read_buffer~read_buffer_pos_ref|None->matchon_unknown_rpcwith|`Expertimpl->let{P.Query.tag;version;id;data=len}=queryinletd=letresponder=Responder.createidt.writerinimplt.connection_state~rpc_tag:(P.Rpc_tag.to_stringtag)~versionresponderread_buffer~pos:!read_buffer_pos_ref~len:(len:>int)inifDeferred.is_determineddthenContinueelseWaitd|(`Continue|`Raise|`Close_connection|`Call_)ason_unknown_rpc->leterror=Rpc_error.Unimplemented_rpc(query.tag,`Versionquery.version)inwrite_responsetquery.idP.Message.bin_writer_nat0_t(Errorerror);handle_unknown_rpcon_unknown_rpcerrortquery;;lethandle_query(Tt)~query~read_buffer~read_buffer_pos_ref=ift.stopped||Writer.is_closedt.writerthenTransport.Handler_result.Stop(Ok())elsehandle_query_internalt~query~read_buffer~read_buffer_pos_ref;;endmoduleDirect_stream_writer=Instance.Direct_stream_writerletcreate~implementations:i's~on_unknown_rpc=(* Make sure the tags are unique. *)letimplementations=Description.Table.create~size:10()inletdups=Description.Hash_set.create~size:10()inList.iteri's~f:(fun(i:_Implementation.t)->letdescription={Description.name=P.Rpc_tag.to_stringi.tag;version=i.version}inmatchHashtbl.addimplementations~key:description~data:i.fwith|`Ok->()|`Duplicate->Hash_set.adddupsdescription);ifnot(Hash_set.is_emptydups)thenError(`Duplicate_implementations(Hash_set.to_listdups))elseOk{implementations;on_unknown_rpc=(on_unknown_rpc:>_on_unknown_rpc);}letinstantiatet~connection_description~connection_close_started~connection_state~writer=letrecunpacked:_Instance.unpacked={implementations=t;writer;open_streaming_responses=Hashtbl.Poly.create~size:10();connection_state;connection_description;connection_close_started;stopped=false;last_dispatched_implementation=None;packed_self=Instance.Tunpacked}inunpacked.packed_self;;exceptionDuplicate_implementationsofDescription.tlist[@@derivingsexp]letcreate_exn~implementations~on_unknown_rpc=matchcreate~implementations~on_unknown_rpcwith|Okx->x|Error`Duplicate_implementationsdups->raise(Duplicate_implementationsdups)letnull()=create_exn~implementations:[]~on_unknown_rpc:`Raiseletadd_exnt(implementation:_Implementation.t)=letdesc:Description.t={name=P.Rpc_tag.to_stringimplementation.tag;version=implementation.version}inletimplementations=Hashtbl.copyt.implementationsinmatchHashtbl.addimplementations~key:desc~data:implementation.fwith|`Duplicate->raise(Duplicate_implementations[desc])|`Ok->{twithimplementations}letaddtimplementation=Or_error.try_with(fun()->add_exntimplementation)letlift{implementations;on_unknown_rpc}~f=letimplementations=Hashtbl.mapimplementations~f:(Implementation.F.lift~f)inleton_unknown_rpc=matchon_unknown_rpcwith|`Raise|`Continue|`Close_connectionasx->x|`Callcall->`Call(funstate->call(fstate))|`Expertexpert->`Expert(funstate->expert(fstate))in{implementations;on_unknown_rpc}moduleExpert=structmoduleResponder=RespondermoduleRpc_responder=structtypet=Responder.tletcannot_sendr=failwiths"Message cannot be sent"r[%sexp_of:_Transport.Send_result.t];;letmark_responded(t:t)=ift.respondedthenfailwiths"Already responded"t[%sexp_of:Responder.t];t.responded<-true;;;letschedule(t:t)buf~pos~len=mark_respondedt;letheader:Nat0.tP.Message.t=Response{id=t.query_id;data=Ok(Nat0.of_int_exnlen)}inmatchWriter.send_bin_prot_and_bigstring_non_copyingt.writerP.Message.bin_writer_nat0_theader~buf~pos~lenwith|Sentd->`Flushedd|Closed->`Connection_closed|Message_too_big_asr->cannot_sendrlethandle_send_result:unitTransport.Send_result.t->unit=function|Sent()|Closed->()|Message_too_big_asr->cannot_sendrletwrite_bigstring(t:t)buf~pos~len=mark_respondedt;letheader:Nat0.tP.Message.t=Response{id=t.query_id;data=Ok(Nat0.of_int_exnlen)}inWriter.send_bin_prot_and_bigstringt.writerP.Message.bin_writer_nat0_theader~buf~pos~len|>handle_send_resultletwrite_error(t:t)error=mark_respondedt;letdata=Rpc_result.uncaught_exn~location:"server-side raw rpc computation"(Error.to_exnerror)inWriter.send_bin_prott.writerP.Message.bin_writer_nat0_t(Response{id=t.query_id;data})|>handle_send_resultletwrite_bin_prot(t:t)bin_writer_aa=mark_respondedt;Writer.send_bin_prott.writer(P.Message.bin_writer_needs_length(Writer_with_length.of_writerbin_writer_a))(Response{id=t.query_id;data=Oka})|>handle_send_resultendletcreate_exn=create_exnend