package kafka
OCaml bindings for Kafka
Install
Dune Dependency
Authors
Maintainers
Sources
0.3.2.tar.gz
sha256=709c482a2477da1790e1e8393fcf3c614ce79a2e183e3ffad4d772f3aded3858
md5=49bb99a375ed791cc9700bac18001965
doc/src/kafka.helpers/kafka_producer.ml.html
Source file kafka_producer.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
type ('a,'b) sink = unit -> ('a -> unit) * (unit -> 'b) type 'a iterable = ('a -> unit) -> unit let stream_to open_sink iterable = let (push,close) = open_sink () in try iterable push ; close () with error -> close (); raise error type 'a push_error_handler = ('a -> unit) -> 'a -> exn -> unit let retry_on_error push msg error = push msg let raise_on_error push msg error = raise error let partition_sink ?(producer_props = ["metadata.broker.list","localhost:9092"]) ?(topic_props = []) ?(delivery_error_handler = raise_on_error) topic_name partition = fun () -> let producer = Kafka.new_producer producer_props in let topic = Kafka.new_topic producer topic_name topic_props in let rec push msg = let wait_and_push msg = let max_outq_len = ((Kafka.outq_len producer) * 4)/5 in Kafka.wait_delivery ~max_outq_len producer; push msg in try Kafka.produce topic partition msg with error -> delivery_error_handler wait_and_push msg error in let term () = (Kafka.wait_delivery producer; Kafka.destroy_topic topic; Kafka.destroy_handler producer) in (push,term) let topic_sink ?(producer_props = ["metadata.broker.list","localhost:9092"]) ?(topic_props = []) ?(delivery_error_handler = raise_on_error) topic_name = fun () -> let producer = Kafka.new_producer producer_props in let topic = Kafka.new_topic producer topic_name topic_props in let partition_count = List.length (Kafka.topic_metadata producer topic).Kafka.Metadata.topic_partitions in let rec push (partition,msg) = let wait_and_push p_msg = let max_outq_len = ((Kafka.outq_len producer) * 4)/5 in Kafka.wait_delivery ~max_outq_len producer; push p_msg in try Kafka.produce topic (partition mod partition_count) msg with error -> delivery_error_handler wait_and_push (partition,msg) error in let term () = (Kafka.wait_delivery producer; Kafka.destroy_topic topic; Kafka.destroy_handler producer) in (push,term)
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>