package mqtt

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file Mqtt_packet.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
module BE = EndianBytes.BigEndian
open Mqtt_core

let _msgid = ref 0

let gen_id () =
  let () = incr _msgid in
  if !_msgid >= 0xFFFF then _msgid := 1;
  !_msgid

let int16be n =
  let s = Bytes.create 2 in
  BE.set_int16 s 0 n;
  s

let int8be n =
  let s = Bytes.create 1 in
  BE.set_int8 s 0 n;
  s

type messages =
  | Connect_pkt
  | Connack_pkt
  | Publish_pkt
  | Puback_pkt
  | Pubrec_pkt
  | Pubrel_pkt
  | Pubcomp_pkt
  | Subscribe_pkt
  | Suback_pkt
  | Unsubscribe_pkt
  | Unsuback_pkt
  | Pingreq_pkt
  | Pingresp_pkt
  | Disconnect_pkt

type cxn_flags = Will_retain | Will_qos of qos | Clean_session

type cxn_data = {
  clientid : string;
  credentials : credentials option;
  will : (string * string) option;
  flags : cxn_flags list;
  keep_alive : int;
}

type client_options = { ping_timeout : float; cxn_data : cxn_data }

type connection_status =
  | Accepted
  | Unacceptable_protocol_version
  | Identifier_rejected
  | Server_unavailable
  | Bad_username_or_password
  | Not_authorized

let connection_status_to_string = function
  | Accepted -> "Accepted"
  | Unacceptable_protocol_version -> "Unacceptable_protocol_version"
  | Identifier_rejected -> "Identifier_rejected"
  | Server_unavailable -> "Server_unavailable"
  | Bad_username_or_password -> "Bad_username_or_password"
  | Not_authorized -> "Not_authorized"

let connection_status_to_int = function
  | Accepted -> 0
  | Unacceptable_protocol_version -> 1
  | Identifier_rejected -> 2
  | Server_unavailable -> 3
  | Bad_username_or_password -> 4
  | Not_authorized -> 5

let connection_status_of_int = function
  | 0 -> Accepted
  | 1 -> Unacceptable_protocol_version
  | 2 -> Identifier_rejected
  | 3 -> Server_unavailable
  | 4 -> Bad_username_or_password
  | 5 -> Not_authorized
  | _ -> raise (Invalid_argument "Invalid connection status code")

type t =
  | Connect of cxn_data
  | Connack of { session_present : bool; connection_status : connection_status }
  | Subscribe of (int * (string * qos) list)
  | Suback of (int * (qos, unit) result list)
  | Unsubscribe of (int * string list)
  | Unsuback of int
  | Publish of (int option * string * string)
  | Puback of int
  | Pubrec of int
  | Pubrel of int
  | Pubcomp of int
  | Pingreq
  | Pingresp
  | Disconnect

type options = bool * qos * bool

let bits_of_message = function
  | Connect_pkt -> 1
  | Connack_pkt -> 2
  | Publish_pkt -> 3
  | Puback_pkt -> 4
  | Pubrec_pkt -> 5
  | Pubrel_pkt -> 6
  | Pubcomp_pkt -> 7
  | Subscribe_pkt -> 8
  | Suback_pkt -> 9
  | Unsubscribe_pkt -> 10
  | Unsuback_pkt -> 11
  | Pingreq_pkt -> 12
  | Pingresp_pkt -> 13
  | Disconnect_pkt -> 14

let message_of_bits = function
  | 1 -> Connect_pkt
  | 2 -> Connack_pkt
  | 3 -> Publish_pkt
  | 4 -> Puback_pkt
  | 5 -> Pubrec_pkt
  | 6 -> Pubrel_pkt
  | 7 -> Pubcomp_pkt
  | 8 -> Subscribe_pkt
  | 9 -> Suback_pkt
  | 10 -> Unsubscribe_pkt
  | 11 -> Unsuback_pkt
  | 12 -> Pingreq_pkt
  | 13 -> Pingresp_pkt
  | 14 -> Disconnect_pkt
  | _ -> raise (Invalid_argument "invalid bits in message")

let bits_of_qos = function
  | Atmost_once -> 0
  | Atleast_once -> 1
  | Exactly_once -> 2

let qos_of_bits = function
  | 0 -> Atmost_once
  | 1 -> Atleast_once
  | 2 -> Exactly_once
  | b -> raise (Invalid_argument ("invalid qos number: " ^ string_of_int b))

let suback_qos_of_bits = function 0x80 -> Error () | b -> Ok (qos_of_bits b)
let bit_of_bool = function true -> 1 | false -> 0

let bool_of_bit = function
  | 1 -> true
  | 0 -> false
  | n ->
    raise
      (Invalid_argument ("expected zero or one, but got " ^ string_of_int n))

let trunc str =
  (* truncate leading zeroes *)
  let len = String.length str in
  let rec loop count =
    if count >= len || str.[count] <> '\000' then count else loop (count + 1)
  in
  let leading = loop 0 in
  if leading = len then "\000" else String.sub str leading (len - leading)

let addlen s =
  let len = String.length s in
  if len > 0xFFFF then raise (Invalid_argument "string too long");
  Bytes.to_string (int16be len) ^ s

let opt_with s n = function Some a -> s a | None -> n
let puback id = Puback id
let pubrec id = Pubrec id
let pubcomp id = Pubcomp id

module Encoder = struct
  let encode_length len =
    let rec loop ll digits =
      if ll <= 0 then digits
      else
        let incr = Int32.logor (Int32.of_int 0x80) in
        let shft = Int32.logor (Int32.shift_left digits 8) in
        let getdig x dig = if x > 0 then incr dig else dig in
        let quotient = ll / 128 in
        let digit = getdig quotient (Int32.of_int (ll mod 128)) in
        let digits = shft digit in
        loop quotient digits
    in
    loop len 0l

  let fixed_header typ ?(flags = 0) body_len =
    let msgid = bits_of_message typ lsl 4 in
    let hdr = Bytes.create 1 in
    let len = Bytes.create 4 in
    BE.set_int8 hdr 0 (msgid + flags);
    BE.set_int32 len 0 (encode_length body_len);
    let len = trunc (Bytes.to_string len) in
    Bytes.to_string hdr ^ len

  let unsubscribe ~id topics =
    let accum acc i = acc + 2 + String.length i in
    let tl = List.fold_left accum 2 topics in
    (* +2 for msgid *)
    let buf = Buffer.create (tl + 5) in
    (* ~5 for fixed header *)
    let addtopic t = addlen t |> Buffer.add_string buf in
    let msgid = int16be id |> Bytes.to_string in
    let hdr = fixed_header Unsubscribe_pkt ~flags:2 tl in
    Buffer.add_string buf hdr;
    Buffer.add_string buf msgid;
    List.iter addtopic topics;
    Buffer.contents buf

  let unsuback id =
    let msgid = int16be id |> Bytes.to_string in
    let hdr = fixed_header Unsuback_pkt 2 in
    hdr ^ msgid

  let simple_pkt typ = fixed_header typ 0
  let pingreq () = simple_pkt Pingreq_pkt
  let pingresp () = simple_pkt Pingresp_pkt

  let pubpkt ?flags typ id =
    let hdr = fixed_header ?flags typ 2 in
    let msgid = int16be id |> Bytes.to_string in
    let buf = Buffer.create 4 in
    Buffer.add_string buf hdr;
    Buffer.add_string buf msgid;
    Buffer.contents buf

  let pubrec = pubpkt Pubrec_pkt
  let pubrel = pubpkt ~flags:2 Pubrel_pkt
  let pubcomp = pubpkt Pubcomp_pkt

  let suback id qoses =
    let paylen = List.length qoses + 2 in
    let buf = Buffer.create (paylen + 5) in
    let msgid = int16be id |> Bytes.to_string in
    let q2i q = bits_of_qos q |> int8be |> Bytes.to_string in
    let blit q = Buffer.add_string buf (q2i q) in
    let hdr = fixed_header Suback_pkt paylen in
    Buffer.add_string buf hdr;
    Buffer.add_string buf msgid;
    List.iter blit qoses;
    Buffer.contents buf

  let puback = pubpkt Puback_pkt
  let disconnect () = simple_pkt Disconnect_pkt

  let subscribe ~id topics =
    let accum acc (i, _) = acc + 3 + String.length i in
    let tl = List.fold_left accum 0 topics in
    let tl = tl + 2 in
    (* add msgid to total len *)
    let buf = Buffer.create (tl + 5) in
    (* ~5 for fixed header *)
    let addtopic (t, q) =
      Buffer.add_string buf (addlen t);
      Buffer.add_string buf (Bytes.to_string @@ int8be (bits_of_qos q))
    in
    let msgid = int16be id |> Bytes.to_string in
    let hdr = fixed_header Subscribe_pkt ~flags:2 tl in
    Buffer.add_string buf hdr;
    Buffer.add_string buf msgid;
    List.iter addtopic topics;
    Buffer.contents buf

  let publish ~dup ~qos ~retain ~id ~topic payload =
    let id_data =
      if qos = Atleast_once || qos = Exactly_once then
        Bytes.to_string (int16be id)
      else ""
    in
    let dup = if qos = Atmost_once then false else dup in
    let topic = addlen topic in
    let sl = String.length in
    let tl = sl topic + sl payload + sl id_data in
    let buf = Buffer.create (tl + 5) in
    let flags =
      let dup = bit_of_bool dup lsl 3 in
      let qos = bits_of_qos qos lsl 1 in
      let retain = bit_of_bool retain in
      dup + qos + retain
    in
    let hdr = fixed_header Publish_pkt ~flags tl in
    Buffer.add_string buf hdr;
    Buffer.add_string buf topic;
    Buffer.add_string buf id_data;
    Buffer.add_string buf payload;
    Buffer.contents buf

  let connect_payload ?credentials ?will ?(flags = []) ?(keep_alive = 10) id =
    let name = addlen "MQTT" in
    let version = "\004" in
    if keep_alive > 0xFFFF then raise (Invalid_argument "keep_alive too large");
    let addhdr2 flag term (flags, hdr) =
      match term with
      | None -> (flags, hdr)
      | Some (a, b) -> (flags lor flag, hdr ^ addlen a ^ addlen b)
    in
    let adduserpass term (flags, hdr) =
      match term with
      | None -> (flags, hdr)
      | Some (Username s) -> (flags lor 0x80, hdr ^ addlen s)
      | Some (Credentials (u, p)) -> addhdr2 0xC0 (Some (u, p)) (flags, hdr)
    in
    let flag_nbr = function
      | Clean_session -> 0x02
      | Will_qos qos -> bits_of_qos qos lsl 3
      | Will_retain -> 0x20
    in
    let accum a acc = acc lor flag_nbr a in
    let flags, pay =
      (List.fold_right accum flags 0, addlen id)
      |> addhdr2 0x04 will
      |> adduserpass credentials
    in
    let tbuf = int16be keep_alive in
    let fbuf = Bytes.create 1 in
    BE.set_int8 fbuf 0 flags;
    let accum acc a = acc + String.length a in
    let fields =
      [ name; version; Bytes.to_string fbuf; Bytes.to_string tbuf; pay ]
    in
    let lens = List.fold_left accum 0 fields in
    let buf = Buffer.create lens in
    List.iter (Buffer.add_string buf) fields;
    Buffer.contents buf

  let connect ?credentials ?will ?flags ?keep_alive id =
    let cxn_pay = connect_payload ?credentials ?will ?flags ?keep_alive id in
    let hdr = fixed_header Connect_pkt (String.length cxn_pay) in
    hdr ^ cxn_pay

  let connect_data d =
    let clientid = d.clientid in
    let credentials = d.credentials in
    let will = d.will in
    let flags = d.flags in
    let keep_alive = d.keep_alive in
    connect_payload ?credentials ?will ~flags ~keep_alive clientid

  let connack ~session_present status =
    let fixed_header = fixed_header Connack_pkt 2 in
    let flags = Bytes.to_string (int8be (bit_of_bool session_present)) in
    let connection_status =
      Bytes.to_string (int8be (connection_status_to_int status))
    in
    let variable_header = flags ^ connection_status in
    fixed_header ^ variable_header
end

module Decoder = struct
  let decode_connect rb =
    let lead = Read_buffer.read rb 9 in
    if "\000\004MQTT\004" <> lead then
      raise (Invalid_argument "invalid MQTT or version");
    let hdr = Read_buffer.read_uint8 rb in
    let keep_alive = Read_buffer.read_uint16 rb in
    let has_username = 0 <> hdr land 0x80 in
    let has_password = 0 <> hdr land 0xC0 in
    let will_flag = bool_of_bit ((hdr land 0x04) lsr 2) in
    let will_retain = will_flag && 0 <> hdr land 0x20 in
    let will_qos =
      if will_flag then Some (qos_of_bits ((hdr land 0x18) lsr 3)) else None
    in
    let clean_session = bool_of_bit ((hdr land 0x02) lsr 1) in
    let rs = Read_buffer.read_string in
    let clientid = rs rb in
    let will =
      if will_flag then
        let t = rs rb in
        let m = rs rb in
        Some (t, m)
      else None
    in
    let credentials =
      if has_password then
        let u = rs rb in
        let p = rs rb in
        Some (Credentials (u, p))
      else if has_username then Some (Username (rs rb))
      else None
    in
    let flags = if clean_session then [ Clean_session ] else [] in
    let flags = opt_with (fun qos -> Will_qos qos :: flags) flags will_qos in
    let flags = if will_retain then Will_retain :: flags else flags in
    Connect { clientid; credentials; will; flags; keep_alive }

  let decode_connack rb =
    let flags = Read_buffer.read_uint8 rb in
    let session_present = bool_of_bit flags in
    let connection_status =
      connection_status_of_int (Read_buffer.read_uint8 rb)
    in
    Connack { session_present; connection_status }

  let decode_publish (_, qos, _) rb =
    let topic = Read_buffer.read_string rb in
    let msgid =
      if qos = Atleast_once || qos = Exactly_once then
        Some (Read_buffer.read_uint16 rb)
      else None
    in
    let payload = Read_buffer.len rb |> Read_buffer.read rb in
    Publish (msgid, topic, payload)

  let decode_puback rb = Puback (Read_buffer.read_uint16 rb)
  let decode_pubrec rb = Pubrec (Read_buffer.read_uint16 rb)
  let decode_pubrel rb = Pubrel (Read_buffer.read_uint16 rb)
  let decode_pubcomp rb = Pubcomp (Read_buffer.read_uint16 rb)

  let decode_subscribe rb =
    let id = Read_buffer.read_uint16 rb in
    let get_topic rb =
      let topic = Read_buffer.read_string rb in
      let qos = Read_buffer.read_uint8 rb |> qos_of_bits in
      (topic, qos)
    in
    let topics = Read_buffer.read_all rb get_topic in
    Subscribe (id, topics)

  let decode_suback rb =
    let id = Read_buffer.read_uint16 rb in
    let get_qos rb = Read_buffer.read_uint8 rb |> suback_qos_of_bits in
    let qoses = Read_buffer.read_all rb get_qos in
    Suback (id, List.rev qoses)

  let decode_unsub rb =
    let id = Read_buffer.read_uint16 rb in
    let topics = Read_buffer.read_all rb Read_buffer.read_string in
    Unsubscribe (id, topics)

  let decode_unsuback rb = Unsuback (Read_buffer.read_uint16 rb)
  let decode_pingreq _rb = Pingreq
  let decode_pingresp _rb = Pingresp
  let decode_disconnect _rb = Disconnect

  let decode_packet opts = function
    | Connect_pkt -> decode_connect
    | Connack_pkt -> decode_connack
    | Publish_pkt -> decode_publish opts
    | Puback_pkt -> decode_puback
    | Pubrec_pkt -> decode_pubrec
    | Pubrel_pkt -> decode_pubrel
    | Pubcomp_pkt -> decode_pubcomp
    | Subscribe_pkt -> decode_subscribe
    | Suback_pkt -> decode_suback
    | Unsubscribe_pkt -> decode_unsub
    | Unsuback_pkt -> decode_unsuback
    | Pingreq_pkt -> decode_pingreq
    | Pingresp_pkt -> decode_pingresp
    | Disconnect_pkt -> decode_disconnect

  let decode_fixed_header byte : messages * options =
    let typ = (byte land 0xF0) lsr 4 in
    let dup = (byte land 0x08) lsr 3 in
    let qos = (byte land 0x06) lsr 1 in
    let retain = byte land 0x01 in
    let typ = message_of_bits typ in
    let dup = bool_of_bit dup in
    let qos = qos_of_bits qos in
    let retain = bool_of_bit retain in
    (typ, (dup, qos, retain))
end
OCaml

Innovation. Community. Security.