additional options for testing

This commit is contained in:
John Krauss 2016-05-16 11:52:41 -04:00
parent 02a529f0fc
commit 8563ca7e45

View File

@ -7,35 +7,40 @@ import requests
HOSTNAME = os.environ['OBS_HOSTNAME'] HOSTNAME = os.environ['OBS_HOSTNAME']
API_KEY = os.environ['OBS_API_KEY'] API_KEY = os.environ['OBS_API_KEY']
META_HOSTNAME = os.environ.get('OBS_META_HOSTNAME', HOSTNAME)
META_API_KEY = os.environ.get('OBS_META_API_KEY', API_KEY)
USE_SCHEMA = 'OBS_USE_SCHEMA' in os.environ
def query(q, **options):
def query(q, is_meta=False, **options):
''' '''
Query the account. Returned is the response, wrapped by the requests Query the account. Returned is the response, wrapped by the requests
library. library.
''' '''
url = 'https://{hostname}/api/v2/sql'.format(hostname=HOSTNAME) url = 'https://{hostname}/api/v2/sql'.format(
hostname=META_HOSTNAME if is_meta else HOSTNAME)
params = options.copy() params = options.copy()
params['q'] = re.sub(r'\s+', ' ', q) params['q'] = re.sub(r'\s+', ' ', q)
params['api_key'] = API_KEY params['api_key'] = META_API_KEY if is_meta else API_KEY
return requests.get(url, params=params) return requests.get(url, params=params)
MEASURE_COLUMNS = [(r['id'], ) for r in query(''' MEASURE_COLUMNS = [(r['id'], ) for r in query('''
SELECT id FROM observatory.obs_column SELECT id FROM observatory.obs_column
WHERE type ILIKE 'numeric' WHERE type ILIKE 'numeric'
AND weight > 0 AND weight > 0
''').json()['rows']] ''', is_meta=True).json()['rows']]
CATEGORY_COLUMNS = [(r['id'], ) for r in query(''' CATEGORY_COLUMNS = [(r['id'], ) for r in query('''
SELECT id FROM observatory.obs_column SELECT id FROM observatory.obs_column
WHERE type ILIKE 'text' WHERE type ILIKE 'text'
AND weight > 0 AND weight > 0
''').json()['rows']] ''', is_meta=True).json()['rows']]
BOUNDARY_COLUMNS = [(r['id'], ) for r in query(''' BOUNDARY_COLUMNS = [(r['id'], ) for r in query('''
SELECT id FROM observatory.obs_column SELECT id FROM observatory.obs_column
WHERE type ILIKE 'geometry' WHERE type ILIKE 'geometry'
AND weight > 0 AND weight > 0
''').json()['rows']] ''', is_meta=True).json()['rows']]
def default_point(column_id): def default_point(column_id):
''' '''
@ -59,8 +64,10 @@ def default_point(column_id):
@parameterized(MEASURE_COLUMNS) @parameterized(MEASURE_COLUMNS)
def test_measure_points(column_id): def test_measure_points(column_id):
resp = query(''' resp = query('''
SELECT * FROM cdb_observatory.OBS_GetMeasure({point}, '{column_id}') SELECT * FROM {schema}OBS_GetMeasure({point}, '{column_id}')
'''.format(column_id=column_id, point=default_point(column_id))) '''.format(column_id=column_id,
schema='cdb_observatory.' if USE_SCHEMA else '',
point=default_point(column_id)))
assert_equal(resp.status_code, 200) assert_equal(resp.status_code, 200)
rows = resp.json()['rows'] rows = resp.json()['rows']
assert_equal(1, len(rows)) assert_equal(1, len(rows))
@ -69,8 +76,10 @@ SELECT * FROM cdb_observatory.OBS_GetMeasure({point}, '{column_id}')
@parameterized(CATEGORY_COLUMNS) @parameterized(CATEGORY_COLUMNS)
def test_category_points(column_id): def test_category_points(column_id):
resp = query(''' resp = query('''
SELECT * FROM cdb_observatory.OBS_GetCategory({point}, '{column_id}') SELECT * FROM {schema}OBS_GetCategory({point}, '{column_id}')
'''.format(column_id=column_id, point=default_point(column_id))) '''.format(column_id=column_id,
schema='cdb_observatory.' if USE_SCHEMA else '',
point=default_point(column_id)))
assert_equal(resp.status_code, 200) assert_equal(resp.status_code, 200)
rows = resp.json()['rows'] rows = resp.json()['rows']
assert_equal(1, len(rows)) assert_equal(1, len(rows))
@ -79,8 +88,10 @@ SELECT * FROM cdb_observatory.OBS_GetCategory({point}, '{column_id}')
@parameterized(BOUNDARY_COLUMNS) @parameterized(BOUNDARY_COLUMNS)
def test_boundary_points(column_id): def test_boundary_points(column_id):
resp = query(''' resp = query('''
SELECT * FROM cdb_observatory.OBS_GetBoundary({point}, '{column_id}') SELECT * FROM {schema}OBS_GetBoundary({point}, '{column_id}')
'''.format(column_id=column_id, point=default_point(column_id))) '''.format(column_id=column_id,
schema='cdb_observatory.' if USE_SCHEMA else '',
point=default_point(column_id)))
assert_equal(resp.status_code, 200) assert_equal(resp.status_code, 200)
rows = resp.json()['rows'] rows = resp.json()['rows']
assert_equal(1, len(rows)) assert_equal(1, len(rows))