Page
Library
Module
Module type
Parameter
Class
Class type
Source
Streaming.Stream
SourceModule with defintions for streams.
Stream is a purely functional abstraction for incremental, push-based, sequential processing of elements. Streams can be easily and efficiently transformed and concatenated.
Stream operations do not leak resources. This is guaranteed in the presence of early termination (when not all stream elements are consumed) and in case of exceptions in the streaming pipeline.
Streams are built to be compatible with sources, sinks and flows. To create a stream that produces all elements from a source use Stream.from
, to consume a stream with a sink use Stream.into
and to transform stream elements with a flow use Stream.via
. For more sophisticated pipelines that might have source leftovers, run
can be used.
A simple echo program that loops over standard input and prints every line to standard output until Ctrl-D is hit:
# Stream.stdin |> Stream.stdout;;
hello<Enter>
hello
world<Enter>
world
<Ctrl+d>
- : unit = ()
range ~by:step n m
is a sequence of integers starting from n
to m
(excluding m
) incremented by step
. The range is open on the right side.
iota n
is range ~by:1 0 n
, that is a range from 0
to n
incremented by 1
.
generate ~len f
generates a stream of length n
mapping each index to an element with f
.
repeat ~times:n x
produces a stream by repeating x
n
times. If times
is omitted, x
is repeated ad infinitum.
repeatedly ~times:n f
produces a stream by repeatedly calling f ()
n
times. If times
is omitted, f
is called ad infinitum.
iterate x f
is an infinite source where the first item is calculated by applying f
to x
, the second item by applying the function on the previous result and so on.
unfold seed next
is a stream created from a seed
state and a function that produces elements and an updated state. The stream will terminate when next
produces None
.
of_array items
is a stream with all elements in the array items
.
A stream with all elements transformed with a mapping function.
A stream that includes only the elements that satisfy a predicate.
Take first elements from the stream that satisfy a predicate and discard the rest.
Drpo first elements from the stream that satisfy a predicate and keep the rest.
concat stream1 stream2
is a stream that exhausts all elements from stream1
and then all elements from stream2
.
Examples
# let stream1 = Stream.of_list ['a'; 'b'; 'c'] in
let stream2 = Stream.of_list ['d'; 'e'; 'f'] in
Stream.to_list (Stream.concat stream1 stream2)
- : char list = ['a'; 'b'; 'c'; 'd'; 'e'; 'f']
stream1 ++ stream2
is the infix operator version of concat stream1 stream2
.
prepend x stream
adds the element x
to the beginning of stream
.
flat_map f stream
is a stream concatenated from sub-streams produced by applying f
to all elements of stream
.
let duplicated =
[1; 2; 3]
|> String.of_list
|> String.flat_map (fun x -> Stream.of_list [x; x])
|> Stream.to_list in
assert (duplicated = [1; 1; 2; 2; 3; 3])
cycle ~times:n stream
produces a stream by repeating all elements from stream
n
times. If times
is omitted, x
is repeated ad infinitum.
partition n
partitions the stream into sub-streams of size n
.
Operations that traverse the the stream computing a single result value.
If the stream is infinite and the consumer accumulates the elements, the processing will not terminate, potentially resulting in a memory leak.
len stream
counts the number of elements in stream
.
Will exhaust the stream during processing.
Examples
# Stream.len (Stream.of_list ['a'; 'b'; 'c']);
- : int = 3
each f stream
applies an effectful function f
to all elements of stream
.
fold step init stream
reduces the values of stream
with the step
function, starting with init
.
If the step
function raises an exception, the stream will be properly terminated.
is_empty stream
is true
if the stream has no elements and false
otherwise. This operations consumes the first elements of the stream.
of_file path
is a stream of lines read from the file located at path
.
The file is opened lazily only when the stream is consumed and will be closed even if the stream processing terminates with an exception.
to_file path stream
writes lines from stream
into the file located at path
.
Integration adaptors for sources, sinks and flows.
from source
is a stream created from a source.
Examples
# Stream.len (Stream.from (Source.list [0; 1; 2]))
- : int = 3
into sink stream
is the result value produced by streaming all elements of stream
into sink
.
Examples
# Stream.into Sink.sum (Stream.of_list [0; 1; 2])
- : int = 3
fill sink stream
is similar to into
but, in addition to the result value produced by sink
, will optionally return a leftover stream with elements that were not consumed by sink
.
via flow stream
is stream produced by transforming all elements of stream
via flow
.
Examples
Stream.count 100
|> Stream.via (Flow.filter (fun x -> x mod 2 = 0))
|> Stream.via (Flow.take 50)
|> Stream.into Sink.sum
Fuses sources, sinks and flows and produces a result and a leftover.
let (r, leftover) = Stream.run ~from:source via:flow ~into:sink
Streams elements from source
into sink
via a stream transformer flow
. In addition to the result value r
produced by sink
, a leftover
source is returned, if source
was not exhausted.
Warning: If a leftover source is produced, it is required to either consume it or manually dispose its resources. Not doing so might lead to resource leaks.
Examples
# let (x, leftover) =
let source = Source.list ["1"; "2"; "3"] in
let flow = Flow.map int_of_string in
Stream.run ~from:source ~via:flow ~into:Sink.first
val x : int option = Some 1
val leftover : string source option = Some <abstr>
# match leftover with
| Some source -> Source.dispose source
| None -> print_endline "No leftover"
- : unit = ()
Streams can be constructed with the let-binding syntax which is similar to list comprehensions. The following example demonstrates this feature:
open Stream.Syntax
let items =
let* n = Stream.range 1 3 in
let* c = Stream.of_list ['x'; 'y'] in
yield (n, c) in
assert (Strea.to_list items = [(1, 'x'); (1, 'y'); (2, 'x'); (2, 'y')])