Bike Sharing Demand Kaggle Competition with Spark and Python
Forecast use of a city bikeshare system
Bike sharing systems are a means of renting bicycles where the process of obtaining membership, rental, and bike return is automated via a network of kiosk locations throughout a city. Using these systems, people are able rent a bike from a one location and return it to a different place on an as-needed basis. Currently, there are over 500 bike-sharing programs around the world.
The data generated by these systems makes them attractive for researchers because the duration of travel, departure location, arrival location, and time elapsed is explicitly recorded. Bike sharing systems therefore function as a sensor network, which can be used for studying mobility in a city. In this competition, participants are asked to combine historical usage patterns with weather data in order to forecast bike rental demand in the Capital Bikeshare program in Washington, D.C.
The dataset for the same can be downloaded from
Once you have downloaded the Bike-Sharing-Dataset.zip file, unzip it. This will create a directory called Bike-Sharing-Dataset, which contains the day.csv, hour.csv, and the Readme.txt files.
The Readme.txt file contains information on the dataset, including the variable names and descriptions. Take a look at the file, and you will see that we have the following variables available:
• instant: This is the record ID
• dteday: This is the raw date
• season: This is different seasons such as spring, summer, winter, and fall
• yr: This is the year (2011 or 2012)
• mnth: This is the month of the year
• hr: This is the hour of the day
• holiday: This is whether the day was a holiday or not
• weekday: This is the day of the week
• workingday: This is whether the day was a working day or not
• weathersit: This is a categorical variable that describes the weather at a
particular time
• temp: This is the normalized temperature
• atemp: This is the normalized apparent temperature
• hum: This is the normalized humidity
• windspeed: This is the normalized wind speed
• cnt: This is the target variable, that is, the count of bike rentals for that hour
We will work with the hourly data contained in hour.csv. If you look at the first line of the dataset, you will see that it contains the column names as a header. You can do this by running the following command:
Before we work with the data in Spark, we will again remove the header from the first line of the file using the same sed command that we used previously to create a new file called hour_noheader.csv:
If you prefer to use IPython Notebook, you can start it with the following command:
The .ipynb file for this project can be found on the following link:
In [6]:
path = "/home/osboxes/new_hour.csv"
raw_data = sc.textFile(path)
num_data = raw_data.count()
We'll start as usual by loading the dataset and inspecting it:
In [7]:
records = raw_data.map(lambda x: x.split(","))
first = records.first()
print first
In [8]:
print num_data
So, we have 17,379 hourly records in our dataset. We have inspected the column
names already. We will ignore the record ID and raw date columns. We will also
ignore the casual and registered count target variables and focus on the overall
count variable, cnt (which is the sum of the other two counts). We are left with
12 variables. The first eight are categorical, while the last 4 are normalized
real-valued variables.
To deal with the eight categorical variables, we will use the binary encoding
approach with which you should be quite familiar by now. The four real-valued
variables will be left as is.
We will first cache our dataset, since we will be reading from it many times:
In [9]:
records.cache()
Out[9]:
In order to extract each categorical feature into a binary vector form, we will need
to know the feature mapping of each feature value to the index of the nonzero value
in our binary vector. Let's define a function that will extract this mapping from our
dataset for a given column:
In [13]:
def get_mapping(rdd, idx):
return rdd.map(lambda fields: fields[idx]).distinct().zipWithIndex().collectAsMap()
Our function first maps the field to its unique values and then uses the
zipWithIndex transformation to zip the value up with a unique index such that a
key-value RDD is formed, where the key is the variable and the value is the index.
This index will be the index of the nonzero entry in the binary vector representation
of the feature. We will finally collect this RDD back to the driver as a Python
dictionary.
We can test our function on the third variable column (index 2):
In [14]:
print "Mapping of first categorical feasture column: %s" % get_mapping(records, 2)
Now, we can apply this function to each categorical column (that is, for variable
indices 2 to 9):
In [15]:
mappings = [get_mapping(records, i) for i in range(2,10)]
cat_len = sum(map(len, mappings))
num_len = len(records.first()[11:15])
total_len = num_len + cat_len
We now have the mappings for each variable, and we can see how many values in
total we need for our binary vector representation:
In [16]:
print "Feature vector length for categorical features: %d" % cat_len
print "Feature vector length for numerical features: %d" % num_len
print "Total feature vector length: %d" % total_len
Creating feature vectors for the linear model¶
The next step is to use our extracted mappings to convert the categorical features to binary-encoded features. Again, it will be helpful to create a function that we can apply to each record in our dataset for this purpose. We will also create a function to extract the target variable from each record. We will need to import numpy for linear algebra utilities and MLlib's LabeledPoint class to wrap our feature vectors and target variables:
In [17]:
from pyspark.mllib.regression import LabeledPoint
import numpy as np
In [18]:
def extract_features(record):
cat_vec = np.zeros(cat_len)
i = 0
step = 0
for field in record[2:9]:
m = mappings[i]
idx = m[field]
cat_vec[idx + step] = 1
i = i + 1
step = step + len(m)
num_vec = np.array([float(field) for field in record[10:14]])
return np.concatenate((cat_vec, num_vec))
In [19]:
def extract_label(record):
return float(record[-1])
In the preceding extract_features function, we ran through each column in the
row of data. We extracted the binary encoding for each variable in turn from the
mappings we created previously. The step variable ensures that the nonzero feature
index in the full feature vector is correct (and is somewhat more efficient than,
say, creating many smaller binary vectors and concatenating them). The numeric
vector is created directly by first converting the data to floating point numbers and
wrapping these in a numpy array. The resulting two vectors are then concatenated.
The extract_label function simply converts the last column variable (the count)
into a float.
With our utility functions defined, we can proceed with extracting feature vectors
and labels from our data records:
In [20]:
data = records.map(lambda r: LabeledPoint(extract_label(r), extract_features(r)))
Let's inspect the first record in the extracted feature RDD:
In [21]:
first_point = data.first()
print "Raw data: " + str(first[2:])
print "Label: " + str(first_point.label)
print "Linear Model feature vector:\n" + str(first_point.features)
print "Linear Model feature vector length: " + str(len(first_point.features))
As we can see, we converted the raw data into a feature vector made up of the binary
categorical and real numeric features, and we indeed have a total vector length of 61 .
Creating feature vectors for the decision tree¶
As we have seen, decision tree models typically work on raw features (that is, it is not required to convert categorical features into a binary vector encoding; they can, instead, be used directly). Therefore, we will create a separate function to extract the decision tree feature vector, which simply converts all the values to floats and wraps them in a numpy array:
In [22]:
def extract_features_dt(record):
return np.array(map(float, record[2:14]))
In [24]:
data_dt = records.map(lambda r: LabeledPoint(extract_label(r),extract_features_dt(r)))
first_point_dt = data_dt.first()
print "Decision Tree feature vector: " + str(first_point_dt.features)
print "Decision Tree feature vector length: " + str(len(first_point_dt.features))
Training and using regression models¶
Training for regression models using decision trees and linear models follows the same procedure as for classification models. We simply pass the training data contained in a [LabeledPoint] RDD to the relevant train method. Note that in Scala, if we wanted to customize the various model parameters (such as regularization and step size for the SGD optimizer), we are required to instantiate a new model instance and use the optimizer field to access these available parameter setters.
In [25]:
from pyspark.mllib.regression import LinearRegressionWithSGD
from pyspark.mllib.tree import DecisionTree
help(LinearRegressionWithSGD.train)
In [26]:
help(DecisionTree.trainRegressor)
Training a regression model on the bike sharing dataset¶
We're ready to use the features we have extracted to train our models on the bike sharing data. First, we'll train the linear regression model and take a look at the first few predictions that the model makes on the data:
In [27]:
linear_model = LinearRegressionWithSGD.train(data, iterations=10,step=0.1, intercept=False)
Note that we have not used the default settings for iterations and step here.
We've changed the number of iterations so that the model does not take too long to
train. As for the step size, you will see why this has been changed from the default a
little later. You will see the following output:
In [28]:
true_vs_predicted = data.map(lambda p: (p.label, linear_model.predict(p.features)))
print "Linear Model predictions: " + str(true_vs_predicted.take(5))
Next, we will train the decision tree model simply using the default arguments to
the trainRegressor method (which equates to using a tree depth of 5). Note that
we need to pass in the other form of the dataset, data_dt , that we created from the
raw feature values (as opposed to the binary encoded features that we used for the
preceding linear model).
We also need to pass in an argument for categoricalFeaturesInfo . This is a
dictionary that maps the categorical feature index to the number of categories for the
feature. If a feature is not in this mapping, it will be treated as continuous. For our
purposes, we will leave this as is, passing in an empty mapping:
In [29]:
dt_model = DecisionTree.trainRegressor(data_dt,{})
preds = dt_model.predict(data_dt.map(lambda p: p.features))
actual = data.map(lambda p: p.label)
true_vs_predicted_dt = actual.zip(preds)
print "Decision Tree predictions: " + str(true_vs_predicted_dt.take(5))
print "Decision Tree depth: " + str(dt_model.depth())
print "Decision Tree number of nodes: " + str(dt_model.numNodes())
Evaluating the performance of regression models¶
In [30]:
def squared_error(actual, pred):
return (pred - actual)**2
In [31]:
def abs_error(actual, pred):
return np.abs(pred - actual)
In [32]:
def squared_log_error(pred, actual):
return (np.log(pred + 1) - np.log(actual + 1))**2
In [33]:
mse = true_vs_predicted.map(lambda (t, p): squared_error(t, p)).mean()
mae = true_vs_predicted.map(lambda (t, p): abs_error(t, p)).mean()
rmsle = np.sqrt(true_vs_predicted.map(lambda (t, p): squared_log_error(t, p)).mean())
In [34]:
print "Linear Model - Mean Squared Error: %2.4f" % mse
print "Linear Model - Mean Absolute Error: %2.4f" % mae
print "Linear Model - Root Mean Squared Log Error: %2.4f" % rmsle
In [36]:
mse_dt = true_vs_predicted_dt.map(lambda (t, p): squared_error(t, p)).mean()
mae_dt = true_vs_predicted_dt.map(lambda (t, p): abs_error(t, p)).mean()
rmsle_dt = np.sqrt(true_vs_predicted_dt.map(lambda (t, p): squared_log_error(t, p)).mean())
print "Decision Tree - Mean Squared Error: %2.4f" % mse_dt
print "Decision Tree - Mean Absolute Error: %2.4f" % mae_dt
print "Decision Tree - Root Mean Squared Log Error: %2.4f" % rmsle_dt
In [ ]:
This comment has been removed by a blog administrator.
ReplyDelete