refactoring segmentation function
This commit is contained in:
parent
e5f1f92ce1
commit
cee8967274
@ -51,3 +51,21 @@ AS $$
|
|||||||
model_params = {'n_estimators': n_estimators, 'max_depth':max_depth, 'subsample' : subsample, 'learning_rate': learning_rate, 'min_samples_leaf' : min_samples_leaf}
|
model_params = {'n_estimators': n_estimators, 'max_depth':max_depth, 'subsample' : subsample, 'learning_rate': learning_rate, 'min_samples_leaf' : min_samples_leaf}
|
||||||
return create_and_predict_segment(query,variable_name,target_table, model_params)
|
return create_and_predict_segment(query,variable_name,target_table, model_params)
|
||||||
$$ LANGUAGE plpythonu;
|
$$ LANGUAGE plpythonu;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION
|
||||||
|
CDB_CreateAndPredictSegment (
|
||||||
|
query TEXT,
|
||||||
|
variable_name TEXT,
|
||||||
|
target_table TEXT,
|
||||||
|
feature_columns TEXT[],
|
||||||
|
n_estimators INTEGER DEFAULT 1200,
|
||||||
|
max_depth INTEGER DEFAULT 3,
|
||||||
|
subsample DOUBLE PRECISION DEFAULT 0.5,
|
||||||
|
learning_rate DOUBLE PRECISION DEFAULT 0.01,
|
||||||
|
min_samples_leaf INTEGER DEFAULT 1)
|
||||||
|
RETURNS TABLE (cartodb_id TEXT, prediction NUMERIC, accuracy NUMERIC)
|
||||||
|
AS $$
|
||||||
|
from crankshaft.segmentation import create_and_predict_segment
|
||||||
|
model_params = {'n_estimators': n_estimators, 'max_depth':max_depth, 'subsample' : subsample, 'learning_rate': learning_rate, 'min_samples_leaf' : min_samples_leaf}
|
||||||
|
return create_and_predict_segment(query,variable_name,target_table, model_params)
|
||||||
|
$$ LANGUAGE plpythonu;
|
||||||
|
@ -65,3 +65,44 @@ class AnalysisDataProvider:
|
|||||||
return data
|
return data
|
||||||
except plpy.SPIError, err:
|
except plpy.SPIError, err:
|
||||||
plpy.error('Analysis failed: %s' % err)
|
plpy.error('Analysis failed: %s' % err)
|
||||||
|
|
||||||
|
def get_model_data(self, params):
|
||||||
|
"""fetch data for Segmentation"""
|
||||||
|
columns = ','.join(['array_agg("{col}") As "{col}"'.format(col=col)
|
||||||
|
for col in params['feature_columns']])
|
||||||
|
|
||||||
|
query = ("SELECT"
|
||||||
|
"array_agg({target}) As target,"
|
||||||
|
"{columns} As feature",
|
||||||
|
"FROM ({subquery}) As q").format(params['query'],
|
||||||
|
['variable'])
|
||||||
|
try:
|
||||||
|
data = plpy.execute(query)
|
||||||
|
return data
|
||||||
|
except plpy.SPIError, err:
|
||||||
|
plpy.error('Failed to build segmentation model: %s' % err)
|
||||||
|
|
||||||
|
def get_segment_data(self, params):
|
||||||
|
"""fetch cartodb_ids"""
|
||||||
|
query = ("SELECT"
|
||||||
|
"array_agg({id_col} ORDER BY {id_col}) as ids,"
|
||||||
|
"FROM ({subquery}) as q").format(**params)
|
||||||
|
try:
|
||||||
|
data = plpy.execute(query)
|
||||||
|
return data
|
||||||
|
except plpy.SPIError, err:
|
||||||
|
plpy.error('Failed to build segmentation model: %s' % err)
|
||||||
|
|
||||||
|
def get_predict_data(self, params):
|
||||||
|
"""fetch data for Segmentation"""
|
||||||
|
|
||||||
|
joined_features = ','.join(['"{0}"::numeric'.format(a)
|
||||||
|
for a in features_columns])
|
||||||
|
query = ("SELECT"
|
||||||
|
"Array({joined_features}) As features,"
|
||||||
|
"FROM ({subquery}) as q").format(**params)
|
||||||
|
try:
|
||||||
|
cursor = plpy.cursor(query)
|
||||||
|
return cursor
|
||||||
|
except plpy.SPIError, err:
|
||||||
|
plpy.error('Failed to build segmentation model: %s' % err)
|
||||||
|
@ -8,56 +8,28 @@ import plpy
|
|||||||
from sklearn.ensemble import GradientBoostingRegressor
|
from sklearn.ensemble import GradientBoostingRegressor
|
||||||
from sklearn import metrics
|
from sklearn import metrics
|
||||||
from sklearn.cross_validation import train_test_split
|
from sklearn.cross_validation import train_test_split
|
||||||
|
from crankshaft.analysis_data_provider import AnalysisDateProvider
|
||||||
|
|
||||||
# Lower level functions
|
# Lower level functions
|
||||||
# ---------------------
|
# ---------------------
|
||||||
|
|
||||||
# NOTE: added optional param here
|
# NOTE: added optional param here
|
||||||
def replace_nan_with_mean(array, avgs=None):
|
|
||||||
"""
|
|
||||||
Input:
|
|
||||||
@param array: an array of floats which may have null-valued entries
|
|
||||||
Output:
|
|
||||||
array with nans filled in with the mean of the dataset
|
|
||||||
"""
|
|
||||||
# TODO: update code to take in avgs parameter
|
|
||||||
|
|
||||||
# returns an array of rows and column indices
|
|
||||||
indices = np.where(np.isnan(array))
|
|
||||||
|
|
||||||
# iterate through entries which have nan values
|
|
||||||
for row, col in zip(*indices):
|
|
||||||
array[row, col] = np.mean(array[~np.isnan(array[:, col]), col])
|
|
||||||
|
|
||||||
return array
|
|
||||||
|
|
||||||
|
|
||||||
def get_data(variable, feature_columns, query):
|
class Segmentation:
|
||||||
"""
|
|
||||||
Fetch data from the database, clean, and package into
|
|
||||||
numpy arrays
|
|
||||||
Input:
|
|
||||||
@param variable: name of the target variable
|
|
||||||
@param feature_columns: list of column names
|
|
||||||
@param query: subquery that data is pulled from for the packaging
|
|
||||||
Output:
|
|
||||||
prepared data, packaged into NumPy arrays
|
|
||||||
"""
|
|
||||||
|
|
||||||
columns = ','.join(['array_agg("{col}") As "{col}"'.format(col=col)
|
def __init__(self, data_provider=None):
|
||||||
for col in feature_columns])
|
if data_provider is None:
|
||||||
|
self.data_provider = AnalysisDataProvider()
|
||||||
|
else:
|
||||||
|
self.data_provider = data_provider
|
||||||
|
|
||||||
try:
|
def clean_data(self, query, variable, feature_columns):
|
||||||
data = plpy.execute('''
|
params = {"subquery": query,
|
||||||
SELECT
|
"target": variable,
|
||||||
array_agg("{variable}") As target,
|
"features": feature_columns}
|
||||||
{columns}
|
|
||||||
FROM ({query}) As a'''.format(
|
data = self.data_provider.get_model_data(params)
|
||||||
variable=variable,
|
|
||||||
columns=columns,
|
|
||||||
query=query))
|
|
||||||
except Exception, e:
|
|
||||||
plpy.error('Failed to access data to build segmentation model: %s' % e)
|
|
||||||
|
|
||||||
# extract target data from plpy object
|
# extract target data from plpy object
|
||||||
target = np.array(data[0]['target'])
|
target = np.array(data[0]['target'])
|
||||||
@ -66,12 +38,37 @@ def get_data(variable, feature_columns, query):
|
|||||||
features = np.column_stack([np.array(data[0][col], dtype=float)
|
features = np.column_stack([np.array(data[0][col], dtype=float)
|
||||||
for col in feature_columns])
|
for col in feature_columns])
|
||||||
|
|
||||||
return replace_nan_with_mean(target), replace_nan_with_mean(features)
|
features, feature_means = replace_nan_with_mean(features)
|
||||||
|
target, target_mean = replace_nan_with_mean(target)
|
||||||
|
return target, features, target_mean, feature_means
|
||||||
|
|
||||||
|
def replace_nan_with_mean(array, means=None):
|
||||||
|
"""
|
||||||
|
Input:
|
||||||
|
@param array: an array of floats which may have null-valued
|
||||||
|
entries
|
||||||
|
Output:
|
||||||
|
array with nans filled in with the mean of the dataset
|
||||||
|
"""
|
||||||
|
# TODO: update code to take in avgs parameter
|
||||||
|
|
||||||
|
# returns an array of rows and column indices
|
||||||
|
indices = np.where(np.isnan(array))
|
||||||
|
|
||||||
|
if not means:
|
||||||
|
for col in np.shape(array)[1]:
|
||||||
|
means[col] = np.mean(array[~np.isnan(array[:, col]), col])
|
||||||
|
|
||||||
|
# iterate through entries which have nan values
|
||||||
|
for row, col in zip(*indices):
|
||||||
|
array[row, col] = means[col]
|
||||||
|
|
||||||
|
return array, means
|
||||||
|
|
||||||
|
|
||||||
# High level interface
|
# High level interface
|
||||||
# --------------------
|
# --------------------
|
||||||
|
|
||||||
|
|
||||||
def create_and_predict_segment_agg(target, features, target_features,
|
def create_and_predict_segment_agg(target, features, target_features,
|
||||||
target_ids, model_parameters):
|
target_ids, model_parameters):
|
||||||
"""
|
"""
|
||||||
@ -79,15 +76,15 @@ def create_and_predict_segment_agg(target, features, target_features,
|
|||||||
straight form the SQL calling the function.
|
straight form the SQL calling the function.
|
||||||
|
|
||||||
Input:
|
Input:
|
||||||
@param target: The 1D array of lenth NSamples containing the target
|
@param target: The 1D array of lenth NSamples containing the
|
||||||
variable we want the model to predict
|
target variable we want the model to predict
|
||||||
@param features: The 2D array of size NSamples * NFeatures that
|
@param features: The 2D array of size NSamples * NFeatures that
|
||||||
form the imput to the model
|
form the imput to the model
|
||||||
@param target_ids: A 1D array of target_ids that will be used to
|
@param target_ids: A 1D array of target_ids that will be used
|
||||||
associate the results of the prediction with the rows which
|
to associate the results of the prediction with the rows which
|
||||||
they come from
|
they come from
|
||||||
@param model_parameters: A dictionary containing parameters for the
|
@param model_parameters: A dictionary containing parameters for
|
||||||
model.
|
the model.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
clean_target = replace_nan_with_mean(target)
|
clean_target = replace_nan_with_mean(target)
|
||||||
@ -101,46 +98,38 @@ def create_and_predict_segment_agg(target, features, target_features,
|
|||||||
return zip(target_ids, prediction,
|
return zip(target_ids, prediction,
|
||||||
np.full(prediction.shape, accuracy_array))
|
np.full(prediction.shape, accuracy_array))
|
||||||
|
|
||||||
|
def create_and_predict_segment(query, variable, feature_columns,
|
||||||
def create_and_predict_segment(query, variable, target_query, model_params):
|
target_query, model_params):
|
||||||
"""
|
"""
|
||||||
generate a segment with machine learning
|
generate a segment with machine learning
|
||||||
Stuart Lynn
|
Stuart Lynn
|
||||||
|
@param query: subquery that data is pulled from for packaging
|
||||||
|
@param variable: name of the target variable
|
||||||
|
@param feature_columns: list of column names
|
||||||
|
@target_query: The query to run to obtain the data to predict
|
||||||
|
@param model_params: A dictionary of model parameters, the full
|
||||||
|
specification can be found on the
|
||||||
|
scikit learn page for [GradientBoostingRegressor]
|
||||||
|
(http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.GradientBoostingRegressor.html)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# fetch column names
|
params = {"subquery": target_query,
|
||||||
try:
|
"id_col": "cartodb_id"}
|
||||||
columns = plpy.execute('''
|
|
||||||
SELECT *
|
|
||||||
FROM ({query}) As a
|
|
||||||
LIMIT 1'''.format(query=query))[0].keys()
|
|
||||||
except Exception, e:
|
|
||||||
plpy.error('Failed to build segmentation model: %s' % e)
|
|
||||||
|
|
||||||
# extract column names to be used in building the segmentation model
|
|
||||||
feature_columns = set(columns) - set([variable, 'cartodb_id',
|
|
||||||
'the_geom', 'the_geom_webmercator'])
|
|
||||||
# get data from database
|
|
||||||
target, features = get_data(variable, feature_columns, query)
|
|
||||||
|
|
||||||
|
target, features, target_mean,
|
||||||
|
feature_means = clean_data(variable, feature_columns, query)
|
||||||
model, accuracy = train_model(target, features, model_params, 0.2)
|
model, accuracy = train_model(target, features, model_params, 0.2)
|
||||||
result = predict_segment(model, feature_columns, target_query)
|
result = predict_segment(model, feature_columns, target_query,
|
||||||
|
feature_means)
|
||||||
accuracy_array = [accuracy] * result.shape[0]
|
accuracy_array = [accuracy] * result.shape[0]
|
||||||
|
|
||||||
# cartodb_id plpy.execute code here instead of in predict_segment
|
cartodb_ids = self.data_provider.get_segment_data(params)
|
||||||
try:
|
|
||||||
cartodb_ids = plpy.execute('''
|
|
||||||
SELECT array_agg(cartodb_id ORDER BY cartodb_id) As cartodb_ids
|
|
||||||
FROM ({0}) As a'''.format(target_query))[0]['cartodb_ids']
|
|
||||||
except Exception, err:
|
|
||||||
plpy.error('Failed to build segmentation model: %s' % err)
|
|
||||||
|
|
||||||
return zip(cartodb_ids, result, accuracy_array)
|
return zip(cartodb_ids, result, accuracy_array)
|
||||||
|
|
||||||
|
|
||||||
def train_model(target, features, model_params, test_split):
|
def train_model(target, features, model_params, test_split):
|
||||||
"""
|
"""
|
||||||
Train the Gradient Boosting model on the provided data and calculate
|
Train the Gradient Boosting model on the provided data to calculate
|
||||||
the accuracy of the model
|
the accuracy of the model
|
||||||
Input:
|
Input:
|
||||||
@param target: 1D Array of the variable that the model is to be
|
@param target: 1D Array of the variable that the model is to be
|
||||||
@ -154,8 +143,9 @@ def train_model(target, features, model_params, test_split):
|
|||||||
@parma test_split: The fraction of the data to be withheld for
|
@parma test_split: The fraction of the data to be withheld for
|
||||||
testing the model / calculating the accuray
|
testing the model / calculating the accuray
|
||||||
"""
|
"""
|
||||||
features_train, features_test, target_train, target_test =
|
features_train, features_test,
|
||||||
train_test_split(features, target, test_size=test_split)
|
target_train, target_test = train_test_split(features, target,
|
||||||
|
test_size=test_split)
|
||||||
model = GradientBoostingRegressor(**model_params)
|
model = GradientBoostingRegressor(**model_params)
|
||||||
model.fit(features_train, target_train)
|
model.fit(features_train, target_train)
|
||||||
accuracy = calculate_model_accuracy(model, features_test, target_test)
|
accuracy = calculate_model_accuracy(model, features_test, target_test)
|
||||||
@ -167,16 +157,16 @@ def calculate_model_accuracy(model, features_test, target_test):
|
|||||||
Calculate the mean squared error of the model prediction
|
Calculate the mean squared error of the model prediction
|
||||||
Input:
|
Input:
|
||||||
@param model: model trained from input features
|
@param model: model trained from input features
|
||||||
@param features_test: test features set to make a prediction from
|
@param features_test: test features set to make prediction from
|
||||||
@param target_target: test target set to compare predictions to
|
@param target_target: test target set to compare predictions to
|
||||||
Output:
|
Output:
|
||||||
mean squared error of the model prection compared to target_test
|
mean squared error of the model prection compared target_test
|
||||||
"""
|
"""
|
||||||
prediction = model.predict(features_test)
|
prediction = model.predict(features_test)
|
||||||
return metrics.mean_squared_error(prediction, target_test)
|
return metrics.mean_squared_error(prediction, target_test)
|
||||||
|
|
||||||
|
|
||||||
def predict_segment(model, features_col, target_query):
|
def predict_segment(model, features_columns, target_query, feature_means):
|
||||||
"""
|
"""
|
||||||
Use the provided model to predict the values for the new feature set
|
Use the provided model to predict the values for the new feature set
|
||||||
Input:
|
Input:
|
||||||
@ -188,37 +178,21 @@ def predict_segment(model, features_col, target_query):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
batch_size = 1000
|
batch_size = 1000
|
||||||
joined_features = ','.join(['"{0}"::numeric'.format(a)
|
params = {"subquery": target_query,
|
||||||
for a in features_col])
|
"feature": feature_columns}
|
||||||
|
|
||||||
try:
|
|
||||||
cursor = plpy.cursor('''
|
|
||||||
SELECT Array[{joined_features}] As features
|
|
||||||
FROM ({target_query}) As a'''.format(
|
|
||||||
joined_features=joined_features,
|
|
||||||
target_query=target_query))
|
|
||||||
except Exception, err:
|
|
||||||
plpy.error('Failed to build segmentation model: %s' % err)
|
|
||||||
|
|
||||||
# TODO: is this a good solution for finding the averages?
|
|
||||||
# r = plpy.execute('''
|
|
||||||
# SELECT {cols}
|
|
||||||
# FROM ({target_query}) As a
|
|
||||||
# '''.format(cols=', '.join(['avg({c}) As {c}'.format(c=c)
|
|
||||||
# for c in joined_features]),
|
|
||||||
# target_query=target_query))
|
|
||||||
# avgs = [r[0][c] for c in joined_features]
|
|
||||||
results = []
|
results = []
|
||||||
|
cursors = self.data_provider.get_predict_data(params)
|
||||||
while True:
|
while True:
|
||||||
rows = cursor.fetch(batch_size)
|
rows = cursors.fetch(batch_size)
|
||||||
if not rows:
|
if not rows:
|
||||||
break
|
break
|
||||||
batch = np.row_stack([np.array(row['features'], dtype=float)
|
batch = np.row_stack([np.array(row['features'], dtype=float)
|
||||||
for row in rows])
|
for row in rows])
|
||||||
|
|
||||||
# Need to fix this to global mean. This will cause weird effects
|
# Need to fix this to global mean. This will cause weird effects
|
||||||
batch = replace_nan_with_mean(batch)
|
|
||||||
|
batch = replace_nan_with_mean(batch, feature_means)
|
||||||
prediction = model.predict(batch)
|
prediction = model.predict(batch)
|
||||||
results.append(prediction)
|
results.append(prediction)
|
||||||
|
|
||||||
|
@ -42,6 +42,9 @@ class MockPlPy:
|
|||||||
def info(self, msg):
|
def info(self, msg):
|
||||||
self.infos.append(msg)
|
self.infos.append(msg)
|
||||||
|
|
||||||
|
def error(self, msg):
|
||||||
|
self.infos.append(msg)
|
||||||
|
|
||||||
def cursor(self, query):
|
def cursor(self, query):
|
||||||
data = self.execute(query)
|
data = self.execute(query)
|
||||||
return MockCursor(data)
|
return MockCursor(data)
|
||||||
|
Loading…
Reference in New Issue
Block a user