Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file server_connection.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407(*----------------------------------------------------------------------------
* Copyright (c) 2017 Inhabited Type LLC.
* Copyright (c) 2019 Antonio N. Monteiro.
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. Neither the name of the author nor the names of his contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*---------------------------------------------------------------------------*)moduleAB=Angstrom.BufferedmoduleReader=Parse.ReadermoduleWriter=Serialize.WritermoduleScheduler=Scheduler.Make(structincludeStreamtypet=Reqd.tletflush_write_body=Reqd.flush_response_bodyletrequires_output=Reqd.requires_outputend)typerequest_handler=Reqd.t->unittypeerror=[`Bad_request|`Internal_server_error|`Exnofexn]typeerror_handler=?request:Request.t->error->(Headers.t->[`write]Body.t)->unittypet={mutablesettings:Settings.t;reader:Reader.frame;writer:Writer.t;config:Config.t;request_handler:request_handler;error_handler:error_handler;streams:Scheduler.t(* Number of currently open client streams. Used for
* MAX_CONCURRENT_STREAMS bookkeeping *);mutablecurrent_client_streams:int;mutablemax_client_stream_id:Stream_identifier.t;mutablemax_pushed_stream_id:Stream_identifier.t;mutablereceiving_headers_for_stream:Stream_identifier.toption(* Keep track of number of SETTINGS frames that we sent and for which
* we haven't eceived an acknowledgment from the client. *);mutableunacked_settings:int;mutabledid_send_go_away:bool(* From RFC7540§4.3:
* Header compression is stateful. One compression context and one
* decompression context are used for the entire connection. *);hpack_encoder:Hpack.Encoder.t;hpack_decoder:Hpack.Decoder.t}letis_closedt=Reader.is_closedt.reader&&Writer.is_closedt.writerletwakeup_writert=Writer.wakeupt.writerletshutdown_readert=Reader.force_closet.readerletshutdown_writert=Writer.closet.writer;wakeup_writertletshutdownt=shutdown_readert;shutdown_writert(* Handling frames against closed streams is hard. See:
* https://docs.google.com/presentation/d/1iG_U2bKTc9CnKr0jPTrNfmxyLufx_cK2nNh9VjrKH6s
*)letwas_closed_or_implicitly_closedtstream_id=ifStream_identifier.is_requeststream_idthenStream_identifier.(stream_id<=t.max_client_stream_id)elseStream_identifier.(stream_id<=t.max_pushed_stream_id)(* TODO: currently connection-level errors are not reported to the error
* handler because it is assumed that an error handler will produce a response,
* and since HTTP/2 is multiplexed, there's no matching response for a
* connection error. We should do something about it. *)letreport_errort=function|Error.ConnectionError(error,data)->ifnott.did_send_go_awaythen((* From RFC7540§5.4.1:
* An endpoint that encounters a connection error SHOULD first send a
* GOAWAY frame (Section 6.8) with the stream identifier of the last
* stream that it successfully received from its peer. The GOAWAY frame
* includes an error code that indicates why the connection is
* terminating. After sending the GOAWAY frame for an error condition,
* the endpoint MUST close the TCP connection. *)letdebug_data=ifString.lengthdata=0thenBigstringaf.emptyelseBigstringaf.of_string~off:0~len:(String.lengthdata)datainletframe_info=Writer.make_frame_infoStream_identifier.connectionin(* TODO: Only write if not already shutdown. *)Writer.write_go_awayt.writerframe_info~debug_data~last_stream_id:t.max_client_stream_iderror;Writer.flusht.writer(fun()->(* XXX: We need to allow lower numbered streams to complete before
* shutting down. *)shutdownt);t.did_send_go_away<-true;wakeup_writert)|StreamError(stream_id,error)->(matchScheduler.findt.streamsstream_idwith|Somereqd->Stream.reset_streamreqderror|None->ifnot(was_closed_or_implicitly_closedtstream_id)then(* Possible if the stream was going to enter the Idle state (first time
* we saw e.g. a PRIORITY frame for it) but had e.g. a
* FRAME_SIZE_ERROR. *)letframe_info=Writer.make_frame_infostream_idinWriter.write_rst_streamt.writerframe_infoerror);wakeup_writertletreport_connection_errort?(additional_debug_data="")error=report_errort(ConnectionError(error,additional_debug_data))letreport_stream_errortstream_iderror=report_errort(StreamError(stream_id,error))letset_error_and_handle?requesttstreamerrorerror_code=assert(request=None);Reqd.report_errorstreamerrorerror_code;wakeup_writertletreport_exntexn=ifnot(is_closedt)thenletadditional_debug_data=Printexc.to_stringexninreport_connection_errort~additional_debug_dataError_code.InternalErrorleton_close_streamtid~activeclosed=ifactivethen(* From RFC7540§5.1.2:
* Streams that are in the "open" state or in either of the "half-closed"
* states count toward the maximum number of streams that an endpoint is
* permitted to open. *)t.current_client_streams<-t.current_client_streams-1;Scheduler.mark_for_removalt.streamsidclosedletsend_window_update:typea.t->aScheduler.PriorityTreeNode.node->int->unit=funtstreamn->letsend_window_update_framestream_idn=letvalid_inflow=Scheduler.add_inflowstreamninassertvalid_inflow;letframe_info=Writer.make_frame_infostream_idinWriter.write_window_updatet.writerframe_infoninifn>0then(letmax_window_size=Settings.WindowSize.max_window_sizeinletstream_id=Scheduler.stream_idstreaminletrecloopn=ifn>max_window_sizethen(send_window_update_framestream_idmax_window_size;loop(n-max_window_size))elsesend_window_update_framestream_idninloopn;wakeup_writert)letcreate_push_streamtparent_stream_id=letcandidate_push_stream_id=Int32.addt.max_pushed_stream_id2linifnott.settings.enable_pushthen(* From RFC7540§6.6:
* PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH setting of
* the peer endpoint is set to 0. *)Error`Push_disabledelseifStream_identifier.(candidate_push_stream_id>max_stream_id)then((* From RFC7540§5.1:
* Stream identifiers cannot be reused. Long-lived connections can result
* in an endpoint exhausting the available range of stream identifiers.
* [...] A server that is unable to establish a new stream identifier can
* send a GOAWAY frame so that the client is forced to open a new
* connection for new streams. *)report_connection_errortError_code.NoError;Error`Stream_ids_exhausted)elseletpushed_stream_id=candidate_push_stream_idint.max_pushed_stream_id<-pushed_stream_id;letreqd=Stream.createpushed_stream_id~max_frame_size:t.settings.max_frame_sizet.writert.error_handler(on_close_streamtpushed_stream_id)in(* From RFC7540§5.3.5:
* All streams are initially assigned a non-exclusive dependency on
* stream 0x0. Pushed streams (Section 8.2) initially depend on their
* associated stream. In both cases, streams are assigned a default
* weight of 16. *)let_stream:Scheduler.nonrootScheduler.node=Scheduler.addt.streams~priority:{Priority.default_prioritywithstream_dependency=parent_stream_id}~initial_send_window_size:t.settings.initial_window_size~initial_recv_window_size:t.config.initial_window_sizereqdinOkreqdlethandle_headerst~end_streamstreamactive_streamheaders=let(Scheduler.Stream{descriptor=reqd;_})=streamin(* From RFC7540§5.1.2:
* Endpoints MUST NOT exceed the limit set by their peer. An endpoint that
* receives a HEADERS frame that causes its advertised concurrent stream
* limit to be exceeded MUST treat this as a stream error (Section 5.4.2)
* of type PROTOCOL_ERROR or REFUSED_STREAM. *)ift.current_client_streams+1>t.config.max_concurrent_streamsthenift.unacked_settings>0then(* From RFC7540§8.1.4:
* The REFUSED_STREAM error code can be included in a RST_STREAM frame
* to indicate that the stream is being closed prior to any processing
* having occurred. Any request that was sent on the reset stream can
* be safely retried.
*
* Note: if there are pending SETTINGS to acknowledge, assume there was a
* race condition and let the client retry. *)report_stream_errortreqd.Stream.idError_code.RefusedStreamelsereport_stream_errortreqd.Stream.idError_code.ProtocolErrorelse(reqd.state<-Active(OpenFullHeaders,active_stream);(* From RFC7540§5.1.2:
* Streams that are in the "open" state or in either of the "half-closed"
* states count toward the maximum number of streams that an endpoint is
* permitted to open. *)t.current_client_streams<-t.current_client_streams+1;matchHeaders.method_path_and_scheme_or_malformedheaderswith|`Malformed->(* From RFC7540§8.1.2.6:
* For malformed requests, a server MAY send an HTTP response prior to
* closing or resetting the stream. *)set_error_and_handletreqd`Bad_requestProtocolError|`Valid(meth,path,scheme)->(* Note: we don't need to check for `end_stream` flag + a non-zero body
* length, as the spec allows for non-zero content-length headers and no
* DATA frames.
*
* From RFC7540§8.1.2.6:
* A response that is defined to have no payload, as described in
* [RFC7230], Section 3.3.2, can have a non-zero content-length header
* field, even though no content is included in DATA frames. *)(matchMessage.body_lengthheaderswith|`Errore->set_error_and_handletreqdeProtocolError|`Fixed_|`Unknown->letrequest=Request.create~scheme~headers(Httpaf.Method.of_stringmeth)pathinletrequest_body=ifend_streamthenBody.emptyelseBody.create_reader(Bigstringaf.createt.config.request_body_buffer_size)~done_reading:(funlen->(* From RFC7540§6.9.1:
* The receiver of a frame sends a WINDOW_UPDATE frame as it
* consumes data and frees up space in flow-control windows.
* Separate WINDOW_UPDATE frames are sent for the stream- and
* connection-level flow-control windows. *)matchreqd.statewith|Active_->send_window_updatett.streamslen;send_window_updatetstreamlen|Idle|Reserved_|Closed_->())inletrequest_info=Reqd.create_active_requestrequestrequest_bodyinifend_streamthen((* From RFC7540§5.1:
* [...] an endpoint receiving an END_STREAM flag causes the stream
* state to become "half-closed (remote)". *)reqd.state<-Active(HalfClosedrequest_info,active_stream);(* Deliver EOF to the request body, as the handler might be waiting
* on it to produce a response. *)Body.close_readerrequest_body)elsereqd.state<-Active(Open(ActiveMessagerequest_info),active_stream);t.request_handlerreqd;wakeup_writert))lethandle_headers_blockt?(is_trailers=false)streamactive_streampartial_headersflagsheaders_block=letopenABinlet(Scheduler.Stream{descriptor=reqd;_})=streaminletend_headers=Flags.test_end_headerflagsin(* From RFC7540§6.10:
* An endpoint receiving HEADERS, PUSH_PROMISE, or CONTINUATION
* frames needs to reassemble header blocks and perform decompression
* even if the frames are to be discarded *)letparse_state'=AB.feedpartial_headers.Stream.parse_state(`Bigstringheaders_block)inifend_headersthen(t.receiving_headers_for_stream<-None;letparse_state'=AB.feedparse_state'`Eofinmatchparse_state'with|Done(_,Okheaders)->ifnotis_trailersthen((* Note:
* the highest stream identifier that the server has seen is set here
* (as opposed to when the stream was first opened - when handling
* the first HEADERS frame) because it refers to the highest stream
* identifier that the server will process.
*
* From RFC7540§6.8:
* The last stream identifier in the GOAWAY frame contains the
* highest-numbered stream identifier for which the sender of the
* GOAWAY frame might have taken some action on or might yet take
* action on. All streams up to and including the identified stream
* might have been processed in some way. *)t.max_client_stream_id<-reqd.Stream.id;(* `handle_headers` will take care of transitioning the stream state *)handle_headerst~end_stream:partial_headers.end_streamstreamactive_streamheaders)elseifHeaders.trailers_validheadersthen(Reqd.deliver_trailer_headersreqdheaders;letrequest_body=Reqd.request_bodyreqdinBody.close_readerrequest_body)else(* From RFC7540§8.1.2.1:
* Pseudo-header fields MUST NOT appear in trailers. Endpoints MUST
* treat a request or response that contains undefined or invalid
* pseudo-header fields as malformed (Section 8.1.2.6). *)set_error_and_handletreqd`Bad_requestProtocolError(* From RFC7540§4.3:
* A decoding error in a header block MUST be treated as a connection
* error (Section 5.4.1) of type COMPRESSION_ERROR. *)|Done(_,Error_)|Partial_->report_connection_errortError_code.CompressionError|Fail(_,_,message)->report_connection_errort~additional_debug_data:messageError_code.CompressionError)elsepartial_headers.parse_state<-parse_state'lethandle_trailer_headers=handle_headers_block~is_trailers:trueletopen_streamt~prioritystream_id=ifnotStream_identifier.(stream_id>t.max_client_stream_id)then((* From RFC7540§5.1.1:
* [...] The identifier of a newly established stream MUST be numerically
* greater than all streams that the initiating endpoint has opened or
* reserved. [...] An endpoint that receives an unexpected stream
* identifier MUST respond with a connection error (Section 5.4.1) of
* type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolError;None)else(* From RFC7540§6.2:
* The HEADERS frame (type=0x1) is used to open a stream (Section 5.1),
* and additionally carries a header block fragment. HEADERS frames can
* be sent on a stream in the "idle", "reserved (local)", "open", or
* "half-closed (remote)" state. *)matchScheduler.get_nodet.streamsstream_idwith|None->letreqd=Stream.createstream_id~max_frame_size:t.settings.max_frame_sizet.writert.error_handler(on_close_streamtstream_id)inletstream:Scheduler.nonrootScheduler.node=Scheduler.addt.streams~priority~initial_send_window_size:t.settings.initial_window_size~initial_recv_window_size:t.config.initial_window_sizereqdinSomestream|Some(Scheduler.Streamnodeasstream)->(* From RFC7540§6.9.2:
* Both endpoints can adjust the initial window size for new streams
* by including a value for SETTINGS_INITIAL_WINDOW_SIZE in the
* SETTINGS frame.
*
* Note: we already have the stream in the priority tree, and the
* default initial window size for new streams could have changed
* between adding the (idle) stream and opening it.
*
* Note: `inflow` doesn't change, that's set by us statically via config.
*)node.flow<-t.settings.initial_window_size;Somestreamletprocess_first_headers_blocktframe_headerstreamheaders_block=let(Scheduler.Stream{descriptor=reqd;_})=streaminlet{Frame.stream_id;flags;_}=frame_headerinletend_headers=Flags.test_end_headerflagsinletheaders_block_length=Bigstringaf.lengthheaders_blockinletinitial_buffer_size=ifend_headersthenheaders_block_lengthelse(* Conservative estimate that there's only going to be one CONTINUATION
* frame. *)2*headers_block_lengthinletpartial_headers={Stream.parse_state=AB.parse~initial_buffer_size(Hpack.Decoder.decode_headerst.hpack_decoder);end_stream=Flags.test_end_streamflags}inletactive_stream=Reqd.create_active_streamt.hpack_encodert.config.response_body_buffer_size(create_push_streamt)inreqd.Stream.state<-Active(Open(PartialHeaderspartial_headers),active_stream);ifnotend_headersthent.receiving_headers_for_stream<-Somestream_id;handle_headers_blocktstreamactive_streampartial_headersflagsheaders_blockletprocess_trailer_headerststreamactive_streamframe_headerheaders_block=let(Scheduler.Stream{descriptor=reqd;_})=streaminlet{Frame.stream_id;flags;_}=frame_headerinletend_stream=Flags.test_end_streamflagsinifnotend_streamthen(* From RFC7540§8.1:
* A HEADERS frame (and associated CONTINUATION frames) can only appear
* at the start or end of a stream. An endpoint that receives a HEADERS
* frame without the END_STREAM flag set after receiving a final
* (non-informational) status code MUST treat the corresponding request
* or response as malformed (Section 8.1.2.6). *)set_error_and_handletreqd`Bad_requestProtocolErrorelseletpartial_headers={Stream.parse_state=AB.parse(Hpack.Decoder.decode_headerst.hpack_decoder)(* obviously true at this point. *);end_stream}inactive_stream.Reqd.trailers_parser<-Somepartial_headers;ifnotFlags.(test_end_headerflags)thent.receiving_headers_for_stream<-Somestream_id;(* trailer headers: RFC7230§4.4 *)handle_trailer_headerststreamactive_streampartial_headersflagsheaders_blockletprocess_headers_framet{Frame.frame_header;_}~priorityheaders_block=let{Frame.stream_id;_}=frame_headerinlet{Priority.stream_dependency;_}=priorityinifnotStream_identifier.(is_requeststream_id)then(* From RFC7540§5.1.1:
* Streams initiated by a client MUST use odd-numbered stream
* identifiers. [...] An endpoint that receives an unexpected
* stream identifier MUST respond with a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolErrorelseifStream_identifier.(stream_dependency===stream_id)then(* From RFC7540§5.3.1:
* A stream cannot depend on itself. An endpoint MUST treat this as a
* stream error (Section 5.4.2) of type PROTOCOL_ERROR. *)report_stream_errortstream_idError_code.ProtocolErrorelsematchScheduler.get_nodet.streamsstream_idwith|None->(matchopen_streamt~prioritystream_idwith|Somereqd->process_first_headers_blocktframe_headerreqdheaders_block|None->())|Some(Scheduler.Stream{descriptor=reqd;_}asstream)->(matchreqd.statewith|Idle->(* From RFC7540§6.2:
* HEADERS frames can be sent on a stream in the "idle", "reserved
* (local)", "open", or "half-closed (remote)" state. *)(matchopen_streamt~prioritystream_idwith|Somereqd->process_first_headers_blocktframe_headerreqdheaders_block|None->())|Active(Open(WaitingForPeer|PartialHeaders_),_)->(* This case is unreachable because we check that partial HEADERS
* states must be followed by CONTINUATION frames elsewhere. *)assertfalse(* if we're getting a HEADERS frame at this point, they must be
* trailers, and the END_STREAM flag needs to be set. *)|Active(Open(FullHeaders|ActiveMessage_),active_stream)->process_trailer_headerststreamactive_streamframe_headerheaders_block|Active(HalfClosed_,_)(* From RFC7540§5.1:
* half-closed (remote): [...] If an endpoint receives additional
* frames, other than WINDOW_UPDATE, PRIORITY, or RST_STREAM, for a
* stream that is in this state, it MUST respond with a stream
* error (Section 5.4.2) of type STREAM_CLOSED. *)|Closed{reason=ResetByThem_;_}->(* From RFC7540§5.1:
* closed: [...] An endpoint that receives any frame other than
* PRIORITY after receiving a RST_STREAM MUST treat that as a
* stream error (Section 5.4.2) of type STREAM_CLOSED. *)report_stream_errortstream_idError_code.StreamClosed(* From RFC7540§5.1:
* reserved (local): [...] Receiving any type of frame other than
* RST_STREAM, PRIORITY, or WINDOW_UPDATE on a stream in this state
* MUST be treated as a connection error (Section 5.4.1) of type
* PROTOCOL_ERROR. *)|Reserved_|Closed_->(* From RFC7540§5.1:
* Similarly, an endpoint that receives any frames after receiving
* a frame with the END_STREAM flag set MUST treat that as a
* connection error (Section 5.4.1) of type STREAM_CLOSED [...]. *)report_connection_errortError_code.StreamClosed)letprocess_data_framet{Frame.frame_header;_}bstr=letopenSchedulerinlet{Frame.flags;stream_id;payload_length;_}=frame_headerinifnot(Stream_identifier.is_requeststream_id)then(* From RFC7540§5.1.1:
* Streams initiated by a client MUST use odd-numbered stream
* identifiers. [...] An endpoint that receives an unexpected stream
* identifier MUST respond with a connection error (Section 5.4.1) of
* type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolErrorelse((* From RFC7540§6.9:
* A receiver that receives a flow-controlled frame MUST always account
* for its contribution against the connection flow-control window,
* unless the receiver treats this as a connection error (Section 5.4.1).
* This is necessary even if the frame is in error. *)Scheduler.deduct_inflowt.streamspayload_length;matchScheduler.get_nodet.streamsstream_idwith|Some(Stream{descriptor;_}asstream)->(matchdescriptor.statewith|Active(Open(ActiveMessagerequest_info),active_stream)->letrequest_body=Reqd.request_bodydescriptorinrequest_info.request_body_bytes<-Int64.(addrequest_info.request_body_bytes(of_int(Bigstringaf.lengthbstr)));letrequest=request_info.requestinifnotScheduler.(allowed_to_receivet.streamsstreampayload_length)then((* From RFC7540§6.9:
* A receiver MAY respond with a stream error (Section 5.4.2) or
* connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if it
* is unable to accept a frame. *)send_window_updatett.streamspayload_length;report_stream_errortstream_idError_code.FlowControlError)else(Scheduler.deduct_inflowstreampayload_length;matchMessage.body_lengthrequest.headerswith|`Fixedlen(* Getting more than the client declared *)whenInt64.comparerequest_info.request_body_byteslen>0->(* Give back connection-level flow-controlled bytes (we use payload
* length to include any padding bytes that the frame might have
* included - which were ignored at parse time). *)send_window_updatett.streamspayload_length;(* From RFC7540§8.1.2.6:
* A request or response is also malformed if the value of a
* content-length header field does not equal the sum of the
* DATA frame payload lengths that form the body. *)set_error_and_handletdescriptor`Bad_requestProtocolError|_->letend_stream=Flags.test_end_streamflagsinifend_streamthenif(* From RFC7540§6.1:
* When set, bit 0 indicates that this frame is the last that
* the endpoint will send for the identified stream. Setting
* this flag causes the stream to enter one of the
* "half-closed" states or the "closed" state
* (Section 5.1). *)Reqd.requires_outputdescriptorthen(* There's a potential race condition here if the request
* handler completes the response right after. *)descriptor.state<-Active(HalfClosedrequest_info,active_stream)elseStream.finish_streamdescriptorFinished;(* From RFC7540§6.9.1:
* The receiver of a frame sends a WINDOW_UPDATE frame as it
* consumes data and frees up space in flow-control windows.
* Separate WINDOW_UPDATE frames are sent for the stream- and
* connection-level flow-control windows.
*
* Note: we send these WINDOW_UPDATE frames once the body bytes
* have been surfaced to the application. This is done in the
* record field `done_reading` of `Body.t`. *)letfaraday=Body.unsafe_faradayrequest_bodyinifnot(Faraday.is_closedfaraday)then(Faraday.schedule_bigstringfaradaybstr;ifend_streamthenBody.close_readerrequest_body);Reqd.flush_request_bodydescriptor)|Idle->(* From RFC7540§5.1:
* idle: [...] Receiving any frame other than HEADERS or PRIORITY on
* a stream in this state MUST be treated as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolError(* This is technically in the half-closed (local) state *)|Closed{reason=ResetByUsNoError;_}->(* From RFC7540§6.9:
* A receiver that receives a flow-controlled frame MUST always
* account for its contribution against the connection flow-control
* window, unless the receiver treats this as a connection error
* (Section 5.4.1). This is necessary even if the frame is in
* error. *)send_window_updatett.streamspayload_length(* From RFC7540§6.4:
* [...] after sending the RST_STREAM, the sending endpoint MUST be
* prepared to receive and process additional frames sent on the
* stream that might have been sent by the peer prior to the arrival
* of the RST_STREAM.
*
* Note: after some writer yields / wake ups, we will have stopped
* keeping state information for the stream. This functions effectively
* as a way of only accepting frames after an RST_STREAM from us up to
* a time limit. *)|_->send_window_updatett.streamspayload_length;(* From RFC7540§6.1:
* If a DATA frame is received whose stream is not in "open" or
* "half-closed (local)" state, the recipient MUST respond with a
* stream error (Section 5.4.2) of type STREAM_CLOSED. *)report_stream_errortstream_idError_code.StreamClosed)|None->ifnot(was_closed_or_implicitly_closedtstream_id)then(* From RFC7540§5.1:
* idle: [...] Receiving any frame other than HEADERS or PRIORITY on
* a stream in this state MUST be treated as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolError)letprocess_priority_framet{Frame.frame_header;_}priority=let{Frame.stream_id;_}=frame_headerinlet{Priority.stream_dependency;_}=priorityinifnot(Stream_identifier.is_requeststream_id)then(* From RFC7540§5.1.1:
* Streams initiated by a client MUST use odd-numbered stream
* identifiers. [...] An endpoint that receives an unexpected stream
* identifier MUST respond with a connection error (Section 5.4.1) of
* type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolErrorelseifStream_identifier.(stream_id===stream_dependency)then(* From RFC7540§5.3.1:
* A stream cannot depend on itself. An endpoint MUST treat this as a
* stream error (Section 5.4.2) of type PROTOCOL_ERROR. *)report_stream_errortstream_idError_code.ProtocolErrorelsematchScheduler.get_nodet.streamsstream_idwith|Somestream->Scheduler.reprioritize_streamt.streams~prioritystream|None->(* From RFC7540§5.1.1:
* Endpoints SHOULD process PRIORITY frames, though they can be ignored
* if the stream has been removed from the dependency tree (see Section
* 5.3.4).
*
* Note:
* if we're receiving a PRIORITY frame for a stream that we already
* removed from the tree (i.e. can't be found in the hash table, and
* for which the stream ID is smaller then or equal to the max stream
* id that the client has opened), don't bother processing it. *)ifnot(was_closed_or_implicitly_closedtstream_id)thenletreqd=Stream.createstream_id~max_frame_size:t.settings.max_frame_sizet.writert.error_handler(on_close_streamtstream_id)inlet_stream:Scheduler.nonrootScheduler.node=Scheduler.addt.streams~priority~initial_send_window_size:t.settings.initial_window_size~initial_recv_window_size:t.config.initial_window_sizereqdin()letprocess_rst_stream_framet{Frame.frame_header;_}error_code=let{Frame.stream_id;_}=frame_headerinifnot(Stream_identifier.is_requeststream_id)then(* From RFC7540§5.1.1:
* Streams initiated by a client MUST use odd-numbered stream
* identifiers. [...] An endpoint that receives an unexpected stream
* identifier MUST respond with a connection error (Section 5.4.1) of
* type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolErrorelsematchScheduler.findt.streamsstream_idwith|Somereqd->(matchreqd.statewith|Idle->(* From RFC7540§6.4:
* RST_STREAM frames MUST NOT be sent for a stream in the "idle"
* state. If a RST_STREAM frame identifying an idle stream is
* received, the recipient MUST treat this as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolError|_->(* From RFC7540§6.4:
* The RST_STREAM frame fully terminates the referenced stream and
* causes it to enter the "closed" state. After receiving a
* RST_STREAM on a stream, the receiver MUST NOT send additional
* frames for that stream, with the exception of PRIORITY.
*
* Note:
* This match branch also accepts streams in the `Closed` state. We
* do that to comply with the following:
*
* From RFC7540§6.4:
* [...] after sending the RST_STREAM, the sending endpoint MUST be
* prepared to receive and process additional frames sent on the
* stream that might have been sent by the peer prior to the arrival
* of the RST_STREAM. *)Stream.finish_streamreqd(ResetByThemerror_code))|None->(* We might have removed the stream from the hash table. If its stream
* id is smaller than or equal to the max client stream id we've seen,
* then it must have been closed. *)ifnot(was_closed_or_implicitly_closedtstream_id)then(* From RFC7540§6.4:
* RST_STREAM frames MUST NOT be sent for a stream in the "idle"
* state. If a RST_STREAM frame identifying an idle stream is
* received, the recipient MUST treat this as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR.
*
* Note:
* If we didn't find the stream in the hash table it must be
* "idle". *)report_connection_errortError_code.ProtocolErrorletapply_settings_listtsettings=letopenSchedulerin(* From RFC7540§6.5:
* Each parameter in a SETTINGS frame replaces any existing value for
* that parameter. Parameters are processed in the order in which they
* appear, and a receiver of a SETTINGS frame does not need to maintain
* any state other than the current value of its parameters. *)letnew_settings=List.fold_left(fun(acc:Settings.t)item->matchitemwith|Settings.HeaderTableSize,x->(* From RFC7540§6.5.2:
* Allows the sender to inform the remote endpoint of the maximum
* size of the header compression table used to decode header
* blocks, in octets. *)Hpack.Encoder.set_capacityt.hpack_encoderx;{accwithheader_table_size=x}|EnablePush,x->(* We've already verified that this setting is either 0 or 1 in the
* call to `Settings.check_settings_list` above. *){accwithenable_push=x=1}|MaxConcurrentStreams,x->{accwithmax_concurrent_streams=x}|InitialWindowSize,new_val->(* From RFC7540§6.9.2:
* [...] a SETTINGS frame can alter the initial flow-control
* window size for streams with active flow-control windows (that
* is, streams in the "open" or "half-closed (remote)" state).
* When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, a
* receiver MUST adjust the size of all stream flow-control
* windows that it maintains by the difference between the new
* value and the old value. *)letold_val=acc.initial_window_sizeinletgrowth=new_val-old_valinletexceptionLocalin(matchScheduler.iter~f:(funstream->(* From RFC7540§6.9.2:
* An endpoint MUST treat a change to
* SETTINGS_INITIAL_WINDOW_SIZE that causes any
* flow-control window to exceed the maximum size as a
* connection error (Section 5.4.1) of type
* FLOW_CONTROL_ERROR. *)ifnot(Scheduler.add_flowstreamgrowth)thenraiseLocal)t.streamswith|()->()|exceptionLocal->report_connection_errort~additional_debug_data:(Format.sprintf"Window size for stream would exceed %d"Settings.WindowSize.max_window_size)Error_code.FlowControlError);{accwithinitial_window_size=new_val}|MaxFrameSize,x->(* XXX: We're probably not abiding entirely by this. If we get a
* MAX_FRAME_SIZE setting we'd need to reallocate the read buffer?
* This will need support from the I/O runtimes. *)Scheduler.iter~f:(fun(Stream{descriptor;_})->ifReqd.requires_outputdescriptorthendescriptor.max_frame_size<-x)t.streams;{accwithmax_frame_size=x}|MaxHeaderListSize,x->{accwithmax_header_list_size=Somex})t.settingssettingsint.settings<-new_settingsletwrite_settings_framet~acksettings=letflags=ifackthenFlags.(set_ackdefault_flags)elseFlags.default_flagsinletframe_info=Writer.make_frame_info~flagsStream_identifier.connectioninWriter.write_settingst.writerframe_infosettings;ifnotackthen(* Don't expected our acknowledgements to be acknowledged... *)t.unacked_settings<-t.unacked_settings+1letprocess_settings_framet{Frame.frame_header;_}settings=let{Frame.flags;_}=frame_headerin(* We already checked that an acked SETTINGS is empty. Don't need to do
* anything else in that case *)ifFlags.(test_ackflags)then(t.unacked_settings<-t.unacked_settings-1;ift.unacked_settings<0then(* The server is ACKing a SETTINGS frame that we didn't send *)letadditional_debug_data="Received SETTINGS with ACK but no ACK was pending"inreport_connection_errort~additional_debug_dataError_code.ProtocolError)elsematchSettings.check_settings_listsettingswith|Ok()->apply_settings_listtsettings;(* From RFC7540§6.5.2:
* Once all values have been processed, the recipient MUST immediately
* emit a SETTINGS frame with the ACK flag set.
*
* From RFC7540§6.5:
* ACK (0x1): [...] When this bit is set, the payload of the SETTINGS
* frame MUST be empty. *)write_settings_framet~ack:true[];wakeup_writert|Errorerror->report_errorterrorletprocess_ping_framet{Frame.frame_header;_}payload=let{Frame.flags;_}=frame_headerin(* From RFC7540§6.7:
* ACK (0x1): When set, bit 0 indicates that this PING frame is a PING
* response. [...] An endpoint MUST NOT respond to PING frames containing
* this flag. *)ifnot(Flags.test_ackflags)then((* From RFC7540§6.7:
* Receivers of a PING frame that does not include an ACK flag MUST send
* a PING frame with the ACK flag set in response, with an identical
* payload. PING responses SHOULD be given higher priority than any other
* frame. *)letframe_info=Writer.make_frame_info(* From RFC7540§6.7:
* ACK (0x1): When set, bit 0 indicates that this PING frame is a
* PING response. An endpoint MUST set this flag in PING
* responses. *)~flags:Flags.(set_ackdefault_flags)Stream_identifier.connectionin(* From RFC7540§6.7:
* Receivers of a PING frame that does not include an ACK flag MUST send
* a PING frame with the ACK flag set in response, with an identical
* payload. *)Writer.write_pingt.writerframe_infopayload;wakeup_writert)letprocess_goaway_framet_framepayload=let_last_stream_id,_error,debug_data=payloadinletlen=Bigstringaf.lengthdebug_datainletbytes=Bytes.createleninBigstringaf.unsafe_blit_to_bytesdebug_data~src_off:0bytes~dst_off:0~len;(* TODO(anmonteiro): I think we need to allow lower numbered streams to
* complete. *)shutdowntletadd_window_increment:typea.t->aScheduler.PriorityTreeNode.node->int->unit=funtstreamincrement->letopenSchedulerinletdid_add=Scheduler.add_flowstreamincrementinletstream_id=Scheduler.stream_idstreaminletnew_flow=matchstreamwith|Connection{flow;_}->flow|Stream{flow;_}->flowinifdid_addthen(ifnew_flow>0then(* Don't bother waking up the writer if the new flow doesn't allow
* the stream to write. *)wakeup_writert)elseifStream_identifier.is_connectionstream_idthenreport_connection_errort~additional_debug_data:(Printf.sprintf"Window size for stream would exceed %d"Settings.WindowSize.max_window_size)Error_code.FlowControlErrorelsereport_stream_errortstream_idError_code.FlowControlErrorletprocess_window_update_framet{Frame.frame_header;_}window_increment=letopenSchedulerinlet{Frame.stream_id;_}=frame_headerin(* From RFC7540§6.9:
* The WINDOW_UPDATE frame can be specific to a stream or to the entire
* connection. In the former case, the frame's stream identifier indicates
* the affected stream; in the latter, the value "0" indicates that the
* entire connection is the subject of the frame. *)ifStream_identifier.is_connectionstream_idthenadd_window_incrementtt.streamswindow_incrementelsematchScheduler.get_nodet.streamsstream_idwith|Some(Stream{descriptor;_}asstream_node)->(matchdescriptor.statewith|Idle->(* From RFC7540§5.1:
* idle: [...] Receiving any frame other than HEADERS or PRIORITY on
* a stream in this state MUST be treated as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolError|Active_(* From RFC7540§5.1:
* reserved (local): [...] A PRIORITY or WINDOW_UPDATE frame MAY be
* received in this state. *)|Reserved_->add_window_incrementtstream_nodewindow_increment|Closed_->(* From RFC7540§6.9:
* [...] a receiver could receive a WINDOW_UPDATE frame on a
* "half-closed (remote)" or "closed" stream. A receiver MUST NOT
* treat this as an error (see Section 5.1). *)(* From RFC7540§5.1:
* Endpoints MUST ignore WINDOW_UPDATE or RST_STREAM frames received
* in this state, though endpoints MAY choose to treat frames that
* arrive a significant time after sending END_STREAM as a connection
* error (Section 5.4.1) of type PROTOCOL_ERROR. *)())|None->ifnot(was_closed_or_implicitly_closedtstream_id)then(* From RFC7540§5.1:
* idle: [...] Receiving any frame other than HEADERS or PRIORITY on
* a stream in this state MUST be treated as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolErrorletprocess_continuation_framet{Frame.frame_header;_}headers_block=let{Frame.stream_id;flags;_}=frame_headerinifnot(Stream_identifier.is_requeststream_id)then(* From RFC7540§5.1.1:
* Streams initiated by a client MUST use odd-numbered stream
* identifiers. [...] An endpoint that receives an unexpected stream
* identifier MUST respond with a connection error (Section 5.4.1) of
* type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolErrorelsematchScheduler.get_nodet.streamsstream_idwith|Some(Scheduler.Stream{descriptor;_}asstream)->(matchdescriptor.statewith|Active(Open(PartialHeaderspartial_headers),active_stream)->handle_headers_blocktstreamactive_streampartial_headersflagsheaders_block|Active(Open(ActiveMessage_),({trailers_parser=Somepartial_headers;_}asactive_stream))->handle_trailer_headerststreamactive_streampartial_headersflagsheaders_block|_->(* TODO: maybe need to handle the case where the stream has been closed
* due to a stream error. *)(* From RFC7540§6.10:
* A RST_STREAM is the last frame that an endpoint can send on a
* stream. The peer that sends the RST_STREAM frame MUST be prepared
* to receive any frames that were sent or enqueued for sending by
* the remote peer. These frames can be ignored, except where they
* modify connection state (such as the state maintained for header
* compression (Section 4.3) or flow control). *)report_connection_errortError_code.ProtocolError)|None->(* From RFC7540§6.10:
* A CONTINUATION frame MUST be preceded by a HEADERS, PUSH_PROMISE or
* CONTINUATION frame without the END_HEADERS flag set. A recipient
* that observes violation of this rule MUST respond with a connection
* error (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolErrorletdefault_error_handler?request:_errorhandle=letmessage=matcherrorwith|`Exnexn->Printexc.to_stringexn|(#Status.client_error|#Status.server_error)aserror->Status.to_stringerrorinletbody=handleHeaders.emptyinBody.write_stringbodymessage;Body.close_writerbodyletwrite_connection_prefacet=(* Check if the settings for the connection are different than the default
* HTTP/2 settings. In the event that they are, we need to send a non-empty
* SETTINGS frame advertising our configuration. *)letsettings=Settings.settings_for_the_connectiont.settingsin(* XXX(anmonteiro): same as in the client. Revert
* [t.settings.initial_window_size] to the spec-default value until we
* receive a setting for it. *)t.settings<-{t.settingswithinitial_window_size=Settings.default.initial_window_size};(* Now send the connection preface, including our settings for the
* connection.
*
* From RFC7540§3.5:
* The server connection preface consists of a potentially empty
* SETTINGS frame (Section 6.5) that MUST be the first frame the
* server sends in the HTTP/2 connection. *)write_settings_frame~ack:falsetsettings;(* If a higher value for initial window size is configured, add more
* tokens to the connection (we have no streams at this point). *)ift.config.initial_window_size>Settings.default.initial_window_sizethenletdiff=t.config.initial_window_size-Settings.default.initial_window_sizeinsend_window_updatett.streamsdiffletcreate_generic~h2c~config~error_handlerrequest_handler=letsettings=Config.to_settingsconfiginletwriter=Writer.createsettings.max_frame_sizeinletrecconnection_preface_handlerrecv_framesettings_list=lett=Lazy.forcetin(* If this connection is `h2c` (HTTP/2 over TCP), we have already written
* the server connection preface. This is only necessary if we're
* responding to a client-initiated connection, but in `h2c` the server
* writes the preface first (handled below in `create_h2c`) *)ifnoth2cthenwrite_connection_prefacet;(* Now process the client's SETTINGS frame. `process_settings_frame` will
* take care of calling `wakeup_writer`. *)process_settings_frametrecv_framesettings_listandframe_handlerr=lett=Lazy.forcetinmatchrwith|Errore->report_errorte|Ok({Frame.frame_payload;frame_header}asframe)->(matcht.receiving_headers_for_streamwith|Somestream_idwhen(notStream_identifier.(stream_id===frame_header.stream_id))||frame_header.frame_type<>Continuation->(* From RFC7540§6.2:
* A HEADERS frame without the END_HEADERS flag set MUST be followed
* by a CONTINUATION frame for the same stream. A receiver MUST treat
* the receipt of any other type of frame or a frame on a different
* stream as a connection error (Section 5.4.1) of type
* PROTOCOL_ERROR. *)report_connection_errort~additional_debug_data:"HEADERS or PUSH_PROMISE without the END_HEADERS flag set must be \
followed by a CONTINUATION frame for the same stream"Error_code.ProtocolError|_->(matchframe_payloadwith|Headers(priority,headers_block)->process_headers_frametframe~priorityheaders_block|Databs->process_data_frametframebs|Prioritypriority->process_priority_frametframepriority|RSTStreamerror_code->process_rst_stream_frametframeerror_code|Settingssettings->process_settings_frametframesettings|PushPromise_->(* From RFC7540§8.2:
* A client cannot push. Thus, servers MUST treat the receipt of a
* PUSH_PROMISE frame as a connection error (Section 5.4.1) of type
* PROTOCOL_ERROR. *)report_connection_errort~additional_debug_data:"Client cannot push"Error_code.ProtocolError|Pingdata->process_ping_frametframedata|GoAway(last_stream_id,error,debug_data)->process_goaway_frametframe(last_stream_id,error,debug_data)|WindowUpdatewindow_size->process_window_update_frametframewindow_size|Continuationheaders_block->process_continuation_frametframeheaders_block|Unknown_->(* TODO: in the future we can expose a hook for handling unknown
* frames, e.g. the ALTSVC frame defined in RFC7838§4
* (https://tools.ietf.org/html/rfc7838#section-4) *)(* From RFC7540§5.1:
* Frames of unknown types are ignored. *)()))andt=lazy{settings;reader=Reader.server_frames~max_frame_size:settings.max_frame_sizeconnection_preface_handlerframe_handler;writer;config;request_handler;error_handler;streams=Scheduler.make_root();current_client_streams=0;max_client_stream_id=0l;max_pushed_stream_id=0l;receiving_headers_for_stream=None;did_send_go_away=false;unacked_settings=0;hpack_encoder=Hpack.Encoder.(createsettings.header_table_size);hpack_decoder=Hpack.Decoder.(createsettings.header_table_size)}inLazy.forcetletcreate?(config=Config.default)?(error_handler=default_error_handler)request_handler=(* `h2c` false = direct *)create_generic~h2c:false~config~error_handlerrequest_handlerlethandle_h2c_requesttheadersrequest_body_iovecs=(* From RFC7540§3.2:
* The HTTP/1.1 request that is sent prior to upgrade is assigned a stream
* identifier of 1 (see Section 5.1.1) with default priority values
* (Section 5.3.5). *)matchopen_stream~priority:Priority.default_priorityt1lwith|Some(Scheduler.Stream{descriptor=reqd;_}asstream)->letactive_stream=Reqd.create_active_streamt.hpack_encodert.config.response_body_buffer_size(create_push_streamt)int.max_client_stream_id<-reqd.Stream.id;letlengthv=Httpaf.IOVec.lengthvrequest_body_iovecsinletend_stream=lengthv=0inhandle_headerst~end_streamstreamactive_streamheaders;letrequest=Reqd.requestreqdinletrequest_body=Reqd.request_bodyreqdinletrequest_info=Reqd.create_active_requestrequestrequest_bodyinrequest_info.request_body_bytes<-Int64.(addrequest_info.request_body_bytes(of_intlengthv));(* From RFC7540§3.2:
* Stream 1 is implicitly "half-closed" from the client toward the server
* (see Section 5.1), since the request is completed as an HTTP/1.1
* request. After commencing the HTTP/2 connection, stream 1 is used for
* the response. *)reqd.state<-Active(HalfClosedrequest_info,active_stream);ifnotend_streamthenletfaraday=Body.unsafe_faradayrequest_bodyinifnot(Faraday.is_closedfaraday)then(List.iter(fun{Httpaf.IOVec.buffer;off;len}->Faraday.schedule_bigstringfaraday~off~lenbuffer)request_body_iovecs;(* Close the request body, we're not expecting more input. *)Body.close_readerrequest_body)|None->()(* This function is meant to be called inside an HTTP/1.1 upgrade handler.
*
* It's useful to have the request body on the server because a single request
* handler may process both upgraded (h2c) and direct connections. Given the
* following, application code that calls this function needs to buffer the
* entire request body in memory if it wants, so we can just get a
* `Bigstringaf.t IOVec.t list`
*
*
* From RFC7540§3.2:
* Requests that contain a payload body MUST be sent in their
* entirety before the client can send HTTP/2 frames. This means
* that a large request can block the use of the connection until
* it is completely sent. *)letcreate_h2c?(config=Config.default)?(error_handler=default_error_handler)~http_request?(request_body=[])request_handler=let{Httpaf.Request.headers;_}=http_requestin(* From RFC7540§3.2.1:
* A request that upgrades from HTTP/1.1 to HTTP/2 MUST include exactly one
* HTTP2-Settings header field.
*
* [...] A server MUST NOT upgrade the connection to HTTP/2 if this header
* field is not present or if more than one is present. A server MUST NOT
* send this header field. *)matchHttpaf.Headers.(get_multiheaders"http2-settings",get_multiheaders"connection")with|[settings],[connection]whenHeaders.is_valid_h2c_connectionconnection->(* From RFC7540§3.2.1:
* The content of the HTTP2-Settings header field is the payload of a
* SETTINGS frame (Section 6.5), encoded as a base64url string (that is,
* the URL- and filename-safe Base64 encoding described in Section 5 of
* [RFC4648], with any trailing '=' characters omitted).
*
* [...] A server decodes and interprets these values as it would any
* other SETTINGS frame. Explicit acknowledgement of these settings
* (Section 6.5.3) is not necessary, since a 101 response serves as
* implicit acknowledgement. *)(matchHeaders.of_http1http_requestwith|Okh2_headers->(matchSettings.of_base64settingswith|Okupgrade_settings->letsettings_list=Settings.settings_for_the_connectionupgrade_settingsin(matchSettings.check_settings_listsettings_listwith|Ok()->lett=create_generic~h2c:true~config~error_handlerrequest_handlerinapply_settings_listtsettings_list;(* From RFC7540§3.5:
* The first HTTP/2 frame sent by the server MUST be a server
* connection preface (Section 3.5) consisting of a SETTINGS
* frame (Section 6.5).
*
* Note: as opposed to a connection started by a client, in h2c
* we're upgrading from HTTP/1.1 so the server is reponsible for
* writing the HTTP/2 connection preface to the wire first. We
* also configure the connection with `~h2c:true` above to not
* send it a second time. *)write_connection_prefacet;(* From RFC7540§3.2:
* A server that supports HTTP/2 accepts the upgrade with a 101
* (Switching Protocols) response. After the empty line that
* terminates the 101 response, the server can begin sending
* HTTP/2 frames. These frames MUST include a response to the
* request that initiated the upgrade *)handle_h2c_requestth2_headersrequest_body;Okt|Errorerror->Error(Error.messageerror))|Errormsg->Errormsg)|Errormsg->Errormsg)|_->Error"A request that upgrades from HTTP/1.1 to HTTP/2 MUST include exactly \
one HTTP2-Settings header field and HTTP2-Settings as a connection \
option in the Connection header field."letnext_read_operationt=ifReader.is_closedt.readerthenshutdown_readert;matchReader.nextt.readerwith|(`Read|`Close)asoperation->operation|`Errore->report_errorte;(matchewith|ConnectionError_->(* From RFC7540§5.4.1:
* A connection error is any error that prevents further processing
* of the frame layer or corrupts any connection state. *)`Close|StreamError_->(* From RFC7540§5.4.2:
* A stream error is an error related to a specific stream that does
* not affect processing of other streams. *)`Read)letreadtbs~off~len=Reader.read_with_moret.readerbs~off~lenIncompleteletread_eoftbs~off~len=Reader.read_with_moret.readerbs~off~lenComplete(* XXX(anmonteiro): this function is here to please the Gluten `RUNTIME`
* interface.
*
* We don't expect this function to ever be called. H2 never issues `Yield`
* commands because the connection is multiplexed, and it's therefore always
* looking to read frames from the peer. *)letyield_reader_tk=k()letnext_write_operationt=Scheduler.flusht.streams(t.max_client_stream_id,t.max_pushed_stream_id);Writer.nextt.writerletreport_write_resulttresult=Writer.report_resultt.writerresultletyield_writertk=Writer.on_wakeup_writert.writerk