Saturday, 9 July 2016

Bike Sharing Demand Kaggle Competition with Spark and Python

Bike Sharing Demand Kaggle Competition with Spark and Python


Forecast use of a city bikeshare system


Bike sharing systems are a means of renting bicycles where the process of obtaining membership, rental, and bike return is automated via a network of kiosk locations throughout a city. Using these systems, people are able rent a bike from a one location and return it to a different place on an as-needed basis. Currently, there are over 500 bike-sharing programs around the world.
The data generated by these systems makes them attractive for researchers because the duration of travel, departure location, arrival location, and time elapsed is explicitly recorded. Bike sharing systems therefore function as a sensor network, which can be used for studying mobility in a city. In this competition, participants are asked to combine historical usage patterns with weather data in order to forecast bike rental demand in the Capital Bikeshare program in Washington, D.C.

The dataset for the same can be downloaded from

Once you have downloaded the Bike-Sharing-Dataset.zip file, unzip it. This will create a directory called Bike-Sharing-Dataset, which contains the day.csv, hour.csv, and the Readme.txt files.
The Readme.txt file contains information on the dataset, including the variable names and descriptions. Take a look at the file, and you will see that we have the following variables available:
• instant: This is the record ID
• dteday: This is the raw date
• season: This is different seasons such as spring, summer, winter, and fall
• yr: This is the year (2011 or 2012)
• mnth: This is the month of the year
• hr: This is the hour of the day
• holiday: This is whether the day was a holiday or not
• weekday: This is the day of the week
• workingday: This is whether the day was a working day or not
• weathersit: This is a categorical variable that describes the weather at a
particular time
• temp: This is the normalized temperature
• atemp: This is the normalized apparent temperature
• hum: This is the normalized humidity
• windspeed: This is the normalized wind speed
• cnt: This is the target variable, that is, the count of bike rentals for that hour
We will work with the hourly data contained in hour.csv. If you look at the first line of the dataset, you will see that it contains the column names as a header. You can do this by running the following command:
Before we work with the data in Spark, we will again remove the header from the first line of the file using the same sed command that we used previously to create a new file called hour_noheader.csv:


If you prefer to use IPython Notebook, you can start it with the following command:

The .ipynb file for this project can be found on the following link:

Regression and CART in Spark and Python for BikeSharing Dataset Kaggle
In [6]:
path = "/home/osboxes/new_hour.csv"
raw_data = sc.textFile(path)
num_data = raw_data.count()
We'll start as usual by loading the dataset and inspecting it:
In [7]:
records = raw_data.map(lambda x: x.split(","))
first = records.first()
print first
[u'1', u'2011-01-01', u'1', u'0', u'1', u'0', u'0', u'6', u'0', u'1', u'0.24', u'0.2879', u'0.81', u'0', u'3', u'13', u'16']
In [8]:
print num_data
17379
So, we have 17,379 hourly records in our dataset. We have inspected the column names already. We will ignore the record ID and raw date columns. We will also ignore the casual and registered count target variables and focus on the overall count variable, cnt (which is the sum of the other two counts). We are left with 12 variables. The first eight are categorical, while the last 4 are normalized real-valued variables. To deal with the eight categorical variables, we will use the binary encoding approach with which you should be quite familiar by now. The four real-valued variables will be left as is. We will first cache our dataset, since we will be reading from it many times:
In [9]:
records.cache()
Out[9]:
PythonRDD[10] at RDD at PythonRDD.scala:43
In order to extract each categorical feature into a binary vector form, we will need to know the feature mapping of each feature value to the index of the nonzero value in our binary vector. Let's define a function that will extract this mapping from our dataset for a given column:
In [13]:
def get_mapping(rdd, idx):
    return rdd.map(lambda fields: fields[idx]).distinct().zipWithIndex().collectAsMap()
Our function first maps the field to its unique values and then uses the zipWithIndex transformation to zip the value up with a unique index such that a key-value RDD is formed, where the key is the variable and the value is the index. This index will be the index of the nonzero entry in the binary vector representation of the feature. We will finally collect this RDD back to the driver as a Python dictionary. We can test our function on the third variable column (index 2):
In [14]:
print "Mapping of first categorical feasture column: %s" % get_mapping(records, 2)
Mapping of first categorical feasture column: {u'1': 0, u'3': 1, u'2': 2, u'4': 3}
Now, we can apply this function to each categorical column (that is, for variable indices 2 to 9):
In [15]:
mappings = [get_mapping(records, i) for i in range(2,10)]
cat_len = sum(map(len, mappings))
num_len = len(records.first()[11:15])
total_len = num_len + cat_len
We now have the mappings for each variable, and we can see how many values in total we need for our binary vector representation:
In [16]:
print "Feature vector length for categorical features: %d" % cat_len
print "Feature vector length for numerical features: %d" % num_len
print "Total feature vector length: %d" % total_len
Feature vector length for categorical features: 57
Feature vector length for numerical features: 4
Total feature vector length: 61

Creating feature vectors for the linear model

The next step is to use our extracted mappings to convert the categorical features to binary-encoded features. Again, it will be helpful to create a function that we can apply to each record in our dataset for this purpose. We will also create a function to extract the target variable from each record. We will need to import numpy for linear algebra utilities and MLlib's LabeledPoint class to wrap our feature vectors and target variables:
In [17]:
from pyspark.mllib.regression import LabeledPoint
import numpy as np
In [18]:
def extract_features(record):
    cat_vec = np.zeros(cat_len)
    i = 0
    step = 0
    for field in record[2:9]:
        m = mappings[i]
        idx = m[field]
        cat_vec[idx + step] = 1
        i = i + 1
        step = step + len(m)
    num_vec = np.array([float(field) for field in record[10:14]])
    return np.concatenate((cat_vec, num_vec))
In [19]:
def extract_label(record):
    return float(record[-1])
In the preceding extract_features function, we ran through each column in the row of data. We extracted the binary encoding for each variable in turn from the mappings we created previously. The step variable ensures that the nonzero feature index in the full feature vector is correct (and is somewhat more efficient than, say, creating many smaller binary vectors and concatenating them). The numeric vector is created directly by first converting the data to floating point numbers and wrapping these in a numpy array. The resulting two vectors are then concatenated. The extract_label function simply converts the last column variable (the count) into a float. With our utility functions defined, we can proceed with extracting feature vectors and labels from our data records:
In [20]:
data = records.map(lambda r: LabeledPoint(extract_label(r), extract_features(r)))
Let's inspect the first record in the extracted feature RDD:
In [21]:
first_point = data.first()
print "Raw data: " + str(first[2:])
print "Label: " + str(first_point.label)
print "Linear Model feature vector:\n" + str(first_point.features)
print "Linear Model feature vector length: " + str(len(first_point.features))
Raw data: [u'1', u'0', u'1', u'0', u'0', u'6', u'0', u'1', u'0.24', u'0.2879', u'0.81', u'0', u'3', u'13', u'16']
Label: 16.0
Linear Model feature vector:
[1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.24,0.2879,0.81,0.0]
Linear Model feature vector length: 61
As we can see, we converted the raw data into a feature vector made up of the binary categorical and real numeric features, and we indeed have a total vector length of 61 .

Creating feature vectors for the decision tree

As we have seen, decision tree models typically work on raw features (that is, it is not required to convert categorical features into a binary vector encoding; they can, instead, be used directly). Therefore, we will create a separate function to extract the decision tree feature vector, which simply converts all the values to floats and wraps them in a numpy array:
In [22]:
def extract_features_dt(record):
    return np.array(map(float, record[2:14]))
In [24]:
data_dt = records.map(lambda r: LabeledPoint(extract_label(r),extract_features_dt(r)))
first_point_dt = data_dt.first()
print "Decision Tree feature vector: " + str(first_point_dt.features)
print "Decision Tree feature vector length: " + str(len(first_point_dt.features))
Decision Tree feature vector: [1.0,0.0,1.0,0.0,0.0,6.0,0.0,1.0,0.24,0.2879,0.81,0.0]
Decision Tree feature vector length: 12

Training and using regression models

Training for regression models using decision trees and linear models follows the same procedure as for classification models. We simply pass the training data contained in a [LabeledPoint] RDD to the relevant train method. Note that in Scala, if we wanted to customize the various model parameters (such as regularization and step size for the SGD optimizer), we are required to instantiate a new model instance and use the optimizer field to access these available parameter setters.
In [25]:
from pyspark.mllib.regression import LinearRegressionWithSGD
from pyspark.mllib.tree import DecisionTree
help(LinearRegressionWithSGD.train)
Help on method train in module pyspark.mllib.regression:

train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, initialWeights=None, regParam=0.0, regType=None, intercept=False, validateData=True, convergenceTol=0.001) method of __builtin__.type instance
    Train a linear regression model using Stochastic Gradient
    Descent (SGD).
    This solves the least squares regression formulation
    
        f(weights) = 1/(2n) ||A weights - y||^2,
    
    which is the mean squared error.
    Here the data matrix has n rows, and the input RDD holds the
    set of rows of A, each with its corresponding right hand side
    label y. See also the documentation for the precise formulation.
    
    :param data:              The training data, an RDD of
                              LabeledPoint.
    :param iterations:        The number of iterations
                              (default: 100).
    :param step:              The step parameter used in SGD
                              (default: 1.0).
    :param miniBatchFraction: Fraction of data to be used for each
                              SGD iteration (default: 1.0).
    :param initialWeights:    The initial weights (default: None).
    :param regParam:          The regularizer parameter
                              (default: 0.0).
    :param regType:           The type of regularizer used for
                              training our model.
    
                              :Allowed values:
                                 - "l1" for using L1 regularization (lasso),
                                 - "l2" for using L2 regularization (ridge),
                                 - None for no regularization
    
                                 (default: None)
    
    :param intercept:         Boolean parameter which indicates the
                              use or not of the augmented representation
                              for training data (i.e. whether bias
                              features are activated or not,
                              default: False).
    :param validateData:      Boolean parameter which indicates if
                              the algorithm should validate data
                              before training. (default: True)
    :param convergenceTol:    A condition which decides iteration termination.
                              (default: 0.001)
    
    .. versionadded:: 0.9.0

In [26]:
help(DecisionTree.trainRegressor)
Help on method trainRegressor in module pyspark.mllib.tree:

trainRegressor(cls, data, categoricalFeaturesInfo, impurity='variance', maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0) method of __builtin__.type instance
    Train a DecisionTreeModel for regression.
    
    :param data: Training data: RDD of LabeledPoint.
                 Labels are real numbers.
    :param categoricalFeaturesInfo: Map from categorical feature
             index to number of categories.
             Any feature not in this map is treated as continuous.
    :param impurity: Supported values: "variance"
    :param maxDepth: Max depth of tree.
             E.g., depth 0 means 1 leaf node.
             Depth 1 means 1 internal node + 2 leaf nodes.
    :param maxBins: Number of bins used for finding splits at each
             node.
    :param minInstancesPerNode: Min number of instances required at
             child nodes to create the parent split
    :param minInfoGain: Min info gain required to create a split
    :return: DecisionTreeModel
    
    Example usage:
    
    >>> from pyspark.mllib.regression import LabeledPoint
    >>> from pyspark.mllib.tree import DecisionTree
    >>> from pyspark.mllib.linalg import SparseVector
    >>>
    >>> sparse_data = [
    ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
    ...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
    ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
    ...     LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
    ... ]
    >>>
    >>> model = DecisionTree.trainRegressor(sc.parallelize(sparse_data), {})
    >>> model.predict(SparseVector(2, {1: 1.0}))
    1.0
    >>> model.predict(SparseVector(2, {1: 0.0}))
    0.0
    >>> rdd = sc.parallelize([[0.0, 1.0], [0.0, 0.0]])
    >>> model.predict(rdd).collect()
    [1.0, 0.0]
    
    .. versionadded:: 1.1.0

Training a regression model on the bike sharing dataset

We're ready to use the features we have extracted to train our models on the bike sharing data. First, we'll train the linear regression model and take a look at the first few predictions that the model makes on the data:
In [27]:
linear_model = LinearRegressionWithSGD.train(data, iterations=10,step=0.1, intercept=False)
Note that we have not used the default settings for iterations and step here. We've changed the number of iterations so that the model does not take too long to train. As for the step size, you will see why this has been changed from the default a little later. You will see the following output:
In [28]:
true_vs_predicted = data.map(lambda p: (p.label, linear_model.predict(p.features)))
print "Linear Model predictions: " + str(true_vs_predicted.take(5))
Linear Model predictions: [(16.0, 117.89250386724844), (40.0, 116.22496123192109), (32.0, 116.02369145779232), (13.0, 115.67088016754431), (1.0, 115.56315650834316)]
Next, we will train the decision tree model simply using the default arguments to the trainRegressor method (which equates to using a tree depth of 5). Note that we need to pass in the other form of the dataset, data_dt , that we created from the raw feature values (as opposed to the binary encoded features that we used for the preceding linear model). We also need to pass in an argument for categoricalFeaturesInfo . This is a dictionary that maps the categorical feature index to the number of categories for the feature. If a feature is not in this mapping, it will be treated as continuous. For our purposes, we will leave this as is, passing in an empty mapping:
In [29]:
dt_model = DecisionTree.trainRegressor(data_dt,{})
preds = dt_model.predict(data_dt.map(lambda p: p.features))
actual = data.map(lambda p: p.label)
true_vs_predicted_dt = actual.zip(preds)
print "Decision Tree predictions: " + str(true_vs_predicted_dt.take(5))
print "Decision Tree depth: " + str(dt_model.depth())
print "Decision Tree number of nodes: " + str(dt_model.numNodes())
Decision Tree predictions: [(16.0, 54.913223140495866), (40.0, 54.913223140495866), (32.0, 53.171052631578945), (13.0, 14.284023668639053), (1.0, 14.284023668639053)]
Decision Tree depth: 5
Decision Tree number of nodes: 63

Evaluating the performance of regression models

In [30]:
def squared_error(actual, pred):
    return (pred - actual)**2
In [31]:
def abs_error(actual, pred):
    return np.abs(pred - actual)
In [32]:
def squared_log_error(pred, actual):
    return (np.log(pred + 1) - np.log(actual + 1))**2
In [33]:
mse = true_vs_predicted.map(lambda (t, p): squared_error(t, p)).mean()
mae = true_vs_predicted.map(lambda (t, p): abs_error(t, p)).mean()
rmsle = np.sqrt(true_vs_predicted.map(lambda (t, p): squared_log_error(t, p)).mean())
In [34]:
print "Linear Model - Mean Squared Error: %2.4f" % mse
print "Linear Model - Mean Absolute Error: %2.4f" % mae
print "Linear Model - Root Mean Squared Log Error: %2.4f" % rmsle
Linear Model - Mean Squared Error: 30679.4539
Linear Model - Mean Absolute Error: 130.6429
Linear Model - Root Mean Squared Log Error: 1.4653
In [36]:
mse_dt = true_vs_predicted_dt.map(lambda (t, p): squared_error(t, p)).mean()
mae_dt = true_vs_predicted_dt.map(lambda (t, p): abs_error(t, p)).mean()
rmsle_dt = np.sqrt(true_vs_predicted_dt.map(lambda (t, p): squared_log_error(t, p)).mean())
print "Decision Tree - Mean Squared Error: %2.4f" % mse_dt
print "Decision Tree - Mean Absolute Error: %2.4f" % mae_dt
print "Decision Tree - Root Mean Squared Log Error: %2.4f" % rmsle_dt
Decision Tree - Mean Squared Error: 11560.7978
Decision Tree - Mean Absolute Error: 71.0969
Decision Tree - Root Mean Squared Log Error: 0.6259
In [ ]:
 

1 comment:

  1. This comment has been removed by a blog administrator.

    ReplyDelete