P4-utils: Using SVM to classify whether the traffic is normal or malicious ICMP attack

 

This lab is based on https://github.com/GAR-Project/project. So please refer to that link first. In that website, the authors provide many useful information. Also, please install influxdb and logstash first. The man differences for this lab are a) I am using logstash to collect packets’ information while that project is using telegraf. b) In that project, they are using openvswitch based network while I am using p4 based network.

 

[Topology]

 

H1-----S1 ------H2

 

H1 will send the traffic to H2. S1 will clone the packets to the controller to determine whether the traffic is normal or malicious. If it is malicious, the controller will ring the alarm.

 

Before we start this lab. Please create a database “mydb” and a username “telegraf” with password “telegraf” first.

 

[basic.p4]

/* -*- P4_16 -*- */

#include <core.p4>

#include <v1model.p4>

/*************************************************************************

*********************** H E A D E R S  ***********************************

*************************************************************************/

 

struct metadata {

    /* empty */

}

 

struct headers {

}

/*************************************************************************

*********************** P A R S E R  ***********************************

*************************************************************************/

parser MyParser(packet_in packet,

                out headers hdr,

                inout metadata meta,

                inout standard_metadata_t standard_metadata) {

    state start {

        transition accept;

    }

}

/*************************************************************************

************   C H E C K S U M    V E R I F I C A T I O N   *************

*************************************************************************/

control MyVerifyChecksum(inout headers hdr, inout metadata meta) { 

    apply {  }

}

/*************************************************************************

**************  I N G R E S S   P R O C E S S I N G   *******************

*************************************************************************/

control MyIngress(inout headers hdr,

                  inout metadata meta,

                  inout standard_metadata_t standard_metadata) {

    action drop() {

        mark_to_drop(standard_metadata);

    }

    action forward(bit<9> port) {

        standard_metadata.egress_spec = port;

    }

    table phy_forward {

        key = {

            standard_metadata.ingress_port: exact;

        }

        actions = {

            forward;

            drop;

        }

        size = 1024;

        default_action = drop();

    }

    apply {

        phy_forward.apply();

    }

}

/*************************************************************************

****************  E G R E S S   P R O C E S S I N G   *******************

*************************************************************************/

control MyEgress(inout headers hdr,

                 inout metadata meta,

                 inout standard_metadata_t standard_metadata) {

    apply { 

        if (standard_metadata.instance_type == 0 ){

         clone(CloneType.E2E,100);

        }

    }

}

/*************************************************************************

*************   C H E C K S U M    C O M P U T A T I O N   **************

*************************************************************************/

control MyComputeChecksum(inout headers  hdr, inout metadata meta) {

     apply {

    }

}

/*************************************************************************

***********************  D E P A R S E R  *******************************

*************************************************************************/

control MyDeparser(packet_out packet, in headers hdr) {

    apply {

    }

}

/*************************************************************************

***********************  S W I T C H  *******************************

*************************************************************************/

V1Switch(

MyParser(),

MyVerifyChecksum(),

MyIngress(),

MyEgress(),

MyComputeChecksum(),

MyDeparser()

) main;

 

[cmd.txt]

table_add phy_forward forward 1 => 2

table_add phy_forward forward 2 => 1

mirroring_add 100 3

 

[p4app.json]

{

  "program": "basic.p4",

  "switch": "simple_switch",

  "compiler": "p4c",

  "options": "--target bmv2 --arch v1model --std p4-16",

  "switch_cli": "simple_switch_CLI",

  "cli": true,

  "pcap_dump": true,

  "enable_log": true,

  "topo_module": {

    "file_path": "",

    "module_name": "p4utils.mininetlib.apptopo",

    "object_name": "AppTopoStrategies"

  },

  "controller_module": null,

  "topodb_module": {

    "file_path": "",

    "module_name": "p4utils.utils.topology",

    "object_name": "Topology"

  },

  "mininet_module": {

    "file_path": "",

    "module_name": "p4utils.mininetlib.p4net",

    "object_name": "P4Mininet"

  },

  "topology": {

    "assignment_strategy": "l2",

    "links": [["h1", "s1"], ["h2", "s1"]],

    "hosts": {

      "h1": {

      },

      "h2": {

      }

    },

    "switches": {

      "s1": {

        "cli_input": "cmd.txt",

        "program": "basic.p4",

        "cpu_port": true

      }

    }

  }

}

 

[receive.py] The controller will use this file to get icmp packets. Then this program will send the source ip, destination ip, and packet length to localhost:6666. The logstash will listen on 6666 and send the corresponding information to infuxdb.

#!/usr/bin/env python

import sys

import struct

import os

 

from scapy.all import sniff

from scapy.all import Packet, IPOption, Ether

from scapy.all import IP, UDP, ICMP, Raw, ls

 

def handle_pkt(pkt):

    print "Controller got a packet"

    print pkt.summary()

    if ICMP in pkt and pkt[ICMP].type == 8:

        ip_src=pkt[IP].src

        ip_dst=pkt[IP].dst   

        ip_len=pkt[IP].len

        print ip_src,ip_dst,ip_len

        os.system(" echo %s %s %s | nc localhost 6666" % (ip_src,ip_dst,ip_len))

 

def main():

    if len(sys.argv) < 2:

        iface = 's1-cpu-eth1'

    else:

        iface = sys.argv[1]

 

    print "sniffing on %s" % iface

    sys.stdout.flush()

    sniff(iface = iface,

          prn = lambda x: handle_pkt(x))

 

if __name__ == '__main__':

    main()

 

[myicmp.conf] configuration file for logstash

input {

  tcp {

    type => "tcp"

    port => 6666

    mode => "server"

  }

}

filter{

  grok{

    match => ["message", "%{IP:srcIP} %{IP:destIP} %{INT:length}"]

  }

}

output {

  influxdb {

    db => "mydb"

    host => "localhost"

    port => "8086"

    user => "telegraf"

    password => "telegraf"

    measurement => "net"

    allow_time_override => true

    flush_size => "1"

    data_points => {

        "srcip"=>"%{srcIP}"

        "dstip"=>"%{destIP}" 

      "length"=>"%{length}"    

    }

    coerce_values => {

      "length" => "integer"

    }      

  }

  stdout { codec => rubydebug }

} 

 

[data_gathering.py] This file is used to tag normal traffic or malicious traffic. For training purpose.

import influxdb, sys

 

# Note xxxx will be replaced. I will explain later.

QUERY = """select count(length) as a,mean(length) as b from net where time <= xxxx group by time(3s) order by time desc limit 5"""

 

if __name__ == "__main__":

    db = influxdb.InfluxDBClient('127.0.0.1', 8086, 'telegraf', 'telegraf', 'mydb')

    measurement_class = sys.argv[1]          

    out_file = open("ICMP_data_class_{}.csv".format(measurement_class), "w+")

 

    for measurement in db.query(QUERY).get_points(measurement = 'net'):

        cnt = measurement["a"]

        meanlen = measurement["b"]

        out_file.write("{}, {}, {}\n".format(cnt, meanlen, measurement_class))

       

    out_file.close()

    print("Finished generating a class {} training dataset!".format(measurement_class))

    exit(0)

 

[controller.py]

from p4utils.utils.topology import Topology

from p4utils.utils.sswitch_API import SimpleSwitchAPI

import influxdb, datetime, time, os, signal

from sklearn import svm

blockip=[]

 

class myController(object):

 

    def __init__(self):

        self.topo = Topology(db="topology.db")

        self.controllers = {}

        self.connect_to_switches()

 

    def connect_to_switches(self):

        for p4switch in self.topo.get_p4switches():

            thrift_port = self.topo.get_thrift_port(p4switch)

            self.controllers[p4switch] = SimpleSwitchAPI(thrift_port)  

 

class gar_py:

        def __init__(self, db_host = 'localhost', port = 8086, db = 'mydb', kern_type = 'linear', dbg = False):

                self.debug = dbg

                self.host = db_host

                self.port = port

                self.dbname = db

                self.client = influxdb.InfluxDBClient(self.host, self.port, 'telegraf', 'telegraf', self.dbname)

                self.svm_inst = svm.SVC(kernel = kern_type)

                self.training_files = ["./ICMP_data_class_0.csv", "./ICMP_data_class_1.csv"]

                self.query = """select count(length) as a,mean(length) as b from net group by time(3s) order by time desc limit 3"""

                self.train_svm()

                self.controller=myController()

 

        def train_svm(self):

                features, labels = [], []

 

                for fname in self.training_files:

                        meal = open(fname, "rt")

                        for line in meal:

                                data_list = line.rsplit(", ")

                                for i in range(len(data_list)):

                                        if i < 2:

                                                data_list[i] = float(data_list[i])

                                        else:

                                                data_list[i] = int(data_list[i])

                                features.append(data_list[:2])

                                labels.append(data_list[2])

                        meal.close()

                print "features=", features

                print "labels=", labels

                self.svm_inst.fit(features, labels)

       

        def work_time(self):

                last_entry_time = "0"

                while True:

                        for new_entry in list(self.get_data(self.query).get_points(measurement = 'net')):

                                if new_entry['time'] > last_entry_time:

                                        last_entry_time = new_entry['time']

                                       

                                        if self.debug:

                                                print("\n** New entry **\n\tICMP info: " + str(new_entry['a']) +" "+str(new_entry['b']))

                                        self.ring_the_alarm(self.under_attack(new_entry['a'],new_entry['b']))

                        time.sleep(3)

       

        def under_attack(self, a, b):

                if b is None:

                  return False

                if self.debug:

                        print("\tCurrent prediction: " + str(self.svm_inst.predict([[a,b]])[0]))

                if self.svm_inst.predict([[a,b]])[0] == 1: 

                        return True

                else:

                        return False

 

        def get_data(self, petition):

                return self.client.query(petition)

 

        def ring_the_alarm(self, should_i_ring):

                if should_i_ring:

                        print("ring_the_alarm")

                        #if "10.0.0.1" not in blockip:

                           #      self.controller.controllers["s1"].table_add("block_pkt", "drop", [str("10.0.0.1")], [])

                           #      blockip.append("10.0.0.1")

 

def ctrl_c_handler(s, f):

        print("\b\bShutting down MR. SVM... Bye!")

        exit(0)

       

if __name__ == "__main__":

        signal.signal(signal.SIGINT, ctrl_c_handler)

        ai_bot = gar_py(db_host = '127.0.0.1', dbg = True)

        ai_bot.work_time()

 

[execution]

Start mininet enviroment

 

Open another terminal to start logstash (It will take few minutes. Please be patient.)

You can see like the following figure. It means that logstash is ready.

 

Open another terminal for receive.py

 

Open another terminal for influxdb

 

We need to do training first. So Send the normal ping packets. And Use “data_gathering.py” to classify that those packets are normal.



After h1 ping h2, go to influxdb terminal. Use “select * from net”.

 

You can see some things like the following. Remember the timestamp in the last record. In this example, please remember 1593228364299000000.

 

Modify the data_gathering.py

QUERY = """select count(length) as a,mean(length) as b from net where time <= xxxx group by time(3s) order by time desc limit 5"""

 

 

Open another terminal to do the following steps. (ICMP_data_class_0.py is for normal ICMP packets.)

 

Now we need to use hping3 for h1 to send malicious ICMP attack packets to h2 for data training. (Send around 10 seconds and then stop the hping3.)

 

Go to influxdb and use “select * from net”. Remember the timestamp for the last record.

 

Modify the data_gathering.py

 

Now open another terminal to run controller.py. When no traffic is sent between h1 and h2. It will show ICMP info: 0 None

 

When h1 ping h2. You can see “ICMP info: 2 84”. Something like this.

 

When you run “h1 hping3 -V -1 -d 1200 --faster h2”, the controller will show “ring_the_alarm” message.

 

Dr. Chih-Heng Ke

Department of Computer Science and Information Engineering, National Quemoy University, Kinmen, Taiwan

Email: smallko@gmail.com