Wednesday, 3 August 2016

KDD CUP 1999 Using Spark and Python


Datasets:

We will be using datasets from the KDD Cup 1999.


This work is on reference with https://github.com/jadianes/spark-py-notebooks


Dataset Description :

Since 1999, KDD’99 has been the most wildly used data set for the evaluation of anomaly detection methods. This data set is prepared by Stolfo  and is built based on the data captured in DARPA’98 IDS evaluation program . DARPA’98 is about 4 gigabytes of compressed raw (binary) tcpdump data of 7 weeks of network traffic, which can be processed into about 5 million connection records, each with about 100 bytes. The two weeks of test data have around 2 million connection records. KDD training dataset consists of approximately 4,900,000 single connection vectors each of which contains 41 features and is labeled as either normal or an attack, with exactly one specific attack type. 

The simulated attacks fall in one of the following four categories:


1) Denial of Service Attack (DoS): is an attack in which the attacker makes some computing or memory resource too busy or too full to handle legitimate requests, or denies legitimate users access to a machine. 


2) User to Root Attack (U2R): is a class of exploit in which the attacker starts out with access to a normal user account on the system (perhaps gained by sniffing passwords, a dictionary attack, or social engineering) and is able to exploit some vulnerability to gain root access to the system. 


3) Remote to Local Attack (R2L): occurs when an attacker who has the ability to send packets to a machine over a network but who does not have an account on that machine exploits some vulnerability to gain local access as a user of that machine. 


4) Probing Attack: is an attempt to gather information about a network of computers for the apparent purpose of circumventing its security controls. 


This work is divided into 10 tasks which includes RDD creation,RDD sampling,RDD set operations,MLIB logistic regression,etc.




RDD creation

The Resilient Distributed Dataset or RDD. An RDD is a distributed collection of elements. All work in Spark is expressed as either creating new RDDs, transforming existing RDDs, or calling actions on RDDs to compute a result. Spark automatically distributes the data contained in RDDs across your cluster and parallelizes the operations you perform on them.

References

The reference book for these and other Spark related topics is Learning Spark by Holden Karau, Andy Konwinski, Patrick Wendell, and Matei Zaharia.

The KDD Cup 1999 competition dataset is described in detail here.

Getting the data files

In this notebook we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.

In [31]:
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

Creating a RDD from a file

The most common way of creating an RDD is to load it from a file. Notice that Spark's textFile can handle compressed files directly.

In [32]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

Now we have our data file loaded into the raw_data RDD.

Without getting into Spark transformations and actions, the most basic thing we can do to check that we got our RDD contents right is to count() the number of lines loaded from the file into the RDD.

In [33]:
raw_data.count()
Out[33]:
494021

We can also check the first few entries in our data.

In [34]:
raw_data.take(5)
Out[34]:
[u'0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 u'0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 u'0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 u'0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 u'0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']

In the following notebooks, we will use this raw data to learn about the different Spark transformations and actions.

Creating and RDD using parallelize

Another way of creating an RDD is to parallelize an already existing list.

In [35]:
a = range(100)

data = sc.parallelize(a)

As we did before, we can count() the number of elements in the RDD.

In [36]:
data.count()
Out[36]:
100

As before, we can access the first few elements on our RDD.

In [37]:
data.take(5)
Out[37]:
[0, 1, 2, 3, 4]

RDD basics

This notebook will introduce three basic but essential Spark operations. Two of them are the transformations map and filter. The other is the action collect. At the same time we will introduce the concept of persistence in Spark.

Getting the data and creating the RDD

As we did in our first notebook, we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.

In [ ]:
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

Now we can use this file to create our RDD.

In [1]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

The filter transformation

This transformation can be applied to RDDs in order to keep just elements that satisfy a certain condition. More concretely, a function is evaluated on every element in the original RDD. The new resulting RDD will contain just those elements that make the function return True.

For example, imagine we want to count how many normal. interactions we have in our dataset. We can filter our raw_data RDD as follows.

In [2]:
normal_raw_data = raw_data.filter(lambda x: 'normal.' in x)

Now we can count how many elements we have in the new RDD.

In [3]:
from time import time
t0 = time()
normal_count = normal_raw_data.count()
tt = time() - t0
print "There are {} 'normal' interactions".format(normal_count)
print "Count completed in {} seconds".format(round(tt,3))
There are 97278 'normal' interactions
Count completed in 5.951 seconds

Remember from notebook 1 that we have a total of 494021 in our 10 percent dataset. Here we can see that 97278 contain the normal. tag word.

Notice that we have measured the elapsed time for counting the elements in the RDD. We have done this because we wanted to point out that actual (distributed) computations in Spark take place when we execute actions and not transformations. In this case count is the action we execute on the RDD. We can apply as many transformations as we want on a our RDD and no computation will take place until we call the first action that, in this case takes a few seconds to complete.

The map transformation

By using the map transformation in Spark, we can apply a function to every element in our RDD. Python's lambdas are specially expressive for this particular.

In this case we want to read our data file as a CSV formatted one. We can do this by applying a lambda function to each element in the RDD as follows.

In [4]:
from pprint import pprint
csv_data = raw_data.map(lambda x: x.split(","))
t0 = time()
head_rows = csv_data.take(5)
tt = time() - t0
print "Parse completed in {} seconds".format(round(tt,3))
pprint(head_rows[0])
Parse completed in 1.715 seconds
[u'0',
 u'tcp',
 u'http',
 u'SF',
 u'181',
 u'5450',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'1',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'8',
 u'8',
 u'0.00',
 u'0.00',
 u'0.00',
 u'0.00',
 u'1.00',
 u'0.00',
 u'0.00',
 u'9',
 u'9',
 u'1.00',
 u'0.00',
 u'0.11',
 u'0.00',
 u'0.00',
 u'0.00',
 u'0.00',
 u'0.00',
 u'normal.']

Again, all action happens once we call the first Spark action (i.e. take in this case). What if we take a lot of elements instead of just the first few?

In [5]:
t0 = time()
head_rows = csv_data.take(100000)
tt = time() - t0
print "Parse completed in {} seconds".format(round(tt,3))
Parse completed in 8.629 seconds

We can see that it takes longer. The map function is applied now in a distributed way to a lot of elements on the RDD, hence the longer execution time.

Using map and predefined functions

Of course we can use predefined functions with map. Imagine we want to have each element in the RDD as a key-value pair where the key is the tag (e.g. normal) and the value is the whole list of elements that represents the row in the CSV formatted file. We could proceed as follows.

In [6]:
def parse_interaction(line):
    elems = line.split(",")
    tag = elems[41]
    return (tag, elems)

key_csv_data = raw_data.map(parse_interaction)
head_rows = key_csv_data.take(5)
pprint(head_rows[0])
(u'normal.',
 [u'0',
  u'tcp',
  u'http',
  u'SF',
  u'181',
  u'5450',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'1',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'8',
  u'8',
  u'0.00',
  u'0.00',
  u'0.00',
  u'0.00',
  u'1.00',
  u'0.00',
  u'0.00',
  u'9',
  u'9',
  u'1.00',
  u'0.00',
  u'0.11',
  u'0.00',
  u'0.00',
  u'0.00',
  u'0.00',
  u'0.00',
  u'normal.'])

That was easy, wasn't it?

In our notebook about working with key-value pairs we will use this type of RDDs to do data aggregations (e.g. count by key).

The collect action

So far we have used the actions count and take. Another basic action we need to learn is collect. Basically it will get all the elements in the RDD into memory for us to work with them. For this reason it has to be used with care, specially when working with large RDDs.

An example using our raw data.

In [9]:
t0 = time()
all_raw_data = raw_data.collect()
tt = time() - t0
print "Data collected in {} seconds".format(round(tt,3))
Data collected in 17.927 seconds

That took longer as any other action we used before, of course. Every Spark worker node that has a fragment of the RDD has to be coordinated in order to retrieve its part, and then reduce everything together.

As a last example combining all the previous, we want to collect all the normal interactions as key-value pairs.

In [13]:
# get data from file
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

# parse into key-value pairs
key_csv_data = raw_data.map(parse_interaction)

# filter normal key interactions
normal_key_interactions = key_csv_data.filter(lambda x: x[0] == "normal.")

# collect all
t0 = time()
all_normal = normal_key_interactions.collect()
tt = time() - t0
normal_count = len(all_normal)
print "Data collected in {} seconds".format(round(tt,3))
print "There are {} 'normal' interactions".format(normal_count)
Data collected in 12.485 seconds
There are 97278 normal interactions

This count matches with the previous count for normal interactions. The new procedure is more time consuming. This is because we retrieve all the data with collect and then use Python's len on the resulting list. Before we were just counting the total number of elements in the RDD by using count.

Sampling RDDs

So far we have introduced RDD creation together with some basic transformations such as map and filter and some actions such as count, take, and collect.

This notebook will show how to sample RDDs. Regarding transformations, sample will be introduced since it will be useful in many statistical learning scenarios. Then we will compare results with the takeSample action.

Getting the data and creating the RDD

In this case we will use the complete dataset provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.

In [1]:
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz")

Now we can use this file to create our RDD.

In [2]:
data_file = "./kddcup.data.gz"
raw_data = sc.textFile(data_file)

Sampling RDDs

In Spark, there are two sampling operations, the transformation sample and the action takeSample. By using a transformation we can tell Spark to apply successive transformation on a sample of a given RDD. By using an action we retrieve a given sample and we can have it in local memory to be used by any other standard library (e.g. Scikit-learn).

The sample transformation

The sample transformation takes up to three parameters. First is whether the sampling is done with replacement or not. Second is the sample size as a fraction. Finally we can optionally provide a random seed.

In [3]:
raw_data_sample = raw_data.sample(False, 0.1, 1234)
sample_size = raw_data_sample.count()
total_size = raw_data.count()
print "Sample size is {} of {}".format(sample_size, total_size)
Sample size is 489957 of 4898431

But the power of sampling as a transformation comes from doing it as part of a sequence of additional transformations. This will show more powerful once we start doing aggregations and key-value pairs operations, and will be specially useful when using Spark's machine learning library MLlib.

In the meantime, imagine we want to have an approximation of the proportion of normal. interactions in our dataset. We could do this by counting the total number of tags as we did in previous notebooks. However we want a quicker response and we don't need the exact answer but just an approximation. We can do it as follows.

In [4]:
from time import time

# transformations to be applied
raw_data_sample_items = raw_data_sample.map(lambda x: x.split(","))
sample_normal_tags = raw_data_sample_items.filter(lambda x: "normal." in x)

# actions + time
t0 = time()
sample_normal_tags_count = sample_normal_tags.count()
tt = time() - t0

sample_normal_ratio = sample_normal_tags_count / float(sample_size)
print "The ratio of 'normal' interactions is {}".format(round(sample_normal_ratio,3)) 
print "Count done in {} seconds".format(round(tt,3))
The ratio of 'normal' interactions is 0.199
Count done in 44.523 seconds

Let's compare this with calculating the ratio without sampling.

In [5]:
# transformations to be applied
raw_data_items = raw_data.map(lambda x: x.split(","))
normal_tags = raw_data_items.filter(lambda x: "normal." in x)

# actions + time
t0 = time()
normal_tags_count = normal_tags.count()
tt = time() - t0

normal_ratio = normal_tags_count / float(total_size)
print "The ratio of 'normal' interactions is {}".format(round(normal_ratio,3)) 
print "Count done in {} seconds".format(round(tt,3))
The ratio of 'normal' interactions is 0.199
Count done in 91.09 seconds

We can see a gain in time. The more transformations we apply after the sampling the bigger this gain. This is because without sampling all the transformations are applied to the complete set of data.

The takeSample action

If what we need is to grab a sample of raw data from our RDD into local memory in order to be used by other non-Spark libraries, takeSample can be used.

The syntax is very similar, but in this case we specify the number of items instead of the sample size as a fraction of the complete data size.

In [6]:
t0 = time()
raw_data_sample = raw_data.takeSample(False, 400000, 1234)
normal_data_sample = [x.split(",") for x in raw_data_sample if "normal." in x]
tt = time() - t0

normal_sample_size = len(normal_data_sample)

normal_ratio = normal_sample_size / 400000.0
print "The ratio of 'normal' interactions is {}".format(normal_ratio)
print "Count done in {} seconds".format(round(tt,3))
The ratio of 'normal' interactions is 0.1988025
Count done in 76.166 seconds

The process was very similar as before. We obtained a sample of about 10 percent of the data, and then filter and split.

However, it took longer, even with a slightly smaller sample. The reason is that Spark just distributed the execution of the sampling process. The filtering and splitting of the results were done locally in a single node.

Set operations on RDDs

Spark supports many of the operations we have in mathematical sets, such as union and intersection, even when the RDDs themselves are not properly sets. It is important to note that these operations require that the RDDs being operated on are of the same type.

Set operations are quite straightforward to understand as it work as expected. The only consideration comes from the fact that RDDs are not real sets, and therefore operations such as the union of RDDs doesn't remove duplicates. In this notebook we will have a brief look at subtract, distinct, and cartesian.

Getting the data and creating the RDD

As we did in our first notebook, we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.

In [1]:
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
In [2]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

Getting attack interactions using subtract

For illustrative purposes, imagine we already have our RDD with non attack (normal) interactions from some previous analysis.

In [3]:
normal_raw_data = raw_data.filter(lambda x: "normal." in x)

We can obtain attack interactions by subtracting normal ones from the original unfiltered RDD as follows.

In [4]:
attack_raw_data = raw_data.subtract(normal_raw_data)

Let's do some counts to check our results.

In [5]:
from time import time

# count all
t0 = time()
raw_data_count = raw_data.count()
tt = time() - t0
print "All count in {} secs".format(round(tt,3))
All count in 5.261 secs
In [6]:
# count normal
t0 = time()
normal_raw_data_count = normal_raw_data.count()
tt = time() - t0
print "Normal count in {} secs".format(round(tt,3))
Normal count in 5.571 secs
In [7]:
# count attacks
t0 = time()
attack_raw_data_count = attack_raw_data.count()
tt = time() - t0
print "Attack count in {} secs".format(round(tt,3))
Attack count in 12.075 secs
In [8]:
print "There are {} normal interactions and {} attacks, \
from a total of {} interactions".format(normal_raw_data_count,attack_raw_data_count,raw_data_count)
There are 97278 normal interactions and 396743 attacks, from a total of 494021 interactions

So now we have two RDDs, one with normal interactions and another one with attacks.

Protocol and service combinations using cartesian

We can compute the Cartesian product between two RDDs by using the cartesian transformation. It returns all possible pairs of elements between two RDDs. In our case we will use it to generate all the possible combinations between service and protocol in our network interactions.

First of all we need to isolate each collection of values in two separate RDDs. For that we will use distinct on the CSV-parsed dataset. From the dataset description we know that protocol is the second column and service is the third (tag is the last one and not the first as appears in the page).

So first, let's get the protocols.

In [9]:
csv_data = raw_data.map(lambda x: x.split(","))
protocols = csv_data.map(lambda x: x[1]).distinct()
protocols.collect()
Out[9]:
[u'udp', u'icmp', u'tcp']

Now we do the same for services.

In [10]:
services = csv_data.map(lambda x: x[2]).distinct()
services.collect()
Out[10]:
[u'domain',
 u'http_443',
 u'Z39_50',
 u'smtp',
 u'urp_i',
 u'private',
 u'echo',
 u'shell',
 u'red_i',
 u'eco_i',
 u'sunrpc',
 u'ftp_data',
 u'urh_i',
 u'pm_dump',
 u'pop_3',
 u'pop_2',
 u'systat',
 u'ftp',
 u'uucp',
 u'whois',
 u'netbios_dgm',
 u'efs',
 u'remote_job',
 u'daytime',
 u'ntp_u',
 u'finger',
 u'ldap',
 u'netbios_ns',
 u'kshell',
 u'iso_tsap',
 u'ecr_i',
 u'nntp',
 u'printer',
 u'domain_u',
 u'uucp_path',
 u'courier',
 u'exec',
 u'time',
 u'netstat',
 u'telnet',
 u'gopher',
 u'rje',
 u'sql_net',
 u'link',
 u'auth',
 u'netbios_ssn',
 u'csnet_ns',
 u'X11',
 u'IRC',
 u'tftp_u',
 u'login',
 u'supdup',
 u'name',
 u'nnsp',
 u'mtp',
 u'http',
 u'bgp',
 u'ctf',
 u'hostnames',
 u'klogin',
 u'vmnet',
 u'tim_i',
 u'discard',
 u'imap4',
 u'other',
 u'ssh']

A longer list in this case.

Now we can do the cartesian product.

In [11]:
product = protocols.cartesian(services).collect()
print "There are {} combinations of protocol X service".format(len(product))
There are 198 combinations of protocol X service

Obviously, for such small RDDs doesn't really make sense to use Spark cartesian product. We could have perfectly collected the values after using distinct and do the cartesian product locally. Moreover, distinct and cartesian are expensive operations so they must be used with care when the operating datasets are large.

Data aggregations on RDDs

We can aggregate RDD data in Spark by using three different actions: reduce, fold, and aggregate. The last one is the more general one and someway includes the first two.

Getting the data and creating the RDD

As we did in our first notebook, we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million nework interactions. The file is provided as a Gzip file that we will download locally.

In [1]:
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
In [2]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

Inspecting interaction duration by tag

Both fold and reduce take a function as an argument that is applied to two elements of the RDD. The fold action differs from reduce in that it gets and additional initial zero value to be used for the initial call. This value should be the identity element for the function provided.

As an example, imagine we want to know the total duration of our interactions for normal and attack interactions. We can use reduce as follows.

In [3]:
# parse data
csv_data = raw_data.map(lambda x: x.split(","))

# separate into different RDDs
normal_csv_data = csv_data.filter(lambda x: x[41]=="normal.")
attack_csv_data = csv_data.filter(lambda x: x[41]!="normal.")

The function that we pass to reduce gets and returns elements of the same type of the RDD. If we want to sum durations we need to extract that element into a new RDD.

In [4]:
normal_duration_data = normal_csv_data.map(lambda x: int(x[0]))
attack_duration_data = attack_csv_data.map(lambda x: int(x[0]))

Now we can reduce these new RDDs.

In [5]:
total_normal_duration = normal_duration_data.reduce(lambda x, y: x + y)
total_attack_duration = attack_duration_data.reduce(lambda x, y: x + y)

print "Total duration for 'normal' interactions is {}".\
    format(total_normal_duration)
print "Total duration for 'attack' interactions is {}".\
    format(total_attack_duration)
Total duration for 'normal' interactions is 21075991
Total duration for 'attack' interactions is 2626792

We can go further and use counts to calculate duration means.

In [6]:
normal_count = normal_duration_data.count()
attack_count = attack_duration_data.count()

print "Mean duration for 'normal' interactions is {}".\
    format(round(total_normal_duration/float(normal_count),3))
print "Mean duration for 'attack' interactions is {}".\
    format(round(total_attack_duration/float(attack_count),3))
Mean duration for 'normal' interactions is 216.657
Mean duration for 'attack' interactions is 6.621

We have a first (and too simplistic) approach to identify attack interactions.

A better way, using aggregate

The aggregate action frees us from the constraint of having the return be the same type as the RDD we are working on. Like with fold, we supply an initial zero value of the type we want to return. Then we provide two functions. The first one is used to combine the elements from our RDD with the accumulator. The second function is needed to merge two accumulators. Let's see it in action calculating the mean we did before.

In [7]:
normal_sum_count = normal_duration_data.aggregate(
    (0,0), # the initial value
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators
)

print "Mean duration for 'normal' interactions is {}".\
    format(round(normal_sum_count[0]/float(normal_sum_count[1]),3))
Mean duration for 'normal' interactions is 216.657

In the previous aggregation, the accumulator first element keeps the total sum, while the second element keeps the count. Combining an accumulator with an RDD element consists in summing up the value and incrementing the count. Combining two accumulators requires just a pairwise sum.

We can do the same with attack type interactions.

In [8]:
attack_sum_count = attack_duration_data.aggregate(
    (0,0), # the initial value
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators
)

print "Mean duration for 'attack' interactions is {}".\
    format(round(attack_sum_count[0]/float(attack_sum_count[1]),3))
Mean duration for 'attack' interactions is 6.621

Working with key/value pair RDDs

Spark provides specific functions to deal with RDDs which elements are key/value pairs. They are usually used to perform aggregations and other processings by key.

In this notebook we will show how, by working with key/value pairs, we can process our network interactions dataset in a more practical and powerful way than that used in previous notebooks. Key/value pair aggregations will show to be particularly effective when trying to explore each type of tag in our network attacks, in an individual way.

Getting the data and creating the RDD

As we did in our first notebook, we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.

In [1]:
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
In [1]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

Creating a pair RDD for interaction types

In this notebook we want to do some exploratory data analysis on our network interactions dataset. More concretely we want to profile each network interaction type in terms of some of its variables such as duration. In order to do so, we first need to create the RDD suitable for that, where each interaction is parsed as a CSV row representing the value, and is put together with its corresponding tag as a key.

Normally we create key/value pair RDDs by applying a function using map to the original data. This function returns the corresponding pair for a given RDD element. We can proceed as follows.

In [2]:
csv_data = raw_data.map(lambda x: x.split(","))
key_value_data = csv_data.map(lambda x: (x[41], x)) # x[41] contains the network interaction tag

We have now our key/value pair data ready to be used. Let's get the first element in order to see how it looks like.

In [3]:
key_value_data.take(1)
Out[3]:
[(u'normal.',
  [u'0',
   u'tcp',
   u'http',
   u'SF',
   u'181',
   u'5450',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'1',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'8',
   u'8',
   u'0.00',
   u'0.00',
   u'0.00',
   u'0.00',
   u'1.00',
   u'0.00',
   u'0.00',
   u'9',
   u'9',
   u'1.00',
   u'0.00',
   u'0.11',
   u'0.00',
   u'0.00',
   u'0.00',
   u'0.00',
   u'0.00',
   u'normal.'])]

Data aggregations with key/value pair RDDs

We can use all the transformations and actions available for normal RDDs with key/value pair RDDs. We just need to make the functions work with pair elements. Additionally, Spark provides specific functions to work with RDDs containing pair elements. They are very similar to those available for general RDDs.

For example, we have a reduceByKey transformation that we can use as follows to calculate the total duration of each network interaction type.

In [4]:
key_value_duration = csv_data.map(lambda x: (x[41], float(x[0]))) 
durations_by_key = key_value_duration.reduceByKey(lambda x, y: x + y)

durations_by_key.collect()
Out[4]:
[(u'guess_passwd.', 144.0),
 (u'nmap.', 0.0),
 (u'warezmaster.', 301.0),
 (u'rootkit.', 1008.0),
 (u'warezclient.', 627563.0),
 (u'smurf.', 0.0),
 (u'pod.', 0.0),
 (u'neptune.', 0.0),
 (u'normal.', 21075991.0),
 (u'spy.', 636.0),
 (u'ftp_write.', 259.0),
 (u'phf.', 18.0),
 (u'portsweep.', 1991911.0),
 (u'teardrop.', 0.0),
 (u'buffer_overflow.', 2751.0),
 (u'land.', 0.0),
 (u'imap.', 72.0),
 (u'loadmodule.', 326.0),
 (u'perl.', 124.0),
 (u'multihop.', 1288.0),
 (u'back.', 284.0),
 (u'ipsweep.', 43.0),
 (u'satan.', 64.0)]

We have a specific counting action for key/value pairs.

In [5]:
counts_by_key = key_value_data.countByKey()
counts_by_key
Out[5]:
defaultdict(<type 'int'>, {u'guess_passwd.': 53, u'nmap.': 231, u'warezmaster.': 20, u'rootkit.': 10, u'warezclient.': 1020, u'smurf.': 280790, u'pod.': 264, u'neptune.': 107201, u'normal.': 97278, u'spy.': 2, u'ftp_write.': 8, u'phf.': 4, u'portsweep.': 1040, u'teardrop.': 979, u'buffer_overflow.': 30, u'land.': 21, u'imap.': 12, u'loadmodule.': 9, u'perl.': 3, u'multihop.': 7, u'back.': 2203, u'ipsweep.': 1247, u'satan.': 1589})

Using combineByKey

This is the most general of the per-key aggregation functions. Most of the other per-key combiners are implemented using it. We can think about it as the aggregate equivalent since it allows the user to return values that are not the same type as our input data.

For example, we can use it to calculate per-type average durations as follows.

In [6]:
sum_counts = key_value_duration.combineByKey(
    (lambda x: (x, 1)), # the initial value, with value x and count 1
    (lambda acc, value: (acc[0]+value, acc[1]+1)), # how to combine a pair value with the accumulator: sum value, and increment count
    (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])) # combine accumulators
)

sum_counts.collectAsMap()
Out[6]:
{u'back.': (284.0, 2203),
 u'buffer_overflow.': (2751.0, 30),
 u'ftp_write.': (259.0, 8),
 u'guess_passwd.': (144.0, 53),
 u'imap.': (72.0, 12),
 u'ipsweep.': (43.0, 1247),
 u'land.': (0.0, 21),
 u'loadmodule.': (326.0, 9),
 u'multihop.': (1288.0, 7),
 u'neptune.': (0.0, 107201),
 u'nmap.': (0.0, 231),
 u'normal.': (21075991.0, 97278),
 u'perl.': (124.0, 3),
 u'phf.': (18.0, 4),
 u'pod.': (0.0, 264),
 u'portsweep.': (1991911.0, 1040),
 u'rootkit.': (1008.0, 10),
 u'satan.': (64.0, 1589),
 u'smurf.': (0.0, 280790),
 u'spy.': (636.0, 2),
 u'teardrop.': (0.0, 979),
 u'warezclient.': (627563.0, 1020),
 u'warezmaster.': (301.0, 20)}

We can see that the arguments are pretty similar to those passed to aggregate in the previous notebook. The result associated to each type is in the form of a pair. If we want to actually get the averages, we need to do the division before collecting the results.

In [7]:
duration_means_by_type = sum_counts.map(lambda (key,value): (key, round(value[0]/value[1],3))).collectAsMap()

# Print them sorted
for tag in sorted(duration_means_by_type, key=duration_means_by_type.get, reverse=True):
    print tag, duration_means_by_type[tag]
portsweep. 1915.299
warezclient. 615.258
spy. 318.0
normal. 216.657
multihop. 184.0
rootkit. 100.8
buffer_overflow. 91.7
perl. 41.333
loadmodule. 36.222
ftp_write. 32.375
warezmaster. 15.05
imap. 6.0
phf. 4.5
guess_passwd. 2.717
back. 0.129
satan. 0.04
ipsweep. 0.034
nmap. 0.0
smurf. 0.0
pod. 0.0
neptune. 0.0
teardrop. 0.0
land. 0.0

A small step into understanding what makes a network interaction be considered an attack.

MLlib: Basic Statistics and Exploratory Data Analysis

So far we have used different map and aggregation functions, on simple and key/value pair RDD's, in order to get simple statistics that help us understand our datasets. In this notebook we will introduce Spark's machine learning library MLlib through its basic statistics functionality in order to better understand our dataset. We will use the reduced 10-percent KDD Cup 1999 datasets through the notebook.

Getting the data and creating the RDD

As we did in our first notebook, we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.

In [1]:
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
In [2]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

Local vectors

A local vector is often used as a base type for RDDs in Spark MLlib. A local vector has integer-typed and 0-based indices and double-typed values, stored on a single machine. MLlib supports two types of local vectors: dense and sparse. A dense vector is backed by a double array representing its entry values, while a sparse vector is backed by two parallel arrays: indices and values.

For dense vectors, MLlib uses either Python lists or the NumPy array type. The later is recommended, so you can simply pass NumPy arrays around.

For sparse vectors, users can construct a SparseVector object from MLlib or pass SciPy scipy.sparse column vectors if SciPy is available in their environment. The easiest way to create sparse vectors is to use the factory methods implemented in Vectors.

An RDD of dense vectors

Let's represent each network interaction in our dataset as a dense vector. For that we will use the NumPy array type.

In [3]:
import numpy as np

def parse_interaction(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    symbolic_indexes = [1,2,3,41]
    clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes]
    return np.array([float(x) for x in clean_line_split])

vector_data = raw_data.map(parse_interaction)

Summary statistics

Spark's MLlib provides column summary statistics for RDD[Vector] through the function colStats available in Statistics. The method returns an instance of MultivariateStatisticalSummary, which contains the column-wise max, min, mean, variance, and number of nonzeros, as well as the total count.

In [4]:
from pyspark.mllib.stat import Statistics 
from math import sqrt 

# Compute column summary statistics.
summary = Statistics.colStats(vector_data)

print "Duration Statistics:"
print " Mean: {}".format(round(summary.mean()[0],3))
print " St. deviation: {}".format(round(sqrt(summary.variance()[0]),3))
print " Max value: {}".format(round(summary.max()[0],3))
print " Min value: {}".format(round(summary.min()[0],3))
print " Total value count: {}".format(summary.count())
print " Number of non-zero values: {}".format(summary.numNonzeros()[0])
Duration Statistics:
 Mean: 47.979
 St. deviation: 707.746
 Max value: 58329.0
 Min value: 0.0
 Total value count: 494021
 Number of non-zero values: 12350.0

Summary statistics by label

The interesting part of summary statistics, in our case, comes from being able to obtain them by the type of network attack or 'label' in our dataset. By doing so we will be able to better characterise our dataset dependent variable in terms of the independent variables range of values.

If we want to do such a thing we could filter our RDD containing labels as keys and vectors as values. For that we just need to adapt our parse_interaction function to return a tuple with both elements.

In [5]:
def parse_interaction_with_key(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    symbolic_indexes = [1,2,3,41]
    clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes]
    return (line_split[41], np.array([float(x) for x in clean_line_split]))

label_vector_data = raw_data.map(parse_interaction_with_key)

The next step is not very sophisticated. We use filter on the RDD to leave out other labels but the one we want to gather statistics from.

In [6]:
normal_label_data = label_vector_data.filter(lambda x: x[0]=="normal.")

Now we can use the new RDD to call colStats on the values.

In [7]:
normal_summary = Statistics.colStats(normal_label_data.values())

And collect the results as we did before.

In [8]:
print "Duration Statistics for label: {}".format("normal")
print " Mean: {}".format(normal_summary.mean()[0],3)
print " St. deviation: {}".format(round(sqrt(normal_summary.variance()[0]),3))
print " Max value: {}".format(round(normal_summary.max()[0],3))
print " Min value: {}".format(round(normal_summary.min()[0],3))
print " Total value count: {}".format(normal_summary.count())
print " Number of non-zero values: {}".format(normal_summary.numNonzeros()[0])
Duration Statistics for label: normal
 Mean: 216.657322313
 St. deviation: 1359.213
 Max value: 58329.0
 Min value: 0.0
 Total value count: 97278
 Number of non-zero values: 11690.0

Instead of working with a key/value pair we could have just filter our raw data split using the label in column 41. Then we can parse the results as we did before. This will work as well. However having our data organised as key/value pairs will open the door to better manipulations. Since values() is a transformation on an RDD, and not an action, we don't perform any computation until we call colStats anyway.

But lets wrap this within a function so we can reuse it with any label.

In [9]:
def summary_by_label(raw_data, label):
    label_vector_data = raw_data.map(parse_interaction_with_key).filter(lambda x: x[0]==label)
    return Statistics.colStats(label_vector_data.values())

Let's give it a try with the "normal." label again.

In [10]:
normal_sum = summary_by_label(raw_data, "normal.")

print "Duration Statistics for label: {}".format("normal")
print " Mean: {}".format(normal_sum.mean()[0],3)
print " St. deviation: {}".format(round(sqrt(normal_sum.variance()[0]),3))
print " Max value: {}".format(round(normal_sum.max()[0],3))
print " Min value: {}".format(round(normal_sum.min()[0],3))
print " Total value count: {}".format(normal_sum.count())
print " Number of non-zero values: {}".format(normal_sum.numNonzeros()[0])
Duration Statistics for label: normal
 Mean: 216.657322313
 St. deviation: 1359.213
 Max value: 58329.0
 Min value: 0.0
 Total value count: 97278
 Number of non-zero values: 11690.0

Let's try now with some network attack. We have all of them listed here.

In [11]:
guess_passwd_summary = summary_by_label(raw_data, "guess_passwd.")

print "Duration Statistics for label: {}".format("guess_password")
print " Mean: {}".format(guess_passwd_summary.mean()[0],3)
print " St. deviation: {}".format(round(sqrt(guess_passwd_summary.variance()[0]),3))
print " Max value: {}".format(round(guess_passwd_summary.max()[0],3))
print " Min value: {}".format(round(guess_passwd_summary.min()[0],3))
print " Total value count: {}".format(guess_passwd_summary.count())
print " Number of non-zero values: {}".format(guess_passwd_summary.numNonzeros()[0])
Duration Statistics for label: guess_password
 Mean: 2.71698113208
 St. deviation: 11.88
 Max value: 60.0
 Min value: 0.0
 Total value count: 53
 Number of non-zero values: 4.0

We can see that this type of attack is shorter in duration than a normal interaction. We could build a table with duration statistics for each type of interaction in our dataset. First we need to get a list of labels as described in the first line here.

In [12]:
label_list = ["back.","buffer_overflow.","ftp_write.","guess_passwd.",
              "imap.","ipsweep.","land.","loadmodule.","multihop.",
              "neptune.","nmap.","normal.","perl.","phf.","pod.","portsweep.",
              "rootkit.","satan.","smurf.","spy.","teardrop.","warezclient.",
              "warezmaster."]

Then we get a list of statistics for each label.

In [13]:
stats_by_label = [(label, summary_by_label(raw_data, label)) for label in label_list]

Now we get the duration column, first in our dataset (i.e. index 0).

In [14]:
duration_by_label = [ 
    (stat[0], np.array([float(stat[1].mean()[0]), float(sqrt(stat[1].variance()[0])), float(stat[1].min()[0]), float(stat[1].max()[0]), int(stat[1].count())])) 
    for stat in stats_by_label]

That we can put into a Pandas data frame.

In [15]:
import pandas as pd
pd.set_option('display.max_columns', 50)

stats_by_label_df = pd.DataFrame.from_items(duration_by_label, columns=["Mean", "Std Dev", "Min", "Max", "Count"], orient='index')

And print it.

In [16]:
print "Duration statistics, by label"
stats_by_label_df
Duration statistics, by label
Out[16]:
Mean Std Dev Min Max Count
back. 0.128915 1.110062 0 14 2203
buffer_overflow. 91.700000 97.514685 0 321 30
ftp_write. 32.375000 47.449033 0 134 8
guess_passwd. 2.716981 11.879811 0 60 53
imap. 6.000000 14.174240 0 41 12
ipsweep. 0.034483 0.438439 0 7 1247
land. 0.000000 0.000000 0 0 21
loadmodule. 36.222222 41.408869 0 103 9
multihop. 184.000000 253.851006 0 718 7
neptune. 0.000000 0.000000 0 0 107201
nmap. 0.000000 0.000000 0 0 231
normal. 216.657322 1359.213469 0 58329 97278
perl. 41.333333 14.843629 25 54 3
phf. 4.500000 5.744563 0 12 4
pod. 0.000000 0.000000 0 0 264
portsweep. 1915.299038 7285.125159 0 42448 1040
rootkit. 100.800000 216.185003 0 708 10
satan. 0.040277 0.522433 0 11 1589
smurf. 0.000000 0.000000 0 0 280790
spy. 318.000000 26.870058 299 337 2
teardrop. 0.000000 0.000000 0 0 979
warezclient. 615.257843 2207.694966 0 15168 1020
warezmaster. 15.050000 33.385271 0 156 20

In order to reuse this code and get a dataframe from any variable in our dataset we will define a function.

In [17]:
def get_variable_stats_df(stats_by_label, column_i):
    column_stats_by_label = [
        (stat[0], np.array([float(stat[1].mean()[column_i]), float(sqrt(stat[1].variance()[column_i])), float(stat[1].min()[column_i]), float(stat[1].max()[column_i]), int(stat[1].count())])) 
        for stat in stats_by_label
    ]
    return pd.DataFrame.from_items(column_stats_by_label, columns=["Mean", "Std Dev", "Min", "Max", "Count"], orient='index')

Let's try for duration again.

In [18]:
get_variable_stats_df(stats_by_label,0)
Out[18]:
Mean Std Dev Min Max Count
back. 0.128915 1.110062 0 14 2203
buffer_overflow. 91.700000 97.514685 0 321 30
ftp_write. 32.375000 47.449033 0 134 8
guess_passwd. 2.716981 11.879811 0 60 53
imap. 6.000000 14.174240 0 41 12
ipsweep. 0.034483 0.438439 0 7 1247
land. 0.000000 0.000000 0 0 21
loadmodule. 36.222222 41.408869 0 103 9
multihop. 184.000000 253.851006 0 718 7
neptune. 0.000000 0.000000 0 0 107201
nmap. 0.000000 0.000000 0 0 231
normal. 216.657322 1359.213469 0 58329 97278
perl. 41.333333 14.843629 25 54 3
phf. 4.500000 5.744563 0 12 4
pod. 0.000000 0.000000 0 0 264
portsweep. 1915.299038 7285.125159 0 42448 1040
rootkit. 100.800000 216.185003 0 708 10
satan. 0.040277 0.522433 0 11 1589
smurf. 0.000000 0.000000 0 0 280790
spy. 318.000000 26.870058 299 337 2
teardrop. 0.000000 0.000000 0 0 979
warezclient. 615.257843 2207.694966 0 15168 1020
warezmaster. 15.050000 33.385271 0 156 20

Now for the next numeric column in the dataset, src_bytes.

In [19]:
print "src_bytes statistics, by label"
get_variable_stats_df(stats_by_label,1)
src_bytes statistics, by label
Out[19]:
Mean Std Dev Min Max Count
back. 54156.355878 3159.360232 13140 54540 2203
buffer_overflow. 1400.433333 1337.132616 0 6274 30
ftp_write. 220.750000 267.747616 0 676 8
guess_passwd. 125.339623 3.037860 104 126 53
imap. 347.583333 629.926036 0 1492 12
ipsweep. 10.083400 5.231658 0 18 1247
land. 0.000000 0.000000 0 0 21
loadmodule. 151.888889 127.745298 0 302 9
multihop. 435.142857 540.960389 0 1412 7
neptune. 0.000000 0.000000 0 0 107201
nmap. 24.116883 59.419871 0 207 231
normal. 1157.047524 34226.124718 0 2194619 97278
perl. 265.666667 4.932883 260 269 3
phf. 51.000000 0.000000 51 51 4
pod. 1462.651515 125.098044 564 1480 264
portsweep. 666707.436538 21500665.866700 0 693375640 1040
rootkit. 294.700000 538.578180 0 1727 10
satan. 1.337319 42.946200 0 1710 1589
smurf. 935.772300 200.022386 520 1032 280790
spy. 174.500000 88.388348 112 237 2
teardrop. 28.000000 0.000000 28 28 979
warezclient. 300219.562745 1200905.243130 30 5135678 1020
warezmaster. 49.300000 212.155132 0 950 20

And so on. By reusing the summary_by_label and get_variable_stats_df functions we can perform some exploratory data analysis in large datasets with Spark.

Correlations

Spark's MLlib supports Pearson’s and Spearman’s to calculate pairwise correlation methods among many series. Both of them are provided by the corr method in the Statistics package.

We have two options as input. Either two RDD[Double]s or an RDD[Vector]. In the first case the output will be a Double value, while in the second a whole correlation Matrix. Due to the nature of our data, we will obtain the second.

In [20]:
from pyspark.mllib.stat import Statistics 
correlation_matrix = Statistics.corr(vector_data, method="spearman")

Once we have the correlations ready, we can start inspecting their values.

In [21]:
import pandas as pd
pd.set_option('display.max_columns', 50)

col_names = ["duration","src_bytes","dst_bytes","land","wrong_fragment",
             "urgent","hot","num_failed_logins","logged_in","num_compromised",
             "root_shell","su_attempted","num_root","num_file_creations",
             "num_shells","num_access_files","num_outbound_cmds",
             "is_hot_login","is_guest_login","count","srv_count","serror_rate",
             "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
             "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count",
             "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate",
             "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate",
             "dst_host_rerror_rate","dst_host_srv_rerror_rate"]

corr_df = pd.DataFrame(correlation_matrix, index=col_names, columns=col_names)

corr_df
Out[21]:
duration src_bytes dst_bytes land wrong_fragment urgent hot num_failed_logins logged_in num_compromised root_shell su_attempted num_root num_file_creations num_shells num_access_files num_outbound_cmds is_hot_login is_guest_login count srv_count serror_rate srv_serror_rate rerror_rate srv_rerror_rate same_srv_rate diff_srv_rate srv_diff_host_rate dst_host_count dst_host_srv_count dst_host_same_srv_rate dst_host_diff_srv_rate dst_host_same_src_port_rate dst_host_srv_diff_host_rate dst_host_serror_rate dst_host_srv_serror_rate dst_host_rerror_rate dst_host_srv_rerror_rate
duration 1.000000 0.014196 0.299189 -0.001068 -0.008025 0.017883 0.108639 0.014363 0.159564 0.010687 0.040425 0.026015 0.013401 0.061099 0.008632 0.019407 -0.000019 -0.000010 0.205606 -0.259032 -0.250139 -0.074211 -0.073663 -0.025936 -0.026420 0.062291 -0.050875 0.123621 -0.161107 -0.217167 -0.211979 0.231644 -0.065202 0.100692 -0.056753 -0.057298 -0.007759 -0.013891
src_bytes 0.014196 1.000000 -0.167931 -0.009404 -0.019358 0.000094 0.113920 -0.008396 -0.089702 0.118562 0.003067 0.002282 -0.002050 0.027710 0.014403 -0.001497 0.000010 0.000019 0.027511 0.666230 0.722609 -0.657460 -0.652391 -0.342180 -0.332977 0.744046 -0.739988 -0.104042 0.130377 0.741979 0.729151 -0.712965 0.815039 -0.140231 -0.645920 -0.641792 -0.297338 -0.300581
dst_bytes 0.299189 -0.167931 1.000000 -0.003040 -0.022659 0.007234 0.193156 0.021952 0.882185 0.169772 0.026054 0.012192 -0.003884 0.034154 -0.000054 0.065776 -0.000031 0.000041 0.085947 -0.639157 -0.497683 -0.205848 -0.198715 -0.100958 -0.081307 0.229677 -0.222572 0.521003 -0.611972 0.024124 0.055033 -0.035073 -0.396195 0.578557 -0.167047 -0.158378 -0.003042 0.001621
land -0.001068 -0.009404 -0.003040 1.000000 -0.000333 -0.000065 -0.000539 -0.000076 -0.002785 -0.000447 -0.000093 -0.000049 -0.000230 -0.000150 -0.000076 -0.000211 -0.002881 0.002089 -0.000250 -0.010939 -0.010128 0.014160 0.014342 -0.000451 -0.001690 0.002153 -0.001846 0.020678 -0.019923 -0.012341 0.002576 -0.001803 0.004265 0.016171 0.013566 0.012265 0.000389 -0.001816
wrong_fragment -0.008025 -0.019358 -0.022659 -0.000333 1.000000 -0.000150 -0.004042 -0.000568 -0.020911 -0.003370 -0.000528 -0.000248 -0.001727 -0.001160 -0.000507 -0.001519 -0.000147 0.000441 -0.001869 -0.057711 -0.029117 -0.008849 -0.023382 0.000430 -0.012676 0.010218 -0.009386 0.012117 -0.029149 -0.058225 -0.049560 0.055542 -0.015449 0.007306 0.010387 -0.024117 0.046656 -0.013666
urgent 0.017883 0.000094 0.007234 -0.000065 -0.000150 1.000000 0.008594 0.063009 0.006821 0.031765 0.067437 0.000020 0.061994 0.061383 -0.000066 0.023380 0.012879 0.005162 -0.000100 -0.004778 -0.004799 -0.001338 -0.001327 -0.000705 -0.000726 0.001521 -0.001522 -0.000788 -0.005894 -0.005698 -0.004078 0.005208 -0.001939 -0.000976 -0.001381 -0.001370 -0.000786 -0.000782
hot 0.108639 0.113920 0.193156 -0.000539 -0.004042 0.008594 1.000000 0.112560 0.189126 0.811529 0.101983 -0.000400 0.003096 0.028694 0.009146 0.004224 -0.000393 -0.000248 0.463706 -0.120847 -0.114735 -0.035487 -0.034934 0.013468 0.052003 0.041342 -0.040555 0.032141 -0.074178 -0.017960 0.018783 -0.017198 -0.086998 -0.014141 -0.004706 -0.010721 0.199019 0.189142
num_failed_logins 0.014363 -0.008396 0.021952 -0.000076 -0.000568 0.063009 0.112560 1.000000 -0.002190 0.004619 0.016895 0.072748 0.010060 0.015211 -0.000093 0.005581 0.003431 -0.001560 -0.000428 -0.018024 -0.018027 -0.003674 -0.004027 0.035324 0.034876 0.005716 -0.005538 -0.003096 -0.028369 -0.015092 0.003004 -0.002960 -0.006617 -0.002588 0.014713 0.014914 0.032395 0.032151
logged_in 0.159564 -0.089702 0.882185 -0.002785 -0.020911 0.006821 0.189126 -0.002190 1.000000 0.161190 0.025293 0.011813 0.082533 0.055530 0.024354 0.072698 0.000079 0.000127 0.089318 -0.578287 -0.438947 -0.187114 -0.180122 -0.091962 -0.072287 0.216969 -0.214019 0.503807 -0.682721 0.080352 0.114526 -0.093565 -0.359506 0.659078 -0.143283 -0.132474 0.007236 0.012979
num_compromised 0.010687 0.118562 0.169772 -0.000447 -0.003370 0.031765 0.811529 0.004619 0.161190 1.000000 0.085558 0.048985 0.028557 0.031223 0.011256 0.006977 0.001048 -0.000438 -0.002504 -0.097212 -0.091154 -0.030516 -0.030264 0.008573 0.054006 0.035253 -0.034953 0.036497 -0.041615 0.003465 0.038980 -0.039091 -0.078843 -0.020979 -0.005019 -0.004504 0.214115 0.217858
root_shell 0.040425 0.003067 0.026054 -0.000093 -0.000528 0.067437 0.101983 0.016895 0.025293 0.085558 1.000000 0.233486 0.094512 0.140650 0.132056 0.069353 0.011462 -0.006602 -0.000405 -0.016409 -0.015174 -0.004952 -0.004923 -0.001104 -0.001143 0.004946 -0.004553 0.002286 -0.021367 -0.011906 0.000515 -0.000916 -0.004617 0.008631 -0.003498 -0.003032 0.002763 0.002151
su_attempted 0.026015 0.002282 0.012192 -0.000049 -0.000248 0.000020 -0.000400 0.072748 0.011813 0.048985 0.233486 1.000000 0.119326 0.053110 0.040487 0.081272 -0.018896 0.012927 -0.000219 -0.008279 -0.008225 -0.002318 -0.002295 -0.001227 -0.001253 0.002634 -0.002649 0.000348 -0.006697 -0.006288 -0.005738 0.006687 -0.005020 0.001052 0.001974 0.002893 0.003173 0.001731
num_root 0.013401 -0.002050 -0.003884 -0.000230 -0.001727 0.061994 0.003096 0.010060 0.082533 0.028557 0.094512 0.119326 1.000000 0.047521 0.034405 0.014513 0.001524 -0.002585 -0.001281 -0.054721 -0.053530 -0.016031 -0.015936 -0.008610 -0.008708 0.013881 -0.011337 0.006316 -0.078717 -0.038689 -0.038935 0.047414 -0.015968 0.061030 -0.008457 -0.007096 -0.000421 -0.005012
num_file_creations 0.061099 0.027710 0.034154 -0.000150 -0.001160 0.061383 0.028694 0.015211 0.055530 0.031223 0.140650 0.053110 0.047521 1.000000 0.068660 0.031042 -0.004081 -0.001664 0.013242 -0.036467 -0.034598 -0.009703 -0.010390 -0.005069 -0.004775 0.009784 -0.008711 0.014412 -0.049529 -0.026890 -0.021731 0.027092 -0.015018 0.030590 -0.002257 -0.004295 0.000626 -0.001096
num_shells 0.008632 0.014403 -0.000054 -0.000076 -0.000507 -0.000066 0.009146 -0.000093 0.024354 0.011256 0.132056 0.040487 0.034405 0.068660 1.000000 0.019438 -0.002592 -0.006631 -0.000405 -0.013938 -0.011784 -0.004343 -0.004740 -0.002541 -0.002572 0.004282 -0.003743 0.001096 -0.021200 -0.012017 -0.009962 0.010761 -0.003521 0.015882 -0.001588 -0.002357 -0.000617 -0.002020
num_access_files 0.019407 -0.001497 0.065776 -0.000211 -0.001519 0.023380 0.004224 0.005581 0.072698 0.006977 0.069353 0.081272 0.014513 0.031042 0.019438 1.000000 -0.001597 -0.002850 0.002466 -0.045282 -0.040497 -0.013945 -0.013572 -0.007581 0.001874 0.015499 -0.015112 0.024266 -0.023865 -0.023657 -0.021358 0.026703 -0.033288 0.011765 -0.011197 -0.011487 -0.004743 -0.004552
num_outbound_cmds -0.000019 0.000010 -0.000031 -0.002881 -0.000147 0.012879 -0.000393 0.003431 0.000079 0.001048 0.011462 -0.018896 0.001524 -0.004081 -0.002592 -0.001597 1.000000 0.822890 0.000924 -0.000076 0.000100 0.000167 0.000209 0.000536 0.000346 0.000208 0.000328 -0.000141 -0.000424 -0.000280 -0.000503 -0.000181 -0.000455 0.000288 -0.000011 -0.000372 -0.000823 -0.001038
is_hot_login -0.000010 0.000019 0.000041 0.002089 0.000441 0.005162 -0.000248 -0.001560 0.000127 -0.000438 -0.006602 0.012927 -0.002585 -0.001664 -0.006631 -0.002850 0.822890 1.000000 0.001512 0.000036 0.000064 0.000102 -0.000302 -0.000550 0.000457 -0.000159 -0.000235 -0.000360 -0.000106 0.000206 0.000229 -0.000004 0.000283 0.000538 -0.000076 -0.000007 -0.000435 -0.000529
is_guest_login 0.205606 0.027511 0.085947 -0.000250 -0.001869 -0.000100 0.463706 -0.000428 0.089318 -0.002504 -0.000405 -0.000219 -0.001281 0.013242 -0.000405 0.002466 0.000924 0.001512 1.000000 -0.062340 -0.062713 -0.017343 -0.017240 -0.008867 -0.009193 0.018042 -0.017000 -0.008878 -0.055453 -0.044366 -0.041749 0.044640 -0.038092 -0.012578 -0.001066 -0.016885 0.025282 -0.004292
count -0.259032 0.666230 -0.639157 -0.010939 -0.057711 -0.004778 -0.120847 -0.018024 -0.578287 -0.097212 -0.016409 -0.008279 -0.054721 -0.036467 -0.013938 -0.045282 -0.000076 0.000036 -0.062340 1.000000 0.950587 -0.303538 -0.308923 -0.213824 -0.221352 0.346718 -0.361737 -0.384010 0.547443 0.586979 0.539698 -0.546869 0.776906 -0.496554 -0.331571 -0.335290 -0.261194 -0.256176
srv_count -0.250139 0.722609 -0.497683 -0.010128 -0.029117 -0.004799 -0.114735 -0.018027 -0.438947 -0.091154 -0.015174 -0.008225 -0.053530 -0.034598 -0.011784 -0.040497 0.000100 0.000064 -0.062713 0.950587 1.000000 -0.428185 -0.421424 -0.281468 -0.284034 0.517227 -0.511998 -0.239057 0.442611 0.720746 0.681955 -0.673916 0.812280 -0.391712 -0.449096 -0.442823 -0.313442 -0.308132
serror_rate -0.074211 -0.657460 -0.205848 0.014160 -0.008849 -0.001338 -0.035487 -0.003674 -0.187114 -0.030516 -0.004952 -0.002318 -0.016031 -0.009703 -0.004343 -0.013945 0.000167 0.000102 -0.017343 -0.303538 -0.428185 1.000000 0.990888 -0.091157 -0.095285 -0.851915 0.828012 -0.121489 0.165350 -0.724317 -0.745745 0.719708 -0.650336 -0.153568 0.973947 0.965663 -0.103198 -0.105434
srv_serror_rate -0.073663 -0.652391 -0.198715 0.014342 -0.023382 -0.001327 -0.034934 -0.004027 -0.180122 -0.030264 -0.004923 -0.002295 -0.015936 -0.010390 -0.004740 -0.013572 0.000209 -0.000302 -0.017240 -0.308923 -0.421424 0.990888 1.000000 -0.110664 -0.115286 -0.839315 0.815305 -0.112222 0.160322 -0.713313 -0.734334 0.707753 -0.646256 -0.148072 0.967214 0.970617 -0.122630 -0.124656
rerror_rate -0.025936 -0.342180 -0.100958 -0.000451 0.000430 -0.000705 0.013468 0.035324 -0.091962 0.008573 -0.001104 -0.001227 -0.008610 -0.005069 -0.002541 -0.007581 0.000536 -0.000550 -0.008867 -0.213824 -0.281468 -0.091157 -0.110664 1.000000 0.978813 -0.327986 0.345571 -0.017902 -0.067857 -0.330391 -0.303126 0.308722 -0.278465 0.073061 -0.094076 -0.110646 0.910225 0.911622
srv_rerror_rate -0.026420 -0.332977 -0.081307 -0.001690 -0.012676 -0.000726 0.052003 0.034876 -0.072287 0.054006 -0.001143 -0.001253 -0.008708 -0.004775 -0.002572 0.001874 0.000346 0.000457 -0.009193 -0.221352 -0.284034 -0.095285 -0.115286 0.978813 1.000000 -0.316568 0.333439 0.011285 -0.072595 -0.323032 -0.294328 0.300186 -0.282239 0.075178 -0.096146 -0.114341 0.904591 0.914904
same_srv_rate 0.062291 0.744046 0.229677 0.002153 0.010218 0.001521 0.041342 0.005716 0.216969 0.035253 0.004946 0.002634 0.013881 0.009784 0.004282 0.015499 0.000208 -0.000159 0.018042 0.346718 0.517227 -0.851915 -0.839315 -0.327986 -0.316568 1.000000 -0.982109 0.140660 -0.190121 0.848754 0.873551 -0.844537 0.732841 0.179040 -0.830067 -0.819335 -0.282487 -0.282913
diff_srv_rate -0.050875 -0.739988 -0.222572 -0.001846 -0.009386 -0.001522 -0.040555 -0.005538 -0.214019 -0.034953 -0.004553 -0.002649 -0.011337 -0.008711 -0.003743 -0.015112 0.000328 -0.000235 -0.017000 -0.361737 -0.511998 0.828012 0.815305 0.345571 0.333439 -0.982109 1.000000 -0.138293 0.185942 -0.844028 -0.868580 0.850911 -0.727031 -0.176930 0.807205 0.795844 0.299041 0.298904
srv_diff_host_rate 0.123621 -0.104042 0.521003 0.020678 0.012117 -0.000788 0.032141 -0.003096 0.503807 0.036497 0.002286 0.000348 0.006316 0.014412 0.001096 0.024266 -0.000141 -0.000360 -0.008878 -0.384010 -0.239057 -0.121489 -0.112222 -0.017902 0.011285 0.140660 -0.138293 1.000000 -0.445051 0.035010 0.068648 -0.050472 -0.222707 0.433173 -0.097973 -0.092661 0.022585 0.024722
dst_host_count -0.161107 0.130377 -0.611972 -0.019923 -0.029149 -0.005894 -0.074178 -0.028369 -0.682721 -0.041615 -0.021367 -0.006697 -0.078717 -0.049529 -0.021200 -0.023865 -0.000424 -0.000106 -0.055453 0.547443 0.442611 0.165350 0.160322 -0.067857 -0.072595 -0.190121 0.185942 -0.445051 1.000000 0.022731 -0.070448 0.044338 0.189876 -0.918894 0.123881 0.113845 -0.125142 -0.125273
dst_host_srv_count -0.217167 0.741979 0.024124 -0.012341 -0.058225 -0.005698 -0.017960 -0.015092 0.080352 0.003465 -0.011906 -0.006288 -0.038689 -0.026890 -0.012017 -0.023657 -0.000280 0.000206 -0.044366 0.586979 0.720746 -0.724317 -0.713313 -0.330391 -0.323032 0.848754 -0.844028 0.035010 0.022731 1.000000 0.970072 -0.955178 0.769481 0.043668 -0.722607 -0.708392 -0.312040 -0.300787
dst_host_same_srv_rate -0.211979 0.729151 0.055033 0.002576 -0.049560 -0.004078 0.018783 0.003004 0.114526 0.038980 0.000515 -0.005738 -0.038935 -0.021731 -0.009962 -0.021358 -0.000503 0.000229 -0.041749 0.539698 0.681955 -0.745745 -0.734334 -0.303126 -0.294328 0.873551 -0.868580 0.068648 -0.070448 0.970072 1.000000 -0.980245 0.771158 0.107926 -0.742045 -0.725272 -0.278068 -0.264383
dst_host_diff_srv_rate 0.231644 -0.712965 -0.035073 -0.001803 0.055542 0.005208 -0.017198 -0.002960 -0.093565 -0.039091 -0.000916 0.006687 0.047414 0.027092 0.010761 0.026703 -0.000181 -0.000004 0.044640 -0.546869 -0.673916 0.719708 0.707753 0.308722 0.300186 -0.844537 0.850911 -0.050472 0.044338 -0.955178 -0.980245 1.000000 -0.766402 -0.088665 0.719275 0.701149 0.287476 0.271067
dst_host_same_src_port_rate -0.065202 0.815039 -0.396195 0.004265 -0.015449 -0.001939 -0.086998 -0.006617 -0.359506 -0.078843 -0.004617 -0.005020 -0.015968 -0.015018 -0.003521 -0.033288 -0.000455 0.000283 -0.038092 0.776906 0.812280 -0.650336 -0.646256 -0.278465 -0.282239 0.732841 -0.727031 -0.222707 0.189876 0.769481 0.771158 -0.766402 1.000000 -0.175310 -0.658737 -0.652636 -0.299273 -0.297100
dst_host_srv_diff_host_rate 0.100692 -0.140231 0.578557 0.016171 0.007306 -0.000976 -0.014141 -0.002588 0.659078 -0.020979 0.008631 0.001052 0.061030 0.030590 0.015882 0.011765 0.000288 0.000538 -0.012578 -0.496554 -0.391712 -0.153568 -0.148072 0.073061 0.075178 0.179040 -0.176930 0.433173 -0.918894 0.043668 0.107926 -0.088665 -0.175310 1.000000 -0.118697 -0.103715 0.114971 0.120767
dst_host_serror_rate -0.056753 -0.645920 -0.167047 0.013566 0.010387 -0.001381 -0.004706 0.014713 -0.143283 -0.005019 -0.003498 0.001974 -0.008457 -0.002257 -0.001588 -0.011197 -0.000011 -0.000076 -0.001066 -0.331571 -0.449096 0.973947 0.967214 -0.094076 -0.096146 -0.830067 0.807205 -0.097973 0.123881 -0.722607 -0.742045 0.719275 -0.658737 -0.118697 1.000000 0.968015 -0.087531 -0.096899
dst_host_srv_serror_rate -0.057298 -0.641792 -0.158378 0.012265 -0.024117 -0.001370 -0.010721 0.014914 -0.132474 -0.004504 -0.003032 0.002893 -0.007096 -0.004295 -0.002357 -0.011487 -0.000372 -0.000007 -0.016885 -0.335290 -0.442823 0.965663 0.970617 -0.110646 -0.114341 -0.819335 0.795844 -0.092661 0.113845 -0.708392 -0.725272 0.701149 -0.652636 -0.103715 0.968015 1.000000 -0.111578 -0.110532
dst_host_rerror_rate -0.007759 -0.297338 -0.003042 0.000389 0.046656 -0.000786 0.199019 0.032395 0.007236 0.214115 0.002763 0.003173 -0.000421 0.000626 -0.000617 -0.004743 -0.000823 -0.000435 0.025282 -0.261194 -0.313442 -0.103198 -0.122630 0.910225 0.904591 -0.282487 0.299041 0.022585 -0.125142 -0.312040 -0.278068 0.287476 -0.299273 0.114971 -0.087531 -0.111578 1.000000 0.950964
dst_host_srv_rerror_rate -0.013891 -0.300581 0.001621 -0.001816 -0.013666 -0.000782 0.189142 0.032151 0.012979 0.217858 0.002151 0.001731 -0.005012 -0.001096 -0.002020 -0.004552 -0.001038 -0.000529 -0.004292 -0.256176 -0.308132 -0.105434 -0.124656 0.911622 0.914904 -0.282913 0.298904 0.024722 -0.125273 -0.300787 -0.264383 0.271067 -0.297100 0.120767 -0.096899 -0.110532 0.950964 1.000000

We have used a Pandas DataFrame here to render the correlation matrix in a more comprehensive way. Now we want those variables that are highly correlated. For that we do a bit of dataframe manipulation.

In [22]:
# get a boolean dataframe where true means that a pair of variables is highly correlated
highly_correlated_df = (abs(corr_df) > .8) & (corr_df < 1.0)
# get the names of the variables so we can use them to slice the dataframe
correlated_vars_index = (highly_correlated_df==True).any()
correlated_var_names = correlated_vars_index[correlated_vars_index==True].index
# slice it
highly_correlated_df.loc[correlated_var_names,correlated_var_names]
Out[22]:
src_bytes dst_bytes hot logged_in num_compromised num_outbound_cmds is_hot_login count srv_count serror_rate srv_serror_rate rerror_rate srv_rerror_rate same_srv_rate diff_srv_rate dst_host_count dst_host_srv_count dst_host_same_srv_rate dst_host_diff_srv_rate dst_host_same_src_port_rate dst_host_srv_diff_host_rate dst_host_serror_rate dst_host_srv_serror_rate dst_host_rerror_rate dst_host_srv_rerror_rate
src_bytes False False False False False False False False False False False False False False False False False False False True False False False False False
dst_bytes False False False True False False False False False False False False False False False False False False False False False False False False False
hot False False False False True False False False False False False False False False False False False False False False False False False False False
logged_in False True False False False False False False False False False False False False False False False False False False False False False False False
num_compromised False False True False False False False False False False False False False False False False False False False False False False False False False
num_outbound_cmds False False False False False False True False False False False False False False False False False False False False False False False False False
is_hot_login False False False False False True False False False False False False False False False False False False False False False False False False False
count False False False False False False False False True False False False False False False False False False False False False False False False False
srv_count False False False False False False False True False False False False False False False False False False False True False False False False False
serror_rate False False False False False False False False False False True False False True True False False False False False False True True False False
srv_serror_rate False False False False False False False False False True False False False True True False False False False False False True True False False
rerror_rate False False False False False False False False False False False False True False False False False False False False False False False True True
srv_rerror_rate False False False False False False False False False False False True False False False False False False False False False False False True True
same_srv_rate False False False False False False False False False True True False False False True False True True True False False True True False False
diff_srv_rate False False False False False False False False False True True False False True False False True True True False False True False False False
dst_host_count False False False False False False False False False False False False False False False False False False False False True False False False False
dst_host_srv_count False False False False False False False False False False False False False True True False False True True False False False False False False
dst_host_same_srv_rate False False False False False False False False False False False False False True True False True False True False False False False False False
dst_host_diff_srv_rate False False False False False False False False False False False False False True True False True True False False False False False False False
dst_host_same_src_port_rate True False False False False False False False True False False False False False False False False False False False False False False False False
dst_host_srv_diff_host_rate False False False False False False False False False False False False False False False True False False False False False False False False False
dst_host_serror_rate False False False False False False False False False True True False False True True False False False False False False False True False False
dst_host_srv_serror_rate False False False False False False False False False True True False False True False False False False False False False True False False False
dst_host_rerror_rate False False False False False False False False False False False True True False False False False False False False False False False False True
dst_host_srv_rerror_rate False False False False False False False False False False False True True False False False False False False False False False False True False

Conclusions and possible model selection hints

The previous dataframe showed us which variables are highly correlated. We have kept just those variables with at least one strong correlation. We can use as we please, but a good way could be to do some model selection. That is, if we have a group of variables that are highly correlated, we can keep just one of them to represent the group under the assumption that they convey similar information as predictors. Reducing the number of variables will not improve our model accuracy, but it will make it easier to understand and also more efficient to compute.

For example, from the description of the KDD Cup 99 task we know that the variable dst_host_same_src_port_rate references the percentage of the last 100 connections to the same port, for the same destination host. In our correlation matrix (and auxiliar dataframes) we find that this one is highly and positively correlated to src_bytes and srv_count. The former is the number of bytes sent form source to destination. The later is the number of connections to the same service as the current connection in the past 2 seconds. We might decide not to include dst_host_same_src_port_rate in our model if we include the other two, as a way to reduce the number of variables and later one better interpret our models.

Later on, in those notebooks dedicated to build predictive models, we will make use of this information to build more interpretable models.

MLlib: Classification with Logistic Regression

In this notebook we will use Spark's machine learning library MLlib to build a Logistic Regression classifier for network attack detection. We will use the complete KDD Cup 1999 datasets in order to test Spark capabilities with large datasets.

Additionally, we will introduce two ways of performing model selection: by using a correlation matrix and by using hypothesis testing.

At the time of processing this notebook, our Spark cluster contains:

  • Eight nodes, with one of them acting as master and the rest as workers.
  • Each node contains 8Gb of RAM, with 6Gb being used for each node.
  • Each node has a 2.4Ghz Intel dual core processor.

Getting the data and creating the RDD

As we said, this time we will use the complete dataset provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.

In [1]:
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz")
In [2]:
data_file = "./kddcup.data.gz"
raw_data = sc.textFile(data_file)

print "Train data size is {}".format(raw_data.count())
Train data size is 4898431

The KDD Cup 1999 also provide test data that we will load in a separate RDD.

In [3]:
ft = urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "corrected.gz")
In [4]:
test_data_file = "./corrected.gz"
test_raw_data = sc.textFile(test_data_file)

print "Test data size is {}".format(test_raw_data.count())
Test data size is 311029

Labeled Points

A labeled point is a local vector associated with a label/response. In MLlib, labeled points are used in supervised learning algorithms and they are stored as doubles. For binary classification, a label should be either 0 (negative) or 1 (positive).

Preparing the training data

In our case, we are interested in detecting network attacks in general. We don't need to detect which type of attack we are dealing with. Therefore we will tag each network interaction as non attack (i.e. 'normal' tag) or attack (i.e. anything else but 'normal').

In [5]:
from pyspark.mllib.regression import LabeledPoint
from numpy import array

def parse_interaction(line):
    line_split = line.split(",")
    # leave_out = [1,2,3,41]
    clean_line_split = line_split[0:1]+line_split[4:41]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data = raw_data.map(parse_interaction)

Preparing the test data

Similarly, we process our test data file.

In [6]:
test_data = test_raw_data.map(parse_interaction)

Detecting network attacks using Logistic Regression

Logistic regression is widely used to predict a binary response. Spark implements two algorithms to solve logistic regression: mini-batch gradient descent and L-BFGS. L-BFGS is recommended over mini-batch gradient descent for faster convergence.

Training a classifier

In [7]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from time import time

# Build the model
t0 = time()
logit_model = LogisticRegressionWithLBFGS.train(training_data)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 365.599 seconds

Evaluating the model on new data

In order to measure the classification error on our test data, we use map on the test_data RDD and the model to predict each test point class.

In [8]:
labels_and_preds = test_data.map(lambda p: (p.label, logit_model.predict(p.features)))

Classification results are returned in pars, with the actual test label and the predicted one. This is used to calculate the classification error by using filter and count as follows.

In [9]:
t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 34.291 seconds. Test accuracy is 0.9164

That's a decent accuracy. We know that there is space for improvement with a better variable selection and also by including categorical variables (e.g. we have excluded 'protocol' and 'service').

Model selection

Model or feature selection helps us building more interpretable and efficient models (or a classifier in this case). For illustrative purposes, we will follow two different approaches, correlation matrices and hypothesis testing.

Using a correlation matrix

In a previous notebook we calculated a correlation matrix in order to find predictors that are highly correlated. There are many possible choices there in order to simplify our model. We can pick different combinations of correlated variables and leave just those that represent them. The reader can try different combinations. Here we will choose the following for illustrative purposes:

  • From the description of the KDD Cup 99 task we know that the variable dst_host_same_src_port_rate references the percentage of the last 100 connections to the same port, for the same destination host. In our correlation matrix (and auxiliary dataframes) we find that this one is highly and positively correlated to src_bytes and srv_count. The former is the number of bytes sent form source to destination. The later is the number of connections to the same service as the current connection in the past 2 seconds. We decide not to include dst_host_same_src_port_rate in our model since we include the other two.
  • Variables serror_rate and srv_error_rate (% of connections that have SYN errors for same host and same service respectively) are highly positively correlated. Moreover, the set of variables that they highly correlate with are pretty much the same. They look like contributing very similarly to our model. We will keep just serror_rate.
  • A similar situation happens with rerror_rate and srv_rerror_rate (% of connections that have REJ errors) so we will keep just rerror_rate.
  • Same thing with variables prefixed with dst_host_ for the previous ones (e.g. dst_host_srv_serror_rate).

We will stop here, although the reader can keep experimenting removing correlated variables has before (e.g. same_srv_rate and diff_srv_rate are good candidates. Our list of variables we will drop includes:

  • dst_host_same_src_port_rate, (column 35).
  • srv_serror_rate (column 25).
  • srv_rerror_rate (column 27).
  • dst_host_srv_serror_rate (column 38).
  • dst_host_srv_rerror_rate (column 40).

Evaluating the new model

Let's proceed with the evaluation of our reduced model. First we need to provide training and testing datasets containing just the selected variables. For that we will define a new function to parse the raw data that keeps just what we need.

In [10]:
def parse_interaction_corr(line):
    line_split = line.split(",")
    # leave_out = [1,2,3,25,27,35,38,40,41]
    clean_line_split = line_split[0:1]+line_split[4:25]+line_split[26:27]+line_split[28:35]+line_split[36:38]+line_split[39:40]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

corr_reduced_training_data = raw_data.map(parse_interaction_corr)
corr_reduced_test_data = test_raw_data.map(parse_interaction_corr)

Note: when selecting elements in the split, a list comprehension with a leave_out list for filtering is more Pythonic than slicing and concatenation indeed, but we have found it less efficient. This is very important when dealing with large datasets. The parse_interaction functions will be called for every element in the RDD, so we need to make them as efficient as possible.

Now we can train the model.

In [11]:
# Build the model
t0 = time()
logit_model_2 = LogisticRegressionWithLBFGS.train(corr_reduced_training_data)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 319.124 seconds

And evaluate its accuracy on the test data.

In [12]:
labels_and_preds = corr_reduced_test_data.map(lambda p: (p.label, logit_model_2.predict(p.features)))
t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(corr_reduced_test_data.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 34.102 seconds. Test accuracy is 0.8599

As expected, we have reduced accuracy and also training time. However this doesn't seem a good trade! At least not for logistic regression and considering the predictors we decided to leave out. We have lost quite a lot of accuracy and have not gained a lot of execution time during training. Moreover prediction time didn't improve.

Using hypothesis testing

Hypothesis testing is a powerful tool in statistical inference and learning to determine whether a result is statistically significant. MLlib supports Pearson's chi-squared (χ2) tests for goodness of fit and independence. The goodness of fit test requires an input type of Vector, whereas the independence test requires a Matrix as input. Moreover, MLlib also supports the input type RDD[LabeledPoint] to enable feature selection via chi-squared independence tests. Again, these methods are part of the Statistics package.

In our case we want to perform some sort of feature selection, so we will provide an RDD of LabeledPoint. Internally, MLlib will calculate a contingency matrix and perform the Persons's chi-squared (χ2) test. Features need to be categorical. Real-valued features will be treated as categorical in each of its different values. There is a limit of 1000 different values, so we need either to leave out some features or categorise them. In this case, we will consider just features that either take boolean values or just a few different numeric values in our dataset. We could overcome this limitation by defining a more complex parse_interaction function that categorises each feature properly.

In [13]:
feature_names = ["land","wrong_fragment",
             "urgent","hot","num_failed_logins","logged_in","num_compromised",
             "root_shell","su_attempted","num_root","num_file_creations",
             "num_shells","num_access_files","num_outbound_cmds",
             "is_hot_login","is_guest_login","count","srv_count","serror_rate",
             "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
             "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count",
             "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate",
             "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate",
             "dst_host_rerror_rate","dst_host_srv_rerror_rate"]
In [14]:
def parse_interaction_categorical(line):
    line_split = line.split(",")
    clean_line_split = line_split[6:41]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data_categorical = raw_data.map(parse_interaction_categorical)
In [15]:
from pyspark.mllib.stat import Statistics

chi = Statistics.chiSqTest(training_data_categorical)

Now we can check the resulting values after putting them into a Pandas data frame.

In [16]:
import pandas as pd
pd.set_option('display.max_colwidth', 30)

records = [(result.statistic, result.pValue) for result in chi]

chi_df = pd.DataFrame(data=records, index= feature_names, columns=["Statistic","p-value"])

chi_df 
Out[16]:
Statistic p-value
land 0.464984 0.495304
wrong_fragment 306.855508 0.000000
urgent 38.718442 0.000000
hot 19463.314328 0.000000
num_failed_logins 127.769106 0.000000
logged_in 3273098.055702 0.000000
num_compromised 2011.862724 0.000000
root_shell 1044.917853 0.000000
su_attempted 433.999968 0.000000
num_root 22871.675646 0.000000
num_file_creations 9179.739210 0.000000
num_shells 1380.027801 0.000000
num_access_files 18734.770753 0.000000
num_outbound_cmds 0.000000 1.000000
is_hot_login 8.070987 0.004498
is_guest_login 13500.511066 0.000000
count 4546398.359513 0.000000
srv_count 2296059.697747 0.000000
serror_rate 268419.919232 0.000000
srv_serror_rate 302626.967659 0.000000
rerror_rate 9860.453313 0.000000
srv_rerror_rate 32476.385256 0.000000
same_srv_rate 399912.444046 0.000000
diff_srv_rate 390999.770051 0.000000
srv_diff_host_rate 1365458.716670 0.000000
dst_host_count 2520478.944345 0.000000
dst_host_srv_count 1439086.154193 0.000000
dst_host_same_srv_rate 1237931.905888 0.000000
dst_host_diff_srv_rate 1339002.406545 0.000000
dst_host_same_src_port_rate 2915195.296767 0.000000
dst_host_srv_diff_host_rate 2226290.874366 0.000000
dst_host_serror_rate 407454.601492 0.000000
dst_host_srv_serror_rate 455099.001618 0.000000
dst_host_rerror_rate 136478.956325 0.000000
dst_host_srv_rerror_rate 254547.369785 0.000000

From that we conclude that predictors land and num_outbound_cmds could be removed from our model without affecting our accuracy dramatically. Let's try this.

Evaluating the new model

So the only modification to our first parse_interaction function will be to remove columns 6 and 19, corresponding to the two predictors that we want not to be part of our model.

In [17]:
def parse_interaction_chi(line):
    line_split = line.split(",")
    # leave_out = [1,2,3,6,19,41]
    clean_line_split = line_split[0:1] + line_split[4:6] + line_split[7:19] + line_split[20:41]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data_chi = raw_data.map(parse_interaction_chi)
test_data_chi = test_raw_data.map(parse_interaction_chi)

Now we build the logistic regression classifier again.

In [18]:
# Build the model
t0 = time()
logit_model_chi = LogisticRegressionWithLBFGS.train(training_data_chi)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 356.507 seconds

And evaluate in test data.

In [19]:
labels_and_preds = test_data_chi.map(lambda p: (p.label, logit_model_chi.predict(p.features)))
t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data_chi.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 34.656 seconds. Test accuracy is 0.9164

So we can see that, by using hypothesis testing, we have been able to remove two predictors without diminishing testing accuracy at all. Training time improved a bit as well. This might now seem like a big model reduction, but it is something when dealing with big data sources. Moreover, we should be able to categorise those five predictors we have left out for different reasons and, either include them in the model or leave them out if they aren't statistically significant.

Additionally, we could try to remove some of those predictors that are highly correlated, trying not to reduce accuracy too much. In the end, we should end up with a model easier to understand and use.

MLlib: Decision Trees

In this notebook we will use Spark's machine learning library MLlib to build a Decision Tree classifier for network attack detection. We will use the complete KDD Cup 1999 datasets in order to test Spark capabilities with large datasets.

Decision trees are a popular machine learning tool in part because they are easy to interpret, handle categorical features, extend to the multiclass classification setting, do not require feature scaling, and are able to capture non-linearities and feature interactions. In this notebook, we will first train a classification tree including every single predictor. Then we will use our results to perform model selection. Once we find out the most important ones (the main splits in the tree) we will build a minimal tree using just three of them (the first two levels of the tree in order to compare performance and accuracy.

At the time of processing this notebook, our Spark cluster contains:

  • Eight nodes, with one of them acting as master and the rest as workers.
  • Each node contains 8Gb of RAM, with 6Gb being used for each node.
  • Each node has a 2.4Ghz Intel dual core processor.
  • Running Apache Spark 1.3.1.

Getting the data and creating the RDD

As we said, this time we will use the complete dataset provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.

In [1]:
import urllib
In [2]:
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz")
In [3]:
data_file = "./kddcup.data.gz"
raw_data = sc.textFile(data_file)

print "Train data size is {}".format(raw_data.count())
Train data size is 4898431

The KDD Cup 1999 also provide test data that we will load in a separate RDD.

In [4]:
ft = urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "corrected.gz")
In [5]:
test_data_file = "./corrected.gz"
test_raw_data = sc.textFile(test_data_file)

print "Test data size is {}".format(test_raw_data.count())
Test data size is 311029

Detecting network attacks using Decision Trees

In this section we will train a classification tree that, as we did with logistic regression, will predict if a network interaction is either normal or attack.

Training a classification tree using MLlib requires some parameters:

  • Training data
  • Num classes
  • Categorical features info: a map from column to categorical variables arity. This is optional, although it should increase model accuracy. However it requires that we know the levels in our categorical variables in advance. second we need to parse our data to convert labels to integer values within the arity range.
  • Impurity metric
  • Tree maximum depth
  • And tree maximum number of bins

In the next section we will see how to obtain all the labels within a dataset and convert them to numerical factors.

Preparing the data

As we said, in order to benefits from trees ability to seamlessly with categorical variables, we need to convert them to numerical factors. But first we need to obtain all the possible levels. We will use set transformations on a csv parsed RDD.

In [6]:
from pyspark.mllib.regression import LabeledPoint
from numpy import array

csv_data = raw_data.map(lambda x: x.split(","))
test_csv_data = test_raw_data.map(lambda x: x.split(","))

protocols = csv_data.map(lambda x: x[1]).distinct().collect()
services = csv_data.map(lambda x: x[2]).distinct().collect()
flags = csv_data.map(lambda x: x[3]).distinct().collect()

And now we can use this Python lists in our create_labeled_point function. If a factor level is not in the training data, we assign an especial level. Remember that we cannot use testing data for training our model, not even the factor levels. The testing data represents the unknown to us in a real case.

In [7]:
def create_labeled_point(line_split):
    # leave_out = [41]
    clean_line_split = line_split[0:41]
    
    # convert protocol to numeric categorical variable
    try: 
        clean_line_split[1] = protocols.index(clean_line_split[1])
    except:
        clean_line_split[1] = len(protocols)
        
    # convert service to numeric categorical variable
    try:
        clean_line_split[2] = services.index(clean_line_split[2])
    except:
        clean_line_split[2] = len(services)
    
    # convert flag to numeric categorical variable
    try:
        clean_line_split[3] = flags.index(clean_line_split[3])
    except:
        clean_line_split[3] = len(flags)
    
    # convert label to binary label
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
        
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data = csv_data.map(create_labeled_point)
test_data = test_csv_data.map(create_labeled_point)

Training a classifier

We are now ready to train our classification tree. We will keep the maxDepth value small. This will lead to smaller accuracy, but we will obtain less splits so later on we can better interpret the tree. In a production system we will try to increase this value in order to find a better accuracy.

In [8]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from time import time

# Build the model
t0 = time()
tree_model = DecisionTree.trainClassifier(training_data, numClasses=2, 
                                          categoricalFeaturesInfo={1: len(protocols), 2: len(services), 3: len(flags)},
                                          impurity='gini', maxDepth=4, maxBins=100)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 439.971 seconds

Evaluating the model

In order to measure the classification error on our test data, we use map on the test_data RDD and the model to predict each test point class.

In [9]:
predictions = tree_model.predict(test_data.map(lambda p: p.features))
labels_and_preds = test_data.map(lambda p: p.label).zip(predictions)

Classification results are returned in pars, with the actual test label and the predicted one. This is used to calculate the classification error by using filter and count as follows.

In [10]:
t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 38.651 seconds. Test accuracy is 0.9155

NOTE: the zip transformation doesn't work properly with pySpark 1.2.1. It does in 1.3

Interpreting the model

Understanding our tree splits is a great exercise in order to explain our classification labels in terms of predictors and the values they take. Using the toDebugString method in our three model we can obtain a lot of information regarding splits, nodes, etc.

In [11]:
print "Learned classification tree model:"
print tree_model.toDebugString()
Learned classification tree model:
DecisionTreeModel classifier of depth 4 with 27 nodes
  If (feature 22 <= 89.0)
   If (feature 3 in {2.0,3.0,4.0,7.0,9.0,10.0})
    If (feature 36 <= 0.43)
     If (feature 28 <= 0.19)
      Predict: 1.0
     Else (feature 28 > 0.19)
      Predict: 0.0
    Else (feature 36 > 0.43)
     If (feature 2 in {0.0,3.0,15.0,26.0,27.0,36.0,42.0,58.0,67.0})
      Predict: 0.0
     Else (feature 2 not in {0.0,3.0,15.0,26.0,27.0,36.0,42.0,58.0,67.0})
      Predict: 1.0
   Else (feature 3 not in {2.0,3.0,4.0,7.0,9.0,10.0})
    If (feature 2 in {50.0,51.0})
     Predict: 0.0
    Else (feature 2 not in {50.0,51.0})
     If (feature 32 <= 168.0)
      Predict: 1.0
     Else (feature 32 > 168.0)
      Predict: 0.0
  Else (feature 22 > 89.0)
   If (feature 5 <= 0.0)
    If (feature 11 <= 0.0)
     If (feature 31 <= 253.0)
      Predict: 1.0
     Else (feature 31 > 253.0)
      Predict: 1.0
    Else (feature 11 > 0.0)
     If (feature 2 in {12.0})
      Predict: 0.0
     Else (feature 2 not in {12.0})
      Predict: 1.0
   Else (feature 5 > 0.0)
    If (feature 29 <= 0.08)
     If (feature 2 in {3.0,4.0,26.0,36.0,42.0,58.0,68.0})
      Predict: 0.0
     Else (feature 2 not in {3.0,4.0,26.0,36.0,42.0,58.0,68.0})
      Predict: 1.0
    Else (feature 29 > 0.08)
     Predict: 1.0

For example, a network interaction with the following features (see description here) will be classified as an attack by our model:

  • count, the number of connections to the same host as the current connection in the past two seconds, being greater than 32.
  • dst_bytes, the number of data bytes from destination to source, is 0.
  • service is neither level 0 nor 52.
  • logged_in is false.
    From our services list we know that:
In [12]:
print "Service 0 is {}".format(services[0])
print "Service 52 is {}".format(services[52])
Service 0 is urp_i
Service 52 is tftp_u

So we can characterise network interactions with more than 32 connections to the same server in the last 2 seconds, transferring zero bytes from destination to source, where service is neither urp_i nor tftp_u, and not logged in, as network attacks. A similar approach can be used for each tree terminal node.

We can see that count is the first node split in the tree. Remember that each partition is chosen greedily by selecting the best split from a set of possible splits, in order to maximize the information gain at a tree node (see more here). At a second level we find variables flag (normal or error status of the connection) and dst_bytes (the number of data bytes from destination to source) and so on.

This explaining capability of a classification (or regression) tree is one of its main benefits. Understaining data is a key factor to build better models.

Building a minimal model using the three main splits

So now that we know the main features predicting a network attack, thanks to our classification tree splits, let's use them to build a minimal classification tree with just the main three variables: count, dst_bytes, and flag.

We need to define the appropriate function to create labeled points.

In [13]:
def create_labeled_point_minimal(line_split):
    # leave_out = [41]
    clean_line_split = line_split[3:4] + line_split[5:6] + line_split[22:23]
    
    # convert flag to numeric categorical variable
    try:
        clean_line_split[0] = flags.index(clean_line_split[0])
    except:
        clean_line_split[0] = len(flags)
    
    # convert label to binary label
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
        
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data_minimal = csv_data.map(create_labeled_point_minimal)
test_data_minimal = test_csv_data.map(create_labeled_point_minimal)

That we use to train the model.

In [14]:
# Build the model
t0 = time()
tree_model_minimal = DecisionTree.trainClassifier(training_data_minimal, numClasses=2, 
                                          categoricalFeaturesInfo={0: len(flags)},
                                          impurity='gini', maxDepth=3, maxBins=32)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 226.519 seconds

Now we can predict on the testing data and calculate accuracy.

In [17]:
predictions_minimal = tree_model_minimal.predict(test_data_minimal.map(lambda p: p.features))
labels_and_preds_minimal = test_data_minimal.map(lambda p: p.label).zip(predictions_minimal)
In [18]:
t0 = time()
test_accuracy = labels_and_preds_minimal.filter(lambda (v, p): v == p).count() / float(test_data_minimal.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 23.202 seconds. Test accuracy is 0.909

So we have trained a classification tree with just the three most important predictors, in half of the time, and with a not so bad accuracy. In fact, a classification tree is a very good model selection tool!

Spark SQL and Data Frames

This notebook will introduce Spark capabilities to deal with data in a structured way. Basically, everything turns around the concept of Data Frame and using SQL language to query them. We will see how the data frame abstraction, very popular in other data analytics ecosystems (e.g. R and Python/Pandas), it is very powerful when performing exploratory data analysis. In fact, it is very easy to express data queries when used together with the SQL language. Moreover, Spark distributes this column-based data structure transparently, in order to make the querying process as efficient as possible.

Getting the data and creating the RDD

As we did in previous notebooks, we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million nework interactions. The file is provided as a Gzip file that we will download locally.

In [1]:
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
In [2]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file).cache()

Getting a Data Frame

A Spark DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R or Pandas. They can be constructed from a wide array of sources such as a existing RDD in our case.

The entry point into all SQL functionality in Spark is the SQLContext class. To create a basic instance, all we need is a SparkContext reference. Since we are running Spark in shell mode (using pySpark) we can use the global context object sc for this purpose.

In [3]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

Inferring the schema

With a SQLContext, we are ready to create a DataFrame from our existing RDD. But first we need to tell Spark SQL the schema in our data.

Spark SQL can convert an RDD of Row objects to a DataFrame. Rows are constructed by passing a list of key/value pairs as kwargs to the Row class. The keys define the column names, and the types are inferred by looking at the first row. Therefore, it is important that there is no missing data in the first row of the RDD in order to properly infer the schema.

In our case, we first need to split the comma separated data, and then use the information in KDD's 1999 task description to obtain the column names.

In [4]:
from pyspark.sql import Row

csv_data = raw_data.map(lambda l: l.split(","))
row_data = csv_data.map(lambda p: Row(
    duration=int(p[0]), 
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5])
    )
)

Once we have our RDD of Row we can infer and register the schema.

In [5]:
interactions_df = sqlContext.createDataFrame(row_data)
interactions_df.registerTempTable("interactions")

Now we can run SQL queries over our data frame that has been registered as a table.

In [6]:
# Select tcp network interactions with more than 1 second duration and no transfer from destination
tcp_interactions = sqlContext.sql("""
    SELECT duration, dst_bytes FROM interactions WHERE protocol_type = 'tcp' AND duration > 1000 AND dst_bytes = 0
""")
tcp_interactions.show()
duration dst_bytes
5057     0        
5059     0        
5051     0        
5056     0        
5051     0        
5039     0        
5062     0        
5041     0        
5056     0        
5064     0        
5043     0        
5061     0        
5049     0        
5061     0        
5048     0        
5047     0        
5044     0        
5063     0        
5068     0        
5062     0        

The results of SQL queries are RDDs and support all the normal RDD operations.

In [7]:
# Output duration together with dst_bytes
tcp_interactions_out = tcp_interactions.map(lambda p: "Duration: {}, Dest. bytes: {}".format(p.duration, p.dst_bytes))
for ti_out in tcp_interactions_out.collect():
  print ti_out
Duration: 5057, Dest. bytes: 0
Duration: 5059, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5056, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5039, Dest. bytes: 0
Duration: 5062, Dest. bytes: 0
Duration: 5041, Dest. bytes: 0
Duration: 5056, Dest. bytes: 0
Duration: 5064, Dest. bytes: 0
Duration: 5043, Dest. bytes: 0
Duration: 5061, Dest. bytes: 0
Duration: 5049, Dest. bytes: 0
Duration: 5061, Dest. bytes: 0
Duration: 5048, Dest. bytes: 0
Duration: 5047, Dest. bytes: 0
Duration: 5044, Dest. bytes: 0
Duration: 5063, Dest. bytes: 0
Duration: 5068, Dest. bytes: 0
Duration: 5062, Dest. bytes: 0
Duration: 5046, Dest. bytes: 0
Duration: 5052, Dest. bytes: 0
Duration: 5044, Dest. bytes: 0
Duration: 5054, Dest. bytes: 0
Duration: 5039, Dest. bytes: 0
Duration: 5058, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5032, Dest. bytes: 0
Duration: 5063, Dest. bytes: 0
Duration: 5040, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5066, Dest. bytes: 0
Duration: 5044, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5036, Dest. bytes: 0
Duration: 5055, Dest. bytes: 0
Duration: 2426, Dest. bytes: 0
Duration: 5047, Dest. bytes: 0
Duration: 5057, Dest. bytes: 0
Duration: 5037, Dest. bytes: 0
Duration: 5057, Dest. bytes: 0
Duration: 5062, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5053, Dest. bytes: 0
Duration: 5064, Dest. bytes: 0
Duration: 5044, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5033, Dest. bytes: 0
Duration: 5066, Dest. bytes: 0
Duration: 5063, Dest. bytes: 0
Duration: 5056, Dest. bytes: 0
Duration: 5042, Dest. bytes: 0
Duration: 5063, Dest. bytes: 0
Duration: 5060, Dest. bytes: 0
Duration: 5056, Dest. bytes: 0
Duration: 5049, Dest. bytes: 0
Duration: 5043, Dest. bytes: 0
Duration: 5039, Dest. bytes: 0
Duration: 5041, Dest. bytes: 0
Duration: 42448, Dest. bytes: 0
Duration: 42088, Dest. bytes: 0
Duration: 41065, Dest. bytes: 0
Duration: 40929, Dest. bytes: 0
Duration: 40806, Dest. bytes: 0
Duration: 40682, Dest. bytes: 0
Duration: 40571, Dest. bytes: 0
Duration: 40448, Dest. bytes: 0
Duration: 40339, Dest. bytes: 0
Duration: 40232, Dest. bytes: 0
Duration: 40121, Dest. bytes: 0
Duration: 36783, Dest. bytes: 0
Duration: 36674, Dest. bytes: 0
Duration: 36570, Dest. bytes: 0
Duration: 36467, Dest. bytes: 0
Duration: 36323, Dest. bytes: 0
Duration: 36204, Dest. bytes: 0
Duration: 32038, Dest. bytes: 0
Duration: 31925, Dest. bytes: 0
Duration: 31809, Dest. bytes: 0
Duration: 31709, Dest. bytes: 0
Duration: 31601, Dest. bytes: 0
Duration: 31501, Dest. bytes: 0
Duration: 31401, Dest. bytes: 0
Duration: 31301, Dest. bytes: 0
Duration: 31194, Dest. bytes: 0
Duration: 31061, Dest. bytes: 0
Duration: 30935, Dest. bytes: 0
Duration: 30835, Dest. bytes: 0
Duration: 30735, Dest. bytes: 0
Duration: 30619, Dest. bytes: 0
Duration: 30518, Dest. bytes: 0
Duration: 30418, Dest. bytes: 0
Duration: 30317, Dest. bytes: 0
Duration: 30217, Dest. bytes: 0
Duration: 30077, Dest. bytes: 0
Duration: 25420, Dest. bytes: 0
Duration: 22921, Dest. bytes: 0
Duration: 22821, Dest. bytes: 0
Duration: 22721, Dest. bytes: 0
Duration: 22616, Dest. bytes: 0
Duration: 22516, Dest. bytes: 0
Duration: 22416, Dest. bytes: 0
Duration: 22316, Dest. bytes: 0
Duration: 22216, Dest. bytes: 0
Duration: 21987, Dest. bytes: 0
Duration: 21887, Dest. bytes: 0
Duration: 21767, Dest. bytes: 0
Duration: 21661, Dest. bytes: 0
Duration: 21561, Dest. bytes: 0
Duration: 21455, Dest. bytes: 0
Duration: 21334, Dest. bytes: 0
Duration: 21223, Dest. bytes: 0
Duration: 21123, Dest. bytes: 0
Duration: 20983, Dest. bytes: 0
Duration: 14682, Dest. bytes: 0
Duration: 14420, Dest. bytes: 0
Duration: 14319, Dest. bytes: 0
Duration: 14198, Dest. bytes: 0
Duration: 14098, Dest. bytes: 0
Duration: 13998, Dest. bytes: 0
Duration: 13898, Dest. bytes: 0
Duration: 13796, Dest. bytes: 0
Duration: 13678, Dest. bytes: 0
Duration: 13578, Dest. bytes: 0
Duration: 13448, Dest. bytes: 0
Duration: 13348, Dest. bytes: 0
Duration: 13241, Dest. bytes: 0
Duration: 13141, Dest. bytes: 0
Duration: 13033, Dest. bytes: 0
Duration: 12933, Dest. bytes: 0
Duration: 12833, Dest. bytes: 0
Duration: 12733, Dest. bytes: 0
Duration: 12001, Dest. bytes: 0
Duration: 5678, Dest. bytes: 0
Duration: 5010, Dest. bytes: 0
Duration: 1298, Dest. bytes: 0
Duration: 1031, Dest. bytes: 0
Duration: 36438, Dest. bytes: 0

We can easily have a look at our data frame schema using printSchema.

In [8]:
interactions_df.printSchema()
root
 |-- dst_bytes: long (nullable = true)
 |-- duration: long (nullable = true)
 |-- flag: string (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- src_bytes: long (nullable = true)

Queries as DataFrame operations

Spark DataFrame provides a domain-specific language for structured data manipulation. This language includes methods we can concatenate in order to do selection, filtering, grouping, etc. For example, let's say we want to count how many interactions are there for each protocol type. We can proceed as follows.

In [9]:
from time import time

t0 = time()
interactions_df.select("protocol_type", "duration", "dst_bytes").groupBy("protocol_type").count().show()
tt = time() - t0

print "Query performed in {} seconds".format(round(tt,3))
protocol_type count 
udp           20354 
tcp           190065
icmp          283602
Query performed in 20.568 seconds

Now imagine that we want to count how many interactions last more than 1 second, with no data transfer from destination, grouped by protocol type. We can just add to filter calls to the previous.

In [10]:
t0 = time()
interactions_df.select("protocol_type", "duration", "dst_bytes").filter(interactions_df.duration>1000).filter(interactions_df.dst_bytes==0).groupBy("protocol_type").count().show()
tt = time() - t0

print "Query performed in {} seconds".format(round(tt,3))
protocol_type count
tcp           139  
Query performed in 16.641 seconds

We can use this to perform some exploratory data analysis. Let's count how many attack and normal interactions we have. First we need to add the label column to our data.

In [11]:
def get_label_type(label):
    if label!="normal.":
        return "attack"
    else:
        return "normal"
    
row_labeled_data = csv_data.map(lambda p: Row(
    duration=int(p[0]), 
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5]),
    label=get_label_type(p[41])
    )
)
interactions_labeled_df = sqlContext.createDataFrame(row_labeled_data)

This time we don't need to register the schema since we are going to use the OO query interface.

Let's check the previous actually works by counting attack and normal data in our data frame.

In [12]:
t0 = time()
interactions_labeled_df.select("label").groupBy("label").count().show()
tt = time() - t0

print "Query performed in {} seconds".format(round(tt,3))
label  count 
attack 396743
normal 97278 
Query performed in 17.325 seconds

Now we want to count them by label and protocol type, in order to see how important the protocol type is to detect when an interaction is or not an attack.

In [13]:
t0 = time()
interactions_labeled_df.select("label", "protocol_type").groupBy("label", "protocol_type").count().show()
tt = time() - t0

print "Query performed in {} seconds".format(round(tt,3))
label  protocol_type count 
attack udp           1177  
attack tcp           113252
attack icmp          282314
normal udp           19177 
normal tcp           76813 
normal icmp          1288  
Query performed in 17.253 seconds

At first sight it seems that udp interactions are in lower proportion between network attacks versus other protocol types.

And we can do much more sophisticated groupings. For example, add to the previous a "split" based on data transfer from target.

In [14]:
t0 = time()
interactions_labeled_df.select("label", "protocol_type", "dst_bytes").groupBy("label", "protocol_type", interactions_labeled_df.dst_bytes==0).count().show()
tt = time() - t0

print "Query performed in {} seconds".format(round(tt,3))
label  protocol_type (dst_bytes = 0) count 
normal icmp          true            1288  
attack udp           true            1166  
attack udp           false           11    
normal udp           true            3594  
normal udp           false           15583 
attack tcp           true            110583
attack tcp           false           2669  
normal tcp           true            9313  
normal tcp           false           67500 
attack icmp          true            282314
Query performed in 17.284 seconds

We see how relevant is this new split to determine if a network interaction is an attack.

We will stop here, but we can see how powerful this type of queries are in order to explore our data. Actually we can replicate all the splits we saw in previous notebooks, when introducing classification trees, just by selecting, groping, and filtering our dataframe. For a more detailed (but less real-world) list of Spark's DataFrame operations and data sources, have a look at the official documentation here.

No comments:

Post a Comment