Created helper to be used in all the tests
This commit is contained in:
parent
1397b3fcaf
commit
f552f2db12
0
test/__init__.py
Normal file
0
test/__init__.py
Normal file
0
test/helpers/__init__.py
Normal file
0
test/helpers/__init__.py
Normal file
30
test/helpers/integration_test_helper.py
Normal file
30
test/helpers/integration_test_helper.py
Normal file
@ -0,0 +1,30 @@
|
||||
import os
|
||||
import requests
|
||||
import json
|
||||
|
||||
class IntegrationTestHelper:
|
||||
|
||||
@classmethod
|
||||
def get_environment_variables(cls):
|
||||
username = os.environ["GEOCODER_API_TEST_USERNAME"]
|
||||
api_key = os.environ["GEOCODER_API_TEST_API_KEY"]
|
||||
host = os.environ["GEOCODER_API_TEST_HOST"]
|
||||
table_name = os.environ["GEOCODER_API_TEST_TABLE_NAME"]
|
||||
|
||||
return {
|
||||
"username": username,
|
||||
"api_key": api_key,
|
||||
"host": host,
|
||||
"table_name": table_name
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def execute_query(cls, sql_api_url, query):
|
||||
query_url = "{0}&q={1}".format(sql_api_url, query)
|
||||
print "Executing query: {0}".format(query_url)
|
||||
query_response = requests.get(query_url)
|
||||
if query_response.status_code != 200:
|
||||
raise Exception("Error executing SQL API query")
|
||||
query_response_data = json.loads(query_response.text)
|
||||
|
||||
return query_response_data['rows'][0]['geometry']
|
0
test/integration/__init__.py
Normal file
0
test/integration/__init__.py
Normal file
@ -1,29 +1,22 @@
|
||||
import os, time, requests, json
|
||||
from unittest import TestCase
|
||||
from nose.tools import assert_raises
|
||||
from nose.tools import assert_not_equal
|
||||
from ..helpers.integration_test_helper import IntegrationTestHelper
|
||||
|
||||
|
||||
class TestConfigHelper(TestCase):
|
||||
class TestAdmin0Functions(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
username = os.environ["GEOCODER_API_TEST_USERNAME"]
|
||||
api_key = os.environ["GEOCODER_API_TEST_API_KEY"]
|
||||
host = os.environ["GEOCODER_API_TEST_HOST"]
|
||||
self.table_name = os.environ["GEOCODER_API_TEST_TABLE_NAME"]
|
||||
self.sql_api_url = "https://{0}.{1}/api/v2/sql?api_key={2}".format(username, host, api_key)
|
||||
def setUp(self):
|
||||
self.env_variables = IntegrationTestHelper.get_environment_variables()
|
||||
self.sql_api_url = "https://{0}.{1}/api/v2/sql?api_key={2}".format(
|
||||
self.env_variables['username'],
|
||||
self.env_variables['host'],
|
||||
self.env_variables['api_key']
|
||||
)
|
||||
|
||||
def test_if_select_with_admin0_is_ok(self):
|
||||
query = "SELECT cdb_geocode_admin0_polygon(name) as geometry FROM {0} LIMIT 1".format(self.table_name)
|
||||
geometry = self.execute_query(query)
|
||||
assert geometry != None
|
||||
|
||||
def build_sql_api_query_url(self, query):
|
||||
return "{0}&q={1}".format(self.sql_api_url,query)
|
||||
|
||||
def execute_query(self, query):
|
||||
query_url = self.build_sql_api_query_url(query)
|
||||
query_response = requests.get(query_url)
|
||||
if query_response.status_code != 200:
|
||||
raise Exception("Error executing SQL API query")
|
||||
query_response_data = json.loads(query_response.text)
|
||||
return query_response_data['rows'][0]['geometry']
|
||||
def test_if_select_with_admin0_is_ok(self):
|
||||
query = "SELECT cdb_geocode_admin0_polygon(name) as geometry " \
|
||||
"FROM {0} LIMIT 1".format(
|
||||
self.env_variables['table_name'])
|
||||
geometry = IntegrationTestHelper.execute_query(self.sql_api_url, query)
|
||||
assert_not_equal(geometry, None)
|
||||
|
@ -1,11 +1,18 @@
|
||||
import getopt, sys, requests, time, json, subprocess, os
|
||||
import getopt
|
||||
import sys
|
||||
import requests
|
||||
import time
|
||||
import json
|
||||
import subprocess
|
||||
import os
|
||||
|
||||
|
||||
def main():
|
||||
opts, args = getopt.getopt(sys.argv[1:], "h", ["help", "host="])
|
||||
|
||||
if len(args) < 2:
|
||||
usage()
|
||||
sys.exit()
|
||||
usage()
|
||||
sys.exit()
|
||||
|
||||
host = "cartodb.com"
|
||||
username = args[0]
|
||||
@ -16,76 +23,95 @@ def main():
|
||||
usage()
|
||||
sys.exit()
|
||||
elif o in ("--host"):
|
||||
host = opts[0][1]
|
||||
host = opts[0][1]
|
||||
else:
|
||||
assert False, "unhandled option"
|
||||
|
||||
try:
|
||||
table_name = import_test_dataset(username, api_key, host)
|
||||
set_environment_variables(username, api_key, table_name, host)
|
||||
execute_tests()
|
||||
table_name = import_test_dataset(username, api_key, host)
|
||||
set_environment_variables(username, api_key, table_name, host)
|
||||
execute_tests()
|
||||
finally:
|
||||
clean_environment_variables()
|
||||
clean_test_dataset(username, api_key, table_name, host)
|
||||
clean_environment_variables()
|
||||
clean_test_dataset(username, api_key, table_name, host)
|
||||
|
||||
|
||||
def usage():
|
||||
print """Usage: run_tests.py [options] username api_key
|
||||
print """Usage: run_tests.py [options] username api_key
|
||||
Options:
|
||||
-h: Show this help
|
||||
--host: take that host as base (by default is cartodb.com). Eg. cartodb.com"""
|
||||
--host: take that host as base (by default is cartodb.com)"""
|
||||
|
||||
|
||||
def import_test_dataset(username, api_key, host):
|
||||
url = "https://{0}.{1}/api/v1/imports/?api_key={2}".format(username, host, api_key)
|
||||
dataset = {'file': open('fixtures/geocoder_api_test_dataset.csv', 'rb')}
|
||||
response = requests.post(url, files=dataset)
|
||||
response_json = json.loads(response.text)
|
||||
if not response_json['success']:
|
||||
print "Error importing the test dataset: {0}".format(response.text)
|
||||
sys.exit(1)
|
||||
while(True):
|
||||
table_name = get_imported_table_name(username, host, api_key, response_json['item_queue_id'])
|
||||
if table_name:
|
||||
return table_name
|
||||
else:
|
||||
time.sleep(5)
|
||||
url = "https://{0}.{1}/api/v1/imports/?api_key={2}".format(
|
||||
username, host, api_key
|
||||
)
|
||||
dataset = {'file': open('fixtures/geocoder_api_test_dataset.csv', 'rb')}
|
||||
response = requests.post(url, files=dataset)
|
||||
response_json = json.loads(response.text)
|
||||
if not response_json['success']:
|
||||
print "Error importing the test dataset: {0}".format(response.text)
|
||||
sys.exit(1)
|
||||
while(True):
|
||||
table_name = get_imported_table_name(
|
||||
username,
|
||||
host,
|
||||
api_key,
|
||||
response_json['item_queue_id']
|
||||
)
|
||||
if table_name:
|
||||
return table_name
|
||||
else:
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
def get_imported_table_name(username, host, api_key, import_id):
|
||||
import_data_url = "https://{0}.{1}/api/v1/imports/{2}?api_key={3}".format(
|
||||
username, host, import_id, api_key
|
||||
)
|
||||
import_data_response = requests.get(import_data_url)
|
||||
if import_data_response.status_code != 200:
|
||||
print "Error getting the table name from the import data: {0}".format(import_data_response.text)
|
||||
sys.exit(1)
|
||||
import_data_json = json.loads(import_data_response.text)
|
||||
return import_data_json['table_name']
|
||||
import_data_url = "https://{0}.{1}/api/v1/imports/{2}?api_key={3}".format(
|
||||
username, host, import_id, api_key
|
||||
)
|
||||
import_data_response = requests.get(import_data_url)
|
||||
if import_data_response.status_code != 200:
|
||||
print "Error getting the table name from the import data: {0}".format(
|
||||
import_data_response.text
|
||||
)
|
||||
sys.exit(1)
|
||||
import_data_json = json.loads(import_data_response.text)
|
||||
|
||||
return import_data_json['table_name']
|
||||
|
||||
|
||||
def execute_tests():
|
||||
print "Start testing..."
|
||||
process = subprocess.Popen(["nosetests", "--where=integration/"])
|
||||
process.wait()
|
||||
print "Testing finished!"
|
||||
print "Start testing..."
|
||||
process = subprocess.Popen(["nosetests", "--where=integration/"])
|
||||
process.wait()
|
||||
print "Testing finished!"
|
||||
|
||||
|
||||
def set_environment_variables(username, api_key, table_name, host):
|
||||
os.environ["GEOCODER_API_TEST_USERNAME"] = username
|
||||
os.environ["GEOCODER_API_TEST_API_KEY"] = api_key
|
||||
os.environ["GEOCODER_API_TEST_TABLE_NAME"] = table_name
|
||||
os.environ["GEOCODER_API_TEST_HOST"] = host
|
||||
os.environ["GEOCODER_API_TEST_USERNAME"] = username
|
||||
os.environ["GEOCODER_API_TEST_API_KEY"] = api_key
|
||||
os.environ["GEOCODER_API_TEST_TABLE_NAME"] = table_name
|
||||
os.environ["GEOCODER_API_TEST_HOST"] = host
|
||||
|
||||
|
||||
def clean_environment_variables():
|
||||
print "Cleaning test dataset environment variables..."
|
||||
del os.environ["GEOCODER_API_TEST_USERNAME"]
|
||||
del os.environ["GEOCODER_API_TEST_API_KEY"]
|
||||
del os.environ["GEOCODER_API_TEST_TABLE_NAME"]
|
||||
del os.environ["GEOCODER_API_TEST_HOST"]
|
||||
print "Cleaning test dataset environment variables..."
|
||||
del os.environ["GEOCODER_API_TEST_USERNAME"]
|
||||
del os.environ["GEOCODER_API_TEST_API_KEY"]
|
||||
del os.environ["GEOCODER_API_TEST_TABLE_NAME"]
|
||||
del os.environ["GEOCODER_API_TEST_HOST"]
|
||||
|
||||
|
||||
def clean_test_dataset(username, api_key, table_name, host):
|
||||
print "Cleaning test dataset {0}...".format(table_name)
|
||||
url = "https://{0}.{1}/api/v2/sql?q=drop table {2}&api_key={3}".format(username, host, table_name, api_key)
|
||||
response = requests.get(url)
|
||||
if response.status_code != 200:
|
||||
print "Error cleaning the test dataset: {0}".format(response.text)
|
||||
sys.exit(1)
|
||||
print "Cleaning test dataset {0}...".format(table_name)
|
||||
url = "https://{0}.{1}/api/v2/sql?q=drop table {2}&api_key={3}".format(
|
||||
username, host, table_name, api_key
|
||||
)
|
||||
response = requests.get(url)
|
||||
if response.status_code != 200:
|
||||
print "Error cleaning the test dataset: {0}".format(response.text)
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
main()
|
||||
|
Loading…
Reference in New Issue
Block a user