package core_extended

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file shared.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
open Core

(* the maximum read/write I managed to get off of a socket or disk was 65k *)
let buffer_size = 10 * 65 * 1024

type ('a, 'b) reader =
  ?strip:bool
  -> ?skip_lines:int
  -> ?on_parse_error:
       [ `Raise | `Handle of string Queue.t -> exn -> [ `Continue | `Finish ] ]
  -> header:'a
  -> 'b

let strip_buffer buf =
  let len = Buffer.length buf in
  let rec first_non_space n =
    if n >= len
    then None
    else if Char.is_whitespace (Buffer.nth buf n)
    then first_non_space (n + 1)
    else Some n
  in
  let rec last_non_space n =
    if n < 0
    then None
    else if Char.is_whitespace (Buffer.nth buf n)
    then last_non_space (n - 1)
    else Some n
  in
  match first_non_space 0 with
  | None -> ""
  | Some s ->
    (match last_non_space (len - 1) with
     | None -> assert false
     | Some e -> Buffer.To_string.sub buf ~pos:s ~len:(e - s + 1))
;;

let make_emit_field ~strip current_row field =
  stage (fun () ->
    Queue.enqueue
      current_row
      (if strip then strip_buffer field else Buffer.contents field);
    Buffer.clear field)
;;

let set_headers header_index headers =
  List.iteri headers ~f:(fun i h ->
    Option.iter h ~f:(fun h ->
      match Hashtbl.find header_index h with
      | None -> Hashtbl.set header_index ~key:h ~data:i
      | Some other_i ->
        failwithf "header %s duplicated at position %i and %i" h other_i i ()))
;;

let make_emit_row current_row row_queue header ~lineno =
  let module Table = String.Table in
  let header_index =
    match (header : Header.t) with
    | `No | `Yes | `Require _ | `Transform _ | `Filter_map _ -> Table.create () ~size:1
    | `Replace headers | `Add headers ->
      Table.of_alist_exn (List.mapi headers ~f:(fun i s -> s, i))
  in
  let header_processed =
    ref
      (match header with
       | `No | `Add _ -> true
       | `Require _ | `Replace _ | `Transform _ | `Filter_map _ | `Yes -> false)
  in
  ( `on_eof (fun () -> if not !header_processed then failwith "Header line was not found")
  , fun () ->
      if not !header_processed
      then (
        header_processed := true;
        match header with
        | `No | `Add _ -> assert false
        | `Require at_least ->
          let headers = Queue.to_list current_row in
          List.iter at_least ~f:(fun must_exist ->
            match List.findi headers ~f:(fun _ h -> String.equal h must_exist) with
            | None ->
              failwithf
                "The required header '%s' was not found in '%s' (lineno=%d)"
                must_exist
                (String.concat ~sep:"," headers)
                !lineno
                ()
            | Some (i, _) -> Hashtbl.set header_index ~key:must_exist ~data:i)
        | `Replace _new_headers -> () (* already set above *)
        | `Transform f ->
          Queue.to_list current_row
          |> f
          |> List.map ~f:Option.some
          |> set_headers header_index
        | `Filter_map f -> Queue.to_list current_row |> f |> set_headers header_index
        | `Yes ->
          Queue.to_list current_row |> List.map ~f:Option.some |> set_headers header_index)
      else Queue.enqueue row_queue (Row.create header_index (Queue.to_array current_row));
      lineno := !lineno + 1;
      Queue.clear current_row )
;;
OCaml

Innovation. Community. Security.