Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file msgpack_rpc.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161openCore_kernelopenAsyncletbitsize=letopenInt.Oin2**32;;typeevent={method_name:string;params:Msgpack.tlist}[@@derivingsexp]moduletypeConnection=sigtypetvalreader:t->Async.Reader.tvalwriter:t->Async.Writer.tendmoduletypeS=sigtypeconntypetvalsubscribe:t->Source_code_position.t->eventPipe.Reader.tvalcall:t->method_name:string->parameters:Msgpack.t->(Msgpack.t,Msgpack.t)Deferred.Result.tvalconnect:conn->tvalregister_method:name:string->f:(Msgpack.tlist->Msgpack.tOr_error.t)->unitOr_error.tendmoduleMake(M:Connection)()=structtypeconn=M.ttypet=M.t*(event->unit)Bus.Read_only.tmoduleId_factory=Unique_id.Int63()letpending_requests:(Msgpack.t,Msgpack.t)Result.tIvar.tInt.Table.t=Int.Table.create();;letsubscribe((_,notifications_bus):t)=Async_bus.pipe1_exnnotifications_busletsynchronous_callbacks:(Msgpack.tlist->Msgpack.tOr_error.t)String.Table.t=String.Table.create();;letregister_method~name~f=matchHashtbl.addsynchronous_callbacks~key:name~data:fwith|`Ok->Ok()|`Duplicate->Or_error.errorf"duplicate method name %s"name;;letevent_loopconnnotifications_bus=lethandle_message=function|Msgpack.Array[Integer1;Integermsgid;Nil;result]->(matchHashtbl.findpending_requestsmsgidwith|None->Log.Global.error"Unknown message ID: %d"msgid;return()|Somebox->Ivar.fillbox(Okresult);return())|Msgpack.Array[Integer1;Integermsgid;err;Nil]->(matchHashtbl.findpending_requestsmsgidwith|None->Log.Global.error"Unknown message ID: %d"msgid;return()|Somebox->Ivar.fillbox(Errorerr);return())|Msgpack.Array[Integer2;Stringmethod_name;Arrayparams]->Bus.writenotifications_bus{method_name;params};return()|Msgpack.Array[Integer0;Integermsgid;Stringmethod_name;Arrayparams]->letresp=matchHashtbl.findsynchronous_callbacksmethod_namewith|None->Msgpack.Array[Msgpack.Integer1;Integermsgid;String(sprintf"no method %s"method_name);Nil]|Somef->(matchOr_error.try_with_join(fun()->fparams)with|Okr->Msgpack.Array[Msgpack.Integer1;Integermsgid;Nil;r]|Errore->Msgpack.Array[Msgpack.Integer1;Integermsgid;String(e|>[%sexp_of:Error.t]|>Sexp.to_string);Nil])inAsync.Writer.write(M.writerconn)(Msgpack.string_of_t_exnresp);return()|msg->Log.Global.error!"unexpected msgpack response: %{sexp:Msgpack.t}\n"msg;return()inmatch%bindAngstrom_async.parse_manyMsgpack.Internal.Parser.msghandle_message(M.readerconn)with|Ok()->return()|Errors->Log.Global.error"Unable to parse messagepack-rpc response: %s"s;return();;letregistermsg_id=letbox=Ivar.create()inHashtbl.setpending_requests~key:msg_id~data:box;box,msg_id;;letwait_for_response(box,msg_id)=let%bindresult=Ivar.readboxinHashtbl.removepending_requestsmsg_id;returnresult;;letconnectconn=letnotifications_bus=Bus.create[%here]Arity1~on_subscription_after_first_write:Allow~on_callback_raise:(* This should be impossible. *)Error.raiseindon't_wait_for(event_loopconnnotifications_bus);conn,Bus.read_onlynotifications_bus;;letcall(conn,_)~method_name~parameters=letopenMsgpackinletmsg_id=Id_factory.create()|>Id_factory.to_int_exninletmethod_name=Stringmethod_nameinletquery_msg=Array[Integer0;Integer(msg_idmodbitsize);method_name;parameters]inletresult_box=registermsg_idinletquery=Msgpack.string_of_t_exnquery_msginlet()=Async.Writer.write(M.writerconn)queryinwait_for_responseresult_box;;end