package kafka
OCaml bindings for Kafka
Install
Dune Dependency
Authors
Maintainers
Sources
kafka-0.5.tbz
sha256=7ec32681c104062a4fad43e2736e206128eb88273118071a044081abbc082255
sha512=7485d83cb20705f21b39c7e40cc6564cee30dba2c1993dc93c2791b4527488a33ef557e9fdaa47d3c0777986468f42460bb16c4d6d4076b1c43443f2cb5c6e3f
doc/src/kafka.helpers/kafka_consumer.ml.html
Source file kafka_consumer.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
open Kafka.Metadata (** [protect ~finally f x] calls [f x] and ensures that [finally ()] is called before returning [f x]. Adapted from http://stackoverflow.com/questions/11276985/emulating-try-with-finally-in-ocaml. *) let protect ~finally f x = let module E = struct type 'a t = Left of 'a | Right of exn end in let res = try E.Left (f x) with e -> E.Right e in let () = finally () in match res with | E.Left r -> r | E.Right e -> raise e let fold_partition ?(consumer_props = ["metadata.broker.list","localhost:9092"]) ?(topic_props = []) ?(timeout_ms = 1000) ?(stop_at_end = false) topic_name partition update offset seed = (* enable.partition.eof must be set to true to catch partition end *) let consumer_props = ("enable.partition.eof", "true")::consumer_props in let consumer = Kafka.new_consumer consumer_props in let topic = Kafka.new_topic consumer topic_name topic_props in let start_consuming () = Kafka.consume_start topic partition offset in let stop_consuming () = Kafka.consume_stop topic partition; Kafka.destroy_topic topic; Kafka.destroy_handler consumer; in let rec loop acc = match Kafka.consume ~timeout_ms topic partition with | Kafka.Message _ as msg -> loop (update acc msg) | Kafka.PartitionEnd _ as msg -> let acc = update acc msg in if stop_at_end then acc else loop acc | exception Kafka.Error(Kafka.TIMED_OUT,_) -> if stop_at_end then acc else loop acc | exception e -> raise e in start_consuming (); protect ~finally:stop_consuming loop seed let fold_queue_for_ever queue timeout_ms update seed = let rec loop acc = match Kafka.consume_queue ~timeout_ms queue with | Kafka.Message _ as msg -> loop (update acc msg) | Kafka.PartitionEnd _ as msg -> loop (update acc msg) | exception Kafka.Error(Kafka.TIMED_OUT,_) -> loop acc | exception e -> raise e in loop seed module Partition = struct type t = int let compare a b = a - b end module PartitionSet : Set.S with type elt = int = Set.Make(Partition) let fold_queue_upto_end queue timeout_ms update partitions seed = let rec loop (partition_set,acc) = match Kafka.consume_queue ~timeout_ms queue with | Kafka.Message (_,partition,_,_,_) as msg -> let partition_set = PartitionSet.add partition partition_set in loop (partition_set, update acc msg) | Kafka.PartitionEnd (_,partition,_) as msg -> let partition_set = PartitionSet.remove partition partition_set in let acc = update acc msg in if PartitionSet.is_empty partition_set then acc else loop (partition_set,acc) | exception Kafka.Error(Kafka.TIMED_OUT,_) -> loop (partition_set,acc) | exception e -> ( raise e ) in loop (PartitionSet.of_list partitions,seed) let find_offset partition_offsets partition = try List.assoc partition partition_offsets with Not_found -> 0L let fold_topic ?(consumer_props = ["metadata.broker.list","localhost:9092"]) ?(topic_props = []) ?(timeout_ms = 1000) ?(stop_at_end = false) topic_name partitions update partition_offsets seed = (* enable.partition.eof must be set to true to catch partition end *) let consumer_props = ("enable.partition.eof", "true")::consumer_props in let consumer = Kafka.new_consumer consumer_props in let topic = Kafka.new_topic consumer topic_name topic_props in let partitions = match partitions with | [] -> (Kafka.topic_metadata consumer topic).topic_partitions | _ -> partitions in let offsets = List.map (find_offset partition_offsets) partitions in let queue = Kafka.new_queue consumer in let start_consuming () = List.iter2 (Kafka.consume_start_queue queue topic) partitions offsets in let loop seed = if stop_at_end then fold_queue_upto_end queue timeout_ms update partitions seed else fold_queue_for_ever queue timeout_ms update seed; in let stop_consuming () = List.iter (Kafka.consume_stop topic) partitions; Kafka.destroy_queue queue; Kafka.destroy_topic topic; Kafka.destroy_handler consumer; in start_consuming (); protect ~finally:stop_consuming loop seed module TopicMap : Map.S with type key = string = Map.Make(String) let fold_queue ?(consumer_props = ["metadata.broker.list","localhost:9092"]) ?(topic_props = []) ?(timeout_ms = 1000) ?stop_at_end:(_stop_at_end = false) topic_partition_pairs update topic_partition_offsets seed = (* enable.partition.eof must be set to true to catch partition end *) let consumer_props = ("enable.partition.eof", "true")::consumer_props in let consumer = Kafka.new_consumer consumer_props in let topics = List.fold_left (fun acc (topic_name,_) -> if TopicMap.mem topic_name acc then acc else TopicMap.add topic_name (Kafka.new_topic consumer topic_name topic_props) acc ) TopicMap.empty topic_partition_pairs in let partitions = List.map (fun (topic_name,partition) -> (TopicMap.find topic_name topics, partition) ) topic_partition_pairs in let offsets = List.map (fun (topic_name,partition,offset) -> (TopicMap.find topic_name topics, partition,offset) ) topic_partition_offsets in let queue = Kafka.new_queue consumer in let start_consuming () = List.iter (fun (topic,partition,offset) -> Kafka.consume_start_queue queue topic partition offset) offsets; in let loop seed = fold_queue_for_ever queue timeout_ms update seed; in let stop_consuming () = List.iter (fun (topic,partition) -> Kafka.consume_stop topic partition) partitions; Kafka.destroy_queue queue; TopicMap.iter (fun _ topic -> Kafka.destroy_topic topic) topics; Kafka.destroy_handler consumer; in start_consuming (); protect ~finally:stop_consuming loop seed
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>