Merge pull request #57 from CartoDB/integration_test_automation
First iteration to automate smoke tests
This commit is contained in:
commit
c4dca9c713
0
test/__init__.py
Normal file
0
test/__init__.py
Normal file
3
test/fixtures/geocoder_api_test_dataset.csv
vendored
Normal file
3
test/fixtures/geocoder_api_test_dataset.csv
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
id,country,province,city,postalcode,ip
|
||||
1,Spain,Castilla y León,Valladolid,47010,8.8.8.8
|
||||
2,USA,New York,Manhattn,10001,8.8.8.8
|
|
0
test/helpers/__init__.py
Normal file
0
test/helpers/__init__.py
Normal file
45
test/helpers/import_helper.py
Normal file
45
test/helpers/import_helper.py
Normal file
@ -0,0 +1,45 @@
|
||||
import os
|
||||
import requests
|
||||
import json
|
||||
|
||||
import time
|
||||
|
||||
class ImportHelper:
|
||||
|
||||
@classmethod
|
||||
def import_test_dataset(cls, username, api_key, host):
|
||||
url = "https://{0}.{1}/api/v1/imports/"\
|
||||
"?type_guessing=false&api_key={2}".format(
|
||||
username, host, api_key)
|
||||
dataset = {
|
||||
'file': open('fixtures/geocoder_api_test_dataset.csv', 'rb')}
|
||||
response = requests.post(url, files=dataset)
|
||||
response_json = json.loads(response.text)
|
||||
if not response_json['success']:
|
||||
print "Error importing the test dataset: {0}".format(response.text)
|
||||
sys.exit(1)
|
||||
while(True):
|
||||
table_name = ImportHelper.get_imported_table_name(
|
||||
username,
|
||||
host,
|
||||
api_key,
|
||||
response_json['item_queue_id']
|
||||
)
|
||||
if table_name:
|
||||
return table_name
|
||||
else:
|
||||
time.sleep(5)
|
||||
|
||||
@classmethod
|
||||
def get_imported_table_name(cls, username, host, api_key, import_id):
|
||||
import_url = "https://{0}.{1}/api/v1/imports/{2}?api_key={3}".format(
|
||||
username, host, import_id, api_key)
|
||||
import_data_response = requests.get(import_url)
|
||||
if import_data_response.status_code != 200:
|
||||
print "Error getting the table name from " \
|
||||
"the import data: {0}".format(
|
||||
import_data_response.text)
|
||||
sys.exit(1)
|
||||
import_data_json = json.loads(import_data_response.text)
|
||||
|
||||
return import_data_json['table_name']
|
31
test/helpers/integration_test_helper.py
Normal file
31
test/helpers/integration_test_helper.py
Normal file
@ -0,0 +1,31 @@
|
||||
import os
|
||||
import requests
|
||||
import json
|
||||
|
||||
|
||||
class IntegrationTestHelper:
|
||||
|
||||
@classmethod
|
||||
def get_environment_variables(cls):
|
||||
username = os.environ["GEOCODER_API_TEST_USERNAME"]
|
||||
api_key = os.environ["GEOCODER_API_TEST_API_KEY"]
|
||||
host = os.environ["GEOCODER_API_TEST_HOST"]
|
||||
table_name = os.environ["GEOCODER_API_TEST_TABLE_NAME"]
|
||||
|
||||
return {
|
||||
"username": username,
|
||||
"api_key": api_key,
|
||||
"host": host,
|
||||
"table_name": table_name
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def execute_query(cls, sql_api_url, query):
|
||||
query_url = "{0}?q={1}".format(sql_api_url, query)
|
||||
print "Executing query: {0}".format(query_url)
|
||||
query_response = requests.get(query_url)
|
||||
if query_response.status_code != 200:
|
||||
raise Exception(json.loads(query_response.text)['error'])
|
||||
query_response_data = json.loads(query_response.text)
|
||||
|
||||
return query_response_data['rows'][0]['geometry']
|
0
test/integration/__init__.py
Normal file
0
test/integration/__init__.py
Normal file
32
test/integration/test_admin0_functions.py
Normal file
32
test/integration/test_admin0_functions.py
Normal file
@ -0,0 +1,32 @@
|
||||
from unittest import TestCase
|
||||
from nose.tools import assert_raises
|
||||
from nose.tools import assert_not_equal, assert_equal
|
||||
from ..helpers.integration_test_helper import IntegrationTestHelper
|
||||
|
||||
|
||||
class TestAdmin0Functions(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.env_variables = IntegrationTestHelper.get_environment_variables()
|
||||
self.sql_api_url = "https://{0}.{1}/api/v2/sql".format(
|
||||
self.env_variables['username'],
|
||||
self.env_variables['host'],
|
||||
self.env_variables['api_key']
|
||||
)
|
||||
|
||||
def test_if_select_with_admin0_is_ok(self):
|
||||
query = "SELECT cdb_geocode_admin0_polygon(country) as geometry " \
|
||||
"FROM {0} LIMIT 1&api_key={1}".format(
|
||||
self.env_variables['table_name'],
|
||||
self.env_variables['api_key'])
|
||||
geometry = IntegrationTestHelper.execute_query(self.sql_api_url, query)
|
||||
assert_not_equal(geometry, None)
|
||||
|
||||
def test_if_select_with_admin0_without_api_key_raise_error(self):
|
||||
query = "SELECT cdb_geocode_admin0_polygon(country) as geometry " \
|
||||
"FROM {0} LIMIT 1".format(
|
||||
self.env_variables['table_name'])
|
||||
try:
|
||||
IntegrationTestHelper.execute_query(self.sql_api_url, query)
|
||||
except Exception as e:
|
||||
assert_equal(e.message[0], "The api_key must be provided")
|
40
test/integration/test_admin1_functions.py
Normal file
40
test/integration/test_admin1_functions.py
Normal file
@ -0,0 +1,40 @@
|
||||
from unittest import TestCase
|
||||
from nose.tools import assert_raises
|
||||
from nose.tools import assert_not_equal, assert_equal
|
||||
from ..helpers.integration_test_helper import IntegrationTestHelper
|
||||
|
||||
|
||||
class TestAdmin1Functions(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.env_variables = IntegrationTestHelper.get_environment_variables()
|
||||
self.sql_api_url = "https://{0}.{1}/api/v2/sql".format(
|
||||
self.env_variables['username'],
|
||||
self.env_variables['host'],
|
||||
self.env_variables['api_key']
|
||||
)
|
||||
|
||||
def test_if_select_with_admin1_without_country_is_ok(self):
|
||||
query = "SELECT cdb_geocode_admin1_polygon(province) as geometry " \
|
||||
"FROM {0} LIMIT 1&api_key={1}".format(
|
||||
self.env_variables['table_name'],
|
||||
self.env_variables['api_key'])
|
||||
geometry = IntegrationTestHelper.execute_query(self.sql_api_url, query)
|
||||
assert_not_equal(geometry, None)
|
||||
|
||||
def test_if_select_with_admin1_with_country_is_ok(self):
|
||||
query = "SELECT cdb_geocode_admin1_polygon(province,country)" \
|
||||
"as geometry FROM {0} LIMIT 1&api_key={1}".format(
|
||||
self.env_variables['table_name'],
|
||||
self.env_variables['api_key'])
|
||||
geometry = IntegrationTestHelper.execute_query(self.sql_api_url, query)
|
||||
assert_not_equal(geometry, None)
|
||||
|
||||
def test_if_select_with_admin1_without_api_key_raise_error(self):
|
||||
query = "SELECT cdb_geocode_admin1_polygon(province) as geometry " \
|
||||
"FROM {0} LIMIT 1".format(
|
||||
self.env_variables['table_name'])
|
||||
try:
|
||||
IntegrationTestHelper.execute_query(self.sql_api_url, query)
|
||||
except Exception as e:
|
||||
assert_equal(e.message[0], "The api_key must be provided")
|
32
test/integration/test_ipaddress_functions.py
Normal file
32
test/integration/test_ipaddress_functions.py
Normal file
@ -0,0 +1,32 @@
|
||||
from unittest import TestCase
|
||||
from nose.tools import assert_raises
|
||||
from nose.tools import assert_not_equal, assert_equal
|
||||
from ..helpers.integration_test_helper import IntegrationTestHelper
|
||||
|
||||
|
||||
class TestPostalcodeFunctions(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.env_variables = IntegrationTestHelper.get_environment_variables()
|
||||
self.sql_api_url = "https://{0}.{1}/api/v2/sql".format(
|
||||
self.env_variables['username'],
|
||||
self.env_variables['host'],
|
||||
self.env_variables['api_key']
|
||||
)
|
||||
|
||||
def test_if_select_with_ipaddress_point_is_ok(self):
|
||||
query = "SELECT cdb_geocode_ipaddress_point(ip) " \
|
||||
"as geometry FROM {0} LIMIT 1&api_key={1}".format(
|
||||
self.env_variables['table_name'],
|
||||
self.env_variables['api_key'])
|
||||
geometry = IntegrationTestHelper.execute_query(self.sql_api_url, query)
|
||||
assert_not_equal(geometry, None)
|
||||
|
||||
def test_if_select_with_ipaddress_without_api_key_raise_error(self):
|
||||
query = "SELECT cdb_geocode_ipaddress_point(ip) " \
|
||||
"as geometry FROM {0} LIMIT 1".format(
|
||||
self.env_variables['table_name'])
|
||||
try:
|
||||
IntegrationTestHelper.execute_query(self.sql_api_url, query)
|
||||
except Exception as e:
|
||||
assert_equal(e.message[0], "The api_key must be provided")
|
48
test/integration/test_namedplace_functions.py
Normal file
48
test/integration/test_namedplace_functions.py
Normal file
@ -0,0 +1,48 @@
|
||||
from unittest import TestCase
|
||||
from nose.tools import assert_raises
|
||||
from nose.tools import assert_not_equal, assert_equal
|
||||
from ..helpers.integration_test_helper import IntegrationTestHelper
|
||||
|
||||
|
||||
class TestNameplaceFunctions(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.env_variables = IntegrationTestHelper.get_environment_variables()
|
||||
self.sql_api_url = "https://{0}.{1}/api/v2/sql".format(
|
||||
self.env_variables['username'],
|
||||
self.env_variables['host'],
|
||||
self.env_variables['api_key']
|
||||
)
|
||||
|
||||
def test_if_select_with_namedplace_city_is_ok(self):
|
||||
query = "SELECT cdb_geocode_namedplace_point(city) as geometry " \
|
||||
"FROM {0} LIMIT 1&api_key={1}".format(
|
||||
self.env_variables['table_name'],
|
||||
self.env_variables['api_key'])
|
||||
geometry = IntegrationTestHelper.execute_query(self.sql_api_url, query)
|
||||
assert_not_equal(geometry, None)
|
||||
|
||||
def test_if_select_with_namedplace_city_country_is_ok(self):
|
||||
query = "SELECT cdb_geocode_namedplace_point(city,country) " \
|
||||
"as geometry FROM {0} LIMIT 1&api_key={1}".format(
|
||||
self.env_variables['table_name'],
|
||||
self.env_variables['api_key'])
|
||||
geometry = IntegrationTestHelper.execute_query(self.sql_api_url, query)
|
||||
assert_not_equal(geometry, None)
|
||||
|
||||
def test_if_select_with_namedplace_city_province_country_is_ok(self):
|
||||
query = "SELECT cdb_geocode_namedplace_point(city,province,country) " \
|
||||
"as geometry FROM {0} LIMIT 1&api_key={1}".format(
|
||||
self.env_variables['table_name'],
|
||||
self.env_variables['api_key'])
|
||||
geometry = IntegrationTestHelper.execute_query(self.sql_api_url, query)
|
||||
assert_not_equal(geometry, None)
|
||||
|
||||
def test_if_select_with_namedplace_without_api_key_raise_error(self):
|
||||
query = "SELECT cdb_geocode_namedplace_point(city) as geometry " \
|
||||
"FROM {0} LIMIT 1".format(
|
||||
self.env_variables['table_name'])
|
||||
try:
|
||||
IntegrationTestHelper.execute_query(self.sql_api_url, query)
|
||||
except Exception as e:
|
||||
assert_equal(e.message[0], "The api_key must be provided")
|
40
test/integration/test_postalcode_functions.py
Normal file
40
test/integration/test_postalcode_functions.py
Normal file
@ -0,0 +1,40 @@
|
||||
from unittest import TestCase
|
||||
from nose.tools import assert_raises
|
||||
from nose.tools import assert_not_equal, assert_equal
|
||||
from ..helpers.integration_test_helper import IntegrationTestHelper
|
||||
|
||||
|
||||
class TestPostalcodeFunctions(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.env_variables = IntegrationTestHelper.get_environment_variables()
|
||||
self.sql_api_url = "https://{0}.{1}/api/v2/sql".format(
|
||||
self.env_variables['username'],
|
||||
self.env_variables['host'],
|
||||
self.env_variables['api_key']
|
||||
)
|
||||
|
||||
def test_if_select_with_postalcode_polygon_is_ok(self):
|
||||
query = "SELECT cdb_geocode_postalcode_polygon(postalcode, country) " \
|
||||
"as geometry FROM {0} WHERE country='USA'&api_key={1}".format(
|
||||
self.env_variables['table_name'],
|
||||
self.env_variables['api_key'])
|
||||
geometry = IntegrationTestHelper.execute_query(self.sql_api_url, query)
|
||||
assert_not_equal(geometry, None)
|
||||
|
||||
def test_if_select_with_postalcode_point_is_ok(self):
|
||||
query = "SELECT cdb_geocode_postalcode_point(postalcode, country) " \
|
||||
"as geometry FROM {0} WHERE country='Spain'&api_key={1}".format(
|
||||
self.env_variables['table_name'],
|
||||
self.env_variables['api_key'])
|
||||
geometry = IntegrationTestHelper.execute_query(self.sql_api_url, query)
|
||||
assert_not_equal(geometry, None)
|
||||
|
||||
def test_if_select_with_postalcode_without_api_key_raise_error(self):
|
||||
query = "SELECT cdb_geocode_postalcode_polygon(postalcode, country) " \
|
||||
"as geometry FROM {0} WHERE country = 'USA'".format(
|
||||
self.env_variables['table_name'])
|
||||
try:
|
||||
IntegrationTestHelper.execute_query(self.sql_api_url, query)
|
||||
except Exception as e:
|
||||
assert_equal(e.message[0], "The api_key must be provided")
|
83
test/run_tests.py
Normal file
83
test/run_tests.py
Normal file
@ -0,0 +1,83 @@
|
||||
import getopt
|
||||
import sys
|
||||
import requests
|
||||
import time
|
||||
import json
|
||||
import subprocess
|
||||
import os
|
||||
from helpers.import_helper import ImportHelper
|
||||
|
||||
|
||||
def main():
|
||||
opts, args = getopt.getopt(sys.argv[1:], "h", ["help", "host="])
|
||||
|
||||
if len(args) < 2:
|
||||
usage()
|
||||
sys.exit()
|
||||
|
||||
host = "cartodb.com"
|
||||
username = args[0]
|
||||
api_key = args[1]
|
||||
table_name = "geocoder_api_test_dataset_".format(int(time.time()))
|
||||
for o, a in opts:
|
||||
if o in ("-h", "--help"):
|
||||
usage()
|
||||
sys.exit()
|
||||
elif o in ("--host"):
|
||||
host = opts[0][1]
|
||||
else:
|
||||
assert False, "unhandled option"
|
||||
|
||||
try:
|
||||
table_name = ImportHelper.import_test_dataset(username, api_key, host)
|
||||
set_environment_variables(username, api_key, table_name, host)
|
||||
execute_tests()
|
||||
except Exception as e:
|
||||
print e.message
|
||||
sys.exit(1)
|
||||
finally:
|
||||
clean_environment_variables()
|
||||
clean_test_dataset(username, api_key, table_name, host)
|
||||
|
||||
|
||||
def usage():
|
||||
print """Usage: run_tests.py [options] username api_key
|
||||
Options:
|
||||
-h: Show this help
|
||||
--host: take that host as base (by default is cartodb.com)"""
|
||||
|
||||
|
||||
def execute_tests():
|
||||
print "Start testing..."
|
||||
process = subprocess.Popen(["nosetests", "--where=integration/"])
|
||||
process.wait()
|
||||
print "Testing finished!"
|
||||
|
||||
|
||||
def set_environment_variables(username, api_key, table_name, host):
|
||||
os.environ["GEOCODER_API_TEST_USERNAME"] = username
|
||||
os.environ["GEOCODER_API_TEST_API_KEY"] = api_key
|
||||
os.environ["GEOCODER_API_TEST_TABLE_NAME"] = table_name
|
||||
os.environ["GEOCODER_API_TEST_HOST"] = host
|
||||
|
||||
|
||||
def clean_environment_variables():
|
||||
print "Cleaning test dataset environment variables..."
|
||||
del os.environ["GEOCODER_API_TEST_USERNAME"]
|
||||
del os.environ["GEOCODER_API_TEST_API_KEY"]
|
||||
del os.environ["GEOCODER_API_TEST_TABLE_NAME"]
|
||||
del os.environ["GEOCODER_API_TEST_HOST"]
|
||||
|
||||
|
||||
def clean_test_dataset(username, api_key, table_name, host):
|
||||
print "Cleaning test dataset {0}...".format(table_name)
|
||||
url = "https://{0}.{1}/api/v2/sql?q=drop table {2}&api_key={3}".format(
|
||||
username, host, table_name, api_key
|
||||
)
|
||||
response = requests.get(url)
|
||||
if response.status_code != 200:
|
||||
print "Error cleaning the test dataset: {0}".format(response.text)
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in New Issue
Block a user