P4-utils: Using SVM to classify whether the traffic is normal or malicious ICMP attack
This lab is based on https://github.com/GAR-Project/project. So please refer to that link first. In that website, the authors provide many useful information. Also, please install influxdb and logstash first. The man differences for this lab are a) I am using logstash to collect packets’ information while that project is using telegraf. b) In that project, they are using openvswitch based network while I am using p4 based network.
[Topology]
H1-----S1 ------H2
H1 will send the traffic to H2. S1 will clone the packets to the controller to determine whether the traffic is normal or malicious. If it is malicious, the controller will ring the alarm.
Before we start this lab. Please create a database “mydb” and a username “telegraf” with password “telegraf” first.
[basic.p4]
/* -*- P4_16 -*- */ #include <core.p4> #include <v1model.p4> /************************************************************************* *********************** H E A D E R S *********************************** *************************************************************************/ struct metadata { /* empty */ } struct headers { } /************************************************************************* *********************** P A R S E R *********************************** *************************************************************************/ parser MyParser(packet_in packet,
out headers hdr,
inout metadata meta,
inout standard_metadata_t
standard_metadata) { state start {
transition accept; } } /************************************************************************* ************ C H E C K S U M V E R I F I C A T I O
N ************* *************************************************************************/ control MyVerifyChecksum(inout headers hdr, inout metadata meta) { apply { } } /************************************************************************* ************** I N G R E S S P R O C E S S
I N G ******************* *************************************************************************/ control MyIngress(inout headers hdr,
inout metadata meta,
inout standard_metadata_t
standard_metadata) { action drop()
{ mark_to_drop(standard_metadata); } action
forward(bit<9> port) { standard_metadata.egress_spec =
port; } table phy_forward
{
key = {
standard_metadata.ingress_port:
exact; }
actions = {
forward;
drop; }
size = 1024; default_action = drop(); } apply { phy_forward.apply(); } } /************************************************************************* **************** E G R E S S P R O C E S S
I N G ******************* *************************************************************************/ control MyEgress(inout headers hdr,
inout metadata meta,
inout standard_metadata_t
standard_metadata) { apply { if
(standard_metadata.instance_type
== 0 ){
clone(CloneType.E2E,100); } } } /************************************************************************* ************* C H E C K S U M C O M P U T A T I O N ************** *************************************************************************/ control MyComputeChecksum(inout headers
hdr, inout
metadata meta) { apply { } } /************************************************************************* *********************** D E P A R S E R ******************************* *************************************************************************/ control MyDeparser(packet_out packet, in headers hdr)
{ apply { } } /************************************************************************* *********************** S W I T C H ******************************* *************************************************************************/ V1Switch( MyParser(), MyVerifyChecksum(), MyIngress(), MyEgress(), MyComputeChecksum(), MyDeparser() ) main; |
[cmd.txt]
table_add phy_forward forward 1 => 2 table_add phy_forward forward 2 => 1 mirroring_add 100 3 |
[p4app.json]
{
"program": "basic.p4",
"switch": "simple_switch",
"compiler": "p4c",
"options": "--target bmv2 --arch v1model --std
p4-16",
"switch_cli": "simple_switch_CLI",
"cli": true,
"pcap_dump": true,
"enable_log": true,
"topo_module": { "file_path":
"", "module_name":
"p4utils.mininetlib.apptopo", "object_name":
"AppTopoStrategies"
},
"controller_module": null,
"topodb_module": { "file_path":
"", "module_name":
"p4utils.utils.topology", "object_name":
"Topology"
},
"mininet_module": { "file_path":
"", "module_name":
"p4utils.mininetlib.p4net", "object_name":
"P4Mininet"
},
"topology": { "assignment_strategy":
"l2", "links":
[["h1", "s1"], ["h2", "s1"]], "hosts": {
"h1": { },
"h2": { } }, "switches": {
"s1": {
"cli_input": "cmd.txt",
"program": "basic.p4",
"cpu_port": true } }
} } |
[receive.py] The controller will use this file to get icmp packets. Then this program will send the source ip, destination ip, and packet length to localhost:6666. The logstash will listen on 6666 and send the corresponding information to infuxdb.
#!/usr/bin/env python import sys import struct import os from scapy.all
import sniff from scapy.all
import Packet, IPOption, Ether from scapy.all
import IP, UDP, ICMP, Raw, ls def handle_pkt(pkt): print "Controller got
a packet" print pkt.summary() if ICMP in pkt and pkt[ICMP].type == 8: ip_src=pkt[IP].src ip_dst=pkt[IP].dst ip_len=pkt[IP].len
print ip_src,ip_dst,ip_len os.system(" echo %s %s %s |
nc localhost 6666" % (ip_src,ip_dst,ip_len))
def main(): if len(sys.argv) < 2: iface = 's1-cpu-eth1' else: iface = sys.argv[1] print "sniffing on
%s" % iface sys.stdout.flush() sniff(iface = iface,
prn = lambda x: handle_pkt(x)) if __name__ == '__main__': main() |
[myicmp.conf] configuration file for logstash
input {
tcp { type => "tcp" port => 6666 mode =>
"server"
} } filter{
grok{ match =>
["message", "%{IP:srcIP}
%{IP:destIP} %{INT:length}"]
} } output {
influxdb { db
=> "mydb" host =>
"localhost" port =>
"8086" user => "telegraf" password => "telegraf" measurement =>
"net" allow_time_override
=> true flush_size
=> "1" data_points
=> {
"srcip"=>"%{srcIP}"
"dstip"=>"%{destIP}"
"length"=>"%{length}" } coerce_values
=> {
"length" => "integer" }
}
stdout { codec
=> rubydebug } } |
[data_gathering.py] This file is used to tag normal traffic or malicious traffic. For training purpose.
import influxdb,
sys # Note xxxx
will be replaced. I will explain later. QUERY = """select
count(length) as a,mean(length)
as b from net where time <= xxxx group by
time(3s) order by time desc limit 5""" if __name__ == "__main__": db
= influxdb.InfluxDBClient('127.0.0.1',
8086, 'telegraf', 'telegraf',
'mydb') measurement_class
= sys.argv[1] out_file
= open("ICMP_data_class_{}.csv".format(measurement_class),
"w+") for measurement in db.query(QUERY).get_points(measurement = 'net'): cnt = measurement["a"] meanlen = measurement["b"] out_file.write("{}, {}, {}\n".format(cnt, meanlen, measurement_class)) out_file.close() print("Finished
generating a class {} training dataset!".format(measurement_class)) exit(0) |
[controller.py]
from p4utils.utils.topology
import Topology from p4utils.utils.sswitch_API
import SimpleSwitchAPI import influxdb,
datetime, time, os, signal from sklearn
import svm blockip=[] class myController(object): def __init__(self): self.topo = Topology(db="topology.db") self.controllers = {} self.connect_to_switches() def connect_to_switches(self):
for p4switch in self.topo.get_p4switches():
thrift_port = self.topo.get_thrift_port(p4switch)
self.controllers[p4switch]
= SimpleSwitchAPI(thrift_port) class gar_py: def
__init__(self, db_host = 'localhost', port = 8086, db
= 'mydb', kern_type =
'linear', dbg = False): self.debug = dbg self.host = db_host self.port = port self.dbname = db self.client = influxdb.InfluxDBClient(self.host,
self.port, 'telegraf', 'telegraf', self.dbname) self.svm_inst = svm.SVC(kernel = kern_type) self.training_files =
["./ICMP_data_class_0.csv", "./ICMP_data_class_1.csv"] self.query = """select
count(length) as a,mean(length) as b from net group
by time(3s) order by time desc limit 3""" self.train_svm() self.controller=myController() def
train_svm(self): features,
labels = [], [] for
fname in self.training_files: meal
= open(fname,
"rt") for
line in meal: data_list = line.rsplit(",
") for
i in range(len(data_list)): if
i < 2: data_list[i] = float(data_list[i]) else: data_list[i] = int(data_list[i]) features.append(data_list[:2]) labels.append(data_list[2]) meal.close() print
"features=", features print
"labels=", labels self.svm_inst.fit(features, labels) def
work_time(self): last_entry_time = "0" while
True: for
new_entry in list(self.get_data(self.query).get_points(measurement
= 'net')): if
new_entry['time'] > last_entry_time: last_entry_time = new_entry['time'] if
self.debug: print("\n** New entry **\n\tICMP
info: " + str(new_entry['a']) +"
"+str(new_entry['b'])) self.ring_the_alarm(self.under_attack(new_entry['a'],new_entry['b'])) time.sleep(3) def
under_attack(self, a, b): if
b is None: return False if
self.debug: print("\tCurrent
prediction: " + str(self.svm_inst.predict([[a,b]])[0])) if
self.svm_inst.predict([[a,b]])[0] == 1: return
True else: return
False def
get_data(self,
petition): return
self.client.query(petition) def
ring_the_alarm(self, should_i_ring): if
should_i_ring: print("ring_the_alarm") #if
"10.0.0.1" not in blockip:
# self.controller.controllers["s1"].table_add("block_pkt",
"drop", [str("10.0.0.1")], []) # blockip.append("10.0.0.1") def ctrl_c_handler(s, f): print("\b\bShutting down
MR. SVM... Bye!") exit(0) if __name__ == "__main__": signal.signal(signal.SIGINT, ctrl_c_handler) ai_bot = gar_py(db_host = '127.0.0.1', dbg = True) ai_bot.work_time() |
[execution]
Start mininet enviroment
Open another terminal to start logstash (It will take few minutes. Please be patient.)
You can see like the following figure. It means that logstash is ready.
Open another terminal for receive.py
Open another terminal for influxdb
We need to do training first. So Send the normal ping packets. And Use “data_gathering.py” to classify that those packets are normal.
After h1 ping h2, go to influxdb terminal. Use “select * from net”.
You can see some things like the following. Remember the timestamp in the last record. In this example, please remember 1593228364299000000.
Modify the data_gathering.py
QUERY = """select count(length) as a,mean(length) as b from net where time <= xxxx group by time(3s) order by time desc limit 5"""
Open another terminal to do the following steps. (ICMP_data_class_0.py is for normal ICMP packets.)
Now we need to use hping3 for h1 to send malicious ICMP attack packets to h2 for data training. (Send around 10 seconds and then stop the hping3.)
Go to influxdb and use “select * from net”. Remember the timestamp for the last record.
Modify the data_gathering.py
Now open another terminal to run controller.py. When no traffic is sent between h1 and h2. It will show ICMP info: 0 None
When h1 ping h2. You can see “ICMP info: 2 84”. Something like this.
When you run “h1 hping3 -V -1 -d 1200 --faster h2”, the controller will show “ring_the_alarm” message.
Dr. Chih-Heng Ke
Department of Computer Science and
Information Engineering, National Quemoy University, Kinmen, Taiwan
Email: smallko@gmail.com