package frenetic
The Frenetic Programming Language and Runtime System
Install
Dune Dependency
Authors
Maintainers
Sources
5.0.5.tar.gz
md5=baf754df13a759c32f2c86a1b6f328da
sha512=80140900e7009ccab14b25e244fe7edab87d858676f8a4b3799b4fea16825013cf68363fe5faec71dd54ba825bb4ea2f812c2c666390948ab217ffa75d9cbd29
doc/src/frenetic.async/OpenFlow0x01_Plugin.ml.html
Source file OpenFlow0x01_Plugin.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
open Core open Async open Frenetic_kernel.OpenFlow module OF10 = Frenetic_kernel.OpenFlow0x01 (* TODO: See openflow.ml for discussion. This is transitional. IF YOU CHANGE THE PROTOCOL HERE, YOU MUST ALSO CHANGE IT IN openflow.ml *) type rpc_ack = RpcOk | RpcEof type rpc_command = | GetSwitches | SwitchesReply of OF10.switchId list | GetSwitchFeatures of OF10.switchId | SwitchFeaturesReply of OF10.SwitchFeatures.t option | Send of OF10.switchId * OF10.xid * OF10.Message.t | SendReply of rpc_ack | SendBatch of OF10.switchId * OF10.xid * OF10.Message.t list | BatchReply of rpc_ack | GetEvents | EventsReply of event | SendTrx of OF10.switchId * OF10.Message.t | TrxReply of rpc_ack * OF10.Message.t list | Finished of unit (* This is not sent by the client explicitly *) let write_marshal writer ~flags:f rpc = Writer.write writer (Marshal.to_string rpc f) let chan = Ivar.create () let (events, events_writer) = Pipe.create () let server_sock_addr = Ivar.create () let server_reader = Ivar.create () let server_writer = Ivar.create () let read_outstanding = ref false let read_finished = Condition.create () module LowLevel = struct module OF10 = Frenetic_kernel.OpenFlow0x01 let openflow_executable () = let prog = Filename.dirname(Sys.executable_name) ^ "/frenetic.openflow" in Sys.file_exists prog >>= function | `Yes -> return prog | _ -> failwith (Printf.sprintf "Can't find OpenFlow executable %s!" prog) let start port = Logging.info "Calling create!"; let sock_port = 8984 in let sock_addr = `Inet (Unix.Inet_addr.localhost, sock_port) in let args = ["-s"; string_of_int sock_port; "-p"; string_of_int port; "-v"] in don't_wait_for ( Logging.info "Current uid: %n" (Unix.getuid ()); Logging.flushed () >>= fun () -> openflow_executable () >>= fun prog -> Process.create ~prog ~args () >>= function | Error err -> Logging.error "Failed to launch openflow server %s!" prog; raise (Core_kernel.Error.to_exn err) | Ok proc -> Logging.info "Successfully launched OpenFlow controller with pid %s" (Pid.to_string (Process.pid proc)); (* Redirect stdout of the child proc to out stdout for logging *) let buf = Bytes.create 1000 in don't_wait_for (Deferred.repeat_until_finished () (fun () -> Reader.read (Process.stdout proc) buf >>| function | `Eof -> `Finished () | `Ok n -> `Repeat (Writer.write (Lazy.force Writer.stdout) ~len:n (Bytes.to_string buf)))); Logging.info "Connecting to first OpenFlow server socket"; let rec wait_for_server () = Monitor.try_with ~extract_exn:true (fun () -> Socket.connect (Socket.create Socket.Type.tcp) sock_addr) >>= function | Ok sock -> return sock | Error exn -> Logging.info "Failed to open socket to OpenFlow server: %s" (Exn.to_string exn); Logging.info "Retrying in 1 second"; after (Time.Span.of_sec 1.) >>= wait_for_server in wait_for_server () >>= fun sock -> Ivar.fill server_sock_addr sock_addr; Logging.info "Successfully connected to first OpenFlow server socket"; Ivar.fill server_reader (Reader.create (Socket.fd sock)); Ivar.fill server_writer (Writer.create (Socket.fd sock)); (* We open a second socket to get the events stream *) Logging.info "Connecting to second OpenFlow server socket"; Socket.connect (Socket.create Socket.Type.tcp) sock_addr >>= fun sock -> Logging.info "Successfully connected to second OpenFlow server socket"; let reader = Reader.create (Socket.fd sock) in let writer = Writer.create (Socket.fd sock) in (* TODO(jnfoster): replace with a different call *) write_marshal writer ~flags:[] GetEvents; Deferred.repeat_until_finished () (fun () -> Reader.read_marshal reader >>= function | `Eof -> Logging.info "OpenFlow controller closed events socket"; Pipe.close events_writer; Socket.shutdown sock `Both; return (`Finished ()) | `Ok (EventsReply evt) -> Pipe.write events_writer evt >>| fun () -> `Repeat () | `Ok (_) -> Logging.error "Got a message that's not an EventsReply. WTF? Dropping."; return (`Repeat ()) ) ) let rec clear_to_read () = if (!read_outstanding) then Condition.wait read_finished >>= clear_to_read else return (read_outstanding := true) let signal_read () = read_outstanding := false; Condition.broadcast read_finished () let ready_to_process () = Ivar.read server_reader >>= fun reader -> Ivar.read server_writer >>= fun writer -> clear_to_read () >>= fun () -> let read () = Reader.read_marshal reader >>| function | `Eof -> Logging.error "OpenFlow server socket shutdown unexpectedly!"; failwith "Can not reach OpenFlow server!" | `Ok a -> a in let write = write_marshal writer ~flags:[] in return (read, write) let send swid xid msg = ready_to_process () >>= fun (recv, send) -> send (Send (swid,xid,msg)); recv () >>| function | SendReply resp -> signal_read (); resp | _ -> Logging.error "Received a reply that's not SendReply to a Send"; assert false let send_batch swid xid msgs = ready_to_process () >>= fun (recv, send) -> send (SendBatch (swid,xid,msgs)); recv () >>| function | BatchReply resp -> signal_read (); resp | _ -> Logging.error "Received a reply that's not BatchReply to a SendBatch"; assert false (* We open a new socket for each send_txn call so that we can block on the reply *) let send_txn swid msg = Ivar.read server_sock_addr >>= fun sock_addr -> Socket.connect (Socket.create Socket.Type.tcp) sock_addr >>= fun sock -> let reader = Reader.create (Socket.fd sock) in let writer = Writer.create (Socket.fd sock) in write_marshal writer ~flags:[] (SendTrx (swid,msg)); Reader.read_marshal reader >>| fun resp -> match resp with | `Eof -> Socket.shutdown sock `Both; TrxReply (RpcEof,[]) | `Ok (TrxReply (RpcEof,_)) -> Socket.shutdown sock `Both; TrxReply (RpcEof,[]) | `Ok (TrxReply (RpcOk, resp)) -> Socket.shutdown sock `Both; TrxReply (RpcOk, resp) | _ -> Logging.debug "send_txn returned something unintelligible"; TrxReply (RpcEof,[]) let events = events end let start port = LowLevel.start port let switch_features (switch_id : switchId) = LowLevel.ready_to_process () >>= fun (recv, send) -> send (GetSwitchFeatures switch_id); recv () >>= function | SwitchFeaturesReply resp -> LowLevel.signal_read (); (match resp with | Some sf -> return (Some (From0x01.from_switch_features sf)) | None -> return None) | _ -> Logging.error "Received a reply that's not SwitchFeaturesReply to a GetSwitchFeatures"; assert false (* We just brute-force this, even though there's significant overlap with from_action *) let action_from_policy (pol:Frenetic_netkat.Syntax.policy) : action option = match pol with | Mod hv -> begin match hv with | Location location -> begin match location with | Physical p -> Some (Output (Physical p)) | FastFail _ -> None | Pipe _ -> Some (Output (Controller 128)) | Query q -> None end | EthSrc dlAddr -> Some (Modify(SetEthSrc dlAddr)) | EthDst dlAddr -> Some (Modify(SetEthDst dlAddr)) | Vlan n -> Some (Modify(SetVlan (Some n))) | VlanPcp pcp -> Some (Modify(SetVlanPcp pcp)) | EthType dlTyp -> Some (Modify(SetEthTyp dlTyp)) | IPProto nwProto -> Some (Modify(SetIPProto nwProto)) | IP4Src (nwAddr, mask) -> Some (Modify(SetIP4Src nwAddr)) | IP4Dst (nwAddr, mask) -> Some (Modify(SetIP4Dst nwAddr)) | TCPSrcPort tpPort -> Some (Modify(SetTCPSrcPort tpPort)) | TCPDstPort tpPort -> Some (Modify(SetTCPDstPort tpPort)) | Switch _ | VSwitch _ | VPort _ | VFabric _ | Meta _ | From _ | AbstractLoc _ -> None end | _ -> None let actions_from_policies pol_list = List.filter_map pol_list ~f:action_from_policy let packet_out (swid:int64) (ingress_port:portId option) (payload:payload) (pol_list:Frenetic_netkat.Syntax.policy list) = (* Turn this into a generic PktOut event, then run it through OF10 translator *) let actions = actions_from_policies pol_list in let openflow_generic_pkt_out = (payload, ingress_port, actions) in let pktout0x01 = Frenetic_kernel.OpenFlow.To0x01.from_packetOut openflow_generic_pkt_out in LowLevel.send swid 0l (OF10.Message.PacketOutMsg pktout0x01) >>= function | RpcEof -> return () | RpcOk -> return () let bogus_flow_stats = { flow_table_id = 66L; flow_pattern = Pattern.match_all; flow_actions = []; flow_duration_sec = 0L; flow_duration_nsec = 0L; flow_priority = 0L; flow_idle_timeout = 0L; flow_hard_timeout = 0L; flow_packet_count = 0L; flow_byte_count = 0L } (* We aggregate all the OF10 stats and convert them to a generic OpenFlow at the same time *) let collapse_stats ifrl = let open OF10 in { bogus_flow_stats with flow_packet_count = List.sum (module Int64) ifrl ~f:(fun stat -> stat.packet_count) ; flow_byte_count = List.sum (module Int64) ifrl ~f:(fun stat -> stat.byte_count) } let flow_stats (sw_id : switchId) (pat: Pattern.t) : flowStats Deferred.t = let pat0x01 = To0x01.from_pattern pat in let req = OF10.IndividualRequest { sr_of_match = pat0x01; sr_table_id = 0xff; sr_out_port = None } in LowLevel.send_txn sw_id (OF10.Message.StatsRequestMsg req) >>= function | TrxReply (RpcEof, _) -> assert false | TrxReply (RpcOk, l) -> (match l with | [] -> Logging.info "Got an empty list"; return bogus_flow_stats | [hd] -> ( match hd with | StatsReplyMsg (IndividualFlowRep ifrl) -> return (collapse_stats ifrl) | _ -> Logging.error "Got a reply, but the type is wrong"; return bogus_flow_stats ) | hd :: tl -> Logging.info "Got a > 2 element list"; return bogus_flow_stats ) | _ -> Logging.error "Received a reply that's not TrxReply to a SendTrx"; assert false let bogus_port_stats = { port_no = 666L ; port_rx_packets = 0L ; port_tx_packets = 0L ; port_rx_bytes = 0L ; port_tx_bytes = 0L ; port_rx_dropped = 0L ; port_tx_dropped = 0L ; port_rx_errors = 0L ; port_tx_errors = 0L ; port_rx_frame_err = 0L ; port_rx_over_err = 0L ; port_rx_crc_err = 0L ; port_collisions = 0L } let port_stats (sw_id : switchId) (pid : portId) : portStats Deferred.t = let pt = Int32.(to_int_exn pid) in let req = OF10.PortRequest (Some (PhysicalPort pt)) in LowLevel.send_txn sw_id (OF10.Message.StatsRequestMsg req) >>= function | TrxReply (RpcEof, _) -> assert false | TrxReply (RpcOk, l) -> (match l with | [] -> Logging.info "Got an empty list"; return bogus_port_stats | [hd] -> ( match hd with | StatsReplyMsg (PortRep psl) -> return (Frenetic_kernel.OpenFlow.From0x01.from_port_stats (List.hd_exn psl)) | _ -> Logging.error "Got a reply, but the type is wrong"; return bogus_port_stats ) | hd :: tl -> Logging.info "Got a > 2 element list"; return bogus_port_stats ) | _ -> Logging.error "Received a reply that's not TrxReply to a SendTrx"; assert false let get_switches () = LowLevel.ready_to_process () >>= fun (recv, send) -> send GetSwitches; recv () >>| function | SwitchesReply resp -> LowLevel.signal_read (); resp | _ -> Logging.error "Received a reply that's not SwitchesReply to a GetSwitches"; assert false (* TODO: The following is ripped out of Frenetic_netkat.Updates. Turns out you can't call stuff in that because of a circular dependency. In a later version, we should implement generic commands in Frenetic_kernel.OpenFlow (similar to events, but going the opposite directions), and let openflow.ml translate these to the specifc version of OpenFlow. That way, we can simply pass a plugin instance where the update can write to. *) module BestEffortUpdate0x01 = struct module Comp = Frenetic_netkat.Local_compiler module M = OF10.Message open Frenetic_kernel.OpenFlow.To0x01 exception UpdateError let current_compiler_options = ref Comp.default_compiler_options let restrict sw_id repr = Comp.restrict Frenetic_netkat.Syntax.(Switch sw_id) repr let install_flows_for sw_id table = let to_flow_mod p f = M.FlowModMsg (from_flow p f) in let priority = ref 65536 in let flows = List.map table ~f:(fun flow -> decr priority; to_flow_mod !priority flow) in LowLevel.send_batch sw_id 0l flows >>= function | RpcEof -> raise UpdateError | RpcOk -> return () let delete_flows_for sw_id = let delete_flows = M.FlowModMsg OF10.delete_all_flows in LowLevel.send sw_id 5l delete_flows >>= function | RpcEof -> raise UpdateError | RpcOk -> return () let bring_up_switch (sw_id : switchId) new_r = let table = Comp.to_table ~options:!current_compiler_options sw_id new_r in Logging.debug "Setting up flow table\n%s" (Frenetic_kernel.OpenFlow.string_of_flowTable ~label:(Int64.to_string sw_id) table); Monitor.try_with ~name:"BestEffort.bring_up_switch" (fun () -> delete_flows_for sw_id >>= fun _ -> install_flows_for sw_id table) >>= function | Ok x -> return x | Error _exn -> Logging.debug "switch %Lu: disconnected while attempting to bring up... skipping" sw_id; Logging.flushed () >>| fun () -> Logging.error "%s\n%!" (Exn.to_string _exn) let implement_policy repr = (get_switches ()) >>= fun switches -> Deferred.List.iter switches (fun sw_id -> bring_up_switch sw_id repr) let set_current_compiler_options opt = current_compiler_options := opt end let update (compiler: Frenetic_netkat.Local_compiler.t) = BestEffortUpdate0x01.implement_policy compiler let update_switch (swid: switchId) (compiler: Frenetic_netkat.Local_compiler.t) = BestEffortUpdate0x01.bring_up_switch swid compiler
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>