updates to function framework

This commit is contained in:
Andy Eschbacher 2017-01-30 18:01:14 -05:00
parent 1f2eb6ccfd
commit ee723aa3dc

View File

@ -2,21 +2,16 @@
Segmentation creation and prediction
"""
import sklearn
import numpy as np
import plpy
from sklearn.ensemble import GradientBoostingRegressor
from sklearn import metrics
from sklearn.cross_validation import train_test_split
from crankshaft.analysis_data_provider import AnalysisDateProvider
from crankshaft.analysis_data_provider import AnalysisDataProvider
# Lower level functions
# ---------------------
# NOTE: added optional param here
class Segmentation:
class Segmentation(object):
def __init__(self, data_provider=None):
if data_provider is None:
@ -24,52 +19,7 @@ class Segmentation:
else:
self.data_provider = data_provider
def clean_data(self, query, variable, feature_columns):
params = {"subquery": query,
"target": variable,
"features": feature_columns}
data = self.data_provider.get_model_data(params)
# extract target data from plpy object
target = np.array(data[0]['target'])
# put n feature data arrays into an n x m array of arrays
features = np.column_stack([np.array(data[0][col], dtype=float)
for col in feature_columns])
features, feature_means = replace_nan_with_mean(features)
target, target_mean = replace_nan_with_mean(target)
return target, features, target_mean, feature_means
def replace_nan_with_mean(array, means=None):
"""
Input:
@param array: an array of floats which may have null-valued
entries
Output:
array with nans filled in with the mean of the dataset
"""
# TODO: update code to take in avgs parameter
# returns an array of rows and column indices
indices = np.where(np.isnan(array))
if not means:
for col in np.shape(array)[1]:
means[col] = np.mean(array[~np.isnan(array[:, col]), col])
# iterate through entries which have nan values
for row, col in zip(*indices):
array[row, col] = means[col]
return array, means
# High level interface
# --------------------
def create_and_predict_segment_agg(target, features, target_features,
def create_and_predict_segment_agg(self, target, features, target_features,
target_ids, model_parameters):
"""
Version of create_and_predict_segment that works on arrays that come
@ -94,12 +44,13 @@ class Segmentation:
model, accuracy = train_model(clean_target, clean_features,
model_parameters, 0.2)
prediction = model.predict(target_features)
accuracy_array = [accuracy]*prediction.shape[0]
accuracy_array = [accuracy] * prediction.shape[0]
return zip(target_ids, prediction,
np.full(prediction.shape, accuracy_array))
def create_and_predict_segment(query, variable, feature_columns,
target_query, model_params):
def create_and_predict_segment(self, query, variable, feature_columns,
target_query, model_params,
id_col='cartodb_id'):
"""
generate a segment with machine learning
Stuart Lynn
@ -114,42 +65,119 @@ class Segmentation:
"""
params = {"subquery": target_query,
"id_col": "cartodb_id"}
"id_col": id_col}
target, features, target_mean,
feature_means = clean_data(variable, feature_columns, query)
model, accuracy = train_model(target, features, model_params, 0.2)
result = predict_segment(model, feature_columns, target_query,
feature_means)
result = self.predict_segment(model, feature_columns, target_query,
feature_means)
accuracy_array = [accuracy] * result.shape[0]
cartodb_ids = self.data_provider.get_segment_data(params)
rowid = self.data_provider.get_segmentation_data(params)
return zip(cartodb_ids, result, accuracy_array)
return zip(rowid, result, accuracy_array)
def train_model(target, features, model_params, test_split):
def predict_segment(self, model, feature_columns, target_query, feature_means):
"""
Train the Gradient Boosting model on the provided data to calculate
the accuracy of the model
Use the provided model to predict the values for the new feature set
Input:
@param target: 1D Array of the variable that the model is to be
trained to predict
@param features: 2D Array NSamples *NFeatures to use in trining
the model
@param model_params: A dictionary of model parameters, the full
specification can be found on the
scikit learn page for [GradientBoostingRegressor]
(http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.GradientBoostingRegressor.html)
@parma test_split: The fraction of the data to be withheld for
testing the model / calculating the accuray
@param model: The pretrained model
@features_col: A list of features to use in the
model prediction (list of column names)
@target_query: The query to run to obtain the data to predict
on and the cartdb_ids associated with it.
"""
features_train, features_test,
target_train, target_test = train_test_split(features, target,
test_size=test_split)
model = GradientBoostingRegressor(**model_params)
model.fit(features_train, target_train)
accuracy = calculate_model_accuracy(model, features_test, target_test)
return model, accuracy
batch_size = 1000
params = {"subquery": target_query,
"feature_columns": feature_columns}
results = []
cursors = self.data_provider.get_segmentation_predict_data(params)
while True:
rows = cursors.fetch(batch_size)
if not rows:
break
batch = np.row_stack([np.array(row['features'], dtype=float)
for row in rows])
# Need to fix this to global mean. This will cause weird effects
batch = replace_nan_with_mean(batch, feature_means)
prediction = model.predict(batch)
results.append(prediction)
# NOTE: we removed the cartodb_ids calculation in here
return np.concatenate(results)
def clean_data(self, query, variable, feature_columns):
params = {"subquery": query,
"target": variable,
"features": feature_columns}
data = self.data_provider.get_segmentation_model_data(params)
# extract target data from plpy object
target = np.array(data[0]['target'])
# put n feature data arrays into an n x m array of arrays
features = np.column_stack([np.array(data[0][col], dtype=float)
for col in feature_columns])
features, feature_means = replace_nan_with_mean(features)
target, target_mean = replace_nan_with_mean(target)
return target, features, target_mean, feature_means
def replace_nan_with_mean(array, means=None):
"""
Input:
@param array: an array of floats which may have null-valued
entries
Output:
array with nans filled in with the mean of the dataset
"""
# TODO: update code to take in avgs parameter
# returns an array of rows and column indices
indices = np.where(np.isnan(array))
if not means:
for col in np.shape(array)[1]:
means[col] = np.mean(array[~np.isnan(array[:, col]), col])
# iterate through entries which have nan values
for row, col in zip(*indices):
array[row, col] = means[col]
return array, means
def train_model(target, features, model_params, test_split):
"""
Train the Gradient Boosting model on the provided data to calculate
the accuracy of the model
Input:
@param target: 1D Array of the variable that the model is to be
trained to predict
@param features: 2D Array NSamples *NFeatures to use in trining
the model
@param model_params: A dictionary of model parameters, the full
specification can be found on the
scikit learn page for [GradientBoostingRegressor]
(http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.GradientBoostingRegressor.html)
@parma test_split: The fraction of the data to be withheld for
testing the model / calculating the accuray
"""
features_train, features_test,
target_train, target_test = train_test_split(features, target,
test_size=test_split)
model = GradientBoostingRegressor(**model_params)
model.fit(features_train, target_train)
accuracy = calculate_model_accuracy(model, features_test, target_test)
return model, accuracy
def calculate_model_accuracy(model, features_test, target_test):
@ -164,37 +192,3 @@ def calculate_model_accuracy(model, features_test, target_test):
"""
prediction = model.predict(features_test)
return metrics.mean_squared_error(prediction, target_test)
def predict_segment(model, features_columns, target_query, feature_means):
"""
Use the provided model to predict the values for the new feature set
Input:
@param model: The pretrained model
@features_col: A list of features to use in the
model prediction (list of column names)
@target_query: The query to run to obtain the data to predict
on and the cartdb_ids associated with it.
"""
batch_size = 1000
params = {"subquery": target_query,
"feature": feature_columns}
results = []
cursors = self.data_provider.get_predict_data(params)
while True:
rows = cursors.fetch(batch_size)
if not rows:
break
batch = np.row_stack([np.array(row['features'], dtype=float)
for row in rows])
# Need to fix this to global mean. This will cause weird effects
batch = replace_nan_with_mean(batch, feature_means)
prediction = model.predict(batch)
results.append(prediction)
# NOTE: we removed the cartodb_ids calculation in here
return np.concatenate(results)