mirror of
https://github.com/CartoDB/crankshaft.git
synced 2024-11-01 10:20:48 +08:00
adding markov python code
This commit is contained in:
parent
8cccb18eed
commit
9943d4de58
@ -0,0 +1 @@
|
||||
from markov import *
|
138
src/py/crankshaft/crankshaft/space_time_dynamics/markov.py
Normal file
138
src/py/crankshaft/crankshaft/space_time_dynamics/markov.py
Normal file
@ -0,0 +1,138 @@
|
||||
"""
|
||||
Spatial dynamics measurements using Spatial Markov
|
||||
"""
|
||||
|
||||
|
||||
import numpy as np
|
||||
import pysal as ps
|
||||
import plpy
|
||||
from crankshaft.clustering import get_query
|
||||
|
||||
def spatial_markov_trend(subquery, time_cols, num_time_per_bin, permutations, geom_col, id_col, w_type, num_ngbrs):
|
||||
"""
|
||||
Predict the trends of a unit based on:
|
||||
1. history of its transitions to different classes (e.g., 1st quantile -> 2nd quantile)
|
||||
2. average class of its neighbors
|
||||
|
||||
Inputs:
|
||||
|
||||
@param subquery string: e.g., SELECT * FROM table_name
|
||||
@param time_cols list (string): list of strings of column names
|
||||
@param num_time_per_bin int: number of bins to divide # of time columns into
|
||||
@param permutations int: number of permutations for test stats
|
||||
@param geom_col string: name of column which contains the geometries
|
||||
@param id_col string: name of column which has the ids of the table
|
||||
@param w_type string: weight type ('knn' or 'queen')
|
||||
@param num_ngbrs int: number of neighbors (if knn type)
|
||||
|
||||
Outputs:
|
||||
@param trend_up float: probablity that a geom will move to a higher class
|
||||
@param trend_down float: probablity that a geom will move to a lower class
|
||||
@param trend float: (trend_up - trend_down) / trend_static
|
||||
@param volatility float: a measure of the volatility based on probability stddev(prob array)
|
||||
@param
|
||||
"""
|
||||
|
||||
qvals = {"id_col": id_col,
|
||||
"time_cols": time_cols,
|
||||
"geom_col": geom_col,
|
||||
"subquery": subquery,
|
||||
"num_ngbrs": num_ngbrs}
|
||||
|
||||
query = get_query(w_type, qvals)
|
||||
|
||||
try:
|
||||
query_result = plpy.execute(query)
|
||||
except:
|
||||
zip([None],[None],[None])
|
||||
|
||||
## build weight
|
||||
weights = get_weight(query_result, w_type)
|
||||
|
||||
## prep time data
|
||||
t_data = get_time_data(query_result, time_cols)
|
||||
## rebin time data
|
||||
if num_time_per_bin > 1:
|
||||
## rebin
|
||||
t_data = rebin_data(t_data, num_time_per_bin)
|
||||
|
||||
sp_markov_result = ps.Spatial_Markov(t_data, weights, k=7, fixed=False)
|
||||
|
||||
## get lags
|
||||
lags = ps.lag_spatial(weights, t_data)
|
||||
|
||||
## get lag classes
|
||||
lag_classes = ps.Quantiles(lags.flatten(), k=7).yb
|
||||
|
||||
## look up probablity distribution for each unit according to class and lag class
|
||||
prob_dist = get_prob_dist(lag_classes, sp_markov_result.classes)
|
||||
|
||||
## find the ups and down and overall distribution of each cell
|
||||
trend, trend_up, trend_down, volatility = get_prob_stats(prob_dist)
|
||||
|
||||
## output the results
|
||||
|
||||
return zip(trend, trend_up, trend_down, volatility, weights.id_order)
|
||||
|
||||
def get_time_data(markov_data, time_cols):
|
||||
"""
|
||||
Extract the time columns and bin appropriately
|
||||
"""
|
||||
return np.array([[x[t_col] for x in query_result] for t_col in time_cols], dtype=float)
|
||||
|
||||
def rebin_data(time_data, num_time_per_bin):
|
||||
"""
|
||||
convert an n x l matrix into an (n/m) x l matrix where the values are reduced (averaged) for the intervening states:
|
||||
1 2 3 4 1.5 3.5
|
||||
5 6 7 8 -> 5.5 7.5
|
||||
9 8 7 6 8.5 6.5
|
||||
5 4 3 2 4.5 2.5
|
||||
|
||||
if m = 2
|
||||
|
||||
This process effectively resamples the data at a longer time span n units longer than the input data.
|
||||
For cases when there is a remainder (remainder(5/3) = 2), the remaining two columns are binned together as the last time period, while the first three are binned together.
|
||||
|
||||
Input:
|
||||
@param time_data n x l ndarray: measurements of an attribute at different time intervals
|
||||
@param num_time_per_bin int: number of columns to average into a new column
|
||||
Output:
|
||||
ceil(n / m) x l ndarray of resampled time series
|
||||
"""
|
||||
|
||||
if time_data.shape[1] % num_time_per_bin == 0:
|
||||
## if fit is perfect, then use it
|
||||
n_max = time_data.shape[1] / num_time_per_bin
|
||||
else:
|
||||
## fit remainders into an additional column
|
||||
n_max = time_data.shape[1] / num_time_per_bin + 1
|
||||
|
||||
return np.array([
|
||||
time_data[:,
|
||||
num_time_per_bin*i:num_time_per_bin*(i+1)].mean(axis=1)
|
||||
for i in range(n_max)]).T
|
||||
def get_prob_dist(transition_matrix, lag_indices, unit_indices):
|
||||
"""
|
||||
given an array of transition matrices, look up the probability associated with the arrangements passed
|
||||
|
||||
Input:
|
||||
@param transition_matrix ndarray[k,k,k]:
|
||||
@param lag_indices ndarray:
|
||||
@param unit_indices ndarray:
|
||||
|
||||
Output:
|
||||
Array of probability distributions
|
||||
"""
|
||||
|
||||
return np.array([transition_matrix[(lag_indices[i], unit_indices[i])] for i in range(len(lag_indices))])
|
||||
|
||||
def get_prob_stats(prob_dist, unit_indices):
|
||||
# trend, trend_up, trend_down, volatility = get_prob_stats(prob_dist)
|
||||
|
||||
trend_up = np.array([prob_dist[:, i:].sum() for i in unit_indices])
|
||||
trend_down = np.array([prob_dist[:, :i].sum() for i in unit_indices])
|
||||
trend = trend_up - trend_down
|
||||
volatility = prob_dist.std(axis=1)
|
||||
|
||||
|
||||
return trend_up, trend_down, trend, volatility
|
126
src/py/crankshaft/test/test_space_time_dynamics.py
Normal file
126
src/py/crankshaft/test/test_space_time_dynamics.py
Normal file
@ -0,0 +1,126 @@
|
||||
import unittest
|
||||
import numpy as np
|
||||
|
||||
import unittest
|
||||
|
||||
|
||||
# from mock_plpy import MockPlPy
|
||||
# plpy = MockPlPy()
|
||||
#
|
||||
# import sys
|
||||
# sys.modules['plpy'] = plpy
|
||||
from helper import plpy, fixture_file
|
||||
|
||||
import crankshaft.space_time_dynamics as std
|
||||
import crankshaft.clustering as cc
|
||||
from crankshaft import random_seeds
|
||||
import json
|
||||
|
||||
class SpaceTimeTests(unittest.TestCase):
|
||||
"""Testing class for Markov Functions."""
|
||||
|
||||
def setUp(self):
|
||||
plpy._reset()
|
||||
self.params = {"id_col": "cartodb_id",
|
||||
"time_cols": ['dec_2013', 'jan_2014', 'feb_2014'],
|
||||
"subquery": "SELECT * FROM a_list",
|
||||
"geom_col": "the_geom",
|
||||
"num_ngbrs": 321}
|
||||
self.neighbors_data = json.loads(open(fixture_file('neighbors.json')).read())
|
||||
self.moran_data = json.loads(open(fixture_file('moran.json')).read())
|
||||
|
||||
self.time_data = np.array([i * np.ones(10, dtype=float) for i in range(10)]).T
|
||||
|
||||
self.transition_matrix = p = np.array([
|
||||
[[ 0.96341463, 0.0304878 , 0.00609756, 0. , 0. ],
|
||||
[ 0.06040268, 0.83221477, 0.10738255, 0. , 0. ],
|
||||
[ 0. , 0.14 , 0.74 , 0.12 , 0. ],
|
||||
[ 0. , 0.03571429, 0.32142857, 0.57142857, 0.07142857],
|
||||
[ 0. , 0. , 0. , 0.16666667, 0.83333333]],
|
||||
[[ 0.79831933, 0.16806723, 0.03361345, 0. , 0. ],
|
||||
[ 0.0754717 , 0.88207547, 0.04245283, 0. , 0. ],
|
||||
[ 0.00537634, 0.06989247, 0.8655914 , 0.05913978, 0. ],
|
||||
[ 0. , 0. , 0.06372549, 0.90196078, 0.03431373],
|
||||
[ 0. , 0. , 0. , 0.19444444, 0.80555556]],
|
||||
[[ 0.84693878, 0.15306122, 0. , 0. , 0. ],
|
||||
[ 0.08133971, 0.78947368, 0.1291866 , 0. , 0. ],
|
||||
[ 0.00518135, 0.0984456 , 0.79274611, 0.0984456 , 0.00518135],
|
||||
[ 0. , 0. , 0.09411765, 0.87058824, 0.03529412],
|
||||
[ 0. , 0. , 0. , 0.10204082, 0.89795918]],
|
||||
[[ 0.8852459 , 0.09836066, 0. , 0.01639344, 0. ],
|
||||
[ 0.03875969, 0.81395349, 0.13953488, 0. , 0.00775194],
|
||||
[ 0.0049505 , 0.09405941, 0.77722772, 0.11881188, 0.0049505 ],
|
||||
[ 0. , 0.02339181, 0.12865497, 0.75438596, 0.09356725],
|
||||
[ 0. , 0. , 0. , 0.09661836, 0.90338164]],
|
||||
[[ 0.33333333, 0.66666667, 0. , 0. , 0. ],
|
||||
[ 0.0483871 , 0.77419355, 0.16129032, 0.01612903, 0. ],
|
||||
[ 0.01149425, 0.16091954, 0.74712644, 0.08045977, 0. ],
|
||||
[ 0. , 0.01036269, 0.06217617, 0.89637306, 0.03108808],
|
||||
[ 0. , 0. , 0. , 0.02352941, 0.97647059]]]
|
||||
)
|
||||
|
||||
# def test_spatial_markov(self):
|
||||
# """Test Spatial Markov."""
|
||||
#
|
||||
# ans = "SELECT i.\"cartodb_id\" As id, " \
|
||||
# "i.\"dec_2013\"::numeric As attr1, " \
|
||||
# "i.\"jan_2014\"::numeric As attr2, " \
|
||||
# "i.\"feb_2014\"::numeric As attr3, " \
|
||||
# "(SELECT ARRAY(SELECT j.\"cartodb_id\" " \
|
||||
# "FROM (SELECT * FROM a_list) As j " \
|
||||
# "WHERE j.\"dec_2013\" IS NOT NULL AND " \
|
||||
# "j.\"jan_2014\" IS NOT NULL AND " \
|
||||
# "j.\"feb_2014\" IS NOT NULL " \
|
||||
# "ORDER BY " \
|
||||
# "j.\"the_geom\" <-> i.\"the_geom\" ASC " \
|
||||
# "LIMIT 321 OFFSET 1 ) ) " \
|
||||
# "As neighbors " \
|
||||
# "FROM (SELECT * FROM a_list) As i " \
|
||||
# "WHERE i.\"dec_2013\" IS NOT NULL AND " \
|
||||
# "i.\"jan_2014\" IS NOT NULL AND " \
|
||||
# "i.\"feb_2014\" IS NOT NULL " \
|
||||
# "ORDER BY i.\"cartodb_id\" ASC;"
|
||||
#
|
||||
# subquery = self.params['subquery']
|
||||
# time_cols = self.params['time_cols']
|
||||
# num_time_per_bin = 1
|
||||
# permutations = 99
|
||||
# geom_col = self.params['geom_col']
|
||||
# id_col = self.params['id_col']
|
||||
# w_type = 'knn'
|
||||
# num_ngbrs = self.params['num_ngbrs']
|
||||
#
|
||||
# self.assertEqual(std.spatial_markov(subquery, time_cols, num_time_per_bin, permutations, geom_col, id_col, w_type, num_ngbrs), ans)
|
||||
|
||||
def test_rebin_data(self):
|
||||
"""Test rebin_data"""
|
||||
## sample in double the time (even case since 10 % 2 = 0):
|
||||
## (0+1)/2, (2+3)/2, (4+5)/2, (6+7)/2, (8+9)/2
|
||||
## = 0.5, 2.5, 4.5, 6.5, 8.5
|
||||
ans_even = np.array([(i + 0.5) * np.ones(10, dtype=float)
|
||||
for i in range(0, 10, 2)]).T
|
||||
|
||||
self.assertTrue(np.array_equal(std.rebin_data(self.time_data, 2), ans_even))
|
||||
|
||||
## sample in triple the time (uneven since 10 % 3 = 1):
|
||||
## (0+1+2)/3, (3+4+5)/3, (6+7+8)/3, (9)/1
|
||||
## = 1, 4, 7, 9
|
||||
ans_odd = np.array([i * np.ones(10, dtype=float)
|
||||
for i in (1, 4, 7, 9)]).T
|
||||
self.assertTrue(np.array_equal(std.rebin_data(self.time_data, 3), ans_odd))
|
||||
def test_get_prob_dist(self):
|
||||
"""Test get_prob_dist"""
|
||||
lag_indices = np.array([1, 2, 3, 4])
|
||||
unit_indices = np.array([1, 3, 2, 4])
|
||||
answer = np.array([
|
||||
[ 0.0754717 , 0.88207547, 0.04245283, 0. , 0. ],
|
||||
[ 0. , 0. , 0.09411765, 0.87058824, 0.03529412],
|
||||
[ 0.0049505 , 0.09405941, 0.77722772, 0.11881188, 0.0049505 ],
|
||||
[ 0. , 0. , 0. , 0.02352941, 0.97647059]
|
||||
])
|
||||
result = std.get_prob_dist(self.transition_matrix, lag_indices, unit_indices)
|
||||
|
||||
self.assertTrue(np.array_equal(result, answer))
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user