Predicting
Water Point Functionality
in Tanzania

Caitlin Snyder
7 min readJun 16, 2021

1) Introduction

As I wrap up Module 3 of Flat Iron’s Data Science bootcamp, I will be tacking a Driven Data competition, Pump It Up: Data Mining the Water Table.

Follow along below, or take a look at the Jupyter notebook and repo on GitHub.

The competition provides a dataset of water points in Tanzania and their associated characteristics. It is our job to predict, using the supplied training labels, whether a pump is functional, non-functional, or functional but in need of repair.

In the below, I’ll be building a model to predict water point status given a testing dataset. Let’s get started!

import sys
import re
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics._plot.confusion_matrix import plot_confusion_matrix
from sklearn.utils import resample
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import RandomizedSearchCV, GridSearchCV
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split as tts
from imblearn.pipeline import Pipeline as imbPipeline
from imblearn.over_sampling import RandomOverSampler# !
{sys.executable} -m pip install xgboost
from xgboost import XGBClassifier

2) Define the relevant classes

I’ll be taking an object-oriented approach to this project and will begin by defining the classes and constants I’ll need.

2A) Accessing the data

Paths. We’ll define our path strings within a simple dictionary for easy loading:

paths = {
'train_values': 'data/training_set_values.csv',
'train_labels': 'data/training_set_labels.csv',
'test_values': 'data/test_set_values.csv'
}

Data Loader. The Data Loader will load the appropriate csvs. This helper class includes an option (run_type_dev) to downsample our dataset as needed:

class DataLoader:
def __init__(self):
pass
def load(self, outcome, run_type_dev, sample_size):
X_train = self.load_from_path(paths['train_values'])
y_train = self.load_from_path(paths['train_labels'])

if run_type_dev:
print(f"Sample size = {sample_size}")
X_train = X_train.iloc[0:sample_size]
y_train = y_train.iloc[0:sample_size]

df = pd.concat([X_train, y_train], axis=1, join="inner")
self.outcome_values = np.unique(df[outcome])
return df

def load_from_path(self, path):
df = pd.read_csv(path)
df.set_index('id', inplace=True)
return df

2B) Cleaning the data

VizHelper. The Viz Helper will output relevant visualizations to inform iterative cleaning and analysis:

class VizHelper:
def __init__(self):
pass
def show_visualizations(self, df, outcome):
cont_features = df.select_dtypes(exclude=['object']).columns
self.check_outcome_distribution(df, outcome)
self.generate_heat_map(df, cont_features)
self.show_outliers(df, cont_features)
self.show_basic_correlations(df, cont_features, outcome)
self.show_outcome_dist(df, outcome)
def check_outcome_distribution(self, df, outcome):
labels = df[outcome].value_counts().index
cnts = df[outcome].value_counts().values

df_temp = pd.DataFrame({'labels':labels, 'counts':cnts})
ax = df_temp.plot.bar(x='labels', y='counts', rot=0)
ax.set_title("Frequency of outcome values")
def generate_heat_map(self, df, features):
plt.figure(figsize=(7, 6))
sns.heatmap(df[features].corr(), center=0)
plt.show()
def show_outliers(self, df, cols):
fig, axes = plt.subplots(2, 3, figsize=(9, 6))
axe = axes.ravel()
for i, xcol in enumerate(cols):
sns.boxplot(x=df[xcol], ax=axe[i])
plt.show()
def show_basic_correlations(self, df, cols, outcome):
preds = [i for i in cols if i != outcome]
fig, axes = plt.subplots(2, 3, figsize=(9, 6))
axe = axes.ravel()
for i, xcol in enumerate(preds):
df.plot(kind='scatter', x=xcol, y=outcome, alpha=0.4, color='b', ax=axe[i])

plt.show()
def show_outcome_dist(self, df, outcome):
df[outcome].value_counts().plot(kind='bar')
def show_confusion_matrix(self, clf, X_test, y_test, outcome, title):
labels = y_test[outcome].unique()
disp = plot_confusion_matrix(clf, X_test, y_test,
display_labels=labels,
cmap=plt.cm.Greens,
xticks_rotation='vertical')
disp.ax_.set_title(title)
plt.show()

Cleaner. Our Cleaner will perform basic cleaning tasks (eliminate impossible 0 values, correct column data types) on our raw data:

class Cleaner():
def __init__(self):
self.viz_helper = VizHelper()
def clean_df(self, df):
df.drop(['num_private', 'amount_tsh'], axis=1, inplace=True)
self.convert_to_string(df)
self.replace_nan(df)
self.bin_date(df)
self.bin_categorical_features(df)
return df
def convert_to_string(self, df):
cols = [
'region_code',
'district_code',
'public_meeting',
'permit'
]
list(map(lambda col: self.change_type(df, col, str), cols))

def change_type(self, df, col, new_type):
df[col] = df[col].astype(new_type)
def replace_nan(self, df):
self.replace_null_strings(df, "nan")
self.replace_null_strings(df, "none")
self.replace_zeros(df, "longitude")
self.replace_zeros(df, "latitude")
self.replace_zeros(df, 'construction_year')
self.replace_zeros(df, 'population')
def replace_null_strings(self, df, null_str):
df.replace(to_replace=null_str, value="unknown", inplace=True)
def replace_zeros(self, df, col):
df[col] = df.apply(lambda row: np.nan
if row[col] == 0 else row[col], axis=1)
def bin_date(self, df):
df['year'] = [x.split("-")[0] for x in df['date_recorded']]
df['month'] = [x.split("-")[1] for x in df['date_recorded']]
df.drop(['date_recorded'], axis=1, inplace=True)
def bin_categorical_features(self, df):
cols = df.select_dtypes(include=['object']).columns.values.tolist()
for col in cols:
top_10 = df[col].value_counts().index[:10].tolist()
if len(top_10) == 10:
df[col] = [x if x in top_10 else "Other" for x in df[col]]

2C) Build the pipeline

Splits Manager. The Splits Manager will allow us to easily access our train-test datasets (without worrying about the typos that a simple dictionary access command is susceptible to):

class SplitsManager:
def __init__(self):
self.X_train = None
self.X_test = None,
self.y_train = None
self.y_test = None

Pre-Processor. The Pre-Processor defines transformations our pipeline will use. Since our dataset is unbalanced (a histogram of our outcome variable shows that ‘functional’ is vastly over-represented in the dataset relative to the other two outcomes), we will oversample our minority outcomes to achieve a more balanced dataset:

class PreProcessor():
def __init__(self):
pass
def get_preprocessor(self, df):
return {
'sampler': self.get_resampler(),
'col': self.get_col_transformer(df)
}
def get_resampler(self):
return RandomOverSampler(random_state=42)

def get_col_transformer(self, df):
cont_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())])
cat_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='constant', fill_value='unknown')),
('onehot', OneHotEncoder(handle_unknown='ignore'))])
return ColumnTransformer(
transformers=[
('num', cont_transformer, self.get_cont_features(df)),
('cat', cat_transformer, self.get_cat_features(df))])
def get_cat_features(self, df):
return df.select_dtypes(include=['object']).columns

def get_cont_features(self, df):
return df.select_dtypes(exclude=['object']).columns

Classifiers. We can store our classifiers in a dictionary so that we can easily iterate over them during analysis. Notice that SVM has been commented out of the list of classifiers in circulation — my local computing resources couldn’t handle the workload:

classifiers = {
'decision_tree': DecisionTreeClassifier(),
'random_forest': RandomForestClassifier(),
# 'svm': SVC(),
'knn': KNeighborsClassifier(),
'xgboost': XGBClassifier()
}

Param Grids. We’ll also store the corresponding param grid for each classifier in a dictionary. Note that again elements have been iteratively disabled (xgboost this time) due to computing considerations:

param_grids = {
'decision_tree': {
'clf__criterion': ['gini', 'entropy'],
'clf__max_depth': [None, 2, 3, 4, 5, 6],
'clf__min_samples_split': [2, 5, 10],
'clf__min_samples_leaf': [1, 2, 3, 4, 5, 6]
},
'random_forest': {
'clf__n_estimators': [10, 20, 30],
'clf__criterion': ['gini', 'entropy'],
'clf__max_depth': [5, 10, 20, 30, 35],
'clf__min_samples_split': [2, 5, 10],
'clf__min_samples_leaf': [2, 3, 6]
},
'svm': {
'clf__C': [0.1, 1, 10],
'clf__gamma': [1, 0.1, 0.01],
'clf__kernel': ['rbf']
},
'knn': {
'clf__n_neighbors': [3, 5, 7, 11, 19],
'clf__weights': ['uniform', 'distance'],
'clf__metric': ['euclidean', 'manhattan']
},
'xgboost': {
'clf__max_depth': [10],
# 'clf__min_child_weight': [1],
# 'clf__eta': [.3],
# 'clf__subsample': [1],
# 'clf__colsample_bytree': [1],
# 'clf__objective': ['reg:linear'],
}
}

Results Manager. The Report Manager will use store and organize the results:

class ResultsManager:
def __init__(self, key, results):
self.clf_name = key.replace("_", " ").upper()
self.best_estimator = results.best_estimator_
self.best_score = results.best_score_
self.best_params = results.best_params_

Report Manager. The Report Manager will use the shared keys in these two dictionaries to iterate over each classifier and score its performance:

class ReportManager:
def __init__(self, outcome):
self.splits = None
self.results = []
self.outcome = outcome
def run_reports(self, preprocessor, splits: SplitsManager):
self.splits = splits
list(map(lambda key: self.execute_pipeline(preprocessor, key), classifiers.keys()))
list(map(lambda x: self.display_predictions(x), self.results))
self.display_results()
def execute_pipeline(self, preprocessor, key):
print(f"Beginning {key}...")
param_grid = param_grids[key]
param_grid['col__num__imputer__strategy'] = ['mean', 'median']
pipe = imbPipeline([
('col', preprocessor['col']),
('sampler', preprocessor['sampler']),
("clf", classifiers[key])
])
gs = GridSearchCV(pipe,
param_grid,
cv=3,
scoring="accuracy",
n_jobs=-1
)
gs_results = gs.fit(self.splits.X_train, self.splits.y_train.values.ravel())
self.results.append(ResultsManager(key, gs_results))
def display_results(self):
rows = list(map(lambda x: [x.clf_name, x.best_score, x.best_params], self.results))
df = pd.DataFrame(rows, columns=[
'clf_name',
'best_score',
'best_params'
]).sort_values(by='best_score', ascending = False)
print(df)
print('---------\nAccuracy reports\n---------')
for result in self.results:
print(f"{result.clf_name}: {result.best_score:.2%}")
def display_predictions(self, result: ResultsManager):
title = f"{result.clf_name} ({result.best_score:.2%})"
VizHelper().show_confusion_matrix(result.best_estimator, self.splits.X_test, self.splits.y_test, self.outcome, title)

2D) Tying it all together

Data Manager. The Data Manager is responsible for coordinating the different helper classes responsible for cleaning, pipeline-creation, visualization, and analysis:

class DataManager:
def __init__(self, sample_size, run_type_dev=True):
self.sample_size = sample_size
self.outcome = 'status_group'
self.splits = SplitsManager()
self.process_data(run_type_dev)
def process_data(self, run_type_dev):
raw_df = DataLoader().load(self.outcome, run_type_dev, self.sample_size)
df = Cleaner().clean_df(raw_df)
VizHelper().show_visualizations(df, self.outcome)
self.split_data(df, self.outcome)

def split_data(self, df, outcome):
X = df[[i for i in df.columns if i != outcome]]
y = df[[outcome]]

self.splits.X_train, self.splits.X_test,\
self.splits.y_train, self.splits.y_test =\
tts(X, y, test_size = 0.2, random_state = 42)
def get_report(self):
preprocessor = PreProcessor().get_preprocessor(self.splits.X_train)
ReportManager(self.outcome).run_reports(preprocessor, self.splits)

3) Analyze Results & Take-aways

Now that we’ve defined our classes, we can kick off the analysis:

dfs = DataManager(sample_size=500, run_type_dev=False)
dfs.get_report()

Let’s first consider the implications of our exploratory visualizations:

1) Our predictors are not highly correlated. We do not need to drop features out of concerns for multicollinearity.

2) Our outcome variables is extremely unbalanced. We will compensate for this by oversampling the underrepresented classes in our pipeline.

3) Our boxplots indicate that our predictors are fairly free of outliers.

Turning to our confusion matrices, let’s review the relative performance of our different classifiers:

1) Both our Decision Tree and K-Nearest Neighbors classifiers performed well (73.7% and 75.1% accuracy, respectively).

2) Random Forest and XGBoost performed best, with XGBoost (77.6%) taking a slight lead over Random Forest (77.0%).

3) Support Vector Machine could not be completed locally with the available computing resources. In future projects, I will train my models using cloud-based servers.

Additional avenues for exploration

In the future, it would be exciting to consider how frequency of conflict incidences related to water-resource usage might impact the model’s performance!

Sources

--

--