package streaming

  1. Overview
  2. Docs

Module Streaming.SinkSource

Module with defintions for sinks.

Sinks are streaming abstractions that consume values and produce an aggregated value as a result. The result value is extracted from an internal state that is built incrementally. The internal state can aquire resources that are guaranteed to be terminated when the sink is filled.

Sinks are a great way to define decoupled consumers that can be filled with Stream.into.

Sinks are independent from sources and streams. You can think of them as packed arguments for folding functions with early termination. Formally, they can also be interpreted as Moore machine.

Sourcetype ('a, 'b) t = ('a, 'b) sink

Type for sinks that consume elements of type 'a and, once done, produce a value of type 'b.

Creating a sink

Implementing custom sinks is useful to create a collection of reusable streaming consumers for your application.

The following example demonstrates a sink that consumes all elements into a list:

  let list_sink =
    let init () = [] in
    let push acc x = x :: acc in
    let stop acc = List.rev acc in
    Sink.make ~init ~push ~stop ()

Alternatively, existing list/array/string/queue sinks, or others listed below, can be used.

Sourceval fill : 'r -> ('a, 'r) t

fill result use result to fill the sink. This sink will not consume any input and will immediately produce result when used.

Sourceval fold : ('r -> 'a -> 'r) -> 'r -> ('a, 'r) t

fold f init is a sink that reduces all input elements with the stepping function f starting with the accumulator value init.

Sourceval fold_while : ('r -> bool) -> ('r -> 'a -> 'r) -> 'r -> ('a, 'r) t

fold_while full f init is similar to fold but can terminate early if full returns true.

Sourceval make : init:(unit -> 'acc) -> push:('acc -> 'a -> 'acc) -> ?full:('acc -> bool) -> stop:('acc -> 'r) -> unit -> ('a, 'r) t

Creates a sink from a function that initializes a state value, a stepping function to update that state and a stop function that produces the final result value. Optionally a full function can be passed to decide when the sink should terminate early.

Note: The calls to full should be cheap as this function will be called to avoid allocation of unnecessary resources. If the computation required to decide if the sink is full is expensive, consider caching it whenever possible.

Basic sinks

Sourceval full : ('a, unit) t

A full sink that will not consume any input and will not produce any results.

Sourceval is_empty : ('a, bool) t

is_empty is true if the sink did not consume any elements and false otherwise.

Sourceval each : ('a -> unit) -> ('a, unit) t

Applies an effectful action to all input elements producing nothing.

Sourceval len : ('a, int) t

Consumes and counts all input elements.

Sourceval first : ('a, 'a option) t

The first input element, or None if the sink did not receive enough input.

Equivalent to nth 0.

Sourceval last : ('a, 'a option) t

The last input element, or None if the sink did not receive enough input.

Sourceval nth : int -> ('a, 'a option) t

The n-th input element, or None if the sink did not receive enough input.

Sourceval drain : ('a, unit) t

Consumes all elements producing nothing. Useful for triggering actions in effectful streams.

Finding elements

Sourceval contains : where:('a -> bool) -> ('a, bool) t

contains ~where:pred finds the first element that satisfies pred returning None if there is no such element.

Sourceval find : where:('a -> bool) -> ('a, 'a option) t

find ~where:pred finds the first element that satisfies pred returning None if there is no such element.

Sourceval index : where:('a -> bool) -> ('a, int option) t

Similar to find but returns the index of the element that satisfies the predicate.

Sourceval minimum : by:('a -> 'a -> bool) -> ('a, 'a option) t

Finds the minimum element in the sequence, using the given predicate as as the comparison between the input elements.

Sourceval maximum : by:('a -> 'a -> bool) -> ('a, 'a option) t

Finds the maximum element in the sequence, using the given predicate as as the comparison between the input elements.

Logical predicates

Sourceval all : where:('a -> bool) -> ('a, bool) t

all ~where:pred is true if all input element satisfy pred. Will stop consuming elements when the first element that does not satisfy pred is found. Results in true for empty input.

Sourceval any : where:('a -> bool) -> ('a, bool) t

any ~where:pred is true if at least one input element satisfies pred. Will stop consuming elements when such an element is found. Results in false for empty input.

Data sinks

Sourceval list : ('a, 'a list) t

Puts all input elements into a list.

Sourceval array : ('a, 'a array) t

Puts all input elements into an array.

Sourceval buffer : int -> ('a, 'a array) t

Similar to array buf will only consume n elements.

Sourceval queue : ('a, 'a Queue.t) t

Puts all input elements into a queue.

Sourceval string : (string, string) t

Consumes and concatenates strings.

Sourceval bytes : (bytes, bytes) t

Consumes and concatenates bytes.

IO sinks

Sourceval print : (string, unit) t

Prints all input string elements to standard output as lines.

Sourceval file : string -> (string, unit) t

file path is a sink that writes input strings as lines into a file located at path.

Sourceval stdout : (string, unit) t

A sink that writes input strings as lines to STDOUT.

Sourceval stderr : (string, unit) t

A sink that writes input strings as lines to STDERR.

Numeric compuations

Sourceval sum : (int, int) t

Adds all input integer values.

Sourceval product : (int, int) t

Product of input integer values. Stops if any input element is 0.

Sourceval mean : (float, float) t

Computes a numerically stable arithmetic mean of all input elements.

Combining sinks

Sourceval zip : ('a, 'r1) t -> ('a, 'r2) t -> ('a, 'r1 * 'r2) t

zip left right computes both left and right at the same time with the same input being sent to both sinks. The results of both sinks are produced.

Sourceval zip_left : ('a, 'r) t -> ('a, _) t -> ('a, 'r) t

zip_left left right similar to zip, but only produces the result of the left sink.

Sourceval zip_right : ('a, _) t -> ('a, 'r) t -> ('a, 'r) t

zip_left left right similar to zip, but only produces the result of the right sink.

Sourceval zip_with : ('r1 -> 'r2 -> 'r) -> ('a, 'r1) t -> ('a, 'r2) t -> ('a, 'r) t

zip_with f left right similar to zip, but applies an aggregation function to results produced by left and right.

Sourceval (<&>) : ('a, 'r1) t -> ('a, 'r2) t -> ('a, 'r1 * 'r2) t

left <&> right is an operator version of zip left right.

Sourceval (<&) : ('a, 'r) t -> ('a, _) t -> ('a, 'r) t

left <& right is an operator version of zip_left left right.

Sourceval (&>) : ('a, _) t -> ('a, 'r) t -> ('a, 'r) t

left &> right is an operator version of zip_right left right.

Sourceval unzip : ('a, 'r1) t -> ('b, 'r2) t -> ('a * 'b, 'r1 * 'r2) t

unzip left right is a sink that receives pairs 'a * 'b, sending the first element into left and the second into right. Both sinks are computed at the same time and their results returned as an output pair.

The sink becomes full when either left or right get full.

Sourceval unzip_left : ('a, 'r) t -> ('b, _) t -> ('a * 'b, 'r) t

unzip_left left right is similar to unzip, but only produces the result of the left sink.

If right terminates first, left will be forced to terminate.

Sourceval unzip_right : ('a, _) t -> ('b, 'r) t -> ('a * 'b, 'r) t

unzip_left left right is similar to unzip, but only produces the result of the right sink.

If left terminates first, right will be forced to terminate.

Sourceval unzip_with : ('r1 -> 'r2 -> 'r) -> ('a, 'r1) t -> ('b, 'r2) t -> ('a * 'b, 'r) t

unzip_with f left right similar to unzip, but applies an aggregation function to results produced by left and right.

Sourceval (<*>) : ('a, 'r1) t -> ('b, 'r2) t -> ('a * 'b, 'r1 * 'r2) t

left <*> right is an operator version of unzip left right.

Sourceval (<*) : ('a, 'r) t -> ('b, _) t -> ('a * 'b, 'r) t

left <* right is an operator version of unzip_left left right.

Sourceval (*>) : ('a, _) t -> ('b, 'r) t -> ('a * 'b, 'r) t

left *> right is an operator version of unzip_right left right.

Sourceval distribute : ('a, 'r1) t -> ('a, 'r2) t -> ('a, 'r1 * 'r2) t

distribute left right is similar to zip but distributes the consumed elements over left and right alternating in a round robin fashion.

Sourcetype ('a, 'b) race =
  1. | Left of 'a
  2. | Right of 'b
  3. | Both of 'a * 'b

Type for race result values.

Sourceval race : ('a, 'r1) t -> ('a, 'r2) t -> ('a, ('r1, 'r2) race) t

race left right runs both left and right sinks at the same time producing the result for the one that fills first.

If the sink is terminated prematurely, before either left or right are filled, Both of their values are produced.

Examples

let sink = Sink.(race (find ~where:(fun x -> x > 10)) (nth 8)) in
let result = Stream.of_list [1; 9; 0; 8; 30; 4] |> Stream.into sink in
assert (result = Sink.Left (Some 30))
Sourceval (<|>) : ('a, 'r1) t -> ('a, 'r2) t -> ('a, ('r1, 'r2) race) t

left <|> right is the operator version of race left right.

Sourceval seq : ('a, 'r1) t -> ('a, 'r2) t -> ('a, 'r1 * 'r2) t

seq left right runs left and then right sequentially producing both of their results.

If the resulting sink is stopped before right was started, it will be forced to initialize and terminate.

Sourceval seq_left : ('a, 'r) t -> ('a, _) t -> ('a, 'r) t

seq_left left right is similar to seq, but only produces the result of the left sink.

Sourceval seq_right : ('a, _) t -> ('a, 'r) t -> ('a, 'r) t

seq_right left right is similar to seq, but only produces the result of the right sink.

Sourceval (<+>) : ('a, 'r1) t -> ('a, 'r2) t -> ('a, 'r1 * 'r2) t

left <+> right is an operator version of seq left right.

Sourceval (<+) : ('a, 'r) t -> ('a, _) t -> ('a, 'r) t

left <+ right is an operator version of seq_left left right.

Sourceval (+>) : ('a, _) t -> ('a, 'r) t -> ('a, 'r) t

left +> right is an operator version of seq_right left right.

Mapping and filtering sinks

Sourceval map : ('r1 -> 'r2) -> ('a, 'r1) t -> ('a, 'r2) t

map f sink is a sink sink with the result transformed with f.

Sourceval (<@>) : ('a -> 'b) -> ('c, 'a) t -> ('c, 'b) t

f <@> sink is the operator version of map f sink.

Sourceval premap : ('b -> 'a) -> ('a, 'r) t -> ('b, 'r) t

premap f sink is a sink that premaps the input values.

Examples

If sink consumes integers, but we have an input with strings, we can provide a conversion from strings to integers to premap:

let sink = Sink.(premap int_of_string sum) in
let result = Stream.of_list ["1"; "2"; "3"] |> Stream.into sink in
assert (result = 6)
Sourceval prefilter : ('a -> bool) -> ('a, 'r) t -> ('a, 'r) t

prefilter predicate sink is a sink that filter the input value for sink.

Resource management

Sourceval dispose : ('a, 'r) t -> 'r

Close the sink and produce the currently accumulated result. Any internal state will be terminated.

Syntax definitions

In addition to using the sinks and operations defined above, it is possible to create sinks with a convenient (let) notation.

A common example of a composed sink is the sink that computes the arithmetic mean:

let mean =
  let open Sink.Syntax in
  let+ total = Sink.sum
  and+ count = Sink.len in
  total / count

The resulting sink has type (int, int) sink and will only consume the input once!

Sourcemodule Syntax : sig ... end

Module with syntax definitions for sinks.

Interface implementations

Sourcemodule Functor : sig ... end

Module that implements the "Functor" interface.

Sourcemodule Applicative : sig ... end

Module that implements the "Applicative" interface.

OCaml

Innovation. Community. Security.