package picos

  1. Overview
  2. Docs
Pico scheduler interface

Install

Dune Dependency

Authors

Maintainers

Sources

picos-0.4.0.tbz
sha256=343a8b4759239ca0c107145b8e2cc94c14625fecc0b0887d3c40a9ab7537b8da
sha512=db22b0a5b3adc603c0e815c9011c779f892b9ace76be018b2198d3e24a7d96727c999701025fe5a5fd07d0b452cb7286fc50c939aba0e4dce809941e9ebc12a6

doc/picos.structured/Picos_structured/index.html

Module Picos_structuredSource

Basic structured concurrency primitives for Picos.

This library essentially provides one application programming interface for structuring fibers with any Picos compatible scheduler. This library is both meant to serve as an example of what can be done and to also provide practical means for programming with fibers. Hopefully there will be many more libraries implemented in Picos like this providing different approaches, patterns, and idioms for structuring concurrent programs.

For the examples we open some modules:

  open Picos_structured.Finally
  open Picos_structured
  open Picos_stdio
  open Picos_sync

Modules

Sourcemodule Finally : sig ... end

Syntax for avoiding resource leaks.

Sourcemodule Control : sig ... end

Basic control operations and exceptions for structured concurrency.

Sourcemodule Promise : sig ... end

A cancelable promise.

Sourcemodule Bundle : sig ... end

A dynamic bundle of fibers guaranteed to be joined at the end.

Sourcemodule Run : sig ... end

Operations for running fibers in specific patterns.

Examples

Understanding cancelation

Consider the following program:

  let main () =
    Bundle.join_after begin fun bundle ->
      let promise =
        Bundle.fork_as_promise bundle
        @@ fun () -> Control.block ()
      in

      Bundle.fork bundle begin fun () ->
        Promise.await promise
      end;

      Bundle.fork bundle begin fun () ->
        let condition = Condition.create ()
        and mutex = Mutex.create () in
        Mutex.protect mutex begin fun () ->
          while true do
            Condition.wait condition mutex
          done
        end
      end;

      Bundle.fork bundle begin fun () ->
        let@ inn, out = finally
          Unix.close_pair @@ fun () ->
          Unix.socketpair ~cloexec:true
            PF_UNIX SOCK_STREAM 0
        in
        Unix.set_nonblock inn;
        let n =
          Unix.read inn (Bytes.create 1)
            0 1
        in
        assert (n = 1)
      end;

      Bundle.fork bundle begin fun () ->
        let a_month =
          60.0 *. 60.0 *. 24.0 *. 30.0
        in
        Control.sleep ~seconds:a_month
      end;

      (* Let the children get stuck *)
      Control.yield ();

      Bundle.terminate bundle
    end

First of all, note that above the Mutex and Condition modules come from the Picos_sync library and the Unix module comes from the Picos_stdio library. They do not come from the standard OCaml libraries.

The above program creates a bundle of fibers and forks several fibers to the bundle that all block in various ways. In detail,

Fibers forked to a bundle can be canceled in various ways. In the above program we call Bundle.terminate to cancel all the fibers and effectively close the bundle. This allows the program to return normally immediately and without leaking or leaving anything in an invalid state:

  # Picos_fifos.run main
  - : unit = ()

Now, the point of the above example isn't that you should just call terminate when your program gets stuck. 😅

What the above example hopefully demonstrates is that concurrent abstractions like mutexes and condition variables, asynchronous IO libraries, and others can be designed to support cancelation.

Cancelation is a control flow mechanism that allows structured concurrent abstractions, like the Bundle abstraction, to (hopefully) gracefully tear down concurrent fibers in case of errors. Indeed, one of the basic ideas behind the Bundle abstraction is that in case any fiber forked to the bundle raises an unhandled exception, the whole bundle will be terminated and the error will raised from the bundle, which allows you to understand what went wrong, instead of having to debug a program that mysteriously gets stuck, for example.

Cancelation can also, with some care, be used as a mechanism to terminate fibers once they are no longer needed. However, just like sleep, for example, cancelation is inherently prone to races, i.e. it is difficult to understand the exact point and state at which a fiber gets canceled and it is usually non-deterministic, and therefore cancelation is not recommended for use as a general synchronization or communication mechanism.

A simple echo server and clients

Let's build a simple TCP echo server and run it with some clients.

We first define a function for the server:

  let run_server server_fd =
    Unix.listen server_fd 8;

    Bundle.join_after begin fun bundle ->
      while true do
        let^ client_fd =
          finally Unix.close @@ fun () ->
          Unix.accept
            ~cloexec:true server_fd |> fst
        in

        (* Fork a fiber for client *)
        Bundle.fork bundle begin fun () ->
          let@ client_fd =
            move client_fd
          in
          Unix.set_nonblock client_fd;

          let bs = Bytes.create 100 in
          let n =
            Unix.read client_fd bs 0
              (Bytes.length bs)
          in
          Unix.write client_fd bs 0 n
          |> ignore
        end
      done
    end

The server function expects a bound socket and starts listening. For each accepted client the server forks a new fiber to handle it. The client socket is moved from the server fiber to the client fiber to avoid leaks and to ensure that the socket will be closed.

Let's then define a function for the clients:

  let run_client server_addr =
    let@ socket =
      finally Unix.close @@ fun () ->
      Unix.socket ~cloexec:true
        PF_INET SOCK_STREAM 0
    in
    Unix.set_nonblock socket;
    Unix.connect socket server_addr;

    let msg = "Hello!" in
    Unix.write_substring
      socket msg 0 (String.length msg)
    |> ignore;

    let bytes =
      Bytes.create (String.length msg)
    in
    let n =
      Unix.read socket bytes 0
        (Bytes.length bytes)
    in

    Printf.printf "Received: %s\n%!"
      (Bytes.sub_string bytes 0 n)

The client function takes the address of the server and connects a socket to the server address. It then writes a message to the server and reads a reply from the server and prints it.

Here is the main program:

  let main () =
    let@ server_fd =
      finally Unix.close @@ fun () ->
      Unix.socket ~cloexec:true
        PF_INET SOCK_STREAM 0
    in
    Unix.set_nonblock server_fd;

    (* Let system determine the port *)
    Unix.bind server_fd Unix.(
      ADDR_INET(inet_addr_loopback, 0));

    let server_addr =
      Unix.getsockname server_fd
    in

    Bundle.join_after begin fun bundle ->
      (* Start server *)
      let server =
        Bundle.fork_as_promise bundle
        @@ fun () -> run_server server_fd
      in

      (* Run clients concurrently *)
      Bundle.join_after begin fun bundle ->
        for _ = 1 to 5 do
          Bundle.fork bundle @@ fun () ->
            run_client server_addr
        done
      end;

      (* Stop server *)
      Promise.terminate server
    end

The main program creates a socket for the server and binds it. The server is then started as a new fiber. Then the clients are started to run concurrently. Finally the server is terminated.

Finally we run the main program with a scheduler:

  # Picos_fifos.run main
  Received: Hello!
  Received: Hello!
  Received: Hello!
  Received: Hello!
  Received: Hello!
  - : unit = ()

As an exercise, you might want to refactor the server to avoid moving the file descriptors and use a recursive accept loop instead. You could also terminate the whole bundle at the end instead of just terminating the server.

OCaml

Innovation. Community. Security.