diff --git a/src/py/crankshaft/crankshaft/space_time_dynamics/__init__.py b/src/py/crankshaft/crankshaft/space_time_dynamics/__init__.py new file mode 100644 index 0000000..f45ffdf --- /dev/null +++ b/src/py/crankshaft/crankshaft/space_time_dynamics/__init__.py @@ -0,0 +1 @@ +from markov import * \ No newline at end of file diff --git a/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py b/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py new file mode 100644 index 0000000..911d558 --- /dev/null +++ b/src/py/crankshaft/crankshaft/space_time_dynamics/markov.py @@ -0,0 +1,138 @@ +""" +Spatial dynamics measurements using Spatial Markov +""" + + +import numpy as np +import pysal as ps +import plpy +from crankshaft.clustering import get_query + +def spatial_markov_trend(subquery, time_cols, num_time_per_bin, permutations, geom_col, id_col, w_type, num_ngbrs): + """ + Predict the trends of a unit based on: + 1. history of its transitions to different classes (e.g., 1st quantile -> 2nd quantile) + 2. average class of its neighbors + + Inputs: + + @param subquery string: e.g., SELECT * FROM table_name + @param time_cols list (string): list of strings of column names + @param num_time_per_bin int: number of bins to divide # of time columns into + @param permutations int: number of permutations for test stats + @param geom_col string: name of column which contains the geometries + @param id_col string: name of column which has the ids of the table + @param w_type string: weight type ('knn' or 'queen') + @param num_ngbrs int: number of neighbors (if knn type) + + Outputs: + @param trend_up float: probablity that a geom will move to a higher class + @param trend_down float: probablity that a geom will move to a lower class + @param trend float: (trend_up - trend_down) / trend_static + @param volatility float: a measure of the volatility based on probability stddev(prob array) + @param + """ + + qvals = {"id_col": id_col, + "time_cols": time_cols, + "geom_col": geom_col, + "subquery": subquery, + "num_ngbrs": num_ngbrs} + + query = get_query(w_type, qvals) + + try: + query_result = plpy.execute(query) + except: + zip([None],[None],[None]) + + ## build weight + weights = get_weight(query_result, w_type) + + ## prep time data + t_data = get_time_data(query_result, time_cols) + ## rebin time data + if num_time_per_bin > 1: + ## rebin + t_data = rebin_data(t_data, num_time_per_bin) + + sp_markov_result = ps.Spatial_Markov(t_data, weights, k=7, fixed=False) + + ## get lags + lags = ps.lag_spatial(weights, t_data) + + ## get lag classes + lag_classes = ps.Quantiles(lags.flatten(), k=7).yb + + ## look up probablity distribution for each unit according to class and lag class + prob_dist = get_prob_dist(lag_classes, sp_markov_result.classes) + + ## find the ups and down and overall distribution of each cell + trend, trend_up, trend_down, volatility = get_prob_stats(prob_dist) + + ## output the results + + return zip(trend, trend_up, trend_down, volatility, weights.id_order) + +def get_time_data(markov_data, time_cols): + """ + Extract the time columns and bin appropriately + """ + return np.array([[x[t_col] for x in query_result] for t_col in time_cols], dtype=float) + +def rebin_data(time_data, num_time_per_bin): + """ + convert an n x l matrix into an (n/m) x l matrix where the values are reduced (averaged) for the intervening states: + 1 2 3 4 1.5 3.5 + 5 6 7 8 -> 5.5 7.5 + 9 8 7 6 8.5 6.5 + 5 4 3 2 4.5 2.5 + + if m = 2 + + This process effectively resamples the data at a longer time span n units longer than the input data. + For cases when there is a remainder (remainder(5/3) = 2), the remaining two columns are binned together as the last time period, while the first three are binned together. + + Input: + @param time_data n x l ndarray: measurements of an attribute at different time intervals + @param num_time_per_bin int: number of columns to average into a new column + Output: + ceil(n / m) x l ndarray of resampled time series + """ + + if time_data.shape[1] % num_time_per_bin == 0: + ## if fit is perfect, then use it + n_max = time_data.shape[1] / num_time_per_bin + else: + ## fit remainders into an additional column + n_max = time_data.shape[1] / num_time_per_bin + 1 + + return np.array([ + time_data[:, + num_time_per_bin*i:num_time_per_bin*(i+1)].mean(axis=1) + for i in range(n_max)]).T +def get_prob_dist(transition_matrix, lag_indices, unit_indices): + """ + given an array of transition matrices, look up the probability associated with the arrangements passed + + Input: + @param transition_matrix ndarray[k,k,k]: + @param lag_indices ndarray: + @param unit_indices ndarray: + + Output: + Array of probability distributions + """ + + return np.array([transition_matrix[(lag_indices[i], unit_indices[i])] for i in range(len(lag_indices))]) + +def get_prob_stats(prob_dist, unit_indices): +# trend, trend_up, trend_down, volatility = get_prob_stats(prob_dist) + + trend_up = np.array([prob_dist[:, i:].sum() for i in unit_indices]) + trend_down = np.array([prob_dist[:, :i].sum() for i in unit_indices]) + trend = trend_up - trend_down + volatility = prob_dist.std(axis=1) + + + return trend_up, trend_down, trend, volatility diff --git a/src/py/crankshaft/test/test_space_time_dynamics.py b/src/py/crankshaft/test/test_space_time_dynamics.py new file mode 100644 index 0000000..dd7b8b0 --- /dev/null +++ b/src/py/crankshaft/test/test_space_time_dynamics.py @@ -0,0 +1,126 @@ +import unittest +import numpy as np + +import unittest + + +# from mock_plpy import MockPlPy +# plpy = MockPlPy() +# +# import sys +# sys.modules['plpy'] = plpy +from helper import plpy, fixture_file + +import crankshaft.space_time_dynamics as std +import crankshaft.clustering as cc +from crankshaft import random_seeds +import json + +class SpaceTimeTests(unittest.TestCase): + """Testing class for Markov Functions.""" + + def setUp(self): + plpy._reset() + self.params = {"id_col": "cartodb_id", + "time_cols": ['dec_2013', 'jan_2014', 'feb_2014'], + "subquery": "SELECT * FROM a_list", + "geom_col": "the_geom", + "num_ngbrs": 321} + self.neighbors_data = json.loads(open(fixture_file('neighbors.json')).read()) + self.moran_data = json.loads(open(fixture_file('moran.json')).read()) + + self.time_data = np.array([i * np.ones(10, dtype=float) for i in range(10)]).T + + self.transition_matrix = p = np.array([ + [[ 0.96341463, 0.0304878 , 0.00609756, 0. , 0. ], + [ 0.06040268, 0.83221477, 0.10738255, 0. , 0. ], + [ 0. , 0.14 , 0.74 , 0.12 , 0. ], + [ 0. , 0.03571429, 0.32142857, 0.57142857, 0.07142857], + [ 0. , 0. , 0. , 0.16666667, 0.83333333]], + [[ 0.79831933, 0.16806723, 0.03361345, 0. , 0. ], + [ 0.0754717 , 0.88207547, 0.04245283, 0. , 0. ], + [ 0.00537634, 0.06989247, 0.8655914 , 0.05913978, 0. ], + [ 0. , 0. , 0.06372549, 0.90196078, 0.03431373], + [ 0. , 0. , 0. , 0.19444444, 0.80555556]], + [[ 0.84693878, 0.15306122, 0. , 0. , 0. ], + [ 0.08133971, 0.78947368, 0.1291866 , 0. , 0. ], + [ 0.00518135, 0.0984456 , 0.79274611, 0.0984456 , 0.00518135], + [ 0. , 0. , 0.09411765, 0.87058824, 0.03529412], + [ 0. , 0. , 0. , 0.10204082, 0.89795918]], + [[ 0.8852459 , 0.09836066, 0. , 0.01639344, 0. ], + [ 0.03875969, 0.81395349, 0.13953488, 0. , 0.00775194], + [ 0.0049505 , 0.09405941, 0.77722772, 0.11881188, 0.0049505 ], + [ 0. , 0.02339181, 0.12865497, 0.75438596, 0.09356725], + [ 0. , 0. , 0. , 0.09661836, 0.90338164]], + [[ 0.33333333, 0.66666667, 0. , 0. , 0. ], + [ 0.0483871 , 0.77419355, 0.16129032, 0.01612903, 0. ], + [ 0.01149425, 0.16091954, 0.74712644, 0.08045977, 0. ], + [ 0. , 0.01036269, 0.06217617, 0.89637306, 0.03108808], + [ 0. , 0. , 0. , 0.02352941, 0.97647059]]] + ) + + # def test_spatial_markov(self): + # """Test Spatial Markov.""" + # + # ans = "SELECT i.\"cartodb_id\" As id, " \ + # "i.\"dec_2013\"::numeric As attr1, " \ + # "i.\"jan_2014\"::numeric As attr2, " \ + # "i.\"feb_2014\"::numeric As attr3, " \ + # "(SELECT ARRAY(SELECT j.\"cartodb_id\" " \ + # "FROM (SELECT * FROM a_list) As j " \ + # "WHERE j.\"dec_2013\" IS NOT NULL AND " \ + # "j.\"jan_2014\" IS NOT NULL AND " \ + # "j.\"feb_2014\" IS NOT NULL " \ + # "ORDER BY " \ + # "j.\"the_geom\" <-> i.\"the_geom\" ASC " \ + # "LIMIT 321 OFFSET 1 ) ) " \ + # "As neighbors " \ + # "FROM (SELECT * FROM a_list) As i " \ + # "WHERE i.\"dec_2013\" IS NOT NULL AND " \ + # "i.\"jan_2014\" IS NOT NULL AND " \ + # "i.\"feb_2014\" IS NOT NULL " \ + # "ORDER BY i.\"cartodb_id\" ASC;" + # + # subquery = self.params['subquery'] + # time_cols = self.params['time_cols'] + # num_time_per_bin = 1 + # permutations = 99 + # geom_col = self.params['geom_col'] + # id_col = self.params['id_col'] + # w_type = 'knn' + # num_ngbrs = self.params['num_ngbrs'] + # + # self.assertEqual(std.spatial_markov(subquery, time_cols, num_time_per_bin, permutations, geom_col, id_col, w_type, num_ngbrs), ans) + + def test_rebin_data(self): + """Test rebin_data""" + ## sample in double the time (even case since 10 % 2 = 0): + ## (0+1)/2, (2+3)/2, (4+5)/2, (6+7)/2, (8+9)/2 + ## = 0.5, 2.5, 4.5, 6.5, 8.5 + ans_even = np.array([(i + 0.5) * np.ones(10, dtype=float) + for i in range(0, 10, 2)]).T + + self.assertTrue(np.array_equal(std.rebin_data(self.time_data, 2), ans_even)) + + ## sample in triple the time (uneven since 10 % 3 = 1): + ## (0+1+2)/3, (3+4+5)/3, (6+7+8)/3, (9)/1 + ## = 1, 4, 7, 9 + ans_odd = np.array([i * np.ones(10, dtype=float) + for i in (1, 4, 7, 9)]).T + self.assertTrue(np.array_equal(std.rebin_data(self.time_data, 3), ans_odd)) + def test_get_prob_dist(self): + """Test get_prob_dist""" + lag_indices = np.array([1, 2, 3, 4]) + unit_indices = np.array([1, 3, 2, 4]) + answer = np.array([ + [ 0.0754717 , 0.88207547, 0.04245283, 0. , 0. ], + [ 0. , 0. , 0.09411765, 0.87058824, 0.03529412], + [ 0.0049505 , 0.09405941, 0.77722772, 0.11881188, 0.0049505 ], + [ 0. , 0. , 0. , 0.02352941, 0.97647059] + ]) + result = std.get_prob_dist(self.transition_matrix, lag_indices, unit_indices) + + self.assertTrue(np.array_equal(result, answer)) + + +