124 lines
4.5 KiB
Ruby
124 lines
4.5 KiB
Ruby
module Carto
|
||
class DoLicensingService
|
||
|
||
AVAILABLE_STORAGES = %w(bq bigtable carto).freeze
|
||
PRESELECTED_STORAGE = 'bq'.freeze
|
||
|
||
def initialize(username)
|
||
@user = Carto::User.find_by(username: username)
|
||
@doss = Carto::DoSyncServiceFactory.get_for_user(@user)
|
||
@redis_key = "do:#{@user.username}:datasets"
|
||
end
|
||
|
||
def subscribe(dataset)
|
||
Cartodb::Central.new.create_do_datasets(username: @user.username, datasets: [dataset])
|
||
add_to_redis(dataset)
|
||
end
|
||
|
||
def unsubscribe(dataset_id)
|
||
Cartodb::Central.new.remove_do_dataset(username: @user.username, id: dataset_id)
|
||
remove_from_redis(dataset_id)
|
||
end
|
||
|
||
def subscriptions
|
||
JSON.parse($users_metadata.hget(@redis_key, PRESELECTED_STORAGE) || '[]').map { |s| present_subscription(s) }
|
||
end
|
||
|
||
def subscription(subscription_id)
|
||
subscriptions.find{ |s| s['id'] == subscription_id}
|
||
end
|
||
|
||
def add_to_redis(dataset)
|
||
value = AVAILABLE_STORAGES.map { |storage| [storage, insert_redis_value(dataset, storage)] }.flatten
|
||
$users_metadata.hmset(@redis_key, value)
|
||
end
|
||
|
||
def remove_from_redis(dataset_id)
|
||
value = AVAILABLE_STORAGES.map { |storage| [storage, remove_redis_value(dataset_id, storage)] }.flatten
|
||
$users_metadata.hmset(@redis_key, value)
|
||
end
|
||
|
||
def get_sync_status(subscription_id)
|
||
return @doss.sync(subscription_id)
|
||
end
|
||
|
||
private
|
||
|
||
def present_subscription(subscription)
|
||
parsed_entity_id = @doss.parsed_entity_id(subscription['dataset_id'])
|
||
expires_at = Time.parse(subscription['expires_at']) if subscription['expires_at'].present?
|
||
subscription_data = subscription.merge(parsed_entity_id).merge({
|
||
id: subscription['dataset_id'],
|
||
status: (expires_at && (Time.now >= expires_at)) ? 'expired' : subscription['status']
|
||
})
|
||
subscription_data.with_indifferent_access
|
||
end
|
||
|
||
def insert_redis_value(dataset, storage)
|
||
redis_value = JSON.parse($users_metadata.hget(@redis_key, storage) || '[]')
|
||
if dataset[:available_in].include?(storage)
|
||
# Remove a previous dataset if exists
|
||
redis_value = redis_value.reject { |d| d['dataset_id'] == dataset[:dataset_id] }
|
||
|
||
# Initial sync status
|
||
sync_status = dataset[:sync_status]
|
||
unsyncable_reason = dataset[:unsyncable_reason]
|
||
entity_info = (dataset[:status] != 'requested') ? get_entity_info(dataset[:dataset_id]) : {}
|
||
if sync_status.nil? then
|
||
sync_status, unsyncable_reason = get_initial_sync_status(dataset, entity_info)
|
||
end
|
||
|
||
# Create the new entry
|
||
new_value = [{
|
||
dataset_id: dataset[:dataset_id],
|
||
created_at: dataset[:created_at].to_s,
|
||
expires_at: dataset[:expires_at].to_s,
|
||
status: dataset[:status],
|
||
available_in: dataset[:available_in],
|
||
type: dataset[:type],
|
||
estimated_size: entity_info[:estimated_size].to_i || 0,
|
||
estimated_row_count: entity_info[:estimated_row_count].to_i || 0,
|
||
estimated_columns_count: entity_info[:estimated_columns_count].to_i || 0,
|
||
num_bytes: entity_info[:num_bytes].to_i || 0,
|
||
sync_status: sync_status,
|
||
unsyncable_reason: unsyncable_reason,
|
||
unsynced_errors: dataset[:unsynced_errors] || nil,
|
||
sync_table: dataset[:sync_table] || nil,
|
||
sync_table_id: dataset[:sync_table_id] || nil,
|
||
synchronization_id: dataset[:synchronization_id] || nil
|
||
}]
|
||
# Append to the current one
|
||
redis_value = redis_value + new_value
|
||
end
|
||
|
||
redis_value.to_json
|
||
end
|
||
|
||
def remove_redis_value(dataset_id, storage)
|
||
redis_value = JSON.parse($users_metadata.hget(@redis_key, storage) || '[]')
|
||
redis_value.reject { |dataset| dataset["dataset_id"] == dataset_id }.to_json
|
||
end
|
||
|
||
def get_initial_sync_status(dataset, entity_info)
|
||
sync_info = @doss.check_syncable(dataset) || @doss.check_sync_limits(dataset.merge({
|
||
estimated_size: entity_info[:estimated_size].to_i || 0,
|
||
estimated_row_count: entity_info[:estimated_row_count].to_i || 0,
|
||
estimated_columns_count: entity_info[:estimated_columns_count].to_i || 0,
|
||
num_bytes: entity_info[:num_bytes].to_i || 0
|
||
}))
|
||
if sync_info then
|
||
return sync_info[:sync_status], sync_info[:unsyncable_reason]
|
||
elsif dataset[:status] == 'requested' then
|
||
return 'unsynced', nil
|
||
else
|
||
return 'syncing', nil
|
||
end
|
||
end
|
||
|
||
def get_entity_info(dataset_id)
|
||
@doss.entity_info(dataset_id)
|
||
end
|
||
|
||
end
|
||
end
|