Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file OpenFlow0x01_Plugin.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390openCoreopenAsyncopenFrenetic_kernel.OpenFlowmoduleOF10=Frenetic_kernel.OpenFlow0x01(* TODO: See openflow.ml for discussion. This is transitional.
IF YOU CHANGE THE PROTOCOL HERE, YOU MUST ALSO CHANGE IT IN openflow.ml
*)typerpc_ack=RpcOk|RpcEoftyperpc_command=|GetSwitches|SwitchesReplyofOF10.switchIdlist|GetSwitchFeaturesofOF10.switchId|SwitchFeaturesReplyofOF10.SwitchFeatures.toption|SendofOF10.switchId*OF10.xid*OF10.Message.t|SendReplyofrpc_ack|SendBatchofOF10.switchId*OF10.xid*OF10.Message.tlist|BatchReplyofrpc_ack|GetEvents|EventsReplyofevent|SendTrxofOF10.switchId*OF10.Message.t|TrxReplyofrpc_ack*OF10.Message.tlist|Finishedofunit(* This is not sent by the client explicitly *)letwrite_marshalwriter~flags:frpc=Writer.writewriter(Marshal.to_stringrpcf)letchan=Ivar.create()let(events,events_writer)=Pipe.create()letserver_sock_addr=Ivar.create()letserver_reader=Ivar.create()letserver_writer=Ivar.create()letread_outstanding=reffalseletread_finished=Condition.create()moduleLowLevel=structmoduleOF10=Frenetic_kernel.OpenFlow0x01letopenflow_executable()=letprog=Filename.dirname(Sys.executable_name)^"/frenetic.openflow"inSys.file_existsprog>>=function|`Yes->returnprog|_->failwith(Printf.sprintf"Can't find OpenFlow executable %s!"prog)letstartport=Logging.info"Calling create!";letsock_port=8984inletsock_addr=`Inet(Unix.Inet_addr.localhost,sock_port)inletargs=["-s";string_of_intsock_port;"-p";string_of_intport;"-v"]indon't_wait_for(Logging.info"Current uid: %n"(Unix.getuid());Logging.flushed()>>=fun()->openflow_executable()>>=funprog->Process.create~prog~args()>>=function|Errorerr->Logging.error"Failed to launch openflow server %s!"prog;raise(Core_kernel.Error.to_exnerr)|Okproc->Logging.info"Successfully launched OpenFlow controller with pid %s"(Pid.to_string(Process.pidproc));(* Redirect stdout of the child proc to out stdout for logging *)letbuf=Bytes.create1000indon't_wait_for(Deferred.repeat_until_finished()(fun()->Reader.read(Process.stdoutproc)buf>>|function|`Eof->`Finished()|`Okn->`Repeat(Writer.write(Lazy.forceWriter.stdout)~len:n(Bytes.to_stringbuf))));Logging.info"Connecting to first OpenFlow server socket";letrecwait_for_server()=Monitor.try_with~extract_exn:true(fun()->Socket.connect(Socket.createSocket.Type.tcp)sock_addr)>>=function|Oksock->returnsock|Errorexn->Logging.info"Failed to open socket to OpenFlow server: %s"(Exn.to_stringexn);Logging.info"Retrying in 1 second";after(Time.Span.of_sec1.)>>=wait_for_serverinwait_for_server()>>=funsock->Ivar.fillserver_sock_addrsock_addr;Logging.info"Successfully connected to first OpenFlow server socket";Ivar.fillserver_reader(Reader.create(Socket.fdsock));Ivar.fillserver_writer(Writer.create(Socket.fdsock));(* We open a second socket to get the events stream *)Logging.info"Connecting to second OpenFlow server socket";Socket.connect(Socket.createSocket.Type.tcp)sock_addr>>=funsock->Logging.info"Successfully connected to second OpenFlow server socket";letreader=Reader.create(Socket.fdsock)inletwriter=Writer.create(Socket.fdsock)in(* TODO(jnfoster): replace with a different call *)write_marshalwriter~flags:[]GetEvents;Deferred.repeat_until_finished()(fun()->Reader.read_marshalreader>>=function|`Eof->Logging.info"OpenFlow controller closed events socket";Pipe.closeevents_writer;Socket.shutdownsock`Both;return(`Finished())|`Ok(EventsReplyevt)->Pipe.writeevents_writerevt>>|fun()->`Repeat()|`Ok(_)->Logging.error"Got a message that's not an EventsReply. WTF? Dropping.";return(`Repeat())))letrecclear_to_read()=if(!read_outstanding)thenCondition.waitread_finished>>=clear_to_readelsereturn(read_outstanding:=true)letsignal_read()=read_outstanding:=false;Condition.broadcastread_finished()letready_to_process()=Ivar.readserver_reader>>=funreader->Ivar.readserver_writer>>=funwriter->clear_to_read()>>=fun()->letread()=Reader.read_marshalreader>>|function|`Eof->Logging.error"OpenFlow server socket shutdown unexpectedly!";failwith"Can not reach OpenFlow server!"|`Oka->ainletwrite=write_marshalwriter~flags:[]inreturn(read,write)letsendswidxidmsg=ready_to_process()>>=fun(recv,send)->send(Send(swid,xid,msg));recv()>>|function|SendReplyresp->signal_read();resp|_->Logging.error"Received a reply that's not SendReply to a Send";assertfalseletsend_batchswidxidmsgs=ready_to_process()>>=fun(recv,send)->send(SendBatch(swid,xid,msgs));recv()>>|function|BatchReplyresp->signal_read();resp|_->Logging.error"Received a reply that's not BatchReply to a SendBatch";assertfalse(* We open a new socket for each send_txn call so that we can block on the reply *)letsend_txnswidmsg=Ivar.readserver_sock_addr>>=funsock_addr->Socket.connect(Socket.createSocket.Type.tcp)sock_addr>>=funsock->letreader=Reader.create(Socket.fdsock)inletwriter=Writer.create(Socket.fdsock)inwrite_marshalwriter~flags:[](SendTrx(swid,msg));Reader.read_marshalreader>>|funresp->matchrespwith|`Eof->Socket.shutdownsock`Both;TrxReply(RpcEof,[])|`Ok(TrxReply(RpcEof,_))->Socket.shutdownsock`Both;TrxReply(RpcEof,[])|`Ok(TrxReply(RpcOk,resp))->Socket.shutdownsock`Both;TrxReply(RpcOk,resp)|_->Logging.debug"send_txn returned something unintelligible";TrxReply(RpcEof,[])letevents=eventsendletstartport=LowLevel.startportletswitch_features(switch_id:switchId)=LowLevel.ready_to_process()>>=fun(recv,send)->send(GetSwitchFeaturesswitch_id);recv()>>=function|SwitchFeaturesReplyresp->LowLevel.signal_read();(matchrespwith|Somesf->return(Some(From0x01.from_switch_featuressf))|None->returnNone)|_->Logging.error"Received a reply that's not SwitchFeaturesReply to a GetSwitchFeatures";assertfalse(* We just brute-force this, even though there's significant overlap with from_action *)letaction_from_policy(pol:Frenetic_netkat.Syntax.policy):actionoption=matchpolwith|Modhv->beginmatchhvwith|Locationlocation->beginmatchlocationwith|Physicalp->Some(Output(Physicalp))|FastFail_->None|Pipe_->Some(Output(Controller128))|Queryq->Noneend|EthSrcdlAddr->Some(Modify(SetEthSrcdlAddr))|EthDstdlAddr->Some(Modify(SetEthDstdlAddr))|Vlann->Some(Modify(SetVlan(Somen)))|VlanPcppcp->Some(Modify(SetVlanPcppcp))|EthTypedlTyp->Some(Modify(SetEthTypdlTyp))|IPProtonwProto->Some(Modify(SetIPProtonwProto))|IP4Src(nwAddr,mask)->Some(Modify(SetIP4SrcnwAddr))|IP4Dst(nwAddr,mask)->Some(Modify(SetIP4DstnwAddr))|TCPSrcPorttpPort->Some(Modify(SetTCPSrcPorttpPort))|TCPDstPorttpPort->Some(Modify(SetTCPDstPorttpPort))|Switch_|VSwitch_|VPort_|VFabric_|Meta_|From_|AbstractLoc_->Noneend|_->Noneletactions_from_policiespol_list=List.filter_mappol_list~f:action_from_policyletpacket_out(swid:int64)(ingress_port:portIdoption)(payload:payload)(pol_list:Frenetic_netkat.Syntax.policylist)=(* Turn this into a generic PktOut event, then run it through OF10 translator *)letactions=actions_from_policiespol_listinletopenflow_generic_pkt_out=(payload,ingress_port,actions)inletpktout0x01=Frenetic_kernel.OpenFlow.To0x01.from_packetOutopenflow_generic_pkt_outinLowLevel.sendswid0l(OF10.Message.PacketOutMsgpktout0x01)>>=function|RpcEof->return()|RpcOk->return()letbogus_flow_stats={flow_table_id=66L;flow_pattern=Pattern.match_all;flow_actions=[];flow_duration_sec=0L;flow_duration_nsec=0L;flow_priority=0L;flow_idle_timeout=0L;flow_hard_timeout=0L;flow_packet_count=0L;flow_byte_count=0L}(* We aggregate all the OF10 stats and convert them to a generic OpenFlow at the same time *)letcollapse_statsifrl=letopenOF10in{bogus_flow_statswithflow_packet_count=List.sum(moduleInt64)ifrl~f:(funstat->stat.packet_count);flow_byte_count=List.sum(moduleInt64)ifrl~f:(funstat->stat.byte_count)}letflow_stats(sw_id:switchId)(pat:Pattern.t):flowStatsDeferred.t=letpat0x01=To0x01.from_patternpatinletreq=OF10.IndividualRequest{sr_of_match=pat0x01;sr_table_id=0xff;sr_out_port=None}inLowLevel.send_txnsw_id(OF10.Message.StatsRequestMsgreq)>>=function|TrxReply(RpcEof,_)->assertfalse|TrxReply(RpcOk,l)->(matchlwith|[]->Logging.info"Got an empty list";returnbogus_flow_stats|[hd]->(matchhdwith|StatsReplyMsg(IndividualFlowRepifrl)->return(collapse_statsifrl)|_->Logging.error"Got a reply, but the type is wrong";returnbogus_flow_stats)|hd::tl->Logging.info"Got a > 2 element list";returnbogus_flow_stats)|_->Logging.error"Received a reply that's not TrxReply to a SendTrx";assertfalseletbogus_port_stats={port_no=666L;port_rx_packets=0L;port_tx_packets=0L;port_rx_bytes=0L;port_tx_bytes=0L;port_rx_dropped=0L;port_tx_dropped=0L;port_rx_errors=0L;port_tx_errors=0L;port_rx_frame_err=0L;port_rx_over_err=0L;port_rx_crc_err=0L;port_collisions=0L}letport_stats(sw_id:switchId)(pid:portId):portStatsDeferred.t=letpt=Int32.(to_int_exnpid)inletreq=OF10.PortRequest(Some(PhysicalPortpt))inLowLevel.send_txnsw_id(OF10.Message.StatsRequestMsgreq)>>=function|TrxReply(RpcEof,_)->assertfalse|TrxReply(RpcOk,l)->(matchlwith|[]->Logging.info"Got an empty list";returnbogus_port_stats|[hd]->(matchhdwith|StatsReplyMsg(PortReppsl)->return(Frenetic_kernel.OpenFlow.From0x01.from_port_stats(List.hd_exnpsl))|_->Logging.error"Got a reply, but the type is wrong";returnbogus_port_stats)|hd::tl->Logging.info"Got a > 2 element list";returnbogus_port_stats)|_->Logging.error"Received a reply that's not TrxReply to a SendTrx";assertfalseletget_switches()=LowLevel.ready_to_process()>>=fun(recv,send)->sendGetSwitches;recv()>>|function|SwitchesReplyresp->LowLevel.signal_read();resp|_->Logging.error"Received a reply that's not SwitchesReply to a GetSwitches";assertfalse(* TODO: The following is ripped out of Frenetic_netkat.Updates. Turns out you can't call
stuff in that because of a circular dependency. In a later version, we should implement
generic commands in Frenetic_kernel.OpenFlow (similar to events, but going the opposite
directions), and let openflow.ml translate these to the specifc version of OpenFlow. That
way, we can simply pass a plugin instance where the update can write to. *)moduleBestEffortUpdate0x01=structmoduleComp=Frenetic_netkat.Local_compilermoduleM=OF10.MessageopenFrenetic_kernel.OpenFlow.To0x01exceptionUpdateErrorletcurrent_compiler_options=refComp.default_compiler_optionsletrestrictsw_idrepr=Comp.restrictFrenetic_netkat.Syntax.(Switchsw_id)reprletinstall_flows_forsw_idtable=letto_flow_modpf=M.FlowModMsg(from_flowpf)inletpriority=ref65536inletflows=List.maptable~f:(funflow->decrpriority;to_flow_mod!priorityflow)inLowLevel.send_batchsw_id0lflows>>=function|RpcEof->raiseUpdateError|RpcOk->return()letdelete_flows_forsw_id=letdelete_flows=M.FlowModMsgOF10.delete_all_flowsinLowLevel.sendsw_id5ldelete_flows>>=function|RpcEof->raiseUpdateError|RpcOk->return()letbring_up_switch(sw_id:switchId)new_r=lettable=Comp.to_table~options:!current_compiler_optionssw_idnew_rinLogging.debug"Setting up flow table\n%s"(Frenetic_kernel.OpenFlow.string_of_flowTable~label:(Int64.to_stringsw_id)table);Monitor.try_with~name:"BestEffort.bring_up_switch"(fun()->delete_flows_forsw_id>>=fun_->install_flows_forsw_idtable)>>=function|Okx->returnx|Error_exn->Logging.debug"switch %Lu: disconnected while attempting to bring up... skipping"sw_id;Logging.flushed()>>|fun()->Logging.error"%s\n%!"(Exn.to_string_exn)letimplement_policyrepr=(get_switches())>>=funswitches->Deferred.List.iterswitches(funsw_id->bring_up_switchsw_idrepr)letset_current_compiler_optionsopt=current_compiler_options:=optendletupdate(compiler:Frenetic_netkat.Local_compiler.t)=BestEffortUpdate0x01.implement_policycompilerletupdate_switch(swid:switchId)(compiler:Frenetic_netkat.Local_compiler.t)=BestEffortUpdate0x01.bring_up_switchswidcompiler