creates class-based approach to analysis methods

This commit is contained in:
Andy Eschbacher 2016-11-18 17:24:18 +00:00
parent 7eee4faac1
commit 83f1900512
3 changed files with 143 additions and 92 deletions

View File

@ -3,8 +3,9 @@
CREATE OR REPLACE FUNCTION CDB_KMeans(query text, no_clusters integer, no_init integer default 20) CREATE OR REPLACE FUNCTION CDB_KMeans(query text, no_clusters integer, no_init integer default 20)
RETURNS table (cartodb_id integer, cluster_no integer) as $$ RETURNS table (cartodb_id integer, cluster_no integer) as $$
from crankshaft.clustering import kmeans from crankshaft.clustering import Kmeans
return kmeans(query, no_clusters, no_init) kmeans = Kmeans()
return kmeans.spatial(query, no_clusters, no_init)
$$ LANGUAGE plpythonu; $$ LANGUAGE plpythonu;
@ -20,8 +21,9 @@ CREATE OR REPLACE FUNCTION CDB_KMeansNonspatial(
) )
RETURNS TABLE(cluster_label text, cluster_center json, silhouettes numeric, rowid bigint) AS $$ RETURNS TABLE(cluster_label text, cluster_center json, silhouettes numeric, rowid bigint) AS $$
from crankshaft.clustering import kmeans_nonspatial from crankshaft.clustering import Kmeans
return kmeans_nonspatial(query, colnames, num_clusters, kmeans = Kmeans()
return kmeans.nonspatial(query, colnames, num_clusters,
id_colname, standarize) id_colname, standarize)
$$ LANGUAGE plpythonu; $$ LANGUAGE plpythonu;

View File

@ -3,101 +3,135 @@ import plpy
import numpy as np import numpy as np
def kmeans(query, no_clusters, no_init=20): class QueryRunner:
""" def get_moran(self, query):
find centers based on clusteres of latitude/longitude pairs """fetch data for moran's i analyses"""
query: SQL query that has a WGS84 geometry (the_geom) try:
""" result = plpy.execute(query)
full_query = ("SELECT array_agg(cartodb_id ORDER BY cartodb_id) as ids," # if there are no neighbors, exit
"array_agg(ST_X(the_geom) ORDER BY cartodb_id) xs," if len(result) == 0:
"array_agg(ST_Y(the_geom) ORDER BY cartodb_id) ys " return pu.empty_zipped_array(2)
"FROM ({query}) As a " except plpy.SPIError, e:
"WHERE the_geom IS NOT NULL").format(query=query) plpy.error('Analysis failed: %s' % e)
try: return pu.empty_zipped_array(2)
data = plpy.execute(full_query)
except plpy.SPIError, err:
plpy.error("k-means (spatial) cluster analysis failed: %s" % err)
# Unpack query response def get_columns(self, query, standarize):
xs = data[0]['xs'] """fetch data for non-spatial kmeans"""
ys = data[0]['ys'] try:
ids = data[0]['ids'] db_resp = plpy.execute(query)
except plpy.SPIError, err:
plpy.error('Analysis failed: %s' % err)
km = KMeans(n_clusters=no_clusters, n_init=no_init) return db_resp
labels = km.fit_predict(zip(xs, ys))
return zip(ids, labels) def get_result(self, query):
"""fetch data for spatial kmeans"""
try:
data = plpy.execute(query)
except plpy.SPIError, err:
plpy.error("Analysis failed: %s" % err)
return data
def kmeans_nonspatial(query, colnames, num_clusters=5, class Kmeans:
id_col='cartodb_id', standarize=True): def __init__(self, query_runner=None):
""" if query_runner is None:
query (string): A SQL query to retrieve the data required to do the self.query_runner = QueryRunner()
k-means clustering analysis, like so: else:
SELECT * FROM iris_flower_data self.query_runner = query_runner
colnames (list): a list of the column names which contain the data of
interest, like so: ["sepal_width", "petal_width",
"sepal_length", "petal_length"]
num_clusters (int): number of clusters (greater than zero)
id_col (string): name of the input id_column
"""
import json
from sklearn import metrics
out_id_colname = 'rowids' def spatial(self, query, no_clusters, no_init=20):
# TODO: need a random seed? """
find centers based on clusters of latitude/longitude pairs
query: SQL query that has a WGS84 geometry (the_geom)
"""
full_query = ("SELECT "
"array_agg(cartodb_id ORDER BY cartodb_id) as ids,"
"array_agg(ST_X(the_geom) ORDER BY cartodb_id) xs,"
"array_agg(ST_Y(the_geom) ORDER BY cartodb_id) ys "
"FROM ({query}) As a "
"WHERE the_geom IS NOT NULL").format(query=query)
full_query = ''' data = self.query_runner.get_result(full_query)
SELECT {cols}, array_agg({id_col}) As {out_id_colname}
FROM ({query}) As a
'''.format(query=query,
id_col=id_col,
out_id_colname=out_id_colname,
cols=', '.join(['array_agg({0}) As col{1}'.format(val, idx)
for idx, val in enumerate(colnames)]))
try: # Unpack query response
db_resp = plpy.execute(full_query) xs = data[0]['xs']
except plpy.SPIError, err: ys = data[0]['ys']
plpy.error("k-means (non-spatial) cluster analysis failed: %s" % err) ids = data[0]['ids']
# fill array with values for k-means clustering km = KMeans(n_clusters=no_clusters, n_init=no_init)
if standarize: labels = km.fit_predict(zip(xs, ys))
cluster_columns = _scale_data( return zip(ids, labels)
_extract_columns(db_resp, out_id_colname))
else:
cluster_columns = _extract_columns(db_resp, out_id_colname)
# TODO: decide on optimal parameters for most cases def nonspatial(self, query, colnames, num_clusters=5,
# Are there ways of deciding parameters based on inputs? id_col='cartodb_id', standarize=True):
kmeans = KMeans(n_clusters=num_clusters, """
random_state=0).fit(cluster_columns) query (string): A SQL query to retrieve the data required to do the
k-means clustering analysis, like so:
SELECT * FROM iris_flower_data
colnames (list): a list of the column names which contain the data
of interest, like so: ["sepal_width",
"petal_width",
"sepal_length",
"petal_length"]
num_clusters (int): number of clusters (greater than zero)
id_col (string): name of the input id_column
"""
import json
from sklearn import metrics
centers = [json.dumps(dict(zip(colnames, c))) out_id_colname = 'rowids'
for c in kmeans.cluster_centers_[kmeans.labels_]] # TODO: need a random seed?
silhouettes = metrics.silhouette_samples(cluster_columns, full_query = '''
kmeans.labels_, SELECT {cols}, array_agg({id_col}) As {out_id_colname}
metric='sqeuclidean') FROM ({query}) As a
'''.format(query=query,
id_col=id_col,
out_id_colname=out_id_colname,
cols=', '.join(['array_agg({0}) As col{1}'.format(val, idx)
for idx, val in enumerate(colnames)]))
return zip(kmeans.labels_, db_resp = self.query_runner.get_columns(full_query, standarize)
centers,
silhouettes, # fill array with values for k-means clustering
db_resp[0][out_id_colname]) if standarize:
cluster_columns = _scale_data(
_extract_columns(db_resp, colnames))
else:
cluster_columns = _extract_columns(db_resp, colnames)
print str(cluster_columns)
# TODO: decide on optimal parameters for most cases
# Are there ways of deciding parameters based on inputs?
kmeans = KMeans(n_clusters=num_clusters,
random_state=0).fit(cluster_columns)
centers = [json.dumps(dict(zip(colnames, c)))
for c in kmeans.cluster_centers_[kmeans.labels_]]
silhouettes = metrics.silhouette_samples(cluster_columns,
kmeans.labels_,
metric='sqeuclidean')
return zip(kmeans.labels_,
centers,
silhouettes,
db_resp[0][out_id_colname])
def _extract_columns(db_resp, id_col_name): # -- Preprocessing steps
def _extract_columns(db_resp, colnames):
""" """
Extract the features from the query and pack them into a NumPy array Extract the features from the query and pack them into a NumPy array
db_resp (plpy data object): result of the kmeans request db_resp (plpy data object): result of the kmeans request
id_col_name (string): name of column which has the row id (not a id_col_name (string): name of column which has the row id (not a
feature of the analysis) feature of the analysis)
""" """
return np.array([db_resp[0][c] for c in db_resp.colnames() return np.array([db_resp[0][c] for c in colnames],
if c != id_col_name],
dtype=float).T dtype=float).T
# -- Preprocessing steps
def _scale_data(features): def _scale_data(features):
""" """

View File

@ -7,17 +7,31 @@ import numpy as np
# #
# import sys # import sys
# sys.modules['plpy'] = plpy # sys.modules['plpy'] = plpy
from helper import plpy, fixture_file, MockDBResponse from helper import plpy, fixture_file
from crankshaft.clustering import Kmeans
from crankshaft.clustering import QueryRunner
import crankshaft.clustering as cc import crankshaft.clustering as cc
from crankshaft import random_seeds
import json import json
from collections import OrderedDict from collections import OrderedDict
class FakeQueryRunner(QueryRunner):
def __init__(self, mocked_result):
self.mocked_result = mocked_result
def get_result(self, query):
return self.mocked_result
def get_columns(self, query, standarize):
return self.mocked_result
class KMeansTest(unittest.TestCase): class KMeansTest(unittest.TestCase):
"""Testing class for k-means spatial""" """Testing class for k-means spatial"""
def setUp(self): def setUp(self):
plpy._reset()
self.cluster_data = json.loads( self.cluster_data = json.loads(
open(fixture_file('kmeans.json')).read()) open(fixture_file('kmeans.json')).read())
self.params = {"subquery": "select * from table", self.params = {"subquery": "select * from table",
@ -30,8 +44,9 @@ class KMeansTest(unittest.TestCase):
'ys': d['ys'], 'ys': d['ys'],
'ids': d['ids']} for d in self.cluster_data] 'ids': d['ids']} for d in self.cluster_data]
plpy._define_result('select', data) random_seeds.set_random_seeds(1234)
clusters = cc.kmeans('subquery', 2) kmeans = Kmeans(FakeQueryRunner(data))
clusters = kmeans.spatial('subquery', 2)
labels = [a[1] for a in clusters] labels = [a[1] for a in clusters]
c1 = [a for a in clusters if a[1] == 0] c1 = [a for a in clusters if a[1] == 0]
c2 = [a for a in clusters if a[1] == 1] c2 = [a for a in clusters if a[1] == 1]
@ -47,9 +62,6 @@ class KMeansNonspatialTest(unittest.TestCase):
def setUp(self): def setUp(self):
plpy._reset() plpy._reset()
# self.cluster_data = json.loads(
# open(fixture_file('kmeans-nonspatial.json')).read())
self.params = {"subquery": "SELECT * FROM TABLE", self.params = {"subquery": "SELECT * FROM TABLE",
"n_clusters": 5} "n_clusters": 5}
@ -57,20 +69,23 @@ class KMeansNonspatialTest(unittest.TestCase):
""" """
test for k-means non-spatial test for k-means non-spatial
""" """
# data from:
# http://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html#sklearn-cluster-kmeans
data_raw = [OrderedDict([("col1", [1, 1, 1, 4, 4, 4]), data_raw = [OrderedDict([("col1", [1, 1, 1, 4, 4, 4]),
("col2", [2, 4, 0, 2, 4, 0]), ("col2", [2, 4, 0, 2, 4, 0]),
("rowids", [1, 2, 3, 4, 5, 6])])] ("rowids", [1, 2, 3, 4, 5, 6])])]
data_obj = MockDBResponse(data_raw, [k for k in data_raw[0] random_seeds.set_random_seeds(1234)
if k != 'rowids']) kmeans = Kmeans(FakeQueryRunner(data_raw))
plpy._define_result('select', data_obj) print 'asfasdfasd'
clusters = cc.kmeans_nonspatial('subquery', ['col1', 'col2'], 4) clusters = kmeans.nonspatial('subquery', ['col1', 'col2'], 2)
print str([c[0] for c in clusters])
cl1 = clusters[0][1] cl1 = clusters[0][0]
cl2 = clusters[3][1] cl2 = clusters[3][0]
for idx, val in enumerate(clusters): for idx, val in enumerate(clusters):
if idx < 3: if idx < 3:
self.assertEqual(val[1], cl1) self.assertEqual(val[0], cl1)
else: else:
self.assertEqual(val[1], cl2) self.assertEqual(val[0], cl2)