diff --git a/src/pg/sql/11_kmeans.sql b/src/pg/sql/11_kmeans.sql index c9ae131..1dc6d00 100644 --- a/src/pg/sql/11_kmeans.sql +++ b/src/pg/sql/11_kmeans.sql @@ -3,8 +3,9 @@ CREATE OR REPLACE FUNCTION CDB_KMeans(query text, no_clusters integer, no_init integer default 20) RETURNS table (cartodb_id integer, cluster_no integer) as $$ - from crankshaft.clustering import kmeans - return kmeans(query, no_clusters, no_init) + from crankshaft.clustering import Kmeans + kmeans = Kmeans() + return kmeans.spatial(query, no_clusters, no_init) $$ LANGUAGE plpythonu; @@ -20,8 +21,9 @@ CREATE OR REPLACE FUNCTION CDB_KMeansNonspatial( ) RETURNS TABLE(cluster_label text, cluster_center json, silhouettes numeric, rowid bigint) AS $$ - from crankshaft.clustering import kmeans_nonspatial - return kmeans_nonspatial(query, colnames, num_clusters, + from crankshaft.clustering import Kmeans + kmeans = Kmeans() + return kmeans.nonspatial(query, colnames, num_clusters, id_colname, standarize) $$ LANGUAGE plpythonu; diff --git a/src/py/crankshaft/crankshaft/clustering/kmeans.py b/src/py/crankshaft/crankshaft/clustering/kmeans.py index 383584e..48b9bd3 100644 --- a/src/py/crankshaft/crankshaft/clustering/kmeans.py +++ b/src/py/crankshaft/crankshaft/clustering/kmeans.py @@ -3,101 +3,135 @@ import plpy import numpy as np -def kmeans(query, no_clusters, no_init=20): - """ - find centers based on clusteres of latitude/longitude pairs - query: SQL query that has a WGS84 geometry (the_geom) - """ - full_query = ("SELECT array_agg(cartodb_id ORDER BY cartodb_id) as ids," - "array_agg(ST_X(the_geom) ORDER BY cartodb_id) xs," - "array_agg(ST_Y(the_geom) ORDER BY cartodb_id) ys " - "FROM ({query}) As a " - "WHERE the_geom IS NOT NULL").format(query=query) - try: - data = plpy.execute(full_query) - except plpy.SPIError, err: - plpy.error("k-means (spatial) cluster analysis failed: %s" % err) +class QueryRunner: + def get_moran(self, query): + """fetch data for moran's i analyses""" + try: + result = plpy.execute(query) + # if there are no neighbors, exit + if len(result) == 0: + return pu.empty_zipped_array(2) + except plpy.SPIError, e: + plpy.error('Analysis failed: %s' % e) + return pu.empty_zipped_array(2) - # Unpack query response - xs = data[0]['xs'] - ys = data[0]['ys'] - ids = data[0]['ids'] + def get_columns(self, query, standarize): + """fetch data for non-spatial kmeans""" + try: + db_resp = plpy.execute(query) + except plpy.SPIError, err: + plpy.error('Analysis failed: %s' % err) - km = KMeans(n_clusters=no_clusters, n_init=no_init) - labels = km.fit_predict(zip(xs, ys)) - return zip(ids, labels) + return db_resp + + def get_result(self, query): + """fetch data for spatial kmeans""" + try: + data = plpy.execute(query) + except plpy.SPIError, err: + plpy.error("Analysis failed: %s" % err) + return data -def kmeans_nonspatial(query, colnames, num_clusters=5, - id_col='cartodb_id', standarize=True): - """ - query (string): A SQL query to retrieve the data required to do the - k-means clustering analysis, like so: - SELECT * FROM iris_flower_data - colnames (list): a list of the column names which contain the data of - interest, like so: ["sepal_width", "petal_width", - "sepal_length", "petal_length"] - num_clusters (int): number of clusters (greater than zero) - id_col (string): name of the input id_column - """ - import json - from sklearn import metrics +class Kmeans: + def __init__(self, query_runner=None): + if query_runner is None: + self.query_runner = QueryRunner() + else: + self.query_runner = query_runner - out_id_colname = 'rowids' - # TODO: need a random seed? + def spatial(self, query, no_clusters, no_init=20): + """ + find centers based on clusters of latitude/longitude pairs + query: SQL query that has a WGS84 geometry (the_geom) + """ + full_query = ("SELECT " + "array_agg(cartodb_id ORDER BY cartodb_id) as ids," + "array_agg(ST_X(the_geom) ORDER BY cartodb_id) xs," + "array_agg(ST_Y(the_geom) ORDER BY cartodb_id) ys " + "FROM ({query}) As a " + "WHERE the_geom IS NOT NULL").format(query=query) - full_query = ''' - SELECT {cols}, array_agg({id_col}) As {out_id_colname} - FROM ({query}) As a - '''.format(query=query, - id_col=id_col, - out_id_colname=out_id_colname, - cols=', '.join(['array_agg({0}) As col{1}'.format(val, idx) - for idx, val in enumerate(colnames)])) + data = self.query_runner.get_result(full_query) - try: - db_resp = plpy.execute(full_query) - except plpy.SPIError, err: - plpy.error("k-means (non-spatial) cluster analysis failed: %s" % err) + # Unpack query response + xs = data[0]['xs'] + ys = data[0]['ys'] + ids = data[0]['ids'] - # fill array with values for k-means clustering - if standarize: - cluster_columns = _scale_data( - _extract_columns(db_resp, out_id_colname)) - else: - cluster_columns = _extract_columns(db_resp, out_id_colname) + km = KMeans(n_clusters=no_clusters, n_init=no_init) + labels = km.fit_predict(zip(xs, ys)) + return zip(ids, labels) - # TODO: decide on optimal parameters for most cases - # Are there ways of deciding parameters based on inputs? - kmeans = KMeans(n_clusters=num_clusters, - random_state=0).fit(cluster_columns) + def nonspatial(self, query, colnames, num_clusters=5, + id_col='cartodb_id', standarize=True): + """ + query (string): A SQL query to retrieve the data required to do the + k-means clustering analysis, like so: + SELECT * FROM iris_flower_data + colnames (list): a list of the column names which contain the data + of interest, like so: ["sepal_width", + "petal_width", + "sepal_length", + "petal_length"] + num_clusters (int): number of clusters (greater than zero) + id_col (string): name of the input id_column + """ + import json + from sklearn import metrics - centers = [json.dumps(dict(zip(colnames, c))) - for c in kmeans.cluster_centers_[kmeans.labels_]] + out_id_colname = 'rowids' + # TODO: need a random seed? - silhouettes = metrics.silhouette_samples(cluster_columns, - kmeans.labels_, - metric='sqeuclidean') + full_query = ''' + SELECT {cols}, array_agg({id_col}) As {out_id_colname} + FROM ({query}) As a + '''.format(query=query, + id_col=id_col, + out_id_colname=out_id_colname, + cols=', '.join(['array_agg({0}) As col{1}'.format(val, idx) + for idx, val in enumerate(colnames)])) - return zip(kmeans.labels_, - centers, - silhouettes, - db_resp[0][out_id_colname]) + db_resp = self.query_runner.get_columns(full_query, standarize) + + # fill array with values for k-means clustering + if standarize: + cluster_columns = _scale_data( + _extract_columns(db_resp, colnames)) + else: + cluster_columns = _extract_columns(db_resp, colnames) + + print str(cluster_columns) + # TODO: decide on optimal parameters for most cases + # Are there ways of deciding parameters based on inputs? + kmeans = KMeans(n_clusters=num_clusters, + random_state=0).fit(cluster_columns) + + centers = [json.dumps(dict(zip(colnames, c))) + for c in kmeans.cluster_centers_[kmeans.labels_]] + + silhouettes = metrics.silhouette_samples(cluster_columns, + kmeans.labels_, + metric='sqeuclidean') + + return zip(kmeans.labels_, + centers, + silhouettes, + db_resp[0][out_id_colname]) -def _extract_columns(db_resp, id_col_name): +# -- Preprocessing steps + +def _extract_columns(db_resp, colnames): """ Extract the features from the query and pack them into a NumPy array db_resp (plpy data object): result of the kmeans request id_col_name (string): name of column which has the row id (not a feature of the analysis) """ - return np.array([db_resp[0][c] for c in db_resp.colnames() - if c != id_col_name], + return np.array([db_resp[0][c] for c in colnames], dtype=float).T -# -- Preprocessing steps - def _scale_data(features): """ diff --git a/src/py/crankshaft/test/test_clustering_kmeans.py b/src/py/crankshaft/test/test_clustering_kmeans.py index 03cbd0a..8e5c9b4 100644 --- a/src/py/crankshaft/test/test_clustering_kmeans.py +++ b/src/py/crankshaft/test/test_clustering_kmeans.py @@ -7,17 +7,31 @@ import numpy as np # # import sys # sys.modules['plpy'] = plpy -from helper import plpy, fixture_file, MockDBResponse +from helper import plpy, fixture_file +from crankshaft.clustering import Kmeans +from crankshaft.clustering import QueryRunner import crankshaft.clustering as cc + +from crankshaft import random_seeds import json from collections import OrderedDict +class FakeQueryRunner(QueryRunner): + def __init__(self, mocked_result): + self.mocked_result = mocked_result + + def get_result(self, query): + return self.mocked_result + + def get_columns(self, query, standarize): + return self.mocked_result + + class KMeansTest(unittest.TestCase): """Testing class for k-means spatial""" def setUp(self): - plpy._reset() self.cluster_data = json.loads( open(fixture_file('kmeans.json')).read()) self.params = {"subquery": "select * from table", @@ -30,8 +44,9 @@ class KMeansTest(unittest.TestCase): 'ys': d['ys'], 'ids': d['ids']} for d in self.cluster_data] - plpy._define_result('select', data) - clusters = cc.kmeans('subquery', 2) + random_seeds.set_random_seeds(1234) + kmeans = Kmeans(FakeQueryRunner(data)) + clusters = kmeans.spatial('subquery', 2) labels = [a[1] for a in clusters] c1 = [a for a in clusters if a[1] == 0] c2 = [a for a in clusters if a[1] == 1] @@ -47,9 +62,6 @@ class KMeansNonspatialTest(unittest.TestCase): def setUp(self): plpy._reset() - # self.cluster_data = json.loads( - # open(fixture_file('kmeans-nonspatial.json')).read()) - self.params = {"subquery": "SELECT * FROM TABLE", "n_clusters": 5} @@ -57,20 +69,23 @@ class KMeansNonspatialTest(unittest.TestCase): """ test for k-means non-spatial """ + # data from: + # http://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html#sklearn-cluster-kmeans data_raw = [OrderedDict([("col1", [1, 1, 1, 4, 4, 4]), ("col2", [2, 4, 0, 2, 4, 0]), ("rowids", [1, 2, 3, 4, 5, 6])])] - data_obj = MockDBResponse(data_raw, [k for k in data_raw[0] - if k != 'rowids']) - plpy._define_result('select', data_obj) - clusters = cc.kmeans_nonspatial('subquery', ['col1', 'col2'], 4) + random_seeds.set_random_seeds(1234) + kmeans = Kmeans(FakeQueryRunner(data_raw)) + print 'asfasdfasd' + clusters = kmeans.nonspatial('subquery', ['col1', 'col2'], 2) + print str([c[0] for c in clusters]) - cl1 = clusters[0][1] - cl2 = clusters[3][1] + cl1 = clusters[0][0] + cl2 = clusters[3][0] for idx, val in enumerate(clusters): if idx < 3: - self.assertEqual(val[1], cl1) + self.assertEqual(val[0], cl1) else: - self.assertEqual(val[1], cl2) + self.assertEqual(val[0], cl2)