package mqtt
- Overview
- No Docs
You can search for identifiers within the package.
in-package search v0.2.0
OCaml MQTT client
Install
Dune Dependency
Authors
Maintainers
Sources
0.2.2.tar.gz
md5=1af0b40f0d73e2ed69b515a7c43d2e5f
sha512=f2640262b929bece15b51abd3bfcd708eb495fb7371c23e30c2a69f12a1633385d8de90153ee11aaeacb71b3f782f59b3a2f1ce9d783b20c959d9ce6f2f9d96a
doc/src/mqtt.client/Mqtt_packet.ml.html
Source file Mqtt_packet.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
module BE = EndianBytes.BigEndian open Mqtt_core let _msgid = ref 0 let gen_id () = let () = incr _msgid in if !_msgid >= 0xFFFF then _msgid := 1; !_msgid let int16be n = let s = Bytes.create 2 in BE.set_int16 s 0 n; s let int8be n = let s = Bytes.create 1 in BE.set_int8 s 0 n; s type messages = | Connect_pkt | Connack_pkt | Publish_pkt | Puback_pkt | Pubrec_pkt | Pubrel_pkt | Pubcomp_pkt | Subscribe_pkt | Suback_pkt | Unsubscribe_pkt | Unsuback_pkt | Pingreq_pkt | Pingresp_pkt | Disconnect_pkt type cxn_flags = Will_retain | Will_qos of qos | Clean_session type cxn_data = { clientid : string; credentials : credentials option; will : (string * string) option; flags : cxn_flags list; keep_alive : int; } type client_options = { ping_timeout : float; cxn_data : cxn_data } type connection_status = | Accepted | Unacceptable_protocol_version | Identifier_rejected | Bad_username_or_password let connection_status_to_string = function | Accepted -> "Accepted" | Unacceptable_protocol_version -> "Unacceptable_protocol_version" | Identifier_rejected -> "Identifier_rejected" | Server_unavailable -> "Server_unavailable" | Bad_username_or_password -> "Bad_username_or_password" | Not_authorized -> "Not_authorized" let connection_status_to_int = function | Accepted -> 0 | Unacceptable_protocol_version -> 1 | Identifier_rejected -> 2 | Server_unavailable -> 3 | Bad_username_or_password -> 4 | Not_authorized -> 5 let connection_status_of_int = function | 0 -> Accepted | 1 -> Unacceptable_protocol_version | 2 -> Identifier_rejected | 3 -> Server_unavailable | 4 -> Bad_username_or_password | 5 -> Not_authorized | _ -> raise (Invalid_argument "Invalid connection status code") type t = | Connect of cxn_data | Connack of { session_present : bool; connection_status : connection_status } | Subscribe of (int * (string * qos) list) | Suback of (int * (qos, unit) result list) | Unsubscribe of (int * string list) | Unsuback of int | Publish of (int option * string * string) | Puback of int | Pubrec of int | Pubrel of int | Pubcomp of int | Pingreq | Pingresp | Disconnect type options = bool * qos * bool let bits_of_message = function | Connect_pkt -> 1 | Connack_pkt -> 2 | Publish_pkt -> 3 | Puback_pkt -> 4 | Pubrec_pkt -> 5 | Pubrel_pkt -> 6 | Pubcomp_pkt -> 7 | Subscribe_pkt -> 8 | Suback_pkt -> 9 | Unsubscribe_pkt -> 10 | Unsuback_pkt -> 11 | Pingreq_pkt -> 12 | Pingresp_pkt -> 13 | Disconnect_pkt -> 14 let message_of_bits = function | 1 -> Connect_pkt | 2 -> Connack_pkt | 3 -> Publish_pkt | 4 -> Puback_pkt | 5 -> Pubrec_pkt | 6 -> Pubrel_pkt | 7 -> Pubcomp_pkt | 8 -> Subscribe_pkt | 9 -> Suback_pkt | 10 -> Unsubscribe_pkt | 11 -> Unsuback_pkt | 12 -> Pingreq_pkt | 13 -> Pingresp_pkt | 14 -> Disconnect_pkt | _ -> raise (Invalid_argument "invalid bits in message") let bits_of_qos = function | Atmost_once -> 0 | Atleast_once -> 1 | Exactly_once -> 2 let qos_of_bits = function | 0 -> Atmost_once | 1 -> Atleast_once | 2 -> Exactly_once | b -> raise (Invalid_argument ("invalid qos number: " ^ string_of_int b)) let suback_qos_of_bits = function 0x80 -> Error () | b -> Ok (qos_of_bits b) let bit_of_bool = function true -> 1 | false -> 0 let bool_of_bit = function | 1 -> true | 0 -> false | n -> raise (Invalid_argument ("expected zero or one, but got " ^ string_of_int n)) let trunc str = (* truncate leading zeroes *) let len = String.length str in let rec loop count = if count >= len || str.[count] <> '\000' then count else loop (count + 1) in let leading = loop 0 in if leading = len then "\000" else String.sub str leading (len - leading) let addlen s = let len = String.length s in if len > 0xFFFF then raise (Invalid_argument "string too long"); Bytes.to_string (int16be len) ^ s let opt_with s n = function Some a -> s a | None -> n let puback id = Puback id let pubrec id = Pubrec id let pubcomp id = Pubcomp id module Encoder = struct let encode_length len = let rec loop ll digits = if ll <= 0 then digits else let incr = Int32.logor (Int32.of_int 0x80) in let shft = Int32.logor (Int32.shift_left digits 8) in let getdig x dig = if x > 0 then incr dig else dig in let quotient = ll / 128 in let digit = getdig quotient (Int32.of_int (ll mod 128)) in let digits = shft digit in loop quotient digits in loop len 0l let fixed_header typ ?(flags = 0) body_len = let msgid = bits_of_message typ lsl 4 in let hdr = Bytes.create 1 in let len = Bytes.create 4 in BE.set_int8 hdr 0 (msgid + flags); BE.set_int32 len 0 (encode_length body_len); let len = trunc (Bytes.to_string len) in Bytes.to_string hdr ^ len let unsubscribe ~id topics = let accum acc i = acc + 2 + String.length i in let tl = List.fold_left accum 2 topics in (* +2 for msgid *) let buf = Buffer.create (tl + 5) in (* ~5 for fixed header *) let addtopic t = addlen t |> Buffer.add_string buf in let msgid = int16be id |> Bytes.to_string in let hdr = fixed_header Unsubscribe_pkt ~flags:2 tl in Buffer.add_string buf hdr; Buffer.add_string buf msgid; List.iter addtopic topics; Buffer.contents buf let unsuback id = let msgid = int16be id |> Bytes.to_string in let hdr = fixed_header Unsuback_pkt 2 in hdr ^ msgid let simple_pkt typ = fixed_header typ 0 let pingreq () = simple_pkt Pingreq_pkt let pingresp () = simple_pkt Pingresp_pkt let pubpkt ?flags typ id = let hdr = fixed_header ?flags typ 2 in let msgid = int16be id |> Bytes.to_string in let buf = Buffer.create 4 in Buffer.add_string buf hdr; Buffer.add_string buf msgid; Buffer.contents buf let pubrec = pubpkt Pubrec_pkt let pubrel = pubpkt ~flags:2 Pubrel_pkt let pubcomp = pubpkt Pubcomp_pkt let suback id qoses = let paylen = List.length qoses + 2 in let buf = Buffer.create (paylen + 5) in let msgid = int16be id |> Bytes.to_string in let q2i q = bits_of_qos q |> int8be |> Bytes.to_string in let blit q = Buffer.add_string buf (q2i q) in let hdr = fixed_header Suback_pkt paylen in Buffer.add_string buf hdr; Buffer.add_string buf msgid; List.iter blit qoses; Buffer.contents buf let puback = pubpkt Puback_pkt let disconnect () = simple_pkt Disconnect_pkt let subscribe ~id topics = let accum acc (i, _) = acc + 3 + String.length i in let tl = List.fold_left accum 0 topics in let tl = tl + 2 in (* add msgid to total len *) let buf = Buffer.create (tl + 5) in (* ~5 for fixed header *) let addtopic (t, q) = Buffer.add_string buf (addlen t); Buffer.add_string buf (Bytes.to_string @@ int8be (bits_of_qos q)) in let msgid = int16be id |> Bytes.to_string in let hdr = fixed_header Subscribe_pkt ~flags:2 tl in Buffer.add_string buf hdr; Buffer.add_string buf msgid; List.iter addtopic topics; Buffer.contents buf let publish ~dup ~qos ~retain ~id ~topic payload = let id_data = if qos = Atleast_once || qos = Exactly_once then Bytes.to_string (int16be id) else "" in let dup = if qos = Atmost_once then false else dup in let topic = addlen topic in let sl = String.length in let tl = sl topic + sl payload + sl id_data in let buf = Buffer.create (tl + 5) in let flags = let dup = bit_of_bool dup lsl 3 in let qos = bits_of_qos qos lsl 1 in let retain = bit_of_bool retain in dup + qos + retain in let hdr = fixed_header Publish_pkt ~flags tl in Buffer.add_string buf hdr; Buffer.add_string buf topic; Buffer.add_string buf id_data; Buffer.add_string buf payload; Buffer.contents buf let connect_payload ?credentials ?will ?(flags = []) ?(keep_alive = 10) id = let name = addlen "MQTT" in let version = "\004" in if keep_alive > 0xFFFF then raise (Invalid_argument "keep_alive too large"); let addhdr2 flag term (flags, hdr) = match term with | None -> (flags, hdr) | Some (a, b) -> (flags lor flag, hdr ^ addlen a ^ addlen b) in let adduserpass term (flags, hdr) = match term with | None -> (flags, hdr) | Some (Username s) -> (flags lor 0x80, hdr ^ addlen s) | Some (Credentials (u, p)) -> addhdr2 0xC0 (Some (u, p)) (flags, hdr) in let flag_nbr = function | Clean_session -> 0x02 | Will_qos qos -> bits_of_qos qos lsl 3 | Will_retain -> 0x20 in let accum a acc = acc lor flag_nbr a in let flags, pay = (List.fold_right accum flags 0, addlen id) |> addhdr2 0x04 will |> adduserpass credentials in let tbuf = int16be keep_alive in let fbuf = Bytes.create 1 in BE.set_int8 fbuf 0 flags; let accum acc a = acc + String.length a in let fields = [ name; version; Bytes.to_string fbuf; Bytes.to_string tbuf; pay ] in let lens = List.fold_left accum 0 fields in let buf = Buffer.create lens in List.iter (Buffer.add_string buf) fields; Buffer.contents buf let connect ?credentials ?will ?flags ?keep_alive id = let cxn_pay = connect_payload ?credentials ?will ?flags ?keep_alive id in let hdr = fixed_header Connect_pkt (String.length cxn_pay) in hdr ^ cxn_pay let connect_data d = let clientid = d.clientid in let credentials = d.credentials in let will = d.will in let flags = d.flags in let keep_alive = d.keep_alive in connect_payload ?credentials ?will ~flags ~keep_alive clientid let connack ~session_present status = let fixed_header = fixed_header Connack_pkt 2 in let flags = Bytes.to_string (int8be (bit_of_bool session_present)) in let connection_status = Bytes.to_string (int8be (connection_status_to_int status)) in let variable_header = flags ^ connection_status in fixed_header ^ variable_header end module Decoder = struct let decode_connect rb = let lead = Read_buffer.read rb 9 in if "\000\004MQTT\004" <> lead then raise (Invalid_argument "invalid MQTT or version"); let hdr = Read_buffer.read_uint8 rb in let keep_alive = Read_buffer.read_uint16 rb in let has_username = 0 <> hdr land 0x80 in let has_password = 0 <> hdr land 0xC0 in let will_flag = bool_of_bit ((hdr land 0x04) lsr 2) in let will_retain = will_flag && 0 <> hdr land 0x20 in let will_qos = if will_flag then Some (qos_of_bits ((hdr land 0x18) lsr 3)) else None in let clean_session = bool_of_bit ((hdr land 0x02) lsr 1) in let rs = Read_buffer.read_string in let clientid = rs rb in let will = if will_flag then let t = rs rb in let m = rs rb in Some (t, m) else None in let credentials = if has_password then let u = rs rb in let p = rs rb in Some (Credentials (u, p)) else if has_username then Some (Username (rs rb)) else None in let flags = if clean_session then [ Clean_session ] else [] in let flags = opt_with (fun qos -> Will_qos qos :: flags) flags will_qos in let flags = if will_retain then Will_retain :: flags else flags in Connect { clientid; credentials; will; flags; keep_alive } let decode_connack rb = let flags = Read_buffer.read_uint8 rb in let session_present = bool_of_bit flags in let connection_status = connection_status_of_int (Read_buffer.read_uint8 rb) in Connack { session_present; connection_status } let decode_publish (_, qos, _) rb = let topic = Read_buffer.read_string rb in let msgid = if qos = Atleast_once || qos = Exactly_once then Some (Read_buffer.read_uint16 rb) else None in let payload = Read_buffer.len rb |> Read_buffer.read rb in Publish (msgid, topic, payload) let decode_puback rb = Puback (Read_buffer.read_uint16 rb) let decode_pubrec rb = Pubrec (Read_buffer.read_uint16 rb) let decode_pubrel rb = Pubrel (Read_buffer.read_uint16 rb) let decode_pubcomp rb = Pubcomp (Read_buffer.read_uint16 rb) let decode_subscribe rb = let id = Read_buffer.read_uint16 rb in let get_topic rb = let topic = Read_buffer.read_string rb in let qos = Read_buffer.read_uint8 rb |> qos_of_bits in (topic, qos) in let topics = Read_buffer.read_all rb get_topic in Subscribe (id, topics) let decode_suback rb = let id = Read_buffer.read_uint16 rb in let get_qos rb = Read_buffer.read_uint8 rb |> suback_qos_of_bits in let qoses = Read_buffer.read_all rb get_qos in Suback (id, List.rev qoses) let decode_unsub rb = let id = Read_buffer.read_uint16 rb in let topics = Read_buffer.read_all rb Read_buffer.read_string in Unsubscribe (id, topics) let decode_unsuback rb = Unsuback (Read_buffer.read_uint16 rb) let decode_pingreq _rb = Pingreq let decode_pingresp _rb = Pingresp let decode_disconnect _rb = Disconnect let decode_packet opts = function | Connect_pkt -> decode_connect | Connack_pkt -> decode_connack | Publish_pkt -> decode_publish opts | Puback_pkt -> decode_puback | Pubrec_pkt -> decode_pubrec | Pubrel_pkt -> decode_pubrel | Pubcomp_pkt -> decode_pubcomp | Subscribe_pkt -> decode_subscribe | Suback_pkt -> decode_suback | Unsubscribe_pkt -> decode_unsub | Unsuback_pkt -> decode_unsuback | Pingreq_pkt -> decode_pingreq | Pingresp_pkt -> decode_pingresp | Disconnect_pkt -> decode_disconnect let decode_fixed_header byte : messages * options = let typ = (byte land 0xF0) lsr 4 in let dup = (byte land 0x08) lsr 3 in let qos = (byte land 0x06) lsr 1 in let retain = byte land 0x01 in let typ = message_of_bits typ in let dup = bool_of_bit dup in let qos = qos_of_bits qos in let retain = bool_of_bit retain in (typ, (dup, qos, retain)) end
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>