package lwt-pipe

  1. Overview
  2. Docs
An alternative to `Lwt_stream` with interfaces for producers and consumers and a bounded internal buffer

Install

Dune Dependency

Authors

Maintainers

Sources

v0.1.tar.gz
md5=46cfc88c4220d40356f6bea7c535be6e
sha512=ebc04adf58d913aac8caf43d76b2191fa76101c60a48f6c992a396e5bc8b0756d1c6ca0f9038141b77a40185c8cdb03a9de62252b1d23b06e12f201a9dff914b

doc/lwt-pipe/Lwt_pipe/index.html

Module Lwt_pipeSource

Pipes, Readers, Writers

Stream processing using:

  • Pipe: a possibly buffered channel that can act as a reader or as a writer
  • Reader: accepts values, produces effects
  • Writer: yield values

Examples:

  #require "lwt";;

  module P = Lwt_pipe;;

  let p1 =
    P.of_list CCList.(1 -- 100)
    |> P.Reader.map ~f:string_of_int;;

  Lwt_io.with_file ~mode:Lwt_io.output "/tmp/foo"
    (fun oc ->
       let p2 = P.IO.write_lines oc in
       P.connect ~ownership:`InOwnsOut p1 p2;
       P.wait p2
    );;

status: experimental

Sourceexception Closed
Sourcetype ('a, +'perm) t constraint 'perm = [< `r | `w ]

A pipe between producers of values of type 'a, and consumers of values of type 'a.

Sourcetype ('a, 'perm) pipe = ('a, 'perm) t
Sourcetype 'a read_timeout_result =
  1. | Pipe_closed
  2. | Nothing_available
  3. | Timeout
  4. | Data_available of 'a
    (*

    Return type for the read_with_timeout function

    *)
Sourceval keep : (_, _) t -> unit Lwt.t -> unit

keep p fut adds a pointer from p to fut so that fut is not garbage-collected before p

Sourceval is_closed : (_, _) t -> bool
Sourceval close : (_, _) t -> unit Lwt.t

close p closes p, which will not accept input anymore. This sends End to all readers connected to p

Sourceval close_nonblock : (_, _) t -> unit

Same as close but does not wait for completion of dependent tasks

Sourceval wait : (_, _) t -> unit Lwt.t

Evaluates once the pipe closes

Sourceval create : ?on_close:(unit -> unit) -> ?max_size:int -> unit -> ('a, 'perm) t

Create a new pipe.

  • parameter on_close

    called when the pipe is closed

  • parameter max_size

    size of internal buffer. Default 0.

Sourceval connect : ?ownership:[ `None | `InOwnsOut | `OutOwnsIn ] -> ('a, [> `r ]) t -> ('a, [> `w ]) t -> unit

connect p1 p2 forwards every item output by p1 into p2's input until p1 is closed.

  • parameter own

    determines which pipes owns which (the owner, when it closes, also closes the ownee)

link_close p ~after will close p when after closes. if after is closed already, closes p immediately

Sourceval read : ('a, [> `r ]) t -> 'a option Lwt.t

Read the next value from a Pipe

Sourceval read_with_timeout : ('a, [> `r ]) t -> timeout:float option -> 'a read_timeout_result Lwt.t

read_with_timeout p ~timeout read the next value from a Pipe, optionally waiting for at most a number of seconds passed with the timeout parameter.

Sourceval write : ('a, [> `w ]) t -> 'a -> bool Lwt.t

Returns false if the pipe is closed

Sourceval write_exn : ('a, [> `w ]) t -> 'a -> unit Lwt.t
  • raises Closed

    if the writer is closed

Sourceval write_list : ('a, [> `w ]) t -> 'a list -> bool Lwt.t

Returns false if the pipe is closed

Sourceval write_list_exn : ('a, [> `w ]) t -> 'a list -> unit Lwt.t
  • raises Closed

    if the writer is closed

Sourceval to_stream : ('a, [> `r ]) t -> 'a Lwt_stream.t

to_stream p returns a stream with the content from p. The stream will close when p closes.

Sourceval of_stream : 'a Lwt_stream.t -> ('a, [> `r ]) t

of_stream s reads from s. The returned pipe will close when s closes.

Write-only Interface and Combinators

Sourcemodule Writer : sig ... end

Read-only Interface and Combinators

Sourcemodule Reader : sig ... end

Conversions

Sourcetype 'a lwt_klist = [ `Nil | `Cons of 'a * 'a lwt_klist ] Lwt.t
Sourceval of_list : 'a list -> 'a Reader.t
Sourceval of_array : 'a array -> 'a Reader.t
Sourceval of_string : string -> char Reader.t
Sourceval of_lwt_klist : 'a lwt_klist -> 'a Reader.t
Sourceval to_list_rev : ('a, [> `r ]) t -> 'a list Lwt.t
Sourceval to_list : ('a, [> `r ]) t -> 'a list Lwt.t
Sourceval to_buffer : Buffer.t -> (char, [> `r ]) t -> unit Lwt.t
Sourceval to_buffer_str : ?sep:string -> Buffer.t -> (string, [> `r ]) t -> unit Lwt.t
Sourceval to_string : (char, [> `r ]) t -> string Lwt.t
Sourceval join_strings : ?sep:string -> (string, [> `r ]) t -> string Lwt.t
Sourceval to_lwt_klist : 'a Reader.t -> 'a lwt_klist

Iterates on the reader. Errors are ignored (but stop the list).

Basic IO wrappers

Sourcemodule IO : sig ... end
OCaml

Innovation. Community. Security.