package kafka
OCaml bindings for Kafka
Install
Dune Dependency
Authors
Maintainers
Sources
kafka-0.5.tbz
sha256=7ec32681c104062a4fad43e2736e206128eb88273118071a044081abbc082255
sha512=7485d83cb20705f21b39c7e40cc6564cee30dba2c1993dc93c2791b4527488a33ef557e9fdaa47d3c0777986468f42460bb16c4d6d4076b1c43443f2cb5c6e3f
doc/src/kafka.helpers/kafka_producer.ml.html
Source file kafka_producer.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
type ('a,'b) sink = unit -> ('a -> unit) * (unit -> 'b) type 'a iterable = ('a -> unit) -> unit let stream_to : ('a, 'b) sink -> 'a iterable -> 'b = fun open_sink iterable -> let (push,close) = open_sink () in try iterable push ; close () with error -> ignore (close ()); raise error type 'a push_error_handler = ('a -> unit) -> 'a -> exn -> unit let retry_on_error push msg _error = push msg let raise_on_error _push _msg error = raise error let partition_sink ?(producer_props = ["metadata.broker.list","localhost:9092"]) ?(topic_props = []) ?(delivery_error_handler = raise_on_error) topic_name partition = fun () -> let producer = Kafka.new_producer producer_props in let topic = Kafka.new_topic producer topic_name topic_props in let rec push msg = let wait_and_push msg = let max_outq_len = ((Kafka.outq_len producer) * 4)/5 in Kafka.wait_delivery ~max_outq_len producer; push msg in try Kafka.produce topic partition msg with error -> delivery_error_handler wait_and_push msg error in let term () = (Kafka.wait_delivery producer; Kafka.destroy_topic topic; Kafka.destroy_handler producer) in (push,term) let topic_sink ?(producer_props = ["metadata.broker.list","localhost:9092"]) ?(topic_props = []) ?(delivery_error_handler = raise_on_error) topic_name = fun () -> let producer = Kafka.new_producer producer_props in let topic = Kafka.new_topic producer topic_name topic_props in let partition_count = List.length (Kafka.topic_metadata producer topic).Kafka.Metadata.topic_partitions in let rec push (partition,msg) = let wait_and_push p_msg = let max_outq_len = ((Kafka.outq_len producer) * 4)/5 in Kafka.wait_delivery ~max_outq_len producer; push p_msg in try Kafka.produce topic (partition mod partition_count) msg with error -> delivery_error_handler wait_and_push (partition,msg) error in let term () = (Kafka.wait_delivery producer; Kafka.destroy_topic topic; Kafka.destroy_handler producer) in (push,term)
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>