many more (commented out) tests for complete geom coverage, plus getuscensusmeasure tests

This commit is contained in:
John Krauss 2016-06-01 18:16:50 -04:00
parent 62b23be2e0
commit 63448d6214

View File

@ -42,6 +42,36 @@ WHERE type ILIKE 'geometry'
AND weight > 0
''', is_meta=True).json()['rows']]
US_CENSUS_MEASURE_COLUMNS = [(r['name'], ) for r in query('''
SELECT c.name FROM obs_column c, obs_column_tag ct
WHERE type ILIKE 'numeric'
AND c.id = ct.column_id
AND ct.tag_id LIKE 'us.census%'
AND weight > 0
''', is_meta=True).json()['rows']]
def default_geometry_id(column_id):
'''
Returns default test point for the column_id.
'''
if column_id == 'whosonfirst.wof_disputed_geom':
return 'CDB_LatLng(33.78, 76.57)'
elif column_id == 'whosonfirst.wof_marinearea_geom':
return 'CDB_LatLng(43.33, -68.47)'
elif column_id in ('us.census.tiger.school_district_elementary',
'us.census.tiger.school_district_secondary',
'us.census.tiger.school_district_elementary_clipped',
'us.census.tiger.school_district_secondary_clipped'):
return 'CDB_LatLng(40.7025, -73.7067)'
elif column_id.startswith('es.ine'):
return 'CDB_LatLng(42.8226119029222, -2.51141249535454)'
elif column_id.startswith('us.zillow'):
return 'CDB_LatLng(28.3305906291771, -81.3544048197256)'
else:
return 'CDB_LatLng(40.7, -73.9)'
def default_point(column_id):
'''
Returns default test point for the column_id.
@ -63,8 +93,43 @@ def default_point(column_id):
return 'CDB_LatLng(40.7, -73.9)'
#def default_area(column_id):
# '''
# Returns default test area for the column_id
# '''
# point = default_point(column_id)
# area = 'ST_Transform(ST_Buffer(ST_Transform({point}, 3857), 1000), 4326)'.format(
# point=point)
# return area
@parameterized(US_CENSUS_MEASURE_COLUMNS)
def test_get_us_census_measure_points(name):
resp = query('''
SELECT * FROM {schema}OBS_GetUSCensusMeasure({point}, '{name}')
'''.format(name=name.replace("'", "''"),
schema='cdb_observatory.' if USE_SCHEMA else '',
point=default_point('')))
assert_equal(resp.status_code, 200)
rows = resp.json()['rows']
assert_equal(1, len(rows))
assert_is_not_none(rows[0].values()[0])
#@parameterized(MEASURE_COLUMNS)
#def test_get_measure_areas(column_id):
# resp = query('''
#SELECT * FROM {schema}OBS_GetMeasure({area}, '{column_id}')
# '''.format(column_id=column_id,
# schema='cdb_observatory.' if USE_SCHEMA else '',
# area=default_area(column_id)))
# assert_equal(resp.status_code, 200)
# rows = resp.json()['rows']
# assert_equal(1, len(rows))
# assert_is_not_none(rows[0].values()[0])
@parameterized(MEASURE_COLUMNS)
def test_measure_points(column_id):
def test_get_measure_points(column_id):
resp = query('''
SELECT * FROM {schema}OBS_GetMeasure({point}, '{column_id}')
'''.format(column_id=column_id,
@ -75,8 +140,20 @@ SELECT * FROM {schema}OBS_GetMeasure({point}, '{column_id}')
assert_equal(1, len(rows))
assert_is_not_none(rows[0].values()[0])
#@parameterized(CATEGORY_COLUMNS)
#def test_get_category_areas(column_id):
# resp = query('''
#SELECT * FROM {schema}OBS_GetCategory({area}, '{column_id}')
# '''.format(column_id=column_id,
# schema='cdb_observatory.' if USE_SCHEMA else '',
# area=default_area(column_id)))
# assert_equal(resp.status_code, 200)
# rows = resp.json()['rows']
# assert_equal(1, len(rows))
# assert_is_not_none(rows[0].values()[0])
@parameterized(CATEGORY_COLUMNS)
def test_category_points(column_id):
def test_get_category_points(column_id):
resp = query('''
SELECT * FROM {schema}OBS_GetCategory({point}, '{column_id}')
'''.format(column_id=column_id,
@ -87,14 +164,62 @@ SELECT * FROM {schema}OBS_GetCategory({point}, '{column_id}')
assert_equal(1, len(rows))
assert_is_not_none(rows[0].values()[0])
@parameterized(BOUNDARY_COLUMNS)
def test_boundary_points(column_id):
resp = query('''
SELECT * FROM {schema}OBS_GetBoundary({point}, '{column_id}')
'''.format(column_id=column_id,
schema='cdb_observatory.' if USE_SCHEMA else '',
point=default_point(column_id)))
assert_equal(resp.status_code, 200)
rows = resp.json()['rows']
assert_equal(1, len(rows))
assert_is_not_none(rows[0].values()[0])
#@parameterized(BOUNDARY_COLUMNS)
#def test_get_boundaries_by_geometry(column_id):
# resp = query('''
#SELECT * FROM {schema}OBS_GetBoundariesByGeometry({area}, '{column_id}')
# '''.format(column_id=column_id,
# schema='cdb_observatory.' if USE_SCHEMA else '',
# area=default_area(column_id)))
# assert_equal(resp.status_code, 200)
# rows = resp.json()['rows']
# assert_equal(1, len(rows))
# assert_is_not_none(rows[0].values()[0])
#@parameterized(BOUNDARY_COLUMNS)
#def test_get_points_by_geometry(column_id):
# resp = query('''
#SELECT * FROM {schema}OBS_GetPointsByGeometry({area}, '{column_id}')
# '''.format(column_id=column_id,
# schema='cdb_observatory.' if USE_SCHEMA else '',
# area=default_area(column_id)))
# assert_equal(resp.status_code, 200)
# rows = resp.json()['rows']
# assert_equal(1, len(rows))
# assert_is_not_none(rows[0].values()[0])
#@parameterized(BOUNDARY_COLUMNS)
#def test_get_boundary_points(column_id):
# resp = query('''
#SELECT * FROM {schema}OBS_GetBoundary({point}, '{column_id}')
# '''.format(column_id=column_id,
# schema='cdb_observatory.' if USE_SCHEMA else '',
# point=default_point(column_id)))
# assert_equal(resp.status_code, 200)
# rows = resp.json()['rows']
# assert_equal(1, len(rows))
# assert_is_not_none(rows[0].values()[0])
#@parameterized(BOUNDARY_COLUMNS)
#def test_get_boundary_id(column_id):
# resp = query('''
#SELECT * FROM {schema}OBS_GetBoundaryId({point}, '{column_id}')
# '''.format(column_id=column_id,
# schema='cdb_observatory.' if USE_SCHEMA else '',
# point=default_point(column_id)))
# assert_equal(resp.status_code, 200)
# rows = resp.json()['rows']
# assert_equal(1, len(rows))
# assert_is_not_none(rows[0].values()[0])
#@parameterized(BOUNDARY_COLUMNS)
#def test_get_boundary_by_id(column_id):
# resp = query('''
#SELECT * FROM {schema}OBS_GetBoundaryById({geometry_id}, '{column_id}')
# '''.format(column_id=column_id,
# schema='cdb_observatory.' if USE_SCHEMA else '',
# geometry_id=default_geometry_id(column_id)))
# assert_equal(resp.status_code, 200)
# rows = resp.json()['rows']
# assert_equal(1, len(rows))
# assert_is_not_none(rows[0].values()[0])