package kafka
OCaml bindings for Kafka
Install
Dune Dependency
Authors
Maintainers
Sources
0.4.tar.gz
sha256=baf6b799d20221aaeb60e7e2cca0ce82a80980ca6d355bff20024fbaed5339a5
md5=2904b2cce3c2496054bb7c01003a65e3
doc/src/kafka.lwt/kafka_lwt.ml.html
Source file kafka_lwt.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
open Kafka external consume_job : topic -> partition -> int -> message Lwt_unix.job = "ocaml_kafka_consume_job" let consume ?(timeout_ms = 1000) topic partition = Lwt_unix.run_job (consume_job topic partition timeout_ms) external consume_queue_job : queue -> int -> message Lwt_unix.job = "ocaml_kafka_consume_queue_job" let consume_queue ?(timeout_ms = 1000) queue = Lwt_unix.run_job (consume_queue_job queue timeout_ms) external consume_batch_job : topic -> partition -> int -> int -> message list Lwt_unix.job = "ocaml_kafka_consume_batch_job" let consume_batch ?(timeout_ms = 1000) ?(msg_count = 1024) topic partition = Lwt_unix.run_job (consume_batch_job topic partition timeout_ms msg_count) external consume_batch_queue_job : queue -> int -> int -> message list Lwt_unix.job = "ocaml_kafka_consume_batch_queue_job" let consume_batch_queue ?(timeout_ms = 1000) ?(msg_count = 1024) queue = Lwt_unix.run_job (consume_batch_queue_job queue timeout_ms msg_count) let pending_msg = Hashtbl.create (8*1024) let next_msg_id = let n = ref 1 in let get_next () = let id = !n in n := id + 1 ; id in get_next let produce topic partition ?key msg = let msg_id = next_msg_id () in let waiter, wakener = Lwt.wait () in Hashtbl.add pending_msg msg_id wakener; Kafka.produce topic partition ?key ~msg_id msg; waiter let delivery_callback msg_id error = try let wakener = Hashtbl.find pending_msg msg_id in Hashtbl.remove pending_msg msg_id; match error with | None -> Lwt.wakeup wakener () | Some error -> Lwt.wakeup_exn wakener (Kafka.Error (error,"Failed to produce message")) with Not_found -> () let poll_delivery period_ms producer = let timeout_s = (float_of_int period_ms) /. 1000.0 in let rec loop () = Lwt.( Lwt_unix.sleep timeout_s >>= fun () -> Kafka.poll_events ~timeout_ms:0 producer |> fun _ -> loop () ) in loop let set_option options option value = let options = List.remove_assoc option options in (option,value)::options let new_producer ?(delivery_check_period_ms=100) options = let options = set_option options "delivery.report.only.error" "false" in let producer = Kafka.new_producer ~delivery_callback options in Lwt.async (poll_delivery delivery_check_period_ms producer); producer let wait_delivery ?(timeout_ms = 100) ?(max_outq_len = 0) producer = let timeout_s = (float_of_int timeout_ms) /. 1000.0 in let rec loop () = Lwt.( if Kafka.outq_len producer > max_outq_len then Lwt_unix.sleep timeout_s >>= loop else return_unit ) in loop ()
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>