Multiple Controller: Pyretic + Pox

 

[Description]

  When the pyretic controller do the count_packets() or count_bytes() functions, all the packets will be sent to controller for processing. This will degrade the performance. So in this example, I will show how to use pyretic to do the routing ( using Dijkstra's Algorithm) and use pox to do the monitoring jobs.

 

[Topology]

h1----s1-----s2----h2

 

[Mininet Script: test_multicontroller.py]

#!/usr/bin/python

 

"""

Script created by VND - Visual Network Description (SDN version)

"""

from mininet.net import Mininet

from mininet.node import Controller, RemoteController, OVSKernelSwitch, OVSLegacyKernelSwitch, UserSwitch

from mininet.cli import CLI

from mininet.log import setLogLevel

from mininet.link import Link, TCLink

import time

 

def topology():

    "Create a network."

    net = Mininet( controller=RemoteController, link=TCLink, switch=OVSKernelSwitch )

 

    print "*** Creating nodes"

    s1 = net.addSwitch( 's1', listenPort=6673, mac='00:00:00:00:00:01' )

    s2 = net.addSwitch( 's2', listenPort=6674, mac='00:00:00:00:00:02' )

    h1 = net.addHost( 'h1', mac='00:00:00:00:00:01', ip='10.0.0.1/24' )

    h2 = net.addHost( 'h2', mac='00:00:00:00:00:02', ip='10.0.0.2/24' )

    c10 = net.addController( 'c10', controller=RemoteController, ip='127.0.0.1', port=6633 )

    c11 = net.addController( 'c11', controller=RemoteController, ip='127.0.0.1', port=5566 )

 

    print "*** Creating links"

    linkBW = {'bw':1}

    net.addLink(s1, h1, cls=TCLink, **linkBW)

    net.addLink(s2, h2, cls=TCLink, **linkBW)

    net.addLink(s1, s2, cls=TCLink, **linkBW)

 

    print "*** Starting network"

    net.build()

    c10.start()

    c11.start()

    s1.start( [c10,c11] )

    s2.start( [c10,c11] )

    CLI( net )

 

    print "*** Stopping network"

    net.stop()

 

if __name__ == '__main__':

    setLogLevel( 'info' )

    topology()

 

 

[Pyretic: myroute_dijkstra.py]

from pyretic.lib.corelib import*

from pyretic.lib.std import *

from multiprocessing import Lock

from pyretic.lib.query import *

from collections import defaultdict

 

#switches

switches = []

 

#myhost[srcmac]->(switch, port)

myhost={}

 

#adjacency map [sw1][sw2]->port from sw1 to sw2

adjacency=defaultdict(lambda:defaultdict(lambda:None))

 

def minimum_distance(distance, Q):

  min = float('Inf')

  node = 0

  for v in Q:

    if distance[v] < min:

      min = distance[v]

      node = v

  return node

 

def get_path (src,dst,first_port,final_port):

  #Dijkstra's algorithm

  print "src=",src," dst=",dst, " first_port=", first_port, " final_port=", final_port

  distance = {}

  previous = {}

 

  for dpid in switches:

    distance[dpid] = 9999

    previous[dpid] = None

 

  distance[src]=0

  Q=set(switches)

   

  while len(Q)>0:

    u = minimum_distance(distance, Q)

    Q.remove(u)

   

    for p in switches:

      if adjacency[u][p]!=None:

        w = 1

        if distance[u] + w < distance[p]:

          distance[p] = distance[u] + w

          previous[p] = u

 

  r=[]

  p=dst

  r.append(p)

  q=previous[p]

  while q is not None:

    if q == src:

      r.append(q)

      break

    p=q

    r.append(p)

    q=previous[p]

 

  r.reverse()

  if src==dst:

    path=[src]

  else:

    path=r

 

  # Now add the ports

  r = []

  in_port = first_port

  for s1,s2 in zip(path[:-1],path[1:]):

    out_port = adjacency[s1][s2]

    r.append((s1,in_port,out_port))

    in_port = adjacency[s2][s1]

  r.append((dst,in_port,final_port))

  return r

 

class find_route(DynamicPolicy):

  def __init__(self):

    super(find_route,self).__init__()

    self.flood = flood()

    self.set_initial_state()

 

  def set_initial_state(self):

    self.query = packets(1,['srcmac','dstmac', 'srcip', 'dstip'])

    self.query.register_callback(self.myroute)

    self.forward = self.flood

    self.update_policy()

 

  def set_network(self,network):

    self.set_initial_state()

 

  def update_policy(self):

    self.policy = self.forward + self.query

 

  def myroute(self,pkt):

    #print pkt['srcmac'], pkt['dstmac'], pkt['srcip'], pkt['dstip']

    if (pkt['srcmac'] not in myhost.keys()) or (pkt['dstmac'] not in myhost.keys()):

      return

    p1 = get_path(myhost[pkt['srcmac']][0], myhost[pkt['dstmac']][0],myhost[pkt['srcmac']][1], myhost[pkt['dstmac']][1])

    print p1

   

    r1 = parallel([(match(switch=a,srcip=pkt['srcip'],dstip=pkt['dstip']) >> fwd(c)) for a,b,c in p1])

    print r1   

   

    self.forward = if_(match(dstip=pkt['dstip'],srcip=pkt['srcip']),r1,self.forward)

    self.update_policy()

 

 

def find_host():

   q = packets(1,['srcmac','switch'])

   q.register_callback(mymac_learner)

   return q

 

def mymac_learner(pkt):

   print pkt['srcmac'], pkt['dstmac'], pkt['switch'], pkt['inport']

   #if match(ethtype=ARP_TYPE):

   # print "arp packet"

  

   if pkt['srcmac'] not in myhost.keys():

      myhost[pkt['srcmac']]=( pkt['switch'], pkt['inport'])

 

   #for a in myhost.keys():

    print a, myhost[a][0], myhost[a][1]

  

class find_switch(DynamicPolicy):

    def __init__(self):

        self.last_topology = None

        self.lock = Lock()

        super(find_switch,self).__init__()

 

    def set_network(self, network):

        with self.lock:

            for x in network.switch_list():

              switches.append(x)

            for (s1,s2,data) in network.topology.edges(data=True):

              adjacency[s1][s2]=data[s1]

              adjacency[s2][s1]=data[s2]

            self.last_topology = network.topology

          

 

def arp_and_ip():

  return if_(match(ethtype=ARP_TYPE), flood(), find_route())

        

def main():

  return ( find_switch() + find_host() + arp_and_ip())

 

 

 

[Pox: with_pyretic.py]

from pox.core import core

import pox.openflow.libopenflow_01 as of

from pox.lib.util import dpidToStr

from pox.lib.addresses import IPAddr, EthAddr

from pox.lib.revent import *

from collections import defaultdict

from pox.openflow.discovery import Discovery

from pox.lib.util import dpidToStr

 

# include as part of the betta branch

from pox.openflow.of_json import *

from pox.lib.recoco import Timer

import time

 

log = core.getLogger()

 

#adjacency map [sw1][sw2]->port from sw1 to sw2

adjacency=defaultdict(lambda:defaultdict(lambda:None))

 

switches = []

 

#link bandwidth consumption [sw1][sw2]->bandwidth consumed

link_bw=defaultdict(lambda:defaultdict(lambda:None))

byte=defaultdict(lambda:defaultdict(lambda:None))

clock=defaultdict(lambda:defaultdict(lambda:None))

 

def _timer_func ():

  for connection in core.openflow._connections.values():

    connection.send(of.ofp_stats_request(body=of.ofp_port_stats_request()))

 

def _handle_portstats_received (event):

  print "switch=", dpidToStr(event.connection.dpid)

  for f in event.stats:

    if int(f.port_no)<65534:

      for p in switches:

        if adjacency[event.connection.dpid][p]!=None and adjacency[event.connection.dpid][p]==f.port_no:

          #print "   PortNo:", f.port_no, " Fwd's Pkts:", f.tx_packets, " Fwd's Bytes:", f.tx_bytes

          if byte[event.connection.dpid][p]>0:  

              link_bw[event.connection.dpid][p]=(f.tx_bytes - byte[event.connection.dpid][p]) * 8.0 / (time.time()-clock[event.connection.dpid][p])

              print "link[",event.connection.dpid,"][",p,"]=",link_bw[event.connection.dpid][p], "bps"

          byte[event.connection.dpid][p]=f.tx_bytes

          clock[event.connection.dpid][p]=time.time()

  print "============================================================================"

   

def _handle_ConnectionUp (event):

  switches.append(event.connection.dpid)

  #print "dpid=", dpidToStr(event.connection.dpid)

 

class l2_multi (EventMixin):

  def __init__ (self):

    # Listen to dependencies

    def startup ():

      core.openflow.addListeners(self, priority=0)

      core.openflow_discovery.addListeners(self)

    core.call_when_ready(startup, ('openflow','openflow_discovery'))

 

  def _handle_LinkEvent (self, event):

    def flip (link):

      return Discovery.Link(link[2],link[3], link[0],link[1])

 

    l = event.link

    sw1 = l.dpid1

    sw2 = l.dpid2

 

    if adjacency[sw1][sw2] is None:

     if flip(l) in core.openflow_discovery.adjacency:

       adjacency[sw1][sw2] = l.port1

       adjacency[sw2][sw1] = l.port2

 

def launch ():

  core.registerNew(l2_multi)

  core.openflow.addListenerByName("ConnectionUp", _handle_ConnectionUp)

  core.openflow.addListenerByName("PortStatsReceived", _handle_portstats_received)

  Timer(1, _timer_func, recurring=True)

 

[Execution]

 

use xterm h1 h2 to open two terminals for h1 and h2 and run the iperf server on h2, and iperf client on h1.

 

From the pyretic window, we can see that the path is found via switch 1 and switch 2.

 

From the pox window, we can see that the throughput for link[1][2] (the link between switch 1  and switch 2) is around 1Mbps.

 

Dr. Chih-Heng Ke (smallko@gmail.com)

Department of Computer Science and Information Engineering,

National Quemoy University, Kinmen, Taiwan.