Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
mpmc_relaxed_queue.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
include Saturn_lockfree.Relaxed_queue module Spin = struct let push = push let pop = pop end (* [ccas] A slightly nicer CAS. Tries without taking microarch lock first. Use on indices. *) let ccas cell seen v = if Atomic.get cell != seen then false else Atomic.compare_and_set cell seen v module Not_lockfree = struct (* [spin_threshold] Number of times on spin on a slot before trying an exit strategy. *) let spin_threshold = 30 (* [try_other_exit_every_n] There is two strategies that push/pop can take to fix state ( to be able to return without completion). Generally, we want to try to do "rollback" more than "push forward", as the latter adds contention to the side that might already not be keeping up. *) let try_other_exit_every_n = 10 let time_to_try_push_forward n = n mod try_other_exit_every_n == 0 let push { array; tail; head; mask; _ } item = let tail_val = Atomic.fetch_and_add tail 1 in let index = tail_val land mask in let cell = Array.get array index in (* spin for a bit *) let i = ref 0 in while !i < spin_threshold && not (Atomic.compare_and_set cell None (Some item)) do i := !i + 1 done; (* define clean up function *) let rec take_or_rollback nth_attempt = if Atomic.compare_and_set cell None (Some item) then (* succedded to push *) true else if ccas tail (tail_val + 1) tail_val then (* rolled back tail *) false else if time_to_try_push_forward nth_attempt && ccas head tail_val (tail_val + 1) then (* pushed forward head *) false else (* retry *) take_or_rollback (nth_attempt + 1) in (* if succeeded return true otherwise clean up *) if !i < spin_threshold then true else take_or_rollback 0 let take_item cell = let value = Atomic.get cell in if Option.is_some value && Atomic.compare_and_set cell value None then value else None let pop queue = let ({ array; head; tail; mask; _ } : 'a t) = queue in let head_value = Atomic.get head in let tail_value = Atomic.get tail in if head_value - tail_value >= 0 then None else let old_head = Atomic.fetch_and_add head 1 in let cell = Array.get array (old_head land mask) in (* spin for a bit *) let i = ref 0 in let item = ref None in while !i < spin_threshold && not (Option.is_some !item) do item := take_item cell; i := !i + 1 done; (* define clean up function *) let rec take_or_rollback nth_attempt = let value = Atomic.get cell in if Option.is_some value && Atomic.compare_and_set cell value None then (* dequeued an item, return it *) value else if ccas head (old_head + 1) old_head then (* rolled back head *) None else if time_to_try_push_forward nth_attempt && ccas tail old_head (old_head + 1) then (* pushed tail forward *) None else take_or_rollback (nth_attempt + 1) in (* return if got item, clean up otherwise *) if Option.is_some !item then !item else take_or_rollback 0 module CAS_interface = struct let rec push ({ array; tail; head; mask; _ } as t) item = let tail_val = Atomic.get tail in let head_val = Atomic.get head in let size = mask + 1 in if tail_val - head_val >= size then false else if ccas tail tail_val (tail_val + 1) then ( let index = tail_val land mask in let cell = Array.get array index in (* Given that code above checks for overlap, is this CAS needed? Yes. Even though a thread cannot explicitely enter overlap, it can still occur just because enqueuer may theoretically be unscheduled for unbounded amount of time between incrementing index and filling the slot. I doubt we'd observe that case in real-life (outside some extreme circumstances), but this optimization has to be left for the user to decide. After all, algorithm would not pass model-checking without it. Incidentally, it also makes this method interoperable with standard interface. *) while not (Atomic.compare_and_set cell None (Some item)) do () done; true) else push t item let rec pop ({ array; tail; head; mask; _ } as t) = let tail_val = Atomic.get tail in let head_val = Atomic.get head in if head_val - tail_val >= 0 then None else if ccas head head_val (head_val + 1) then ( let index = head_val land mask in let cell = Array.get array index in let item = ref (Atomic.get cell) in while not (Option.is_some !item && Atomic.compare_and_set cell !item None) do item := Atomic.get cell done; !item) else pop t end end