diff --git a/src/pg/sql/05_segmentation.sql b/src/pg/sql/05_segmentation.sql index 7dac003..d2de727 100644 --- a/src/pg/sql/05_segmentation.sql +++ b/src/pg/sql/05_segmentation.sql @@ -51,3 +51,21 @@ AS $$ model_params = {'n_estimators': n_estimators, 'max_depth':max_depth, 'subsample' : subsample, 'learning_rate': learning_rate, 'min_samples_leaf' : min_samples_leaf} return create_and_predict_segment(query,variable_name,target_table, model_params) $$ LANGUAGE plpythonu; + +CREATE OR REPLACE FUNCTION + CDB_CreateAndPredictSegment ( + query TEXT, + variable_name TEXT, + target_table TEXT, + feature_columns TEXT[], + n_estimators INTEGER DEFAULT 1200, + max_depth INTEGER DEFAULT 3, + subsample DOUBLE PRECISION DEFAULT 0.5, + learning_rate DOUBLE PRECISION DEFAULT 0.01, + min_samples_leaf INTEGER DEFAULT 1) +RETURNS TABLE (cartodb_id TEXT, prediction NUMERIC, accuracy NUMERIC) +AS $$ + from crankshaft.segmentation import create_and_predict_segment + model_params = {'n_estimators': n_estimators, 'max_depth':max_depth, 'subsample' : subsample, 'learning_rate': learning_rate, 'min_samples_leaf' : min_samples_leaf} + return create_and_predict_segment(query,variable_name,target_table, model_params) +$$ LANGUAGE plpythonu; diff --git a/src/py/crankshaft/crankshaft/analysis_data_provider.py b/src/py/crankshaft/crankshaft/analysis_data_provider.py index cbc27bc..b03d29b 100644 --- a/src/py/crankshaft/crankshaft/analysis_data_provider.py +++ b/src/py/crankshaft/crankshaft/analysis_data_provider.py @@ -65,3 +65,44 @@ class AnalysisDataProvider: return data except plpy.SPIError, err: plpy.error('Analysis failed: %s' % err) + + def get_model_data(self, params): + """fetch data for Segmentation""" + columns = ','.join(['array_agg("{col}") As "{col}"'.format(col=col) + for col in params['feature_columns']]) + + query = ("SELECT" + "array_agg({target}) As target," + "{columns} As feature", + "FROM ({subquery}) As q").format(params['query'], + ['variable']) + try: + data = plpy.execute(query) + return data + except plpy.SPIError, err: + plpy.error('Failed to build segmentation model: %s' % err) + + def get_segment_data(self, params): + """fetch cartodb_ids""" + query = ("SELECT" + "array_agg({id_col} ORDER BY {id_col}) as ids," + "FROM ({subquery}) as q").format(**params) + try: + data = plpy.execute(query) + return data + except plpy.SPIError, err: + plpy.error('Failed to build segmentation model: %s' % err) + + def get_predict_data(self, params): + """fetch data for Segmentation""" + + joined_features = ','.join(['"{0}"::numeric'.format(a) + for a in features_columns]) + query = ("SELECT" + "Array({joined_features}) As features," + "FROM ({subquery}) as q").format(**params) + try: + cursor = plpy.cursor(query) + return cursor + except plpy.SPIError, err: + plpy.error('Failed to build segmentation model: %s' % err) diff --git a/src/py/crankshaft/crankshaft/segmentation/segmentation.py b/src/py/crankshaft/crankshaft/segmentation/segmentation.py index 91bf41b..ff97aa7 100644 --- a/src/py/crankshaft/crankshaft/segmentation/segmentation.py +++ b/src/py/crankshaft/crankshaft/segmentation/segmentation.py @@ -8,158 +8,148 @@ import plpy from sklearn.ensemble import GradientBoostingRegressor from sklearn import metrics from sklearn.cross_validation import train_test_split +from crankshaft.analysis_data_provider import AnalysisDateProvider # Lower level functions # --------------------- # NOTE: added optional param here -def replace_nan_with_mean(array, avgs=None): - """ - Input: - @param array: an array of floats which may have null-valued entries - Output: - array with nans filled in with the mean of the dataset - """ - # TODO: update code to take in avgs parameter - - # returns an array of rows and column indices - indices = np.where(np.isnan(array)) - - # iterate through entries which have nan values - for row, col in zip(*indices): - array[row, col] = np.mean(array[~np.isnan(array[:, col]), col]) - - return array -def get_data(variable, feature_columns, query): - """ - Fetch data from the database, clean, and package into - numpy arrays - Input: - @param variable: name of the target variable - @param feature_columns: list of column names - @param query: subquery that data is pulled from for the packaging - Output: - prepared data, packaged into NumPy arrays - """ +class Segmentation: - columns = ','.join(['array_agg("{col}") As "{col}"'.format(col=col) - for col in feature_columns]) + def __init__(self, data_provider=None): + if data_provider is None: + self.data_provider = AnalysisDataProvider() + else: + self.data_provider = data_provider - try: - data = plpy.execute(''' - SELECT - array_agg("{variable}") As target, - {columns} - FROM ({query}) As a'''.format( - variable=variable, - columns=columns, - query=query)) - except Exception, e: - plpy.error('Failed to access data to build segmentation model: %s' % e) + def clean_data(self, query, variable, feature_columns): + params = {"subquery": query, + "target": variable, + "features": feature_columns} - # extract target data from plpy object - target = np.array(data[0]['target']) + data = self.data_provider.get_model_data(params) - # put n feature data arrays into an n x m array of arrays - features = np.column_stack([np.array(data[0][col], dtype=float) - for col in feature_columns]) + # extract target data from plpy object + target = np.array(data[0]['target']) + + # put n feature data arrays into an n x m array of arrays + features = np.column_stack([np.array(data[0][col], dtype=float) + for col in feature_columns]) + + features, feature_means = replace_nan_with_mean(features) + target, target_mean = replace_nan_with_mean(target) + return target, features, target_mean, feature_means + + def replace_nan_with_mean(array, means=None): + """ + Input: + @param array: an array of floats which may have null-valued + entries + Output: + array with nans filled in with the mean of the dataset + """ + # TODO: update code to take in avgs parameter + + # returns an array of rows and column indices + indices = np.where(np.isnan(array)) + + if not means: + for col in np.shape(array)[1]: + means[col] = np.mean(array[~np.isnan(array[:, col]), col]) + + # iterate through entries which have nan values + for row, col in zip(*indices): + array[row, col] = means[col] + + return array, means - return replace_nan_with_mean(target), replace_nan_with_mean(features) # High level interface # -------------------- + def create_and_predict_segment_agg(target, features, target_features, + target_ids, model_parameters): + """ + Version of create_and_predict_segment that works on arrays that come + straight form the SQL calling the function. -def create_and_predict_segment_agg(target, features, target_features, - target_ids, model_parameters): - """ - Version of create_and_predict_segment that works on arrays that come - straight form the SQL calling the function. + Input: + @param target: The 1D array of lenth NSamples containing the + target variable we want the model to predict + @param features: The 2D array of size NSamples * NFeatures that + form the imput to the model + @param target_ids: A 1D array of target_ids that will be used + to associate the results of the prediction with the rows which + they come from + @param model_parameters: A dictionary containing parameters for + the model. + """ - Input: - @param target: The 1D array of lenth NSamples containing the target - variable we want the model to predict - @param features: The 2D array of size NSamples * NFeatures that - form the imput to the model - @param target_ids: A 1D array of target_ids that will be used to - associate the results of the prediction with the rows which - they come from - @param model_parameters: A dictionary containing parameters for the - model. - """ + clean_target = replace_nan_with_mean(target) + clean_features = replace_nan_with_mean(features) + target_features = replace_nan_with_mean(target_features) - clean_target = replace_nan_with_mean(target) - clean_features = replace_nan_with_mean(features) - target_features = replace_nan_with_mean(target_features) + model, accuracy = train_model(clean_target, clean_features, + model_parameters, 0.2) + prediction = model.predict(target_features) + accuracy_array = [accuracy]*prediction.shape[0] + return zip(target_ids, prediction, + np.full(prediction.shape, accuracy_array)) - model, accuracy = train_model(clean_target, clean_features, - model_parameters, 0.2) - prediction = model.predict(target_features) - accuracy_array = [accuracy]*prediction.shape[0] - return zip(target_ids, prediction, - np.full(prediction.shape, accuracy_array)) + def create_and_predict_segment(query, variable, feature_columns, + target_query, model_params): + """ + generate a segment with machine learning + Stuart Lynn + @param query: subquery that data is pulled from for packaging + @param variable: name of the target variable + @param feature_columns: list of column names + @target_query: The query to run to obtain the data to predict + @param model_params: A dictionary of model parameters, the full + specification can be found on the + scikit learn page for [GradientBoostingRegressor] + (http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.GradientBoostingRegressor.html) + """ + params = {"subquery": target_query, + "id_col": "cartodb_id"} -def create_and_predict_segment(query, variable, target_query, model_params): - """ - generate a segment with machine learning - Stuart Lynn - """ + target, features, target_mean, + feature_means = clean_data(variable, feature_columns, query) + model, accuracy = train_model(target, features, model_params, 0.2) + result = predict_segment(model, feature_columns, target_query, + feature_means) + accuracy_array = [accuracy] * result.shape[0] - # fetch column names - try: - columns = plpy.execute(''' - SELECT * - FROM ({query}) As a - LIMIT 1'''.format(query=query))[0].keys() - except Exception, e: - plpy.error('Failed to build segmentation model: %s' % e) + cartodb_ids = self.data_provider.get_segment_data(params) - # extract column names to be used in building the segmentation model - feature_columns = set(columns) - set([variable, 'cartodb_id', - 'the_geom', 'the_geom_webmercator']) - # get data from database - target, features = get_data(variable, feature_columns, query) + return zip(cartodb_ids, result, accuracy_array) - model, accuracy = train_model(target, features, model_params, 0.2) - result = predict_segment(model, feature_columns, target_query) - accuracy_array = [accuracy] * result.shape[0] - - # cartodb_id plpy.execute code here instead of in predict_segment - try: - cartodb_ids = plpy.execute(''' - SELECT array_agg(cartodb_id ORDER BY cartodb_id) As cartodb_ids - FROM ({0}) As a'''.format(target_query))[0]['cartodb_ids'] - except Exception, err: - plpy.error('Failed to build segmentation model: %s' % err) - - return zip(cartodb_ids, result, accuracy_array) - - -def train_model(target, features, model_params, test_split): - """ - Train the Gradient Boosting model on the provided data and calculate - the accuracy of the model - Input: - @param target: 1D Array of the variable that the model is to be - trained to predict - @param features: 2D Array NSamples * NFeatures to use in trining - the model - @param model_params: A dictionary of model parameters, the full - specification can be found on the - scikit learn page for [GradientBoostingRegressor] - (http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.GradientBoostingRegressor.html) - @parma test_split: The fraction of the data to be withheld for - testing the model / calculating the accuray - """ - features_train, features_test, target_train, target_test = - train_test_split(features, target, test_size=test_split) - model = GradientBoostingRegressor(**model_params) - model.fit(features_train, target_train) - accuracy = calculate_model_accuracy(model, features_test, target_test) - return model, accuracy + def train_model(target, features, model_params, test_split): + """ + Train the Gradient Boosting model on the provided data to calculate + the accuracy of the model + Input: + @param target: 1D Array of the variable that the model is to be + trained to predict + @param features: 2D Array NSamples *NFeatures to use in trining + the model + @param model_params: A dictionary of model parameters, the full + specification can be found on the + scikit learn page for [GradientBoostingRegressor] + (http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.GradientBoostingRegressor.html) + @parma test_split: The fraction of the data to be withheld for + testing the model / calculating the accuray + """ + features_train, features_test, + target_train, target_test = train_test_split(features, target, + test_size=test_split) + model = GradientBoostingRegressor(**model_params) + model.fit(features_train, target_train) + accuracy = calculate_model_accuracy(model, features_test, target_test) + return model, accuracy def calculate_model_accuracy(model, features_test, target_test): @@ -167,16 +157,16 @@ def calculate_model_accuracy(model, features_test, target_test): Calculate the mean squared error of the model prediction Input: @param model: model trained from input features - @param features_test: test features set to make a prediction from + @param features_test: test features set to make prediction from @param target_target: test target set to compare predictions to Output: - mean squared error of the model prection compared to target_test + mean squared error of the model prection compared target_test """ prediction = model.predict(features_test) return metrics.mean_squared_error(prediction, target_test) -def predict_segment(model, features_col, target_query): +def predict_segment(model, features_columns, target_query, feature_means): """ Use the provided model to predict the values for the new feature set Input: @@ -188,37 +178,21 @@ def predict_segment(model, features_col, target_query): """ batch_size = 1000 - joined_features = ','.join(['"{0}"::numeric'.format(a) - for a in features_col]) + params = {"subquery": target_query, + "feature": feature_columns} - try: - cursor = plpy.cursor(''' - SELECT Array[{joined_features}] As features - FROM ({target_query}) As a'''.format( - joined_features=joined_features, - target_query=target_query)) - except Exception, err: - plpy.error('Failed to build segmentation model: %s' % err) - - # TODO: is this a good solution for finding the averages? - # r = plpy.execute(''' - # SELECT {cols} - # FROM ({target_query}) As a - # '''.format(cols=', '.join(['avg({c}) As {c}'.format(c=c) - # for c in joined_features]), - # target_query=target_query)) - # avgs = [r[0][c] for c in joined_features] results = [] - + cursors = self.data_provider.get_predict_data(params) while True: - rows = cursor.fetch(batch_size) + rows = cursors.fetch(batch_size) if not rows: break batch = np.row_stack([np.array(row['features'], dtype=float) for row in rows]) # Need to fix this to global mean. This will cause weird effects - batch = replace_nan_with_mean(batch) + + batch = replace_nan_with_mean(batch, feature_means) prediction = model.predict(batch) results.append(prediction) diff --git a/src/py/crankshaft/test/mock_plpy.py b/src/py/crankshaft/test/mock_plpy.py index e8a279d..7bea700 100644 --- a/src/py/crankshaft/test/mock_plpy.py +++ b/src/py/crankshaft/test/mock_plpy.py @@ -42,6 +42,9 @@ class MockPlPy: def info(self, msg): self.infos.append(msg) + def error(self, msg): + self.infos.append(msg) + def cursor(self, query): data = self.execute(query) return MockCursor(data)