package streaming

  1. Overview
  2. Docs

Module Streaming.SourceSource

Module with defintions for sources.

Elements are pulled from a source when needed. A source can have an internal state that will be lazily initialized when (and if) a consumer requests elements. The internal state will be safely disposed when the source runs out of elements, when the consumer terminates, or if an exception is raised at any point in the streaming pipeline.

Sources are a great way to define decoupled producers that can be consumed with Stream.from.

Sources are "single shot" and will haver their input exhausted by most operations. Consider buffering sources if you need to reuse their input.

Sourcetype 'a t = 'a source

The type for sources that produce elements of type 'a.

Creating a source

Implementing your own custom sources enables access to many useful operations. The most flexible way to create a source is with the Source.make function.

The following example creates a source that counts down to zero:

  let countdown n =
    let init () = n in
    let pull i =
      if i = 0 then None
      else Some (i, i - 1)) in
    Source.make ~init ~pull

Alternatively, existing list/array/seq/string sources, or others listed below, can be used.

Sourceval empty : 'a t

zero is an empty source.

Sourceval single : 'a -> 'a t

single a is a source with a single element a.

Sourceval list : 'a list -> 'a t

list items is a source with all elements from the items list.

Sourceval seq : 'a Seq.t -> 'a t

seq items is a source with all elements from the items sequence.

Sourceval array : 'a array -> 'a t

array items is a source with all elements from the items array.

Sourceval string : string -> char t

string str is a source with all characters from the str string.

Sourceval bytes : bytes -> char t

bytes b is a source with all characters from the b bytes.

Sourceval queue : 'a Queue.t -> 'a t

queue q is a source with all characters from the q queue.

Sourceval generate : len:int -> (int -> 'a) -> 'a t

generate ~len f generates a source of length len mapping each index to an element with f.

Sourceval count : int -> int t

count n is an infinite source with integers starting from n.

Sourceval iterate : 'a -> ('a -> 'a) -> 'a t

iterate x f is an infinite source where the first item is calculated by applying f to x, the second item by applying the function on the previous result and so on.

Sourceval unfold : 's -> ('s -> ('a * 's) option) -> 'a t

unfold seed next is a finite source created from a seed state and a function that produces elements and an updated state.

Sourceval make : init:(unit -> 's) -> pull:('s -> ('a * 's) option) -> ?stop:('s -> unit) -> unit -> 'a t

make ~init ~pull ~stop () is a value source created from the init, pull and stop. This function is similar to unfold but without lazy state initialization and state termination functions.

Note: For better performance, it is recommended that the pull function caches the termination condition in case it is expensive.

Zipping sources

Sourceval zip_with : ('a -> 'b -> 'c) -> 'a t -> 'b t -> 'c t

zip_with f src1 src2 is a source that pulls elements from src1 and src2 one by one, combining them with f.

Sourceval zip : 'a t -> 'b t -> ('a * 'b) t

zip src1 src2 is a source of pairs with elements elements pulled from src1 and src2 one by one.

Equivalent to zip_with (fun x y -> (x, y)) src1 src2.

Transforming a source

Note: Instead of applying the transformation functions at the source, consider using Stream.from or defining your compuation as a Flow to make it reusable.

Sourceval map : ('a -> 'b) -> 'a t -> 'b t

A source with all elements transformed with a mapping function.

Sourceval filter : ('a -> bool) -> 'a t -> 'a t

A source that includes only the elements that satisfy a predicate.

Sourceval take : int -> 'a t -> 'a t

Take first n elements from the source and discard the rest.

Similar to take but takes the last n elements.

Sourceval take_while : ('a -> bool) -> 'a t -> 'a t

Take first elements from the source that satisfy a predicate and discard the rest.

Sourceval drop : int -> 'a t -> 'a t

Drop first n elements from the source and keep the rest.

Similar to drop but drops the last n elements.

Sourceval drop_while : ('a -> bool) -> 'a t -> 'a t

Drpo first elements from the source that satisfy a predicate and keep the rest.

Consuming a source

Many consumers are available in the Sink module. You can consume any source using a sink with:

let source = Source.count 10 in
source
|> Stream.from
|> Stream.into Sink.last

Alternatively use the source consumers below for simple operations.

Sourceval fold : ('r -> 'a -> 'r) -> 'r -> 'a t -> 'r

fold step init source reduces the values of source with the step function, starting with init.

If the step function raises an exception, the source will be properly terminated.

Sourceval len : 'a t -> int

len src is the count of elements in src.

Sourceval each : ('a -> unit) -> 'a t -> unit

each f src applies an effectful function f to all elements in src.

Resource handling

Sourceval dispose : 'a t -> unit

dispose source forces the termination of the source state. This function is useful in situations when a leftover source is produced in Stream.run.

Note: If the source is not already initialized, calling this function will first initialize its state before it is terminated.

OCaml

Innovation. Community. Security.