Source file positional.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
open Core
open Async
open Shared
module Row = Delimited_kernel.Read.Row
let ~strict =
let = List.sort header ~compare:(fun (_, a, _) (_, b, _) -> Int.compare a b) in
let = String.Table.create () in
let col2str (name, pos, len) = sprintf "(name: %s, start:%i, length:%i)" name pos len in
let rec loop i l =
match l with
| [] -> Ok (header, header_index)
| [ (name, _, _) ] ->
(match Hashtbl.add header_index ~key:name ~data:i with
| `Ok -> Ok (header, header_index)
| `Duplicate -> Or_error.error_string ("Duplicate column name: " ^ name))
| (name1, pos1, len1) :: (name2, pos2, len2) :: l ->
if pos1 + len1 > pos2
then
Or_error.error_string
("Overlapping columns :"
^ col2str (name1, pos1, len1)
^ col2str (name2, pos2, len2))
else if strict && pos1 + len1 <> pos2
then
Or_error.error_string
("Gap between columns :"
^ col2str (name1, pos1, len1)
^ col2str (name2, pos2, len2))
else (
match Hashtbl.add header_index ~key:name1 ~data:i with
| `Ok -> loop (i + 1) ((name2, pos2, len2) :: l)
| `Duplicate -> Or_error.error_string ("Duplicate column name: " ^ name1))
in
loop 0 header
;;
let of_reader
?(strip = false)
?(skip_lines = 0)
?(on_parse_error = `Raise)
~
?(strict = true)
reader
=
match process_header header ~strict with
| Error e -> Error e
| Ok (, ) ->
let pipe_r, pipe_w = Pipe.create () in
let n_cols = List.length header in
let parse_line line =
let data = Array.create ~len:n_cols "" in
try
List.iter header ~f:(fun (name, pos, len) ->
let i = Hashtbl.find_exn header_index name in
if strip
then data.(i) <- String.strip (String.sub line ~pos ~len)
else data.(i) <- String.sub line ~pos ~len);
Ok (Row.create header_index data)
with
| e -> Error e
in
let close () =
don't_wait_for (Reader.close reader);
Pipe.close pipe_w
in
let rec loop () =
Reader.read_line reader
>>> function
| `Eof -> close ()
| `Ok line ->
(match parse_line line with
| Ok row -> Pipe.write pipe_w row >>> loop
| Error e ->
(match on_parse_error with
| `Raise ->
close ();
raise e
| `Handle f ->
(match f (Queue.create ()) e with
| `Continue -> loop ()
| `Finish -> close ())))
in
upon (drop_lines reader skip_lines) loop;
Ok pipe_r
;;
let create_reader ?strip ?skip_lines ?on_parse_error ~ ?strict filename =
let%map r = Reader.open_file filename in
of_reader ?strip ?skip_lines ?on_parse_error ~header ?strict r
;;
let rec write_line w next_pos line =
match header, line with
| [], [] -> Writer.write w "\r\n"
| (_, pos, len) :: , field :: line ->
let pre_fill =
if next_pos < pos then String.init (pos - next_pos) ~f:(const ' ') else ""
in
let fill =
if String.length field < len
then String.init (len - String.length field) ~f:(const ' ')
else ""
in
Writer.write w (pre_fill ^ field ^ fill);
write_line w header (pos + len) line
| [], _ -> raise (Invalid_argument "Too many fields given")
| _, [] -> raise (Invalid_argument "Too few fields given")
;;
let of_writer writer ?(strict = true) =
match process_header header ~strict with
| Error e -> Error e
| Ok (, _) ->
let pipe_r, pipe_w = Pipe.create () in
don't_wait_for (Pipe.iter_without_pushback pipe_r ~f:(write_line writer header 0));
upon (Pipe.closed pipe_w) (fun () -> don't_wait_for (Writer.close writer));
Ok pipe_w
;;
let create_writer filename ?strict =
let%map w = Writer.open_file filename in
of_writer w header ?strict
;;