From 9b329187460c03383f9002544a1b4357eeb11d07 Mon Sep 17 00:00:00 2001 From: Andy Eschbacher Date: Mon, 21 Mar 2016 09:50:57 -0400 Subject: [PATCH 01/15] fixing cluster query tests --- src/py/crankshaft/test/test_clustering_moran.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/py/crankshaft/test/test_clustering_moran.py b/src/py/crankshaft/test/test_clustering_moran.py index b48b8d6..6a9107a 100644 --- a/src/py/crankshaft/test/test_clustering_moran.py +++ b/src/py/crankshaft/test/test_clustering_moran.py @@ -60,10 +60,10 @@ class MoranTest(unittest.TestCase): ans = "SELECT i.\"cartodb_id\" As id, i.\"andy\"::numeric As attr1, " \ "i.\"jay_z\"::numeric As attr2, (SELECT ARRAY(SELECT j.\"cartodb_id\" " \ - "FROM \"(SELECT * FROM a_list)\" As j WHERE j.\"andy\" IS NOT NULL AND " \ + "FROM (SELECT * FROM a_list) As j WHERE j.\"andy\" IS NOT NULL AND " \ "j.\"jay_z\" IS NOT NULL AND j.\"jay_z\" <> 0 ORDER BY " \ "j.\"the_geom\" <-> i.\"the_geom\" ASC LIMIT 321 OFFSET 1 ) ) " \ - "As neighbors FROM \"(SELECT * FROM a_list)\" As i WHERE i.\"andy\" IS NOT " \ + "As neighbors FROM (SELECT * FROM a_list) As i WHERE i.\"andy\" IS NOT " \ "NULL AND i.\"jay_z\" IS NOT NULL AND i.\"jay_z\" <> 0 ORDER " \ "BY i.\"cartodb_id\" ASC;" @@ -74,10 +74,10 @@ class MoranTest(unittest.TestCase): ans = "SELECT i.\"cartodb_id\" As id, i.\"andy\"::numeric As attr1, " \ "i.\"jay_z\"::numeric As attr2, (SELECT ARRAY(SELECT " \ - "j.\"cartodb_id\" FROM \"(SELECT * FROM a_list)\" As j WHERE ST_Touches(" \ + "j.\"cartodb_id\" FROM (SELECT * FROM a_list) As j WHERE ST_Touches(" \ "i.\"the_geom\", j.\"the_geom\") AND j.\"andy\" IS NOT NULL " \ "AND j.\"jay_z\" IS NOT NULL AND j.\"jay_z\" <> 0)) As " \ - "neighbors FROM \"(SELECT * FROM a_list)\" As i WHERE i.\"andy\" IS NOT NULL " \ + "neighbors FROM (SELECT * FROM a_list) As i WHERE i.\"andy\" IS NOT NULL " \ "AND i.\"jay_z\" IS NOT NULL AND i.\"jay_z\" <> 0 ORDER BY " \ "i.\"cartodb_id\" ASC;" @@ -88,10 +88,10 @@ class MoranTest(unittest.TestCase): ans = "SELECT i.\"cartodb_id\" As id, i.\"andy\"::numeric As attr1, " \ "i.\"jay_z\"::numeric As attr2, (SELECT ARRAY(SELECT " \ - "j.\"cartodb_id\" FROM \"(SELECT * FROM a_list)\" As j WHERE j.\"andy\" IS " \ + "j.\"cartodb_id\" FROM (SELECT * FROM a_list) As j WHERE j.\"andy\" IS " \ "NOT NULL AND j.\"jay_z\" IS NOT NULL AND j.\"jay_z\" <> 0 " \ "ORDER BY j.\"the_geom\" <-> i.\"the_geom\" ASC LIMIT 321 " \ - "OFFSET 1 ) ) As neighbors FROM \"(SELECT * FROM a_list)\" As i WHERE " \ + "OFFSET 1 ) ) As neighbors FROM (SELECT * FROM a_list) As i WHERE " \ "i.\"andy\" IS NOT NULL AND i.\"jay_z\" IS NOT NULL AND " \ "i.\"jay_z\" <> 0 ORDER BY i.\"cartodb_id\" ASC;" @@ -106,6 +106,7 @@ class MoranTest(unittest.TestCase): def test_get_weight(self): """Test get_weight.""" + ## need to add tests self.assertEqual(True, True) From 1e16b7839b40ddc88b8f793d17a8201d8c677f36 Mon Sep 17 00:00:00 2001 From: Andy Eschbacher Date: Wed, 23 Mar 2016 12:35:39 -0400 Subject: [PATCH 02/15] update methods to acomm markov --- .../crankshaft/crankshaft/clustering/moran.py | 91 +++++++++++-------- 1 file changed, 52 insertions(+), 39 deletions(-) diff --git a/src/py/crankshaft/crankshaft/clustering/moran.py b/src/py/crankshaft/crankshaft/clustering/moran.py index 9dd976e..386ab92 100644 --- a/src/py/crankshaft/crankshaft/clustering/moran.py +++ b/src/py/crankshaft/crankshaft/clustering/moran.py @@ -47,7 +47,7 @@ def moran_local(subquery, attr, significance, num_ngbrs, permutations, geom_colu lisa = ps.Moran_Local(y, w) # find units of significance - lisa_sig = lisa_sig_vals(lisa.p_sim, lisa.q, significance) + lisa_sig = quad_position(lisa.q) plpy.notice('** Finished calculations') @@ -95,7 +95,7 @@ def moran_local_rate(subquery, numerator, denominator, significance, num_ngbrs, lisa = ps.esda.moran.Moran_Local_Rate(numer, denom, w, permutations=permutations) # find units of significance - lisa_sig = lisa_sig_vals(lisa.p_sim, lisa.q, significance) + lisa_sig = quad_position(lisa.q) plpy.notice('** Finished calculations') @@ -136,7 +136,7 @@ def moran_local_bv(t, attr1, attr2, significance, num_ngbrs, permutations, geom_ plpy.notice("len of Is: %d" % len(lisa.Is)) # find clustering of significance - lisa_sig = lisa_sig_vals(lisa.p_sim, lisa.q, significance) + lisa_sig = quad_position(lisa.q) plpy.notice('** Finished calculations') @@ -169,33 +169,62 @@ def query_attr_select(params): :param params: dict of information used in query (column names, table name, etc.) """ - - attrs = [k for k in params - if k not in ('id_col', 'geom_col', 'table', 'num_ngbrs', 'subquery')] - - template = "i.\"{%(col)s}\"::numeric As attr%(alias_num)s, " - + template = "i.\"%(col)s\"::numeric As attr%(alias_num)s, " attr_string = "" - for idx, val in enumerate(sorted(attrs)): - attr_string += template % {"col": val, "alias_num": idx + 1} + if 'time_cols' in params: + ## if markov analysis + attrs = params['time_cols'] + + for idx, val in enumerate(attrs): + attr_string += template % {"col": val, "alias_num": idx + 1} + else: + ## if moran's analysis + attrs = [k for k in params + if k not in ('id_col', 'geom_col', 'subquery', 'num_ngbrs', 'subquery')] + + for idx, val in enumerate(sorted(attrs)): + attr_string += template % {"col": params[val], "alias_num": idx + 1} return attr_string def query_attr_where(params): """ Create portion of WHERE clauses for weeding out NULL-valued geometries + Input: dict of params: + {'subquery': ..., + 'numerator': 'data1', + 'denominator': 'data2', + '': ...} + Output: 'idx_replace."data1" IS NOT NULL AND idx_replace."data2" IS NOT NULL' + + Input: + {'subquery': ..., + 'time_cols': ['time1', 'time2', 'time3'], + 'etc': ...} + Output: 'idx_replace."time1" IS NOT NULL AND idx_replace."time2" IS NOT NULL AND idx_replace."time3" IS NOT NULL' """ - attrs = sorted([k for k in params - if k not in ('id_col', 'geom_col', 'table', 'num_ngbrs', 'subquery')]) - attr_string = [] + template = "idx_replace.\"%s\" IS NOT NULL" - for attr in attrs: - attr_string.append("idx_replace.\"{%s}\" IS NOT NULL" % attr) + if 'time_cols' in params: + ## markov where clauses + attrs = params['time_cols'] + # add values to template + for attr in attrs: + attr_string.append(template % attr) + else: + ## moran where clauses - if len(attrs) == 2: - attr_string.append("idx_replace.\"{%s}\" <> 0" % attrs[1]) + # get keys + attrs = sorted([k for k in params + if k not in ('id_col', 'geom_col', 'subquery', 'num_ngbrs', 'subquery')]) + # add values to template + for attr in attrs: + attr_string.append(template % params[attr]) + + if len(attrs) == 2: + attr_string.append("idx_replace.\"%s\" <> 0" % params[attrs[1]]) out = " AND ".join(attr_string) @@ -217,15 +246,16 @@ def knn(params): "i.\"{id_col}\" As id, " \ "%(attr_select)s" \ "(SELECT ARRAY(SELECT j.\"{id_col}\" " \ - "FROM \"({subquery})\" As j " \ + "FROM ({subquery}) As j " \ "WHERE %(attr_where_j)s " \ "ORDER BY j.\"{geom_col}\" <-> i.\"{geom_col}\" ASC " \ "LIMIT {num_ngbrs} OFFSET 1 ) " \ ") As neighbors " \ - "FROM \"({subquery})\" As i " \ + "FROM ({subquery}) As i " \ "WHERE " \ "%(attr_where_i)s " \ "ORDER BY i.\"{id_col}\" ASC;" % replacements + print query return query.format(**params) @@ -245,11 +275,11 @@ def queen(params): "i.\"{id_col}\" As id, " \ "%(attr_select)s" \ "(SELECT ARRAY(SELECT j.\"{id_col}\" " \ - "FROM \"({subquery})\" As j " \ + "FROM ({subquery}) As j " \ "WHERE ST_Touches(i.\"{geom_col}\", j.\"{geom_col}\") AND " \ "%(attr_where_j)s)" \ ") As neighbors " \ - "FROM \"({subquery})\" As i " \ + "FROM ({subquery}) As i " \ "WHERE " \ "%(attr_where_i)s " \ "ORDER BY i.\"{id_col}\" ASC;" % replacements @@ -302,20 +332,3 @@ def quad_position(quads): lisa_sig = np.array([map_quads(q) for q in quads]) return lisa_sig - -def lisa_sig_vals(pvals, quads, threshold): - """ - Produce Moran's I classification based of n - """ - - sig = (pvals <= threshold) - - lisa_sig = np.empty(len(sig), np.chararray) - - for idx, val in enumerate(sig): - if val: - lisa_sig[idx] = map_quads(quads[idx]) - else: - lisa_sig[idx] = 'Not significant' - - return lisa_sig From 58cf210e966b27fa13ad46af71c42393ce4c5e23 Mon Sep 17 00:00:00 2001 From: Andy Eschbacher Date: Wed, 23 Mar 2016 12:37:23 -0400 Subject: [PATCH 03/15] fixes test to acomm markov --- src/py/crankshaft/test/test_clustering_moran.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/py/crankshaft/test/test_clustering_moran.py b/src/py/crankshaft/test/test_clustering_moran.py index 6a9107a..4c06594 100644 --- a/src/py/crankshaft/test/test_clustering_moran.py +++ b/src/py/crankshaft/test/test_clustering_moran.py @@ -41,17 +41,17 @@ class MoranTest(unittest.TestCase): def test_query_attr_select(self): """Test query_attr_select.""" - ans = "i.\"{attr1}\"::numeric As attr1, " \ - "i.\"{attr2}\"::numeric As attr2, " + ans = "i.\"andy\"::numeric As attr1, " \ + "i.\"jay_z\"::numeric As attr2, " self.assertEqual(cc.query_attr_select(self.params), ans) def test_query_attr_where(self): """Test query_attr_where.""" - ans = "idx_replace.\"{attr1}\" IS NOT NULL AND "\ - "idx_replace.\"{attr2}\" IS NOT NULL AND "\ - "idx_replace.\"{attr2}\" <> 0" + ans = "idx_replace.\"andy\" IS NOT NULL AND "\ + "idx_replace.\"jay_z\" IS NOT NULL AND "\ + "idx_replace.\"jay_z\" <> 0" self.assertEqual(cc.query_attr_where(self.params), ans) From b8894169472af5a1d56e74cfd8ea239964c21b7a Mon Sep 17 00:00:00 2001 From: Andy Eschbacher Date: Wed, 23 Mar 2016 12:38:44 -0400 Subject: [PATCH 04/15] adding markov functionality --- src/pg/sql/11_markov.sql | 81 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 src/pg/sql/11_markov.sql diff --git a/src/pg/sql/11_markov.sql b/src/pg/sql/11_markov.sql new file mode 100644 index 0000000..63a46ac --- /dev/null +++ b/src/pg/sql/11_markov.sql @@ -0,0 +1,81 @@ +-- Spatial Markov + +-- input table format: +-- id | geom | date_1 | date_2 | date_3 +-- 1 | Pt1 | 12.3 | 13.1 | 14.2 +-- 2 | Pt2 | 11.0 | 13.2 | 12.5 +-- ... +-- Sample Function call: +-- SELECT cdb_spatial_markov('SELECT * FROM real_estate', +-- Array['date_1', 'date_2', 'date_3']) + + +CREATE OR REPLACE FUNCTION + cdb_spatial_markov ( + subquery TEXT, + time_cols text[], + num_time_per_bin int DEFAULT 1, + permutations INT DEFAULT 99, + geom_column TEXT DEFAULT 'the_geom', + id_col TEXT DEFAULT 'cartodb_id', + w_type TEXT DEFAULT 'knn', + num_ngbrs int DEFAULT 5) +RETURNS TABLE (moran FLOAT, quads TEXT, significance FLOAT, ids INT) +AS $$ + plpy.execute('SELECT cdb_crankshaft._cdb_crankshaft_activate_py()') + from crankshaft.space_time_predictions import spatial_markov + # TODO: use named parameters or a dictionary + return spatial_markov(subquery, time_cols, permutations, geom_column, id_col, w_type, num_ngbrs) +$$ LANGUAGE plpythonu; + +-- input table format: identical to above but in a predictable format +-- Sample function call: +-- SELECT cdb_spatial_markov('SELECT * FROM real_estate', +-- 'date_1') + + +CREATE OR REPLACE FUNCTION + cdb_spatial_markov ( + subquery TEXT, + time_col_min text, + time_col_max text, + date_format text, -- '_YYYY_MM_DD' + num_time_per_bin INT DEFAULT 1, + permutations INT DEFAULT 99, + geom_column TEXT DEFAULT 'the_geom', + id_col TEXT DEFAULT 'cartodb_id', + w_type TEXT DEFAULT 'knn', + num_ngbrs int DEFAULT 5) +RETURNS TABLE (moran FLOAT, quads TEXT, significance FLOAT, ids INT) +AS $$ + plpy.execute('SELECT cdb_crankshaft._cdb_crankshaft_activate_py()') + from crankshaft.clustering import moran_local + # TODO: use named parameters or a dictionary + return spatial_markov(subquery, time_cols, permutations, geom_column, id_col, w_type, num_ngbrs) +$$ LANGUAGE plpythonu; + +-- input table format: +-- id | geom | date | measurement +-- 1 | Pt1 | 12/3 | 13.2 +-- 2 | Pt2 | 11/5 | 11.3 +-- 3 | Pt1 | 11/13 | 12.9 +-- 4 | Pt3 | 12/19 | 10.1 +-- ... + +CREATE OR REPLACE FUNCTION + cdb_spatial_markov ( + subquery TEXT, + time_col text, + num_time_per_bin INT DEFAULT 1, + permutations INT DEFAULT 99, + geom_column TEXT DEFAULT 'the_geom', + id_col TEXT DEFAULT 'cartodb_id', + w_type TEXT DEFAULT 'knn', + num_ngbrs int DEFAULT 5) +RETURNS TABLE (moran FLOAT, quads TEXT, significance FLOAT, ids INT) +AS $$ + plpy.execute('SELECT cdb_crankshaft._cdb_crankshaft_activate_py()') + from crankshaft.clustering import moran_local + # TODO: use named parameters or a dictionary + return spatial_markov(subquery, time_cols, permutations, geom_column, id_col, w_type, num_ngbrs) +$$ LANGUAGE plpythonu; From c488900c8c1144ca0e6cc73b9d477b951647375a Mon Sep 17 00:00:00 2001 From: Andy Eschbacher Date: Wed, 23 Mar 2016 12:39:51 -0400 Subject: [PATCH 05/15] adding markov python code --- .../space_time_dynamics/__init__.py | 1 + .../crankshaft/space_time_dynamics/markov.py | 138 ++++++++++++++++++ .../test/test_space_time_dynamics.py | 126 ++++++++++++++++ 3 files changed, 265 insertions(+) create mode 100644 src/py/crankshaft/crankshaft/space_time_dynamics/__init__.py create mode 100644 src/py/crankshaft/crankshaft/space_time_dynamics/markov.py create mode 100644 src/py/crankshaft/test/test_space_time_dynamics.py diff --git a/src/py/crankshaft/crankshaft/space_time_dynamics/__init__.py b/src/py/crankshaft/crankshaft/space_time_dynamics/__init__.py new file mode 100644 index 0000000..f45ffdf --- /dev/null +++ b/src/py/crankshaft/crankshaft/space_time_dynamics/__init__.py @@ -0,0 +1 @@ +from markov import * \ No newline at end of file diff --git a/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py b/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py new file mode 100644 index 0000000..911d558 --- /dev/null +++ b/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py @@ -0,0 +1,138 @@ +""" +Spatial dynamics measurements using Spatial Markov +""" + + +import numpy as np +import pysal as ps +import plpy +from crankshaft.clustering import get_query + +def spatial_markov_trend(subquery, time_cols, num_time_per_bin, permutations, geom_col, id_col, w_type, num_ngbrs): + """ + Predict the trends of a unit based on: + 1. history of its transitions to different classes (e.g., 1st quantile -> 2nd quantile) + 2. average class of its neighbors + + Inputs: + + @param subquery string: e.g., SELECT * FROM table_name + @param time_cols list (string): list of strings of column names + @param num_time_per_bin int: number of bins to divide # of time columns into + @param permutations int: number of permutations for test stats + @param geom_col string: name of column which contains the geometries + @param id_col string: name of column which has the ids of the table + @param w_type string: weight type ('knn' or 'queen') + @param num_ngbrs int: number of neighbors (if knn type) + + Outputs: + @param trend_up float: probablity that a geom will move to a higher class + @param trend_down float: probablity that a geom will move to a lower class + @param trend float: (trend_up - trend_down) / trend_static + @param volatility float: a measure of the volatility based on probability stddev(prob array) + @param + """ + + qvals = {"id_col": id_col, + "time_cols": time_cols, + "geom_col": geom_col, + "subquery": subquery, + "num_ngbrs": num_ngbrs} + + query = get_query(w_type, qvals) + + try: + query_result = plpy.execute(query) + except: + zip([None],[None],[None]) + + ## build weight + weights = get_weight(query_result, w_type) + + ## prep time data + t_data = get_time_data(query_result, time_cols) + ## rebin time data + if num_time_per_bin > 1: + ## rebin + t_data = rebin_data(t_data, num_time_per_bin) + + sp_markov_result = ps.Spatial_Markov(t_data, weights, k=7, fixed=False) + + ## get lags + lags = ps.lag_spatial(weights, t_data) + + ## get lag classes + lag_classes = ps.Quantiles(lags.flatten(), k=7).yb + + ## look up probablity distribution for each unit according to class and lag class + prob_dist = get_prob_dist(lag_classes, sp_markov_result.classes) + + ## find the ups and down and overall distribution of each cell + trend, trend_up, trend_down, volatility = get_prob_stats(prob_dist) + + ## output the results + + return zip(trend, trend_up, trend_down, volatility, weights.id_order) + +def get_time_data(markov_data, time_cols): + """ + Extract the time columns and bin appropriately + """ + return np.array([[x[t_col] for x in query_result] for t_col in time_cols], dtype=float) + +def rebin_data(time_data, num_time_per_bin): + """ + convert an n x l matrix into an (n/m) x l matrix where the values are reduced (averaged) for the intervening states: + 1 2 3 4 1.5 3.5 + 5 6 7 8 -> 5.5 7.5 + 9 8 7 6 8.5 6.5 + 5 4 3 2 4.5 2.5 + + if m = 2 + + This process effectively resamples the data at a longer time span n units longer than the input data. + For cases when there is a remainder (remainder(5/3) = 2), the remaining two columns are binned together as the last time period, while the first three are binned together. + + Input: + @param time_data n x l ndarray: measurements of an attribute at different time intervals + @param num_time_per_bin int: number of columns to average into a new column + Output: + ceil(n / m) x l ndarray of resampled time series + """ + + if time_data.shape[1] % num_time_per_bin == 0: + ## if fit is perfect, then use it + n_max = time_data.shape[1] / num_time_per_bin + else: + ## fit remainders into an additional column + n_max = time_data.shape[1] / num_time_per_bin + 1 + + return np.array([ + time_data[:, + num_time_per_bin*i:num_time_per_bin*(i+1)].mean(axis=1) + for i in range(n_max)]).T +def get_prob_dist(transition_matrix, lag_indices, unit_indices): + """ + given an array of transition matrices, look up the probability associated with the arrangements passed + + Input: + @param transition_matrix ndarray[k,k,k]: + @param lag_indices ndarray: + @param unit_indices ndarray: + + Output: + Array of probability distributions + """ + + return np.array([transition_matrix[(lag_indices[i], unit_indices[i])] for i in range(len(lag_indices))]) + +def get_prob_stats(prob_dist, unit_indices): +# trend, trend_up, trend_down, volatility = get_prob_stats(prob_dist) + + trend_up = np.array([prob_dist[:, i:].sum() for i in unit_indices]) + trend_down = np.array([prob_dist[:, :i].sum() for i in unit_indices]) + trend = trend_up - trend_down + volatility = prob_dist.std(axis=1) + + + return trend_up, trend_down, trend, volatility diff --git a/src/py/crankshaft/test/test_space_time_dynamics.py b/src/py/crankshaft/test/test_space_time_dynamics.py new file mode 100644 index 0000000..dd7b8b0 --- /dev/null +++ b/src/py/crankshaft/test/test_space_time_dynamics.py @@ -0,0 +1,126 @@ +import unittest +import numpy as np + +import unittest + + +# from mock_plpy import MockPlPy +# plpy = MockPlPy() +# +# import sys +# sys.modules['plpy'] = plpy +from helper import plpy, fixture_file + +import crankshaft.space_time_dynamics as std +import crankshaft.clustering as cc +from crankshaft import random_seeds +import json + +class SpaceTimeTests(unittest.TestCase): + """Testing class for Markov Functions.""" + + def setUp(self): + plpy._reset() + self.params = {"id_col": "cartodb_id", + "time_cols": ['dec_2013', 'jan_2014', 'feb_2014'], + "subquery": "SELECT * FROM a_list", + "geom_col": "the_geom", + "num_ngbrs": 321} + self.neighbors_data = json.loads(open(fixture_file('neighbors.json')).read()) + self.moran_data = json.loads(open(fixture_file('moran.json')).read()) + + self.time_data = np.array([i * np.ones(10, dtype=float) for i in range(10)]).T + + self.transition_matrix = p = np.array([ + [[ 0.96341463, 0.0304878 , 0.00609756, 0. , 0. ], + [ 0.06040268, 0.83221477, 0.10738255, 0. , 0. ], + [ 0. , 0.14 , 0.74 , 0.12 , 0. ], + [ 0. , 0.03571429, 0.32142857, 0.57142857, 0.07142857], + [ 0. , 0. , 0. , 0.16666667, 0.83333333]], + [[ 0.79831933, 0.16806723, 0.03361345, 0. , 0. ], + [ 0.0754717 , 0.88207547, 0.04245283, 0. , 0. ], + [ 0.00537634, 0.06989247, 0.8655914 , 0.05913978, 0. ], + [ 0. , 0. , 0.06372549, 0.90196078, 0.03431373], + [ 0. , 0. , 0. , 0.19444444, 0.80555556]], + [[ 0.84693878, 0.15306122, 0. , 0. , 0. ], + [ 0.08133971, 0.78947368, 0.1291866 , 0. , 0. ], + [ 0.00518135, 0.0984456 , 0.79274611, 0.0984456 , 0.00518135], + [ 0. , 0. , 0.09411765, 0.87058824, 0.03529412], + [ 0. , 0. , 0. , 0.10204082, 0.89795918]], + [[ 0.8852459 , 0.09836066, 0. , 0.01639344, 0. ], + [ 0.03875969, 0.81395349, 0.13953488, 0. , 0.00775194], + [ 0.0049505 , 0.09405941, 0.77722772, 0.11881188, 0.0049505 ], + [ 0. , 0.02339181, 0.12865497, 0.75438596, 0.09356725], + [ 0. , 0. , 0. , 0.09661836, 0.90338164]], + [[ 0.33333333, 0.66666667, 0. , 0. , 0. ], + [ 0.0483871 , 0.77419355, 0.16129032, 0.01612903, 0. ], + [ 0.01149425, 0.16091954, 0.74712644, 0.08045977, 0. ], + [ 0. , 0.01036269, 0.06217617, 0.89637306, 0.03108808], + [ 0. , 0. , 0. , 0.02352941, 0.97647059]]] + ) + + # def test_spatial_markov(self): + # """Test Spatial Markov.""" + # + # ans = "SELECT i.\"cartodb_id\" As id, " \ + # "i.\"dec_2013\"::numeric As attr1, " \ + # "i.\"jan_2014\"::numeric As attr2, " \ + # "i.\"feb_2014\"::numeric As attr3, " \ + # "(SELECT ARRAY(SELECT j.\"cartodb_id\" " \ + # "FROM (SELECT * FROM a_list) As j " \ + # "WHERE j.\"dec_2013\" IS NOT NULL AND " \ + # "j.\"jan_2014\" IS NOT NULL AND " \ + # "j.\"feb_2014\" IS NOT NULL " \ + # "ORDER BY " \ + # "j.\"the_geom\" <-> i.\"the_geom\" ASC " \ + # "LIMIT 321 OFFSET 1 ) ) " \ + # "As neighbors " \ + # "FROM (SELECT * FROM a_list) As i " \ + # "WHERE i.\"dec_2013\" IS NOT NULL AND " \ + # "i.\"jan_2014\" IS NOT NULL AND " \ + # "i.\"feb_2014\" IS NOT NULL " \ + # "ORDER BY i.\"cartodb_id\" ASC;" + # + # subquery = self.params['subquery'] + # time_cols = self.params['time_cols'] + # num_time_per_bin = 1 + # permutations = 99 + # geom_col = self.params['geom_col'] + # id_col = self.params['id_col'] + # w_type = 'knn' + # num_ngbrs = self.params['num_ngbrs'] + # + # self.assertEqual(std.spatial_markov(subquery, time_cols, num_time_per_bin, permutations, geom_col, id_col, w_type, num_ngbrs), ans) + + def test_rebin_data(self): + """Test rebin_data""" + ## sample in double the time (even case since 10 % 2 = 0): + ## (0+1)/2, (2+3)/2, (4+5)/2, (6+7)/2, (8+9)/2 + ## = 0.5, 2.5, 4.5, 6.5, 8.5 + ans_even = np.array([(i + 0.5) * np.ones(10, dtype=float) + for i in range(0, 10, 2)]).T + + self.assertTrue(np.array_equal(std.rebin_data(self.time_data, 2), ans_even)) + + ## sample in triple the time (uneven since 10 % 3 = 1): + ## (0+1+2)/3, (3+4+5)/3, (6+7+8)/3, (9)/1 + ## = 1, 4, 7, 9 + ans_odd = np.array([i * np.ones(10, dtype=float) + for i in (1, 4, 7, 9)]).T + self.assertTrue(np.array_equal(std.rebin_data(self.time_data, 3), ans_odd)) + def test_get_prob_dist(self): + """Test get_prob_dist""" + lag_indices = np.array([1, 2, 3, 4]) + unit_indices = np.array([1, 3, 2, 4]) + answer = np.array([ + [ 0.0754717 , 0.88207547, 0.04245283, 0. , 0. ], + [ 0. , 0. , 0.09411765, 0.87058824, 0.03529412], + [ 0.0049505 , 0.09405941, 0.77722772, 0.11881188, 0.0049505 ], + [ 0. , 0. , 0. , 0.02352941, 0.97647059] + ]) + result = std.get_prob_dist(self.transition_matrix, lag_indices, unit_indices) + + self.assertTrue(np.array_equal(result, answer)) + + + From f3673d6f89631fbbae1b046996795184f374215e Mon Sep 17 00:00:00 2001 From: Andy Eschbacher Date: Wed, 23 Mar 2016 12:41:15 -0400 Subject: [PATCH 06/15] updating test for moran output --- src/pg/test/expected/02_moran_test.out | 121 +++++++++++++++++++++++-- 1 file changed, 111 insertions(+), 10 deletions(-) diff --git a/src/pg/test/expected/02_moran_test.out b/src/pg/test/expected/02_moran_test.out index 66ccaaa..8644e6b 100644 --- a/src/pg/test/expected/02_moran_test.out +++ b/src/pg/test/expected/02_moran_test.out @@ -126,13 +126,65 @@ SELECT ppoints.code, m.quads ORDER BY ppoints.code; NOTICE: ** Constructing query CONTEXT: PL/Python function "cdb_moran_local" -NOTICE: ** Query failed: "SELECT i."cartodb_id" As id, i."value"::numeric As attr1, (SELECT ARRAY(SELECT j."cartodb_id" FROM "(SELECT * FROM ppoints)" As j WHERE j."value" IS NOT NULL ORDER BY j."the_geom" <-> i."the_geom" ASC LIMIT 5 OFFSET 1 ) ) As neighbors FROM "(SELECT * FROM ppoints)" As i WHERE i."value" IS NOT NULL ORDER BY i."cartodb_id" ASC;" +NOTICE: ** Query returned with 52 rows CONTEXT: PL/Python function "cdb_moran_local" -NOTICE: ** Exiting function +NOTICE: ** Finished calculations CONTEXT: PL/Python function "cdb_moran_local" code | quads ------+------- -(0 rows) + 01 | HH + 02 | HL + 03 | LL + 04 | LL + 05 | LH + 06 | LL + 07 | HH + 08 | HH + 09 | HH + 10 | LL + 11 | LL + 12 | LL + 13 | HL + 14 | LL + 15 | LL + 16 | HH + 17 | HH + 18 | LL + 19 | HH + 20 | HH + 21 | LL + 22 | HH + 23 | LL + 24 | LL + 25 | HH + 26 | HH + 27 | LL + 28 | HH + 29 | LL + 30 | LL + 31 | HH + 32 | LL + 33 | HL + 34 | LH + 35 | LL + 36 | LL + 37 | HL + 38 | HL + 39 | HH + 40 | HH + 41 | HL + 42 | LH + 43 | LH + 44 | LL + 45 | LH + 46 | LL + 47 | LL + 48 | HH + 49 | LH + 50 | HH + 51 | LL + 52 | LL +(52 rows) SELECT cdb_crankshaft._cdb_random_seeds(1234); _cdb_random_seeds @@ -147,12 +199,61 @@ SELECT ppoints2.code, m.quads ORDER BY ppoints2.code; NOTICE: ** Constructing query CONTEXT: PL/Python function "cdb_moran_local_rate" -NOTICE: ** Query failed: "SELECT i."cartodb_id" As id, i."denominator"::numeric As attr1, i."numerator"::numeric As attr2, (SELECT ARRAY(SELECT j."cartodb_id" FROM "(SELECT * FROM ppoints2)" As j WHERE j."denominator" IS NOT NULL AND j."numerator" IS NOT NULL AND j."numerator" <> 0 ORDER BY j."the_geom" <-> i."the_geom" ASC LIMIT 5 OFFSET 1 ) ) As neighbors FROM "(SELECT * FROM ppoints2)" As i WHERE i."denominator" IS NOT NULL AND i."numerator" IS NOT NULL AND i."numerator" <> 0 ORDER BY i."cartodb_id" ASC;" +NOTICE: ** Query returned with 51 rows CONTEXT: PL/Python function "cdb_moran_local_rate" -NOTICE: ** Error: +NOTICE: ** Finished calculations CONTEXT: PL/Python function "cdb_moran_local_rate" -NOTICE: ** Exiting function -CONTEXT: PL/Python function "cdb_moran_local_rate" -ERROR: length of returned sequence did not match number of columns in row -CONTEXT: while creating return value -PL/Python function "cdb_moran_local_rate" + code | quads +------+------- + 01 | LL + 02 | LH + 03 | HH + 04 | HH + 05 | LL + 06 | HH + 07 | LL + 08 | LL + 09 | LL + 10 | HH + 11 | HH + 12 | HL + 13 | LL + 14 | HH + 15 | LL + 16 | LL + 17 | LL + 18 | LH + 19 | LL + 20 | LL + 21 | HH + 22 | LL + 23 | HL + 24 | LL + 25 | LL + 26 | LL + 27 | LL + 28 | LL + 29 | LH + 30 | HH + 31 | LL + 32 | LL + 33 | LL + 34 | LL + 35 | LH + 36 | HL + 37 | LH + 38 | LH + 39 | LL + 40 | LL + 41 | LH + 42 | HL + 43 | LL + 44 | HL + 45 | LL + 46 | HL + 47 | LL + 48 | LL + 49 | HL + 50 | LL + 51 | HH +(51 rows) From cfb40ddecd20556781687745f45037f6e3b9632f Mon Sep 17 00:00:00 2001 From: Andy Eschbacher Date: Wed, 23 Mar 2016 13:01:56 -0400 Subject: [PATCH 07/15] fixes test for moran --- src/pg/test/expected/02_moran_test.out | 1 + src/py/crankshaft/test/fixtures/moran.json | 68 +++++++++++----------- 2 files changed, 35 insertions(+), 34 deletions(-) diff --git a/src/pg/test/expected/02_moran_test.out b/src/pg/test/expected/02_moran_test.out index 8644e6b..92cb218 100644 --- a/src/pg/test/expected/02_moran_test.out +++ b/src/pg/test/expected/02_moran_test.out @@ -257,3 +257,4 @@ CONTEXT: PL/Python function "cdb_moran_local_rate" 50 | LL 51 | HH (51 rows) + diff --git a/src/py/crankshaft/test/fixtures/moran.json b/src/py/crankshaft/test/fixtures/moran.json index 0530c18..9bd90d7 100644 --- a/src/py/crankshaft/test/fixtures/moran.json +++ b/src/py/crankshaft/test/fixtures/moran.json @@ -1,52 +1,52 @@ [[0.9319096128346788, "HH"], [-1.135787401862846, "HL"], -[0.11732030672508517, "Not significant"], -[0.6152779669180425, "Not significant"], -[-0.14657336660125297, "Not significant"], -[0.6967858120189607, "Not significant"], -[0.07949310115714454, "Not significant"], -[0.4703198759258987, "Not significant"], -[0.4421125200498064, "Not significant"], -[0.5724288737143592, "Not significant"], +[0.11732030672508517, "LL"], +[0.6152779669180425, "LL"], +[-0.14657336660125297, "LH"], +[0.6967858120189607, "LL"], +[0.07949310115714454, "HH"], +[0.4703198759258987, "HH"], +[0.4421125200498064, "HH"], +[0.5724288737143592, "LL"], [0.8970743435692062, "LL"], -[0.18327334401918674, "Not significant"], -[-0.01466729201304962, "Not significant"], -[0.3481559372544409, "Not significant"], -[0.06547094736902978, "Not significant"], +[0.18327334401918674, "LL"], +[-0.01466729201304962, "HL"], +[0.3481559372544409, "LL"], +[0.06547094736902978, "LL"], [0.15482141569329988, "HH"], -[0.4373841193538136, "Not significant"], -[0.15971286468915544, "Not significant"], -[1.0543588860308968, "Not significant"], +[0.4373841193538136, "HH"], +[0.15971286468915544, "LL"], +[1.0543588860308968, "HH"], [1.7372866900020818, "HH"], [1.091998586053999, "LL"], -[0.1171572584252222, "Not significant"], -[0.08438455015300014, "Not significant"], -[0.06547094736902978, "Not significant"], +[0.1171572584252222, "HH"], +[0.08438455015300014, "LL"], +[0.06547094736902978, "LL"], [0.15482141569329985, "HH"], [1.1627044812890683, "HH"], -[0.06547094736902978, "Not significant"], -[0.795275137550483, "Not significant"], +[0.06547094736902978, "LL"], +[0.795275137550483, "HH"], [0.18562939195219, "LL"], -[0.3010757406693439, "Not significant"], +[0.3010757406693439, "LL"], [2.8205795942839376, "HH"], -[0.11259190602909264, "Not significant"], -[-0.07116352791516614, "Not significant"], -[-0.09945240794119009, "Not significant"], +[0.11259190602909264, "LL"], +[-0.07116352791516614, "HL"], +[-0.09945240794119009, "LH"], [0.18562939195219, "LL"], -[0.1832733440191868, "Not significant"], -[-0.39054253768447705, "Not significant"], +[0.1832733440191868, "LL"], +[-0.39054253768447705, "HL"], [-0.1672071289487642, "HL"], -[0.3337669247916343, "Not significant"], -[0.2584386102554792, "Not significant"], +[0.3337669247916343, "HH"], +[0.2584386102554792, "HH"], [-0.19733845476322634, "HL"], [-0.9379282899805409, "LH"], -[-0.028770969951095866, "Not significant"], -[0.051367269430983485, "Not significant"], +[-0.028770969951095866, "LH"], +[0.051367269430983485, "LL"], [-0.2172548045913472, "LH"], -[0.05136726943098351, "Not significant"], -[0.04191046803899837, "Not significant"], +[0.05136726943098351, "LL"], +[0.04191046803899837, "LL"], [0.7482357030403517, "HH"], -[-0.014585767863118111, "Not significant"], -[0.5410013139159929, "Not significant"], +[-0.014585767863118111, "LH"], +[0.5410013139159929, "HH"], [1.0223932668429925, "LL"], [1.4179402898927476, "LL"]] From 42e760b5d104b57f42307e7903af3f63cd4c9fce Mon Sep 17 00:00:00 2001 From: Andy Eschbacher Date: Wed, 23 Mar 2016 14:50:52 -0400 Subject: [PATCH 08/15] adding passing tests --- .../crankshaft/space_time_dynamics/markov.py | 31 ++++++++--- .../test/test_space_time_dynamics.py | 55 +++++++++++++------ 2 files changed, 63 insertions(+), 23 deletions(-) diff --git a/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py b/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py index 911d558..db2be6f 100644 --- a/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py +++ b/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py @@ -54,7 +54,7 @@ def spatial_markov_trend(subquery, time_cols, num_time_per_bin, permutations, ge ## rebin time data if num_time_per_bin > 1: ## rebin - t_data = rebin_data(t_data, num_time_per_bin) + t_data = rebin_data(t_data, int(num_time_per_bin)) sp_markov_result = ps.Spatial_Markov(t_data, weights, k=7, fixed=False) @@ -68,7 +68,7 @@ def spatial_markov_trend(subquery, time_cols, num_time_per_bin, permutations, ge prob_dist = get_prob_dist(lag_classes, sp_markov_result.classes) ## find the ups and down and overall distribution of each cell - trend, trend_up, trend_down, volatility = get_prob_stats(prob_dist) + trend_up, trend_down, trend, volatility = get_prob_stats(prob_dist) ## output the results @@ -127,12 +127,29 @@ def get_prob_dist(transition_matrix, lag_indices, unit_indices): return np.array([transition_matrix[(lag_indices[i], unit_indices[i])] for i in range(len(lag_indices))]) def get_prob_stats(prob_dist, unit_indices): -# trend, trend_up, trend_down, volatility = get_prob_stats(prob_dist) + """ + get the statistics of the probability distributions - trend_up = np.array([prob_dist[:, i:].sum() for i in unit_indices]) - trend_down = np.array([prob_dist[:, :i].sum() for i in unit_indices]) - trend = trend_up - trend_down + Outputs: + @param trend_up ndarray(float): sum of probabilities for upward + movement (relative to the unit index of that prob) + @param trend_down ndarray(float): sum of probabilities for downard + movement (relative to the unit index of that prob) + @param trend ndarray(float): difference of upward and downward + movements + """ + + num_elements = len(prob_dist) + trend_up = np.empty(num_elements) + trend_down = np.empty(num_elements) + trend = np.empty(num_elements) + + for i in range(num_elements): + trend_up[i] = prob_dist[i, (unit_indices[i]+1):].sum() + trend_down[i] = prob_dist[i, :unit_indices[i]].sum() + trend[i] = (trend_up[i] - trend_down[i]) / prob_dist[i, unit_indices[i]] + + ## calculate volatility of distribution volatility = prob_dist.std(axis=1) - return trend_up, trend_down, trend, volatility diff --git a/src/py/crankshaft/test/test_space_time_dynamics.py b/src/py/crankshaft/test/test_space_time_dynamics.py index dd7b8b0..c35aea5 100644 --- a/src/py/crankshaft/test/test_space_time_dynamics.py +++ b/src/py/crankshaft/test/test_space_time_dynamics.py @@ -28,9 +28,9 @@ class SpaceTimeTests(unittest.TestCase): "num_ngbrs": 321} self.neighbors_data = json.loads(open(fixture_file('neighbors.json')).read()) self.moran_data = json.loads(open(fixture_file('moran.json')).read()) - + self.time_data = np.array([i * np.ones(10, dtype=float) for i in range(10)]).T - + self.transition_matrix = p = np.array([ [[ 0.96341463, 0.0304878 , 0.00609756, 0. , 0. ], [ 0.06040268, 0.83221477, 0.10738255, 0. , 0. ], @@ -61,7 +61,7 @@ class SpaceTimeTests(unittest.TestCase): # def test_spatial_markov(self): # """Test Spatial Markov.""" - # + # # ans = "SELECT i.\"cartodb_id\" As id, " \ # "i.\"dec_2013\"::numeric As attr1, " \ # "i.\"jan_2014\"::numeric As attr2, " \ @@ -80,7 +80,7 @@ class SpaceTimeTests(unittest.TestCase): # "i.\"jan_2014\" IS NOT NULL AND " \ # "i.\"feb_2014\" IS NOT NULL " \ # "ORDER BY i.\"cartodb_id\" ASC;" - # + # # subquery = self.params['subquery'] # time_cols = self.params['time_cols'] # num_time_per_bin = 1 @@ -89,23 +89,23 @@ class SpaceTimeTests(unittest.TestCase): # id_col = self.params['id_col'] # w_type = 'knn' # num_ngbrs = self.params['num_ngbrs'] - # + # # self.assertEqual(std.spatial_markov(subquery, time_cols, num_time_per_bin, permutations, geom_col, id_col, w_type, num_ngbrs), ans) - + def test_rebin_data(self): """Test rebin_data""" - ## sample in double the time (even case since 10 % 2 = 0): - ## (0+1)/2, (2+3)/2, (4+5)/2, (6+7)/2, (8+9)/2 + ## sample in double the time (even case since 10 % 2 = 0): + ## (0+1)/2, (2+3)/2, (4+5)/2, (6+7)/2, (8+9)/2 ## = 0.5, 2.5, 4.5, 6.5, 8.5 - ans_even = np.array([(i + 0.5) * np.ones(10, dtype=float) + ans_even = np.array([(i + 0.5) * np.ones(10, dtype=float) for i in range(0, 10, 2)]).T - + self.assertTrue(np.array_equal(std.rebin_data(self.time_data, 2), ans_even)) ## sample in triple the time (uneven since 10 % 3 = 1): - ## (0+1+2)/3, (3+4+5)/3, (6+7+8)/3, (9)/1 + ## (0+1+2)/3, (3+4+5)/3, (6+7+8)/3, (9)/1 ## = 1, 4, 7, 9 - ans_odd = np.array([i * np.ones(10, dtype=float) + ans_odd = np.array([i * np.ones(10, dtype=float) for i in (1, 4, 7, 9)]).T self.assertTrue(np.array_equal(std.rebin_data(self.time_data, 3), ans_odd)) def test_get_prob_dist(self): @@ -119,8 +119,31 @@ class SpaceTimeTests(unittest.TestCase): [ 0. , 0. , 0. , 0.02352941, 0.97647059] ]) result = std.get_prob_dist(self.transition_matrix, lag_indices, unit_indices) - + self.assertTrue(np.array_equal(result, answer)) - - - + + def test_get_prob_stats(self): + """Test get_prob_stats""" + + probs = np.array([ + [ 0.0754717 , 0.88207547, 0.04245283, 0. , 0. ], + [ 0. , 0. , 0.09411765, 0.87058824, 0.03529412], + [ 0.0049505 , 0.09405941, 0.77722772, 0.11881188, 0.0049505 ], + [ 0. , 0. , 0. , 0.02352941, 0.97647059] + ]) + unit_indices = np.array([1, 3, 2, 4]) + answer_up = np.array([0.04245283, 0.03529412, 0.12376238, 0.]) + answer_down = np.array([0.0754717, 0.09411765, 0.0990099, 0.02352941]) + answer_trend = np.array([-0.03301887 / 0.88207547, -0.05882353 / 0.87058824, 0.02475248 / 0.77722772, -0.02352941 / 0.97647059]) + answer_volatility = np.array([ 0.34221495, 0.33705421, 0.29226542, 0.38834223]) + + result = std.get_prob_stats(probs, unit_indices) + result_up = result[0] + result_down = result[1] + result_trend = result[2] + result_volatility = result[3] + + self.assertTrue(np.allclose(result_up, answer_up)) + self.assertTrue(np.allclose(result_down, answer_down)) + self.assertTrue(np.allclose(result_trend, answer_trend)) + self.assertTrue(np.allclose(result_volatility, answer_volatility)) From c5a58f97ecc8af419c6ba40f9e7166f2ada46416 Mon Sep 17 00:00:00 2001 From: Andy Eschbacher Date: Wed, 23 Mar 2016 15:33:54 -0400 Subject: [PATCH 09/15] add types to arrays --- src/py/crankshaft/crankshaft/space_time_dynamics/markov.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py b/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py index db2be6f..e809632 100644 --- a/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py +++ b/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py @@ -140,9 +140,9 @@ def get_prob_stats(prob_dist, unit_indices): """ num_elements = len(prob_dist) - trend_up = np.empty(num_elements) - trend_down = np.empty(num_elements) - trend = np.empty(num_elements) + trend_up = np.empty(num_elements, dtype=float) + trend_down = np.empty(num_elements, dtype=float) + trend = np.empty(num_elements, dtype=float) for i in range(num_elements): trend_up[i] = prob_dist[i, (unit_indices[i]+1):].sum() From 68e5e0892ccae0a7614fdee8601ef28544f93837 Mon Sep 17 00:00:00 2001 From: Andy Eschbacher Date: Wed, 23 Mar 2016 20:46:19 -0400 Subject: [PATCH 10/15] update signature used in plpython function --- src/pg/sql/11_markov.sql | 96 ++++++++++++++++++++-------------------- 1 file changed, 48 insertions(+), 48 deletions(-) diff --git a/src/pg/sql/11_markov.sql b/src/pg/sql/11_markov.sql index 63a46ac..fa7c838 100644 --- a/src/pg/sql/11_markov.sql +++ b/src/pg/sql/11_markov.sql @@ -6,7 +6,7 @@ -- 2 | Pt2 | 11.0 | 13.2 | 12.5 -- ... -- Sample Function call: --- SELECT cdb_spatial_markov('SELECT * FROM real_estate', +-- SELECT cdb_spatial_markov('SELECT * FROM real_estate', -- Array['date_1', 'date_2', 'date_3']) @@ -16,7 +16,7 @@ CREATE OR REPLACE FUNCTION time_cols text[], num_time_per_bin int DEFAULT 1, permutations INT DEFAULT 99, - geom_column TEXT DEFAULT 'the_geom', + geom_col TEXT DEFAULT 'the_geom', id_col TEXT DEFAULT 'cartodb_id', w_type TEXT DEFAULT 'knn', num_ngbrs int DEFAULT 5) @@ -25,7 +25,7 @@ AS $$ plpy.execute('SELECT cdb_crankshaft._cdb_crankshaft_activate_py()') from crankshaft.space_time_predictions import spatial_markov # TODO: use named parameters or a dictionary - return spatial_markov(subquery, time_cols, permutations, geom_column, id_col, w_type, num_ngbrs) + return def spatial_markov_trend(subquery, time_cols, num_time_per_bin, permutations, geom_col, id_col, w_type, num_ngbrs) $$ LANGUAGE plpythonu; -- input table format: identical to above but in a predictable format @@ -34,48 +34,48 @@ $$ LANGUAGE plpythonu; -- 'date_1') -CREATE OR REPLACE FUNCTION - cdb_spatial_markov ( - subquery TEXT, - time_col_min text, - time_col_max text, - date_format text, -- '_YYYY_MM_DD' - num_time_per_bin INT DEFAULT 1, - permutations INT DEFAULT 99, - geom_column TEXT DEFAULT 'the_geom', - id_col TEXT DEFAULT 'cartodb_id', - w_type TEXT DEFAULT 'knn', - num_ngbrs int DEFAULT 5) -RETURNS TABLE (moran FLOAT, quads TEXT, significance FLOAT, ids INT) -AS $$ - plpy.execute('SELECT cdb_crankshaft._cdb_crankshaft_activate_py()') - from crankshaft.clustering import moran_local - # TODO: use named parameters or a dictionary - return spatial_markov(subquery, time_cols, permutations, geom_column, id_col, w_type, num_ngbrs) -$$ LANGUAGE plpythonu; - --- input table format: --- id | geom | date | measurement --- 1 | Pt1 | 12/3 | 13.2 --- 2 | Pt2 | 11/5 | 11.3 --- 3 | Pt1 | 11/13 | 12.9 --- 4 | Pt3 | 12/19 | 10.1 --- ... - -CREATE OR REPLACE FUNCTION - cdb_spatial_markov ( - subquery TEXT, - time_col text, - num_time_per_bin INT DEFAULT 1, - permutations INT DEFAULT 99, - geom_column TEXT DEFAULT 'the_geom', - id_col TEXT DEFAULT 'cartodb_id', - w_type TEXT DEFAULT 'knn', - num_ngbrs int DEFAULT 5) -RETURNS TABLE (moran FLOAT, quads TEXT, significance FLOAT, ids INT) -AS $$ - plpy.execute('SELECT cdb_crankshaft._cdb_crankshaft_activate_py()') - from crankshaft.clustering import moran_local - # TODO: use named parameters or a dictionary - return spatial_markov(subquery, time_cols, permutations, geom_column, id_col, w_type, num_ngbrs) -$$ LANGUAGE plpythonu; +-- CREATE OR REPLACE FUNCTION +-- cdb_spatial_markov ( +-- subquery TEXT, +-- time_col_min text, +-- time_col_max text, +-- date_format text, -- '_YYYY_MM_DD' +-- num_time_per_bin INT DEFAULT 1, +-- permutations INT DEFAULT 99, +-- geom_column TEXT DEFAULT 'the_geom', +-- id_col TEXT DEFAULT 'cartodb_id', +-- w_type TEXT DEFAULT 'knn', +-- num_ngbrs int DEFAULT 5) +-- RETURNS TABLE (moran FLOAT, quads TEXT, significance FLOAT, ids INT) +-- AS $$ +-- plpy.execute('SELECT cdb_crankshaft._cdb_crankshaft_activate_py()') +-- from crankshaft.clustering import moran_local +-- # TODO: use named parameters or a dictionary +-- return spatial_markov(subquery, time_cols, permutations, geom_column, id_col, w_type, num_ngbrs) +-- $$ LANGUAGE plpythonu; +-- +-- -- input table format: +-- -- id | geom | date | measurement +-- -- 1 | Pt1 | 12/3 | 13.2 +-- -- 2 | Pt2 | 11/5 | 11.3 +-- -- 3 | Pt1 | 11/13 | 12.9 +-- -- 4 | Pt3 | 12/19 | 10.1 +-- -- ... +-- +-- CREATE OR REPLACE FUNCTION +-- cdb_spatial_markov ( +-- subquery TEXT, +-- time_col text, +-- num_time_per_bin INT DEFAULT 1, +-- permutations INT DEFAULT 99, +-- geom_column TEXT DEFAULT 'the_geom', +-- id_col TEXT DEFAULT 'cartodb_id', +-- w_type TEXT DEFAULT 'knn', +-- num_ngbrs int DEFAULT 5) +-- RETURNS TABLE (moran FLOAT, quads TEXT, significance FLOAT, ids INT) +-- AS $$ +-- plpy.execute('SELECT cdb_crankshaft._cdb_crankshaft_activate_py()') +-- from crankshaft.clustering import moran_local +-- # TODO: use named parameters or a dictionary +-- return spatial_markov(subquery, time_cols, permutations, geom_column, id_col, w_type, num_ngbrs) +-- $$ LANGUAGE plpythonu; From 491577ed62d35346eb65be0e0c91ade1dc21e6bb Mon Sep 17 00:00:00 2001 From: Andy Eschbacher Date: Thu, 24 Mar 2016 08:19:11 -0400 Subject: [PATCH 11/15] formated markov sql file a bit --- src/pg/sql/11_markov.sql | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/pg/sql/11_markov.sql b/src/pg/sql/11_markov.sql index fa7c838..f63af5d 100644 --- a/src/pg/sql/11_markov.sql +++ b/src/pg/sql/11_markov.sql @@ -23,9 +23,12 @@ CREATE OR REPLACE FUNCTION RETURNS TABLE (moran FLOAT, quads TEXT, significance FLOAT, ids INT) AS $$ plpy.execute('SELECT cdb_crankshaft._cdb_crankshaft_activate_py()') - from crankshaft.space_time_predictions import spatial_markov - # TODO: use named parameters or a dictionary - return def spatial_markov_trend(subquery, time_cols, num_time_per_bin, permutations, geom_col, id_col, w_type, num_ngbrs) + + from crankshaft.space_time_dynamics import spatial_markov_trend + + ## TODO: use named parameters or a dictionary + + return spatial_markov_trend(subquery, time_cols, num_time_per_bin, permutations, geom_col, id_col, w_type, num_ngbrs) $$ LANGUAGE plpythonu; -- input table format: identical to above but in a predictable format From 98c2b11935a45ffe21f5aaee5681bf791a133090 Mon Sep 17 00:00:00 2001 From: Andy Eschbacher Date: Thu, 24 Mar 2016 11:33:47 -0400 Subject: [PATCH 12/15] update output signature --- src/pg/sql/11_markov.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pg/sql/11_markov.sql b/src/pg/sql/11_markov.sql index f63af5d..09cca98 100644 --- a/src/pg/sql/11_markov.sql +++ b/src/pg/sql/11_markov.sql @@ -20,7 +20,7 @@ CREATE OR REPLACE FUNCTION id_col TEXT DEFAULT 'cartodb_id', w_type TEXT DEFAULT 'knn', num_ngbrs int DEFAULT 5) -RETURNS TABLE (moran FLOAT, quads TEXT, significance FLOAT, ids INT) +RETURNS TABLE (trend numeric, trend_up numeric, trend_down numeric, volatility numeric, ids int) AS $$ plpy.execute('SELECT cdb_crankshaft._cdb_crankshaft_activate_py()') From fbc30f12245928053968ac0f7c6bfb6253f8e462 Mon Sep 17 00:00:00 2001 From: Andy Eschbacher Date: Thu, 24 Mar 2016 11:34:28 -0400 Subject: [PATCH 13/15] add working version --- .../crankshaft/space_time_dynamics/markov.py | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py b/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py index e809632..60321e3 100644 --- a/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py +++ b/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py @@ -6,7 +6,7 @@ Spatial dynamics measurements using Spatial Markov import numpy as np import pysal as ps import plpy -from crankshaft.clustering import get_query +from crankshaft.clustering import get_query, get_weight def spatial_markov_trend(subquery, time_cols, num_time_per_bin, permutations, geom_col, id_col, w_type, num_ngbrs): """ @@ -44,7 +44,9 @@ def spatial_markov_trend(subquery, time_cols, num_time_per_bin, permutations, ge try: query_result = plpy.execute(query) except: - zip([None],[None],[None]) + plpy.notice('** Query failed: %s' % query) + plpy.error('Query failed: check the input parameters') + return zip([None], [None], [None], [None], [None]) ## build weight weights = get_weight(query_result, w_type) @@ -58,17 +60,17 @@ def spatial_markov_trend(subquery, time_cols, num_time_per_bin, permutations, ge sp_markov_result = ps.Spatial_Markov(t_data, weights, k=7, fixed=False) - ## get lags - lags = ps.lag_spatial(weights, t_data) + ## get lags of last time slice + lags = ps.lag_spatial(weights, t_data[:, -1]) ## get lag classes - lag_classes = ps.Quantiles(lags.flatten(), k=7).yb + lag_classes = ps.Quantiles(lags, k=7).yb ## look up probablity distribution for each unit according to class and lag class - prob_dist = get_prob_dist(lag_classes, sp_markov_result.classes) + prob_dist = get_prob_dist(sp_markov_result.P, lag_classes, sp_markov_result.classes[:, -1]) ## find the ups and down and overall distribution of each cell - trend_up, trend_down, trend, volatility = get_prob_stats(prob_dist) + trend_up, trend_down, trend, volatility = get_prob_stats(prob_dist, sp_markov_result.classes[:, -1]) ## output the results @@ -78,7 +80,9 @@ def get_time_data(markov_data, time_cols): """ Extract the time columns and bin appropriately """ - return np.array([[x[t_col] for x in query_result] for t_col in time_cols], dtype=float) + num_attrs = len(time_cols) + return np.array([[x['attr' + str(i)] for x in markov_data] + for i in range(1, num_attrs+1)], dtype=float).T def rebin_data(time_data, num_time_per_bin): """ @@ -139,7 +143,7 @@ def get_prob_stats(prob_dist, unit_indices): movements """ - num_elements = len(prob_dist) + num_elements = len(unit_indices) trend_up = np.empty(num_elements, dtype=float) trend_down = np.empty(num_elements, dtype=float) trend = np.empty(num_elements, dtype=float) From d39849472074ae05701f175779a01d703dd934ac Mon Sep 17 00:00:00 2001 From: Andy Eschbacher Date: Thu, 24 Mar 2016 11:34:59 -0400 Subject: [PATCH 14/15] add test case for markov use of functions --- .../crankshaft/test/test_clustering_moran.py | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/py/crankshaft/test/test_clustering_moran.py b/src/py/crankshaft/test/test_clustering_moran.py index 4c06594..f987239 100644 --- a/src/py/crankshaft/test/test_clustering_moran.py +++ b/src/py/crankshaft/test/test_clustering_moran.py @@ -26,6 +26,11 @@ class MoranTest(unittest.TestCase): "subquery": "SELECT * FROM a_list", "geom_col": "the_geom", "num_ngbrs": 321} + self.params_markov = {"id_col": "cartodb_id", + "time_cols": ["_2013_dec", "_2014_jan", "_2014_feb"], + "subquery": "SELECT * FROM a_list", + "geom_col": "the_geom", + "num_ngbrs": 321} self.neighbors_data = json.loads(open(fixture_file('neighbors.json')).read()) self.moran_data = json.loads(open(fixture_file('moran.json')).read()) @@ -67,8 +72,27 @@ class MoranTest(unittest.TestCase): "NULL AND i.\"jay_z\" IS NOT NULL AND i.\"jay_z\" <> 0 ORDER " \ "BY i.\"cartodb_id\" ASC;" + ans_markov = "SELECT i.\"cartodb_id\" As id, " \ + "i.\"_2013_dec\"::numeric As attr1, " \ + "i.\"_2014_jan\"::numeric As attr2, " \ + "i.\"_2014_feb\"::numeric As attr3, " \ + "(SELECT ARRAY(SELECT j.\"cartodb_id\" " \ + "FROM (SELECT * FROM a_list) As j " \ + "WHERE j.\"_2013_dec\" IS NOT NULL AND " \ + "j.\"_2014_jan\" IS NOT NULL AND " \ + "j.\"_2014_feb\" IS NOT NULL " \ + "ORDER BY j.\"the_geom\" <-> i.\"the_geom\" ASC " \ + "LIMIT 321 OFFSET 1 ) ) As neighbors " \ + "FROM (SELECT * FROM a_list) As i " \ + "WHERE i.\"_2013_dec\" IS NOT NULL AND " \ + "i.\"_2014_jan\" IS NOT NULL AND " \ + "i.\"_2014_feb\" IS NOT NULL "\ + "ORDER BY i.\"cartodb_id\" ASC;" + self.assertEqual(cc.knn(self.params), ans) + self.assertEqual(cc.knn(self.params_markov), ans_markov) + def test_queen(self): """Test queen neighbors function.""" From 2e1b598b4fba78f42a36c8d049e729f67b44f45b Mon Sep 17 00:00:00 2001 From: Andy Eschbacher Date: Fri, 25 Mar 2016 22:49:29 -0400 Subject: [PATCH 15/15] pylinting changes --- .../crankshaft/space_time_dynamics/markov.py | 47 ++++++++++++------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py b/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py index 60321e3..db2425c 100644 --- a/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py +++ b/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py @@ -8,7 +8,8 @@ import pysal as ps import plpy from crankshaft.clustering import get_query, get_weight -def spatial_markov_trend(subquery, time_cols, num_time_per_bin, permutations, geom_col, id_col, w_type, num_ngbrs): +def spatial_markov_trend(subquery, time_cols, num_time_per_bin, + permutations, geom_col, id_col, w_type, num_ngbrs): """ Predict the trends of a unit based on: 1. history of its transitions to different classes (e.g., 1st quantile -> 2nd quantile) @@ -33,6 +34,9 @@ def spatial_markov_trend(subquery, time_cols, num_time_per_bin, permutations, ge @param """ + if num_time_per_bin < 1: + plpy.error('Error: number of time bins must be >= 1') + qvals = {"id_col": id_col, "time_cols": time_cols, "geom_col": geom_col, @@ -58,13 +62,14 @@ def spatial_markov_trend(subquery, time_cols, num_time_per_bin, permutations, ge ## rebin t_data = rebin_data(t_data, int(num_time_per_bin)) - sp_markov_result = ps.Spatial_Markov(t_data, weights, k=7, fixed=False) - - ## get lags of last time slice - lags = ps.lag_spatial(weights, t_data[:, -1]) + sp_markov_result = ps.Spatial_Markov(t_data, + weights, + k=7, + fixed=False, + permutations=permutations) ## get lag classes - lag_classes = ps.Quantiles(lags, k=7).yb + lag_classes = ps.Quantiles(ps.lag_spatial(weights, t_data[:, -1]), k=7).yb ## look up probablity distribution for each unit according to class and lag class prob_dist = get_prob_dist(sp_markov_result.P, lag_classes, sp_markov_result.classes[:, -1]) @@ -86,7 +91,8 @@ def get_time_data(markov_data, time_cols): def rebin_data(time_data, num_time_per_bin): """ - convert an n x l matrix into an (n/m) x l matrix where the values are reduced (averaged) for the intervening states: + convert an n x l matrix into an (n/m) x l matrix where the values are + reduced (averaged) for the intervening states: 1 2 3 4 1.5 3.5 5 6 7 8 -> 5.5 7.5 9 8 7 6 8.5 6.5 @@ -94,12 +100,17 @@ def rebin_data(time_data, num_time_per_bin): if m = 2 - This process effectively resamples the data at a longer time span n units longer than the input data. - For cases when there is a remainder (remainder(5/3) = 2), the remaining two columns are binned together as the last time period, while the first three are binned together. + This process effectively resamples the data at a longer time span n + units longer than the input data. + For cases when there is a remainder (remainder(5/3) = 2), the remaining + two columns are binned together as the last time period, while the + first three are binned together. Input: - @param time_data n x l ndarray: measurements of an attribute at different time intervals - @param num_time_per_bin int: number of columns to average into a new column + @param time_data n x l ndarray: measurements of an attribute at + different time intervals + @param num_time_per_bin int: number of columns to average into a new + column Output: ceil(n / m) x l ndarray of resampled time series """ @@ -111,13 +122,12 @@ def rebin_data(time_data, num_time_per_bin): ## fit remainders into an additional column n_max = time_data.shape[1] / num_time_per_bin + 1 - return np.array([ - time_data[:, - num_time_per_bin*i:num_time_per_bin*(i+1)].mean(axis=1) + return np.array([time_data[:, num_time_per_bin * i:num_time_per_bin * (i+1)].mean(axis=1) for i in range(n_max)]).T def get_prob_dist(transition_matrix, lag_indices, unit_indices): """ - given an array of transition matrices, look up the probability associated with the arrangements passed + given an array of transition matrices, look up the probability + associated with the arrangements passed Input: @param transition_matrix ndarray[k,k,k]: @@ -128,7 +138,8 @@ def get_prob_dist(transition_matrix, lag_indices, unit_indices): Array of probability distributions """ - return np.array([transition_matrix[(lag_indices[i], unit_indices[i])] for i in range(len(lag_indices))]) + return np.array([transition_matrix[(lag_indices[i], unit_indices[i])] + for i in range(len(lag_indices))]) def get_prob_stats(prob_dist, unit_indices): """ @@ -144,9 +155,9 @@ def get_prob_stats(prob_dist, unit_indices): """ num_elements = len(unit_indices) - trend_up = np.empty(num_elements, dtype=float) + trend_up = np.empty(num_elements, dtype=float) trend_down = np.empty(num_elements, dtype=float) - trend = np.empty(num_elements, dtype=float) + trend = np.empty(num_elements, dtype=float) for i in range(num_elements): trend_up[i] = prob_dist[i, (unit_indices[i]+1):].sum()