Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file reader.ml
openCoreopenCore_profileropenCore_profiler_disabledmoduleHeader=structmoduleItem=structtypesingle={name:string;spec:Probe_type.t}typegroup={name:string;points_spec:Probe_type.t;children:Probe_id.tlist}typegroup_point={name:string;parent:Probe_id.t;sources:Probe_id.tlist}typet=|Singleofsingle|Groupofgroup|Group_pointofgroup_pointletname=function|Singlem->m.name|Groupp->p.name|Group_pointpp->pp.nameendtypet=(Item.t,read)Id_table.tletfind_exn=Id_table.find_exnletfind_single_exntid=matchfind_exntidwith|Item.Singlem->m|_->failwithf!"Id %{Probe_id} does not refer to a single"id()letfind_group_exntid=matchfind_exntidwith|Item.Groupp->p|_->failwithf!"Id %{Probe_id} does not refer to a Group"id()letfind_group_point_exntid=matchfind_exntidwith|Item.Group_pointpp->pp|_->failwithf!"Id %{Probe_id} does not refer to a Group_point"id()letget_parent_id_exntid=matchfind_exntidwith|Item.Group_pointpp->pp.parent|_->failwithf!"Id %{Probe_id} does not refer to a Group_point"id()letget_parent_exntid=find_group_exnt(get_parent_id_exntid)letget_name_exnt?with_groupid=matchfind_exntidwith|Item.Single{Item.name;_}->name|Item.Group{Item.name;_}->name|Item.Group_point{Item.name;parent;_}->beginmatchwith_groupwith|Somesep->let{Item.name=group_name;points_spec=_;_}=find_group_exntparentingroup_name^sep^name|None->nameendletget_spec_exntid=matchfind_exntidwith|Item.Single{spec;_}->spec|Item.Group{points_spec;_}->points_spec|Group_point{parent;_}->let{Item.points_spec;_}=find_group_exntparentinpoints_specletget_units_exntid=matchget_spec_exntidwith|Probe_type.Probeunits->units|Probe_type.Timer->failwithf!"Id %{Probe_id} does not refer to something with units"id()letcreate_tablet?(singles=true)?(groups=true)?(group_points=true)?(timers=true)?(probes=true)empty=Id_table.filter_mapt~f:(funidheader_item->letspec_ok=matchget_spec_exntidwith|Probe_type.Timer->timers|Probe_type.Probe_->probesinletitem_type_ok=matchheader_itemwith|Item.Single_->singles|Item.Group_->groups|Item.Group_point_->group_pointsinifspec_ok&&item_type_okthenSomeemptyelseNone)endletconsume_headerbuffer=letmoduleHP=Header_protocolinletmoduleHP_MsgT=Header_protocol.Message_type_and_errorsinletadd_exnmapiddata=matchMap.findmapidwith|Someexisting->failwithf!"Duplicate Id %{Probe_id} (%s, %s)"id(Header.Item.nameexisting)(Header.Item.namedata)()|None->Map.setmap~key:id~datainletadd_point_to_groupmap~group_id~point_id=matchMap.findmapgroup_idwith|Some(Header.Item.Groupp)->letp={pwithchildren=point_id::p.children}inMap.setmap~key:group_id~data:(Header.Item.Groupp)|Some(Header.Item.Single_|Group_point_)|None->failwith"Tried to add a point to something that isn't a group"inletcheck_sources_parentmap~point_id~group_idsources=List.itersources~f:(funsource_id->matchMap.findmapsource_idwith|Some(Header.Item.Group_pointpp)->ifpp.parent<>group_idthenfailwithf!"Point %{Probe_id} of group %{Probe_id} references source point \
%{Probe_id} not belonging to the same group"point_idgroup_idsource_id()|Some(Header.Item.Single_|Header.Item.Group_)|None->failwithf!"Point %{Probe_id} of group %{Probe_id} references source point \
%{Probe_id} that is not a group point"point_idgroup_idsource_id())inletget_message_bufferbuffer=letb=Iobuf.sub_sharedbufferinHP.skip_messagebuffer;binletrecscanepochmap=letthis_message_buffer=get_message_bufferbufferinletHP_MsgT.Tpacked_type=HP.get_message_typethis_message_bufferinletmessage=HP.of_iobufthis_message_buffer~trusted:packed_typeinmatchpacked_typewith|HP_MsgT.New_single->HP.New_single.(letspec=get_specmessageinletmap=add_exnmap(get_idmessage)(Header.Item.Single{name=get_namemessage;spec})inscanepochmap)|HP_MsgT.New_group->HP.New_group.(letpoints_spec=get_specmessageinletgroup_spec={name=get_namemessage;Header.Item.points_spec;children=[]}inletmap=add_exnmap(get_idmessage)(Header.Item.Groupgroup_spec)inscanepochmap)|HP_MsgT.New_group_point->HP.New_group_point.(letsources=letcount=get_sources_countmessageinList.init~f:(funi->get_sources_source_idmessage~count~index:i)countinletpoint_id=get_idmessageinletgroup_id=get_group_idmessageincheck_sources_parentmap~point_id~group_idsources;letmetadata=Header.Item.Group_point{name=get_namemessage;parent=group_id;sources}inletmap=add_exnmappoint_idmetadatainletmap=add_point_to_groupmap~group_id~point_idinscanepochmap)|HP_MsgT.Epoch->HP.Epoch.(matchepochwith|Someepoch->failwithf!"Header contained two epochs: %{Profiler_epoch}, %{Profiler_epoch}"epoch(get_epochmessage)()|None->scan(Some(get_epochmessage))map)|HP_MsgT.End_of_header->(epoch,map)|HP_MsgT.Need_more_data->failwith"Invalid header (truncated)"|HP_MsgT.Invalid_message_type_or_subtype->failwith"Invalid header (bad message type)"|HP_MsgT.Message_length_too_short->failwith"Invalid header (message length too short)"inlet(epoch,map)=scanNoneProbe_id.Map.emptyinletepoch=Option.value_exn~message:"Header did not contain an epoch"epochinlettable=Id_table.init_from_mapmap~f:(fun_iditem->item)in(epoch,table)let%test_unit"read_header"=letmoduleBuffer=Protocol.BufferinletmoduleWriter=Protocol.WriterinletmoduleHI=Header.Iteminletto_id=Probe_id.of_int_exninprotect~finally:Buffer.Unsafe_internals.reset~f:(fun()->Writer.Unsafe_internals.write_epoch();Writer.write_new_single(to_id3)"timer3"Probe_type.Timer;Writer.write_new_group(to_id2)"group2"(Probe_type.ProbeProfiler_units.Seconds);Writer.write_new_group(to_id1)"group1"(Probe_type.Timer);Writer.write_new_group_point~id:(to_id4)~group_id:(to_id1)"group1point4"[||];Writer.write_new_group_point~id:(to_id5)~group_id:(to_id2)"group2point5"[||];Writer.write_new_group_point~id:(to_id6)~group_id:(to_id2)"group2point6"[|to_id5|];Writer.write_new_single(to_id0)"probe0"(Probe_type.ProbeProfiler_units.Words);Writer.write_new_single(to_id8)"timer8"Probe_type.Timer;Writer.write_new_group_point~id:(to_id7)~group_id:(to_id2)"group2point7"[|to_id5;to_id6|];Writer.Unsafe_internals.write_end_of_header();let(epoch2,id_map)=consume_header(Buffer.get_header_chunk())inletid_map_alist=Id_table.to_alistid_mapin[%test_pred:Profiler_epoch.t](funa->a=Writer.epoch)epoch2;[%test_eq:int](List.lengthid_map_alist)9;letexpect=[(to_id0,HI.Single{name="probe0";spec=Probe_type.ProbeProfiler_units.Words});(to_id1,HI.Group{name="group1";points_spec=Probe_type.Timer;children=[to_id4]});(to_id2,HI.Group{name="group2";HI.points_spec=Probe_type.ProbeProfiler_units.Seconds;children=[to_id7;to_id6;to_id5]});(to_id3,HI.Single{name="timer3";spec=Probe_type.Timer});(to_id4,HI.Group_point{name="group1point4";parent=to_id1;sources=[]});(to_id5,HI.Group_point{name="group2point5";parent=to_id2;sources=[]});(to_id6,HI.Group_point{name="group2point6";parent=to_id2;sources=[to_id5]});(to_id7,HI.Group_point{name="group2point7";parent=to_id2;sources=[to_id5;to_id6]});(to_id8,HI.Single{name="timer8";spec=Probe_type.Timer})]inassert(id_map_alist=expect))(* There's scope for making a zero-copy version of this *)moduleShort_message=structmoduleHeader=Protocol.Short_headertypet=|TimerofProbe_id.t*Time_ns.t|ProbeofProbe_id.t*Time_ns.t*int|Group_resetofProbe_id.t*Time_ns.t[@@derivingsexp,compare]letid=function|Timer(id,_)->id|Probe(id,_,_)->id|Group_reset(id,_)->idlettime=function|Timer(_,time)->time|Probe(_,time,_)->time|Group_reset(_,time)->timeendletconsume_short_messagebufferepochheader=letmoduleSM=Short_messageinletmoduleHI=Header.Iteminletremaining=Iobuf.lengthbufferinifremaining=0thenfailwith"Invalid short message: empty buffer"elseifremaining<8thenfailwith"Invalid short message: truncated"elsebegin(* fields common to read_timer, probe and group_reset *)letsm_header=Iobuf.Peek.int64_le_exnbuffer~pos:0inletsm_id=SM.Header.unpack_idsm_headerinletsm_time=SM.Header.unpack_timeepochsm_headerinletread_by_spec=function|Probe_type.Timer->Iobuf.unsafe_advancebuffer8;SM.Timer(sm_id,sm_time)|Probe_type.Probe_->ifremaining<16thenfailwith"Invalid short message: truncated"elsebeginletvalue=Iobuf.Peek.int64_le_exnbuffer~pos:8inIobuf.unsafe_advancebuffer16;SM.Probe(sm_id,sm_time,value)endinletread_group_reset()=Iobuf.unsafe_advancebuffer8;SM.Group_reset(sm_id,sm_time)inmatchHeader.find_exnheadersm_idwith|HI.Group_->read_group_reset()|HI.Single{spec;_}->read_by_specspec|HI.Group_point{parent;_}->let{HI.points_spec;_}=Header.find_group_exnheaderparentinread_by_specpoints_specendlet%test_unit"consume_short_message"=letmoduleBuffer=Protocol.BufferinletmoduleWriter=Protocol.Writerinprotect~finally:Buffer.Unsafe_internals.reset~f:(fun()->letto_id=Probe_id.of_int_exninlettime_past_epochx=Profiler_epoch.addWriter.epoch(Time_ns.Span.of_int_nsx)inWriter.Unsafe_internals.write_epoch();Writer.write_new_single(to_id1)"timer"Probe_type.Timer;Writer.write_new_single(to_id2)"probe"(Probe_type.ProbeProfiler_units.Seconds);Writer.write_new_group(to_id3)"timer-group"Probe_type.Timer;Writer.write_new_group_point~group_id:(to_id3)~id:(to_id4)"timer-group-point"[||];Writer.write_new_group(to_id5)"probe-group"(Probe_type.ProbeProfiler_units.Int);Writer.write_new_group_point~group_id:(to_id5)~id:(to_id6)"probe-group-point"[||];Writer.Unsafe_internals.write_end_of_header();Writer.write_timer_at(to_id1)(time_past_epoch100);Writer.write_probe_at(to_id2)(time_past_epoch200)22;Writer.write_group_reset(to_id3)(time_past_epoch300);Writer.write_timer_at(to_id4)(time_past_epoch400);Writer.write_probe_at(to_id6)(time_past_epoch600)66;let(epoch,header)=consume_header(Buffer.get_header_chunk())inletshort_messages_chunk=matchBuffer.get_chunks()with|[x]->Iobuf.sub_sharedx|_->assertfalseinletread()=consume_short_messageshort_messages_chunkepochheaderin[%test_eq:Short_message.t](read())(Timer(to_id1,time_past_epoch100));[%test_eq:Short_message.t](read())(Probe(to_id2,time_past_epoch200,22));[%test_eq:Short_message.t](read())(Group_reset(to_id3,time_past_epoch300));[%test_eq:Short_message.t](read())(Timer(to_id4,time_past_epoch400));[%test_eq:Short_message.t](read())(Probe(to_id6,time_past_epoch600,66));)letfold_short_messagesbufferepochheader~init~f=Iobuf.protect_window_and_bounds(Iobuf.no_seekbuffer)~f:(funbuffer->letrecloopaccum=ifIobuf.is_emptybufferthenaccumelsebeginletmessage=consume_short_messagebufferepochheaderinloop(faccummessage)endinloopinit)letiter_short_messagesbufferepochheader~f=Iobuf.protect_window_and_bounds(Iobuf.no_seekbuffer)~f:(funbuffer->whilenot(Iobuf.is_emptybuffer)dof(consume_short_messagebufferepochheader)done)letiteri_short_messagesbufferepochheader~f=Iobuf.protect_window_and_bounds(Iobuf.no_seekbuffer)~f:(funbuffer->leti=ref0inwhilenot(Iobuf.is_emptybuffer)dof!i(consume_short_messagebufferepochheader);incridone)letmap_filefilename=letfile_lengthfd=letcurrent_pos=Unix.lseekfd~mode:Unix.SEEK_CURInt64.zeroinletlength=Unix.lseekfd~mode:Unix.SEEK_ENDInt64.zeroinignore(Unix.lseekfd~mode:Unix.SEEK_SETcurrent_pos:int64);length|>Int64.to_int|>Option.value_exninUnix.with_filefilename~mode:[Unix.O_RDONLY]~f:(funfd->letmap=Bigstring.map_file~shared:falsefd(file_lengthfd)inIobuf.of_bigstringmap)