Source file Mqtt_packet.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
module BE = EndianBytes.BigEndian
open Mqtt_core
let _msgid = ref 0
let gen_id () =
let () = incr _msgid in
if !_msgid >= 0xFFFF then _msgid := 1;
!_msgid
let int16be n =
let s = Bytes.create 2 in
BE.set_int16 s 0 n;
s
let int8be n =
let s = Bytes.create 1 in
BE.set_int8 s 0 n;
s
type messages =
| Connect_pkt
| Connack_pkt
| Publish_pkt
| Puback_pkt
| Pubrec_pkt
| Pubrel_pkt
| Pubcomp_pkt
| Subscribe_pkt
| Suback_pkt
| Unsubscribe_pkt
| Unsuback_pkt
| Pingreq_pkt
| Pingresp_pkt
| Disconnect_pkt
type cxn_flags = Will_retain | Will_qos of qos | Clean_session
type cxn_data = {
clientid : string;
credentials : credentials option;
will : (string * string) option;
flags : cxn_flags list;
keep_alive : int;
}
type client_options = { ping_timeout : float; cxn_data : cxn_data }
type connection_status =
| Accepted
| Unacceptable_protocol_version
| Identifier_rejected
| Server_unavailable
| Bad_username_or_password
| Not_authorized
let connection_status_to_string = function
| Accepted -> "Accepted"
| Unacceptable_protocol_version -> "Unacceptable_protocol_version"
| Identifier_rejected -> "Identifier_rejected"
| Server_unavailable -> "Server_unavailable"
| Bad_username_or_password -> "Bad_username_or_password"
| Not_authorized -> "Not_authorized"
let connection_status_to_int = function
| Accepted -> 0
| Unacceptable_protocol_version -> 1
| Identifier_rejected -> 2
| Server_unavailable -> 3
| Bad_username_or_password -> 4
| Not_authorized -> 5
let connection_status_of_int = function
| 0 -> Accepted
| 1 -> Unacceptable_protocol_version
| 2 -> Identifier_rejected
| 3 -> Server_unavailable
| 4 -> Bad_username_or_password
| 5 -> Not_authorized
| _ -> raise (Invalid_argument "Invalid connection status code")
type t =
| Connect of cxn_data
| Connack of { session_present : bool; connection_status : connection_status }
| Subscribe of (int * (string * qos) list)
| Suback of (int * (qos, unit) result list)
| Unsubscribe of (int * string list)
| Unsuback of int
| Publish of (int option * string * string)
| Puback of int
| Pubrec of int
| Pubrel of int
| Pubcomp of int
| Pingreq
| Pingresp
| Disconnect
type options = bool * qos * bool
let bits_of_message = function
| Connect_pkt -> 1
| Connack_pkt -> 2
| Publish_pkt -> 3
| Puback_pkt -> 4
| Pubrec_pkt -> 5
| Pubrel_pkt -> 6
| Pubcomp_pkt -> 7
| Subscribe_pkt -> 8
| Suback_pkt -> 9
| Unsubscribe_pkt -> 10
| Unsuback_pkt -> 11
| Pingreq_pkt -> 12
| Pingresp_pkt -> 13
| Disconnect_pkt -> 14
let message_of_bits = function
| 1 -> Connect_pkt
| 2 -> Connack_pkt
| 3 -> Publish_pkt
| 4 -> Puback_pkt
| 5 -> Pubrec_pkt
| 6 -> Pubrel_pkt
| 7 -> Pubcomp_pkt
| 8 -> Subscribe_pkt
| 9 -> Suback_pkt
| 10 -> Unsubscribe_pkt
| 11 -> Unsuback_pkt
| 12 -> Pingreq_pkt
| 13 -> Pingresp_pkt
| 14 -> Disconnect_pkt
| _ -> raise (Invalid_argument "invalid bits in message")
let bits_of_qos = function
| Atmost_once -> 0
| Atleast_once -> 1
| Exactly_once -> 2
let qos_of_bits = function
| 0 -> Atmost_once
| 1 -> Atleast_once
| 2 -> Exactly_once
| b -> raise (Invalid_argument ("invalid qos number: " ^ string_of_int b))
let suback_qos_of_bits = function 0x80 -> Error () | b -> Ok (qos_of_bits b)
let bit_of_bool = function true -> 1 | false -> 0
let bool_of_bit = function
| 1 -> true
| 0 -> false
| n ->
raise
(Invalid_argument ("expected zero or one, but got " ^ string_of_int n))
let trunc str =
let len = String.length str in
let rec loop count =
if count >= len || str.[count] <> '\000' then count else loop (count + 1)
in
let leading = loop 0 in
if leading = len then "\000" else String.sub str leading (len - leading)
let addlen s =
let len = String.length s in
if len > 0xFFFF then raise (Invalid_argument "string too long");
Bytes.to_string (int16be len) ^ s
let opt_with s n = function Some a -> s a | None -> n
let puback id = Puback id
let pubrec id = Pubrec id
let pubcomp id = Pubcomp id
module Encoder = struct
let encode_length len =
let rec loop ll digits =
if ll <= 0 then digits
else
let incr = Int32.logor (Int32.of_int 0x80) in
let shft = Int32.logor (Int32.shift_left digits 8) in
let getdig x dig = if x > 0 then incr dig else dig in
let quotient = ll / 128 in
let digit = getdig quotient (Int32.of_int (ll mod 128)) in
let digits = shft digit in
loop quotient digits
in
loop len 0l
let typ ?(flags = 0) body_len =
let msgid = bits_of_message typ lsl 4 in
let hdr = Bytes.create 1 in
let len = Bytes.create 4 in
BE.set_int8 hdr 0 (msgid + flags);
BE.set_int32 len 0 (encode_length body_len);
let len = trunc (Bytes.to_string len) in
Bytes.to_string hdr ^ len
let unsubscribe ~id topics =
let accum acc i = acc + 2 + String.length i in
let tl = List.fold_left accum 2 topics in
let buf = Buffer.create (tl + 5) in
let addtopic t = addlen t |> Buffer.add_string buf in
let msgid = int16be id |> Bytes.to_string in
let hdr = fixed_header Unsubscribe_pkt ~flags:2 tl in
Buffer.add_string buf hdr;
Buffer.add_string buf msgid;
List.iter addtopic topics;
Buffer.contents buf
let unsuback id =
let msgid = int16be id |> Bytes.to_string in
let hdr = fixed_header Unsuback_pkt 2 in
hdr ^ msgid
let simple_pkt typ = fixed_header typ 0
let pingreq () = simple_pkt Pingreq_pkt
let pingresp () = simple_pkt Pingresp_pkt
let pubpkt ?flags typ id =
let hdr = fixed_header ?flags typ 2 in
let msgid = int16be id |> Bytes.to_string in
let buf = Buffer.create 4 in
Buffer.add_string buf hdr;
Buffer.add_string buf msgid;
Buffer.contents buf
let pubrec = pubpkt Pubrec_pkt
let pubrel = pubpkt ~flags:2 Pubrel_pkt
let pubcomp = pubpkt Pubcomp_pkt
let suback id qoses =
let paylen = List.length qoses + 2 in
let buf = Buffer.create (paylen + 5) in
let msgid = int16be id |> Bytes.to_string in
let q2i q = bits_of_qos q |> int8be |> Bytes.to_string in
let blit q = Buffer.add_string buf (q2i q) in
let hdr = fixed_header Suback_pkt paylen in
Buffer.add_string buf hdr;
Buffer.add_string buf msgid;
List.iter blit qoses;
Buffer.contents buf
let puback = pubpkt Puback_pkt
let disconnect () = simple_pkt Disconnect_pkt
let subscribe ~id topics =
let accum acc (i, _) = acc + 3 + String.length i in
let tl = List.fold_left accum 0 topics in
let tl = tl + 2 in
let buf = Buffer.create (tl + 5) in
let addtopic (t, q) =
Buffer.add_string buf (addlen t);
Buffer.add_string buf (Bytes.to_string @@ int8be (bits_of_qos q))
in
let msgid = int16be id |> Bytes.to_string in
let hdr = fixed_header Subscribe_pkt ~flags:2 tl in
Buffer.add_string buf hdr;
Buffer.add_string buf msgid;
List.iter addtopic topics;
Buffer.contents buf
let publish ~dup ~qos ~retain ~id ~topic payload =
let id_data =
if qos = Atleast_once || qos = Exactly_once then
Bytes.to_string (int16be id)
else ""
in
let dup = if qos = Atmost_once then false else dup in
let topic = addlen topic in
let sl = String.length in
let tl = sl topic + sl payload + sl id_data in
let buf = Buffer.create (tl + 5) in
let flags =
let dup = bit_of_bool dup lsl 3 in
let qos = bits_of_qos qos lsl 1 in
let retain = bit_of_bool retain in
dup + qos + retain
in
let hdr = fixed_header Publish_pkt ~flags tl in
Buffer.add_string buf hdr;
Buffer.add_string buf topic;
Buffer.add_string buf id_data;
Buffer.add_string buf payload;
Buffer.contents buf
let connect_payload ?credentials ?will ?(flags = []) ?(keep_alive = 10) id =
let name = addlen "MQTT" in
let version = "\004" in
if keep_alive > 0xFFFF then raise (Invalid_argument "keep_alive too large");
let addhdr2 flag term (flags, hdr) =
match term with
| None -> (flags, hdr)
| Some (a, b) -> (flags lor flag, hdr ^ addlen a ^ addlen b)
in
let adduserpass term (flags, hdr) =
match term with
| None -> (flags, hdr)
| Some (Username s) -> (flags lor 0x80, hdr ^ addlen s)
| Some (Credentials (u, p)) -> addhdr2 0xC0 (Some (u, p)) (flags, hdr)
in
let flag_nbr = function
| Clean_session -> 0x02
| Will_qos qos -> bits_of_qos qos lsl 3
| Will_retain -> 0x20
in
let accum a acc = acc lor flag_nbr a in
let flags, pay =
(List.fold_right accum flags 0, addlen id)
|> addhdr2 0x04 will
|> adduserpass credentials
in
let tbuf = int16be keep_alive in
let fbuf = Bytes.create 1 in
BE.set_int8 fbuf 0 flags;
let accum acc a = acc + String.length a in
let fields =
[ name; version; Bytes.to_string fbuf; Bytes.to_string tbuf; pay ]
in
let lens = List.fold_left accum 0 fields in
let buf = Buffer.create lens in
List.iter (Buffer.add_string buf) fields;
Buffer.contents buf
let connect ?credentials ?will ?flags ?keep_alive id =
let cxn_pay = connect_payload ?credentials ?will ?flags ?keep_alive id in
let hdr = fixed_header Connect_pkt (String.length cxn_pay) in
hdr ^ cxn_pay
let connect_data d =
let clientid = d.clientid in
let credentials = d.credentials in
let will = d.will in
let flags = d.flags in
let keep_alive = d.keep_alive in
connect_payload ?credentials ?will ~flags ~keep_alive clientid
let connack ~session_present status =
let = fixed_header Connack_pkt 2 in
let flags = Bytes.to_string (int8be (bit_of_bool session_present)) in
let connection_status =
Bytes.to_string (int8be (connection_status_to_int status))
in
let = flags ^ connection_status in
fixed_header ^ variable_header
end
module Decoder = struct
let decode_connect rb =
let lead = Read_buffer.read rb 9 in
if "\000\004MQTT\004" <> lead then
raise (Invalid_argument "invalid MQTT or version");
let hdr = Read_buffer.read_uint8 rb in
let keep_alive = Read_buffer.read_uint16 rb in
let has_username = 0 <> hdr land 0x80 in
let has_password = 0 <> hdr land 0xC0 in
let will_flag = bool_of_bit ((hdr land 0x04) lsr 2) in
let will_retain = will_flag && 0 <> hdr land 0x20 in
let will_qos =
if will_flag then Some (qos_of_bits ((hdr land 0x18) lsr 3)) else None
in
let clean_session = bool_of_bit ((hdr land 0x02) lsr 1) in
let rs = Read_buffer.read_string in
let clientid = rs rb in
let will =
if will_flag then
let t = rs rb in
let m = rs rb in
Some (t, m)
else None
in
let credentials =
if has_password then
let u = rs rb in
let p = rs rb in
Some (Credentials (u, p))
else if has_username then Some (Username (rs rb))
else None
in
let flags = if clean_session then [ Clean_session ] else [] in
let flags = opt_with (fun qos -> Will_qos qos :: flags) flags will_qos in
let flags = if will_retain then Will_retain :: flags else flags in
Connect { clientid; credentials; will; flags; keep_alive }
let decode_connack rb =
let flags = Read_buffer.read_uint8 rb in
let session_present = bool_of_bit flags in
let connection_status =
connection_status_of_int (Read_buffer.read_uint8 rb)
in
Connack { session_present; connection_status }
let decode_publish (_, qos, _) rb =
let topic = Read_buffer.read_string rb in
let msgid =
if qos = Atleast_once || qos = Exactly_once then
Some (Read_buffer.read_uint16 rb)
else None
in
let payload = Read_buffer.len rb |> Read_buffer.read rb in
Publish (msgid, topic, payload)
let decode_puback rb = Puback (Read_buffer.read_uint16 rb)
let decode_pubrec rb = Pubrec (Read_buffer.read_uint16 rb)
let decode_pubrel rb = Pubrel (Read_buffer.read_uint16 rb)
let decode_pubcomp rb = Pubcomp (Read_buffer.read_uint16 rb)
let decode_subscribe rb =
let id = Read_buffer.read_uint16 rb in
let get_topic rb =
let topic = Read_buffer.read_string rb in
let qos = Read_buffer.read_uint8 rb |> qos_of_bits in
(topic, qos)
in
let topics = Read_buffer.read_all rb get_topic in
Subscribe (id, topics)
let decode_suback rb =
let id = Read_buffer.read_uint16 rb in
let get_qos rb = Read_buffer.read_uint8 rb |> suback_qos_of_bits in
let qoses = Read_buffer.read_all rb get_qos in
Suback (id, List.rev qoses)
let decode_unsub rb =
let id = Read_buffer.read_uint16 rb in
let topics = Read_buffer.read_all rb Read_buffer.read_string in
Unsubscribe (id, topics)
let decode_unsuback rb = Unsuback (Read_buffer.read_uint16 rb)
let decode_pingreq _rb = Pingreq
let decode_pingresp _rb = Pingresp
let decode_disconnect _rb = Disconnect
let decode_packet opts = function
| Connect_pkt -> decode_connect
| Connack_pkt -> decode_connack
| Publish_pkt -> decode_publish opts
| Puback_pkt -> decode_puback
| Pubrec_pkt -> decode_pubrec
| Pubrel_pkt -> decode_pubrel
| Pubcomp_pkt -> decode_pubcomp
| Subscribe_pkt -> decode_subscribe
| Suback_pkt -> decode_suback
| Unsubscribe_pkt -> decode_unsub
| Unsuback_pkt -> decode_unsuback
| Pingreq_pkt -> decode_pingreq
| Pingresp_pkt -> decode_pingresp
| Disconnect_pkt -> decode_disconnect
let byte : messages * options =
let typ = (byte land 0xF0) lsr 4 in
let dup = (byte land 0x08) lsr 3 in
let qos = (byte land 0x06) lsr 1 in
let retain = byte land 0x01 in
let typ = message_of_bits typ in
let dup = bool_of_bit dup in
let qos = qos_of_bits qos in
let retain = bool_of_bit retain in
(typ, (dup, qos, retain))
end