Cached redis connections
This commit is contained in:
parent
904336a298
commit
a375e9dcfb
@ -5,15 +5,25 @@ class QuotaService:
|
||||
""" Class to manage all the quota operation for the Geocoder SQL API Extension """
|
||||
|
||||
GEOCODING_QUOTA_KEY = "geocoding_quota"
|
||||
REDIS_CONNECTION_KEY = "redis_connection"
|
||||
REDIS_CONNECTION_HOST = "redis_host"
|
||||
REDIS_CONNECTION_PORT = "redis_port"
|
||||
REDIS_CONNECTION_DB = "redis_db"
|
||||
|
||||
def __init__(self, logger, user_id, transaction_id, redis_host='localhost', redis_port=6379, redis_db = 5):
|
||||
def __init__(self, logger, user_id, transaction_id, **kwargs):
|
||||
self.logger = logger
|
||||
self.user_id = user_id
|
||||
self.transaction_id = transaction_id
|
||||
self.redis_host = redis_host
|
||||
self.redis_port = redis_port
|
||||
self.redis_db = redis_db
|
||||
self.redis_conn = self.__get_redis_connection()
|
||||
self.cache = {}
|
||||
|
||||
if self.REDIS_CONNECTION_KEY in kwargs:
|
||||
self.redis_connection = self.__get_redis_connection(redis_connection=kwargs[self.REDIS_CONNECTION_KEY])
|
||||
else:
|
||||
if self.REDIS_CONNECTION_HOST not in kwargs:
|
||||
raise "You have to provide redis configuration"
|
||||
redis_config = self.__build_redis_config(kwargs)
|
||||
self.redis_connection = self.__get_redis_connection(redis_config = redis_config)
|
||||
|
||||
|
||||
def check_user_quota(self):
|
||||
""" Check if the current user quota surpasses the current quota """
|
||||
@ -25,25 +35,51 @@ class QuotaService:
|
||||
|
||||
def get_user_quota(self):
|
||||
# Check for exceptions or redis timeout
|
||||
user_quota = self.redis_conn.hget(self.__get_user_redis_key(), self.GEOCODING_QUOTA_KEY)
|
||||
return int(user_quota)
|
||||
user_quota = self.redis_connection.hget(self.__get_user_redis_key(), self.GEOCODING_QUOTA_KEY)
|
||||
return int(user_quota) if user_quota else 0
|
||||
|
||||
def get_current_used_quota(self):
|
||||
""" Recover the used quota for the user in the current month """
|
||||
# Check for exceptions or redis timeout
|
||||
current_used = 0
|
||||
for _, value in self.redis_conn.hscan_iter(self.__get_month_redis_key()):
|
||||
for _, value in self.redis_connection.hscan_iter(self.__get_month_redis_key()):
|
||||
current_used += int(value)
|
||||
return current_used
|
||||
|
||||
def increment_georeference_use(self, amount=1):
|
||||
""" Increment the geocoder use in 1 """
|
||||
# TODO Manage exceptions or timeout
|
||||
self.redis_conn.hincrby(self.__get_month_redis_key(), self.transaction_id,amount)
|
||||
self.redis_connection.hincrby(self.__get_month_redis_key(), self.transaction_id,amount)
|
||||
|
||||
def __get_redis_connection(self):
|
||||
pool = redis.ConnectionPool(host=self.redis_host, port=self.redis_port, db=self.redis_db)
|
||||
return redis.Redis(connection_pool=pool)
|
||||
def get_redis_connection(self):
|
||||
return self.redis_connection
|
||||
|
||||
def __get_redis_connection(self, redis_connection=None, redis_config=None):
|
||||
if redis_connection:
|
||||
self.__add_redis_connection_to_cache(redis_connection)
|
||||
conn = redis_connection
|
||||
else:
|
||||
conn = self.__create_redis_connection(redis_config)
|
||||
|
||||
return conn
|
||||
|
||||
def __create_redis_connection(self, redis_config):
|
||||
# Pool not needed
|
||||
# Try to not create a connection every time, add it to a cache
|
||||
self.logger.debug("Connecting to redis...")
|
||||
pool = redis.ConnectionPool(host=redis_config['host'], port=redis_config['port'], db=redis_config['db'])
|
||||
conn = redis.Redis(connection_pool=pool)
|
||||
self.__add_redis_connection_to_cache(conn)
|
||||
return conn
|
||||
|
||||
def __build_redis_config(self, config):
|
||||
redis_host = config[self.REDIS_CONNECTION_HOST] if self.REDIS_CONNECTION_HOST in config else 'localhost'
|
||||
redis_port = config[self.REDIS_CONNECTION_PORT] if self.REDIS_CONNECTION_PORT in config else 6379
|
||||
redis_db = config[self.REDIS_CONNECTION_DB] if self.REDIS_CONNECTION_DB in config else 5
|
||||
return {'host': redis_host, 'port': redis_port, 'db': redis_db}
|
||||
|
||||
def __add_redis_connection_to_cache(self, connection):
|
||||
""" Cache the redis connection to avoid reach the limit of connections """
|
||||
self.cache = {self.transaction_id: {self.REDIS_CONNECTION_KEY: connection}}
|
||||
|
||||
def __get_month_redis_key(self):
|
||||
today = date.today()
|
||||
|
@ -10,15 +10,21 @@ $$
|
||||
LOG_FILENAME = '/tmp/plpython.log'
|
||||
logging.basicConfig(filename=LOG_FILENAME,level=logging.DEBUG)
|
||||
|
||||
qs = quota_service.QuotaService(logging, user_id, tx_id)
|
||||
if user_id in SD and tx_id in SD[user_id] and 'redis_connection' in SD[user_id][tx_id]:
|
||||
logging.debug("Using redis cached connection...")
|
||||
qs = quota_service.QuotaService(logging, user_id, tx_id, redis_connection=SD[user_id][tx_id]['redis_connection'])
|
||||
else:
|
||||
qs = quota_service.QuotaService(logging, user_id, tx_id, redis_host='localhost', redis_port=6379, redis_db=5)
|
||||
|
||||
if qs.check_user_quota():
|
||||
result = plpy.execute("SELECT geom FROM geocode_admin0_polygons(Array[\'{0}\']::text[])".format(search))
|
||||
logging.debug("Number of rows: {0} --- Status: {1}".format(result.nrows(), result.status()))
|
||||
if result.status() == 5 and result.nrows() == 1:
|
||||
qs.increment_georeference_use()
|
||||
SD[user_id] = {tx_id: {'redis_connection': qs.get_redis_connection()}}
|
||||
return result[0]["geom"]
|
||||
else:
|
||||
raise Exception('Something wrong with the georefence operation')
|
||||
else:
|
||||
raise Exception('Not enough quota for this user')
|
||||
|
||||
$$ LANGUAGE plpythonu;
|
Loading…
Reference in New Issue
Block a user