package streaming

  1. Overview
  2. Docs

Module Streaming.StreamSource

Module with defintions for streams.

Stream is a purely functional abstraction for incremental, push-based, sequential processing of elements. Streams can be easily and efficiently transformed and concatenated.

Stream operations do not leak resources. This is guaranteed in the presence of early termination (when not all stream elements are consumed) and in case of exceptions in the streaming pipeline.

Streams are built to be compatible with sources, sinks and flows. To create a stream that produces all elements from a source use Stream.from, to consume a stream with a sink use Stream.into and to transform stream elements with a flow use Stream.via. For more sophisticated pipelines that might have source leftovers, run can be used.

A simple echo program that loops over standard input and prints every line to standard output until Ctrl-D is hit:

  # Stream.stdin |> Stream.stdout;;
  hello<Enter>
  hello
  world<Enter>
  world
  <Ctrl+d>
  - : unit = ()
Sourcetype 'a t = 'a stream

Type for streams with elements of type 'a.

Creating a stream

Sourceval empty : 'a t

Empty stream with no elements.

Sourceval single : 'a -> 'a t

single a is a stream with a single element a.

Sourceval double : 'a -> 'a -> 'a t

double a b is a stream with two elements: a and b.

Sourceval triple : 'a -> 'a -> 'a -> 'a t

triple a b c is a stream with elements: a, b and c.

Sourceval count : int -> int t

count n is an infinite stream with integers starting from n.

Sourceval range : ?by:int -> int -> int -> int t

range ~by:step n m is a sequence of integers starting from n to m (excluding m) incremented by step. The range is open on the right side.

Sourceval iota : int -> int t

iota n is range ~by:1 0 n, that is a range from 0 to n incremented by 1.

Sourceval (-<) : int -> int -> int t

n -< m is range n m.

Sourceval (--) : int -> int -> int t

n -- m is range n (m - 1).

Sourceval generate : len:int -> (int -> 'a) -> 'a t

generate ~len f generates a stream of length n mapping each index to an element with f.

Sourceval repeat : ?times:int -> 'a -> 'a t

repeat ~times:n x produces a stream by repeating x n times. If times is omitted, x is repeated ad infinitum.

Sourceval repeatedly : ?times:int -> (unit -> 'a) -> 'a t

repeatedly ~times:n f produces a stream by repeatedly calling f () n times. If times is omitted, f is called ad infinitum.

Sourceval iterate : 'a -> ('a -> 'a) -> 'a t

iterate x f is an infinite source where the first item is calculated by applying f to x, the second item by applying the function on the previous result and so on.

Sourceval unfold : 's -> ('s -> ('a * 's) option) -> 'a t

unfold seed next is a stream created from a seed state and a function that produces elements and an updated state. The stream will terminate when next produces None.

Sourceval yield : 'a -> 'a t

yield x is a stream with a single element x.

Stream converters

Sourceval of_list : 'a list -> 'a t

of_list items is a stream with all elements in the list items.

Sourceval to_list : 'a t -> 'a list

to_list stream converts stream into a list.

Sourceval of_array : 'a array -> 'a t

of_array items is a stream with all elements in the array items.

Sourceval to_array : 'a t -> 'a array

to_array stream converts stream into an array.

Sourceval of_string : string -> char t

of_string string is a stream with all characters in string.

Sourceval to_string : char t -> string

to_string stream converts stream of characters into a string.

Transformerming a stream

Sourceval map : ('a -> 'b) -> 'a t -> 'b t

A stream with all elements transformed with a mapping function.

Sourceval filter : ('a -> bool) -> 'a t -> 'a t

A stream that includes only the elements that satisfy a predicate.

Sourceval take : int -> 'a t -> 'a t

Take first n elements from the stream and discard the rest.

Sourceval take_while : ('a -> bool) -> 'a t -> 'a t

Take first elements from the stream that satisfy a predicate and discard the rest.

Sourceval drop_while : ('a -> bool) -> 'a t -> 'a t

Drpo first elements from the stream that satisfy a predicate and keep the rest.

Sourceval drop : int -> 'a t -> 'a t

Drop first n elements from the stream and keep the rest.

Sourceval rest : 'a t -> 'a t

Drops the first element of the stream.

Sourceval indexed : 'a t -> (int * 'a) t

Adds an index to each element in the stream.

Combining streams

Sourceval concat : 'a t -> 'a t -> 'a t

concat stream1 stream2 is a stream that exhausts all elements from stream1 and then all elements from stream2.

Examples

  # let stream1 = Stream.of_list ['a'; 'b'; 'c'] in
    let stream2 = Stream.of_list ['d'; 'e'; 'f'] in
    Stream.to_list (Stream.concat stream1 stream2)
  - : char list = ['a'; 'b'; 'c'; 'd'; 'e'; 'f']
Sourceval (++) : 'a t -> 'a t -> 'a t

stream1 ++ stream2 is the infix operator version of concat stream1 stream2.

Sourceval append : 'a -> 'a t -> 'a t

append x stream adds the element x to the end of stream.

Sourceval prepend : 'a -> 'a t -> 'a t

prepend x stream adds the element x to the beginning of stream.

Sourceval flatten : 'a t t -> 'a t

Concatenates a stream of streams.

Sourceval flat_map : ('a -> 'b t) -> 'a t -> 'b t

flat_map f stream is a stream concatenated from sub-streams produced by applying f to all elements of stream.

let duplicated =
  [1; 2; 3]
  |> String.of_list
  |> String.flat_map (fun x -> Stream.of_list [x; x])
  |> Stream.to_list in
assert (duplicated = [1; 1; 2; 2; 3; 3])
Sourceval cycle : ?times:int -> 'a t -> 'a t

cycle ~times:n stream produces a stream by repeating all elements from stream n times. If times is omitted, x is repeated ad infinitum.

Sourceval interpose : 'a -> 'a t -> 'a t

Inserts a separator element between each stream element.

Groupping and splitting

Sourceval partition : int -> 'a t -> 'a t t

partition n partitions the stream into sub-streams of size n.

Sourceval split : by:('a -> bool) -> 'a t -> 'a t t

split ~by:predicate stream splits stream whe

Consumers

Operations that traverse the the stream computing a single result value.

If the stream is infinite and the consumer accumulates the elements, the processing will not terminate, potentially resulting in a memory leak.

Sourceval len : 'a t -> int

len stream counts the number of elements in stream.

Will exhaust the stream during processing.

Examples

# Stream.len (Stream.of_list ['a'; 'b'; 'c']);
- : int = 3
Sourceval each : ('a -> unit) -> 'a t -> unit

each f stream applies an effectful function f to all elements of stream.

Sourceval fold : ('r -> 'a -> 'r) -> 'r -> 'a t -> 'r

fold step init stream reduces the values of stream with the step function, starting with init.

If the step function raises an exception, the stream will be properly terminated.

Sourceval is_empty : 'a t -> bool

is_empty stream is true if the stream has no elements and false otherwise. This operations consumes the first elements of the stream.

Sourceval first : 'a t -> 'a option

Return the first element in the stream.

Sourceval last : 'a t -> 'a option

Return the last element in the stream, in linear time.

Sourceval drain : 'a t -> unit

IO Streams

Sourceval of_file : string -> string t

of_file path is a stream of lines read from the file located at path.

The file is opened lazily only when the stream is consumed and will be closed even if the stream processing terminates with an exception.

Sourceval to_file : string -> string t -> unit

to_file path stream writes lines from stream into the file located at path.

Sourceval stdin : string t

The stream that reads lines from the standard input channel.

Sourceval stdout : string t -> unit

The stream that writes lines to standard output channel.

Sourceval stderr : string t -> unit

The stream that writes lines to standard error channel.

Adaptors

Integration adaptors for sources, sinks and flows.

Sourceval from : 'a source -> 'a t

from source is a stream created from a source.

Examples

  # Stream.len (Stream.from (Source.list [0; 1; 2]))
  - : int = 3
Sourceval into : ('a, 'b) sink -> 'a t -> 'b

into sink stream is the result value produced by streaming all elements of stream into sink.

Examples

  # Stream.into Sink.sum (Stream.of_list [0; 1; 2])
  - : int = 3

fill sink stream is similar to into but, in addition to the result value produced by sink, will optionally return a leftover stream with elements that were not consumed by sink.

Sourceval via : ('a, 'b) flow -> 'a stream -> 'b stream

via flow stream is stream produced by transforming all elements of stream via flow.

Examples

  Stream.count 100
  |> Stream.via (Flow.filter (fun x -> x mod 2 = 0))
  |> Stream.via (Flow.take 50)
  |> Stream.into Sink.sum
Sourceval run : from:'a source -> via:('a, 'b) flow -> into:('b, 'r) sink -> 'r * 'a source option

Fuses sources, sinks and flows and produces a result and a leftover.

let (r, leftover) = Stream.run ~from:source via:flow ~into:sink

Streams elements from source into sink via a stream transformer flow. In addition to the result value r produced by sink, a leftover source is returned, if source was not exhausted.

Warning: If a leftover source is produced, it is required to either consume it or manually dispose its resources. Not doing so might lead to resource leaks.

Examples

# let (x, leftover) =
    let source = Source.list ["1"; "2"; "3"] in
    let flow = Flow.map int_of_string in
    Stream.run ~from:source ~via:flow ~into:Sink.first
val x : int option = Some 1
val leftover : string source option = Some <abstr>
# match leftover with
  | Some source -> Source.dispose source
  | None -> print_endline "No leftover"
- : unit = ()

Syntax defintions

Streams can be constructed with the let-binding syntax which is similar to list comprehensions. The following example demonstrates this feature:

open Stream.Syntax

let items =
  let* n = Stream.range 1 3 in
  let* c = Stream.of_list ['x'; 'y'] in
  yield (n, c) in
assert (Strea.to_list items = [(1, 'x'); (1, 'y'); (2, 'x'); (2, 'y')])
Sourcemodule Syntax : sig ... end

Module with syntax definitions for streams.

OCaml

Innovation. Community. Security.