Source file stores.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
type error += Cannot_encode_block of L2block.hash
let () =
register_error_kind
~id:"tx_rollup.node.cannot_encode_block"
~title:"An L2 block cannot be encoded"
~description:"An L2 block cannot be encoded to be stored on disk."
~pp:(fun ppf b ->
Format.fprintf
ppf
"The L2 block %a cannot be encoded to be stored on disk."
L2block.Hash.pp
b)
`Permanent
Data_encoding.(obj1 (req "block" L2block.Hash.encoding))
(function Cannot_encode_block b -> Some b | _ -> None)
(fun b -> Cannot_encode_block b)
type error += Cannot_encode_data of string
let () =
register_error_kind
~id:"tx_rollup.node.cannot_encode_data"
~title:"Data cannot be encoded"
~description:"Data cannot be encoded to be stored on disk."
~pp:(fun ppf name ->
Format.fprintf ppf "Data %s cannot be encoded to be stored on disk." name)
`Permanent
Data_encoding.(obj1 (req "name" string))
(function Cannot_encode_data n -> Some n | _ -> None)
(fun n -> Cannot_encode_data n)
type error += Cannot_write_file of string
let () =
register_error_kind
~id:"tx_rollup.node.cannot_write_file"
~title:"File cannot be written"
~description:"File cannot be written to disk."
~pp:(fun ppf name ->
Format.fprintf ppf "File %s cannot be written to disk." name)
`Permanent
Data_encoding.(obj1 (req "name" string))
(function Cannot_write_file n -> Some n | _ -> None)
(fun n -> Cannot_write_file n)
let blit ~src ~dst offset =
let len = Bytes.length src in
Bytes.blit src 0 dst offset len ;
offset + len
let bytes_set_int64 ~src ~dst offset =
Bytes.set_int64_be dst offset src ;
offset + 8
let bytes_set_int32 ~src ~dst offset =
Bytes.set_int32_be dst offset src ;
offset + 4
let bytes_set_int8 ~src ~dst offset =
Bytes.set_int8 dst offset src ;
offset + 1
let read_str str ~offset ~len decode =
let s = String.sub str offset len in
(decode s, offset + len)
let read_int64 str offset =
let i = TzEndian.get_int64_string str offset in
(i, offset + 8)
let read_int32 str offset =
let i = TzEndian.get_int32_string str offset in
(i, offset + 4)
let read_int8 str offset =
let i = TzEndian.get_int8_string str offset in
(i, offset + 1)
module type SINGLETON_STORE = sig
type t
type value
val read : t -> value option Lwt.t
val write : t -> value -> unit tzresult Lwt.t
val delete : t -> unit Lwt.t
end
module type INDEXABLE_STORE = sig
type t
type key
type value
val mem : t -> key -> bool Lwt.t
val find : t -> key -> value option Lwt.t
val add : ?flush:bool -> t -> key -> value -> unit Lwt.t
end
module type INDEXABLE_REMOVABLE_STORE = sig
include INDEXABLE_STORE
val remove : ?flush:bool -> t -> key -> unit Lwt.t
end
module Make_indexable
(K : Index.Key.S)
(V : Index.Value.S) (P : sig
val path : data_dir:string -> string
end) =
struct
module I = Index_unix.Make (K) (V) (Index.Cache.Unbounded)
type t = {index : I.t; scheduler : Lwt_idle_waiter.t}
let log_size = 10_000
let mem store k =
Lwt_idle_waiter.task store.scheduler @@ fun () ->
Lwt.return (I.mem store.index k)
let find store k =
let open Lwt_syntax in
Lwt_idle_waiter.task store.scheduler @@ fun () ->
Option.catch_os @@ fun () ->
let v = I.find store.index k in
return_some v
let add ?(flush = true) store k v =
Lwt_idle_waiter.force_idle store.scheduler @@ fun () ->
I.replace store.index k v ;
if flush then I.flush store.index ;
Lwt.return_unit
let init ~data_dir ~readonly =
let index = I.v ~log_size ~readonly (P.path ~data_dir) in
let scheduler = Lwt_idle_waiter.create () in
Lwt.return {index; scheduler}
let close store =
Lwt_idle_waiter.force_idle store.scheduler @@ fun () ->
(try I.close store.index with Index.Closed -> ()) ;
Lwt.return_unit
end
module Make_indexable_removable
(K : Index.Key.S)
(V : Index.Value.S) (P : sig
val path : data_dir:string -> string
end) =
struct
module V_opt = struct
type t = V.t option
let t = Repr.option V.t
let encoded_size = 1 + V.encoded_size
let encode v =
let dst = Bytes.create encoded_size in
let tag, value_bytes =
match v with
| None -> (0, Bytes.make V.encoded_size '\000')
| Some v -> (1, V.encode v |> Bytes.unsafe_of_string)
in
let offset = bytes_set_int8 ~dst ~src:tag 0 in
let _ = blit ~src:value_bytes ~dst offset in
Bytes.unsafe_to_string dst
let decode str offset =
let tag, offset = read_int8 str offset in
match tag with
| 0 -> None
| 1 ->
let value = V.decode str offset in
Some value
| _ -> assert false
end
include Make_indexable (K) (V_opt) (P)
let find store k =
let open Lwt_syntax in
let+ v = find store k in
match v with None | Some None -> None | Some (Some v) -> Some v
let mem store hash =
let open Lwt_syntax in
let+ b = find store hash in
Option.is_some b
let add ?flush store k v = add ?flush store k (Some v)
let remove ?(flush = true) store k =
Lwt_idle_waiter.force_idle store.scheduler @@ fun () ->
let exists = I.mem store.index k in
if not exists then Lwt.return_unit
else (
I.replace store.index k None ;
if flush then I.flush store.index ;
Lwt.return_unit)
end
module Make_singleton (S : sig
type t
val name : string
val encoding : t Data_encoding.t
end) =
struct
type t = {file : string}
let read store =
let open Lwt_syntax in
let* exists = Lwt_unix.file_exists store.file in
match exists with
| false -> return_none
| true ->
Lwt_io.with_file
~flags:[Unix.O_RDONLY; O_CLOEXEC]
~mode:Input
store.file
@@ fun channel ->
let+ bytes = Lwt_io.read channel in
Data_encoding.Binary.of_bytes_opt
S.encoding
(Bytes.unsafe_of_string bytes)
let write store x =
let open Lwt_result_syntax in
let*! res =
Lwt_utils_unix.with_atomic_open_out ~overwrite:true store.file
@@ fun fd ->
let* block_bytes =
match Data_encoding.Binary.to_bytes_opt S.encoding x with
| None -> tzfail (Cannot_encode_data S.name)
| Some bytes -> return bytes
in
let*! () = Lwt_utils_unix.write_bytes fd block_bytes in
return_unit
in
match res with
| Ok res -> Lwt.return res
| Error _ -> tzfail (Cannot_write_file S.name)
let delete store =
let open Lwt_syntax in
let* exists = Lwt_unix.file_exists store.file in
match exists with
| false -> return_unit
| true -> Lwt_unix.unlink store.file
let init ~data_dir =
let file = Filename.Infix.(Node_data.store_dir data_dir // S.name) in
Lwt.return {file}
end
module L2_block_key = struct
include L2block.Hash
let hash_size = 30
let t =
let open Repr in
map
(bytes_of (`Fixed hash_size))
(fun b -> of_bytes_exn b)
(fun bh -> to_bytes bh)
let encode bh = to_string bh
let encoded_size = size
let decode str off =
let str = String.sub str off encoded_size in
of_string_exn str
end
module L2_level_key = struct
type t = L2block.level
let to_int32 = Protocol.Alpha_context.Tx_rollup_level.to_int32
let of_int32 l =
WithExceptions.Result.get_ok ~loc:__LOC__
@@ Protocol.Alpha_context.Tx_rollup_level.of_int32 l
let t =
let open Repr in
map int32 of_int32 to_int32
let equal x y = Compare.Int32.equal (to_int32 x) (to_int32 y)
let hash = Stdlib.Hashtbl.hash
let hash_size = 30
let encoded_size = 4
let encode l =
let b = Bytes.create encoded_size in
TzEndian.set_int32 b 0 (to_int32 l) ;
Bytes.unsafe_to_string b
let decode str i = TzEndian.get_int32_string str i |> of_int32
end
module Operation_key = struct
include Operation_hash
let hash_size = 30
let t =
let open Repr in
map
(bytes_of (`Fixed hash_size))
(fun b -> of_bytes_exn b)
(fun bh -> to_bytes bh)
end
module Commitment_key = struct
include Protocol.Alpha_context.Tx_rollup_commitment_hash
let hash = Stdlib.Hashtbl.hash
let hash_size = 30
let t =
let open Repr in
map
(bytes_of (`Fixed hash_size))
(fun b -> of_bytes_exn b)
(fun bh -> to_bytes bh)
let encode bh = Bytes.unsafe_to_string (to_bytes bh)
let encoded_size = size
let decode str off =
let str = String.sub str off encoded_size in
of_bytes_exn (Bytes.unsafe_of_string str)
end
module L2_block_info = struct
type t = {
offset : int;
predecessor : L2block.hash option;
context : Protocol.Tx_rollup_l2_context_hash.t;
}
let encoded_size =
8 + L2block.Hash.size + Protocol.Tx_rollup_l2_context_hash.size
let l2_context_hash_repr =
let open Repr in
map
(bytes_of (`Fixed 31))
(fun c -> Protocol.Tx_rollup_l2_context_hash.of_bytes_exn c)
(fun ch -> Protocol.Tx_rollup_l2_context_hash.to_bytes ch)
let t =
let open Repr in
map
(triple int (option L2_block_key.t) l2_context_hash_repr)
(fun (offset, predecessor, context) -> {offset; predecessor; context})
(fun {offset; predecessor; context} -> (offset, predecessor, context))
let encode v =
let dst = Bytes.create encoded_size in
let offset = bytes_set_int64 ~src:(Int64.of_int v.offset) ~dst 0 in
let pred_bytes =
match v.predecessor with
| None -> L2block.Hash.(to_bytes zero)
| Some b -> L2block.Hash.to_bytes b
in
let offset = blit ~src:pred_bytes ~dst offset in
let _ =
blit
~src:(Protocol.Tx_rollup_l2_context_hash.to_bytes v.context)
~dst
offset
in
Bytes.unsafe_to_string dst
let decode str offset =
let file_offset, offset = read_int64 str offset in
let predecessor, offset =
read_str str ~offset ~len:L2block.Hash.size L2block.Hash.of_string_exn
in
let predecessor =
if L2block.Hash.(predecessor = zero) then None else Some predecessor
in
let context, _ =
read_str
str
~offset
~len:Protocol.Tx_rollup_l2_context_hash.size
(fun s ->
Bytes.unsafe_of_string s
|> Protocol.Tx_rollup_l2_context_hash.of_bytes_exn)
in
{offset = Int64.to_int file_offset; predecessor; context}
end
module Tezos_block_info = struct
type t = {
l2_block : L2block.hash option;
level : int32;
predecessor : Block_hash.t;
}
let t =
let open Repr in
map
(triple (option L2_block_key.t) int32 Tezos_store_shared.Block_key.t)
(fun (l2_block, level, predecessor) -> {l2_block; level; predecessor})
(fun {l2_block; level; predecessor} -> (l2_block, level, predecessor))
let encoded_size = L2block.Hash.size + 4 + Block_hash.size
let encode v =
let dst = Bytes.create encoded_size in
let l2_block_bytes =
match v.l2_block with
| Some b -> L2block.Hash.to_bytes b
| None -> L2block.Hash.(to_bytes zero)
in
let offset = blit ~src:l2_block_bytes ~dst 0 in
let offset = bytes_set_int32 ~dst ~src:v.level offset in
let _ = blit ~src:(Block_hash.to_bytes v.predecessor) ~dst offset in
Bytes.unsafe_to_string dst
let decode str offset =
let l2_block, offset =
read_str str ~offset ~len:L2block.Hash.size @@ fun s ->
let block_hash = L2block.Hash.of_string_exn s in
if L2block.Hash.(block_hash = zero) then None else Some block_hash
in
let level, offset = read_int32 str offset in
let predecessor, _ =
read_str str ~offset ~len:Block_hash.size Block_hash.of_string_exn
in
{l2_block; level; predecessor}
end
module Commitment_info = struct
type t = {block : Block_hash.t; operation : Operation_hash.t}
let t =
let open Repr in
map
(pair Tezos_store_shared.Block_key.t Operation_key.t)
(fun (block, operation) -> {block; operation})
(fun {block; operation} -> (block, operation))
let encoded_size = Block_hash.size + Operation_hash.size
let encode v =
let dst = Bytes.create encoded_size in
let offset = blit ~src:(Block_hash.to_bytes v.block) ~dst 0 in
let _ = blit ~src:(Operation_hash.to_bytes v.operation) ~dst offset in
Bytes.unsafe_to_string dst
let decode str offset =
let block, offset =
read_str str ~offset ~len:Block_hash.size Block_hash.of_string_exn
in
let operation, _ =
read_str str ~offset ~len:Operation_hash.size Operation_hash.of_string_exn
in
{block; operation}
end
module Tezos_block_store = struct
type value = Tezos_block_info.t = {
l2_block : L2block.hash option;
level : int32;
predecessor : Block_hash.t;
}
include
Make_indexable (Tezos_store_shared.Block_key) (Tezos_block_info)
(struct
let path ~data_dir = Node_data.tezos_blocks_index data_dir
end)
end
module Level_store =
Make_indexable_removable (L2_level_key) (L2_block_key)
(struct
let path ~data_dir = Node_data.levels_index data_dir
end)
module Commitment_store = struct
type value = Commitment_info.t = {
block : Block_hash.t;
operation : Operation_hash.t;
}
include
Make_indexable_removable (Commitment_key) (Commitment_info)
(struct
let path ~data_dir = Node_data.commitments_index data_dir
end)
end
module L2_block_store = struct
open L2_block_info
module Cache =
Aches_lwt.Lache.Make_option
(Aches.Rache.Transfer (Aches.Rache.LRU) (L2block.Hash))
module L2_block_index =
Index_unix.Make (L2_block_key) (L2_block_info) (Index.Cache.Unbounded)
module L2_blocks_file = struct
let encoding = Data_encoding.dynamic_size ~kind:`Uint30 L2block.encoding
let pread_block_exn fd ~file_offset =
let open Lwt_syntax in
let length_bytes = Bytes.create 4 in
let* () =
Lwt_utils_unix.read_bytes ~file_offset ~pos:0 ~len:4 fd length_bytes
in
let block_length_int32 = Bytes.get_int32_be length_bytes 0 in
let block_length = Int32.to_int block_length_int32 in
let block_bytes = Bytes.extend length_bytes 0 block_length in
let* () =
Lwt_utils_unix.read_bytes
~file_offset:(file_offset + 4)
~pos:4
~len:block_length
fd
block_bytes
in
Lwt.return
( Data_encoding.Binary.of_bytes_exn encoding block_bytes,
4 + block_length )
let pread_block fd ~file_offset =
Option.catch_s (fun () -> pread_block_exn fd ~file_offset)
end
type t = {
index : L2_block_index.t;
fd : Lwt_unix.file_descr;
scheduler : Lwt_idle_waiter.t;
cache : L2block.t Cache.t;
}
let blocks_log_size = 10_000
let mem store hash =
Lwt_idle_waiter.task store.scheduler @@ fun () ->
Lwt.return (L2_block_index.mem store.index hash)
let predecessor store hash =
Lwt_idle_waiter.task store.scheduler @@ fun () ->
try
let {predecessor; _} = L2_block_index.find store.index hash in
Lwt.return predecessor
with Not_found -> Lwt.return_none
let context store hash =
Lwt_idle_waiter.task store.scheduler @@ fun () ->
try
let {context; _} = L2_block_index.find store.index hash in
Lwt.return_some context
with Not_found -> Lwt.return_none
let read_block store hash =
let open Lwt_syntax in
Lwt_idle_waiter.task store.scheduler @@ fun () ->
Option.catch_os @@ fun () ->
let read_from_disk hash =
let {offset; _} = L2_block_index.find store.index hash in
let* o = L2_blocks_file.pread_block store.fd ~file_offset:offset in
match o with
| Some (block, _) -> Lwt.return_some block
| None -> Lwt.return_none
in
Cache.bind_or_put store.cache hash read_from_disk Lwt.return
let locked_write_block store ~offset ~block ~hash =
let open Lwt_result_syntax in
let* block_bytes =
match Data_encoding.Binary.to_bytes_opt L2_blocks_file.encoding block with
| None -> tzfail (Cannot_encode_block hash)
| Some bytes -> return bytes
in
let block_length = Bytes.length block_bytes in
let*! () =
Lwt_utils_unix.write_bytes ~pos:0 ~len:block_length store.fd block_bytes
in
L2_block_index.replace
store.index
hash
{
offset;
predecessor = block.header.predecessor;
context = block.header.context;
} ;
return block_length
let append_block ?(flush = true) store (block : L2block.t) =
let open Lwt_syntax in
Lwt_idle_waiter.force_idle store.scheduler @@ fun () ->
let hash = block.hash in
Cache.put store.cache hash (return_some block) ;
let* offset = Lwt_unix.lseek store.fd 0 Unix.SEEK_END in
let* _written_len = locked_write_block store ~offset ~block ~hash in
if flush then L2_block_index.flush store.index ;
Lwt.return_unit
let init ~data_dir ~readonly ~cache_size =
let open Lwt_syntax in
let flag, perms =
if readonly then (Unix.O_RDONLY, 0o444) else (Unix.O_RDWR, 0o644)
in
let* fd =
Lwt_unix.openfile
(Node_data.l2blocks_data data_dir)
[Unix.O_CREAT; O_CLOEXEC; flag]
perms
in
let index =
L2_block_index.v
~log_size:blocks_log_size
~readonly
(Node_data.l2blocks_index data_dir)
in
let scheduler = Lwt_idle_waiter.create () in
let cache = Cache.create cache_size in
Lwt.return {index; fd; scheduler; cache}
let close store =
let open Lwt_syntax in
Lwt_idle_waiter.force_idle store.scheduler @@ fun () ->
(try L2_block_index.close store.index with Index.Closed -> ()) ;
let* _ignore = Lwt_utils_unix.safe_close store.fd in
Lwt.return_unit
end
module Head_store = Make_singleton (struct
type t = L2block.hash
let name = "head"
let encoding = L2block.Hash.encoding
end)
module Tezos_head_store = Make_singleton (struct
type t = Block_hash.t
let name = "tezos_head"
let encoding = Block_hash.encoding
end)
type rollup_info = {
rollup_id : Protocol.Alpha_context.Tx_rollup.t;
origination_level : int32 option;
}
module Rollup_info_store = Make_singleton (struct
type t = rollup_info
let name = "rollup_info"
let encoding =
let open Data_encoding in
conv
(fun {rollup_id; origination_level} -> (rollup_id, origination_level))
(fun (rollup_id, origination_level) -> {rollup_id; origination_level})
@@ obj2
(req "rollup_id" Protocol.Alpha_context.Tx_rollup.encoding)
(opt "origination_level" int32)
end)
module Finalized_level_store = Make_singleton (struct
type t = Protocol.Alpha_context.Tx_rollup_level.t
let name = "finalized_level"
let encoding = Protocol.Alpha_context.Tx_rollup_level.encoding
end)
type t = {
blocks : L2_block_store.t;
tezos_blocks : Tezos_block_store.t;
levels : Level_store.t;
commitments : Commitment_store.t;
head : Head_store.t;
tezos_head : Tezos_head_store.t;
rollup_info : Rollup_info_store.t;
finalized_level : Finalized_level_store.t;
}
let init ~data_dir ~readonly ~blocks_cache_size =
let open Lwt_syntax in
let* () = Node_data.mk_store_dir data_dir in
let* blocks =
L2_block_store.init ~data_dir ~readonly ~cache_size:blocks_cache_size
and* tezos_blocks = Tezos_block_store.init ~data_dir ~readonly
and* levels = Level_store.init ~data_dir ~readonly
and* commitments = Commitment_store.init ~data_dir ~readonly
and* head = Head_store.init ~data_dir
and* tezos_head = Tezos_head_store.init ~data_dir
and* rollup_info = Rollup_info_store.init ~data_dir
and* finalized_level = Finalized_level_store.init ~data_dir in
return
{
blocks;
tezos_blocks;
commitments;
levels;
head;
tezos_head;
rollup_info;
finalized_level;
}
let close stores =
let open Lwt_syntax in
let* () = L2_block_store.close stores.blocks
and* () = Tezos_block_store.close stores.tezos_blocks
and* () = Level_store.close stores.levels
and* () = Commitment_store.close stores.commitments in
return_unit