package kafka
OCaml bindings for Kafka
Install
Dune Dependency
Authors
Maintainers
Sources
kafka-0.5.tbz
sha256=7ec32681c104062a4fad43e2736e206128eb88273118071a044081abbc082255
sha512=7485d83cb20705f21b39c7e40cc6564cee30dba2c1993dc93c2791b4527488a33ef557e9fdaa47d3c0777986468f42460bb16c4d6d4076b1c43443f2cb5c6e3f
doc/src/kafka/kafka.ml.html
Source file kafka.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
type handler type topic type queue type partition = int type offset = int64 type message = | Message of topic * partition * offset * string * string option (* topic, partition, offset, payload, optional key *) | PartitionEnd of topic * partition * offset (* topic, partition, offset *) type msg_id = int type error = (* Internal errors to rdkafka: *) | BAD_MSG (* Received message is incorrect *) | BAD_COMPRESSION (* Bad/unknown compression *) | DESTROY (* Broker is going away *) | FAIL (* Generic failure *) | TRANSPORT (* Broker transport error *) | CRIT_SYS_RESOURCE (* Critical system resource failure *) | RESOLVE (* Failed to resolve broker. *) | MSG_TIMED_OUT (* Produced message timed out. *) | UNKNOWN_PARTITION (* Permanent: Partition does not exist in cluster. *) | FS (* File or filesystem error *) | UNKNOWN_TOPIC (* Permanent: Topic does not exist in cluster. *) | ALL_BROKERS_DOWN (* All broker connections are down. *) | INVALID_ARG (* Invalid argument, or invalid configuration *) | TIMED_OUT (* Operation timed out *) | QUEUE_FULL (* Queue is full *) | ISR_INSUFF (* ISR count < required.acks *) (* Standard Kafka errors: *) | UNKNOWN | OFFSET_OUT_OF_RANGE | INVALID_MSG | UNKNOWN_TOPIC_OR_PART | INVALID_MSG_SIZE | LEADER_NOT_AVAILABLE | NOT_LEADER_FOR_PARTITION | REQUEST_TIMED_OUT | BROKER_NOT_AVAILABLE | REPLICA_NOT_AVAILABLE | MSG_SIZE_TOO_LARGE | STALE_CTRL_EPOCH | OFFSET_METADATA_TOO_LARGE (* Configuration errors *) | CONF_UNKNOWN (* Unknown configuration name. *) | CONF_INVALID (* Invalid configuration value. *) exception Error of error * string let _ = Callback.register_exception "kafka.error" (Error(UNKNOWN,"msg string")); external new_consumer : (string*string) list -> handler = "ocaml_kafka_new_consumer" external new_producer : ?delivery_callback:(msg_id -> error option -> unit) -> (string*string) list -> handler = "ocaml_kafka_new_producer" external destroy_handler : handler -> unit = "ocaml_kafka_destroy_handler" external handler_name : handler -> string = "ocaml_kafka_handler_name" external new_topic : ?partitioner_callback:(int -> string-> partition) -> handler -> string -> (string*string) list -> topic = "ocaml_kafka_new_topic" external destroy_topic : topic -> unit = "ocaml_kafka_destroy_topic" external topic_name : topic -> string = "ocaml_kafka_topic_name" (* Note that the id is restricted to be some int value. While the underlying library, librdkafka, allows any void* msg_opaque data. This is to avoid issues with the garbage collector *) external produce_idmsg: topic -> partition -> ?key:string -> msg_id -> string -> unit = "ocaml_kafka_produce" let produce topic partition ?key ?(msg_id = 0) msg = produce_idmsg topic partition ?key msg_id msg external outq_len : handler -> int = "ocaml_kafka_outq_len" external poll: handler -> int -> int = "ocaml_kafka_poll" let poll_events ?(timeout_ms = 1000) handler = poll handler timeout_ms let wait_delivery ?(timeout_ms = 100) ?(max_outq_len = 0) handler = let rec loop () = if outq_len handler > max_outq_len then (ignore (poll_events ~timeout_ms handler); loop ()) else () in loop () external consume_start : topic -> partition -> offset -> unit = "ocaml_kafka_consume_start" external consume_stop : topic -> partition -> unit = "ocaml_kafka_consume_stop" let partition_unassigned = -1 let offset_beginning = -2L let offset_end = -1L let offset_stored = -1000L let offset_tail n = Int64.sub (-2000L) (Int64.of_int n) external consume : ?timeout_ms:int -> topic -> partition -> message = "ocaml_kafka_consume" external consume_batch : ?timeout_ms:int -> ?msg_count:int -> topic -> partition -> message list = "ocaml_kafka_consume_batch" external store_offset : topic -> partition -> offset -> unit = "ocaml_kafka_store_offset" external new_queue : handler -> queue = "ocaml_kafka_new_queue" external destroy_queue : queue -> unit = "ocaml_kafka_destroy_queue" external consume_start_queue : queue -> topic -> partition -> offset -> unit = "ocaml_kafka_consume_start_queue" external consume_queue : ?timeout_ms:int -> queue -> message = "ocaml_kafka_consume_queue" external consume_batch_queue : ?timeout_ms:int -> ?msg_count:int -> queue -> message list = "ocaml_kafka_consume_batch_queue" module Metadata = struct type topic_metadata = { topic_name: string; topic_partitions: partition list; } end external get_topic_metadata: handler -> topic -> int -> Metadata.topic_metadata = "ocaml_kafka_get_topic_metadata" external get_topics_metadata: handler -> bool -> int -> Metadata.topic_metadata list = "ocaml_kafka_get_topics_metadata" let topic_metadata ?(timeout_ms = 1000) handler topic = get_topic_metadata handler topic timeout_ms let local_topics_metadata ?(timeout_ms = 1000) handler = get_topics_metadata handler false timeout_ms let all_topics_metadata ?(timeout_ms = 1000) handler = get_topics_metadata handler true timeout_ms
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>