# -*- coding: utf-8 -*-
"""
Copyright (c) 2017 beyond-blockchain.org.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import json
import hashlib
import time
import threading
import queue
import os
import sys
sys.path.extend(["../../", os.path.abspath(os.path.dirname(__file__))])
from bbc1.core.data_handler import DataHandler
from bbc1.core.bbc_stats import BBcStats
from bbc1.core import bbclib
from bbc1.core.message_key_types import PayloadType, KeyType, InfraMessageCategory
from bbc1.core import logger
[docs]class RepairManager:
"""Data repair manager for forged transaction/asset"""
REQUEST_REPAIR_TRANSACTION = 0
REQUEST_REPAIR_ASSET_FILE = 1
REQUEST_TO_SEND_TRANSACTION_DATA = 2
RESPONSE_TRANSACTION_DATA = 3
REQUEST_TO_SEND_ASSET_FILE = 4
RESPONSE_ASSET_FILE = 5
def __init__(self, network=None, domain_id=None, workingdir=".", loglevel="all", logname=None):
if network is not None:
self.network = network
self.core = network.core
self.stats = network.core.stats
self.data_handler = network.domains[domain_id]['data']
else:
self.stats = BBcStats()
self.repair_log = os.path.join(workingdir, domain_id.hex(), "repair_log.json")
self.logger = logger.get_logger(key="repair_manager", level=loglevel, logname=logname)
self.domain_id = domain_id
self.queue = queue.Queue()
self.requesting_list = dict()
self.loop_flag = True
th_nw_loop = threading.Thread(target=self._manager_loop)
th_nw_loop.setDaemon(True)
th_nw_loop.start()
def _output_log(self, repair_info):
"""Output log in json format"""
with open(self.repair_log, "a") as f:
f.write(json.dumps(repair_info)+"\n")
[docs] def exit_loop(self):
"""Exit the manager loop"""
self.loop_flag = False
self.put_message()
def _manager_loop(self):
"""Main loop"""
while self.loop_flag:
msg = self.queue.get()
if msg is None:
continue
if msg[KeyType.command] == RepairManager.REQUEST_REPAIR_TRANSACTION:
self._repair_transaction_data(msg[KeyType.transaction_id])
elif msg[KeyType.command] == RepairManager.REQUEST_REPAIR_ASSET_FILE:
self._repair_asset_file(msg[KeyType.asset_group_id], msg[KeyType.asset_id])
elif msg[KeyType.command] == RepairManager.REQUEST_TO_SEND_TRANSACTION_DATA:
self._send_transaction_data(msg)
elif msg[KeyType.command] == RepairManager.RESPONSE_TRANSACTION_DATA:
self._receive_transaction_data_from_others(msg)
elif msg[KeyType.command] == RepairManager.REQUEST_TO_SEND_ASSET_FILE:
self._send_asset_file(msg)
elif msg[KeyType.command] == RepairManager.RESPONSE_ASSET_FILE:
self._receive_asset_file_from_others(msg)
[docs] def put_message(self, msg=None):
"""append a message to the queue"""
self.queue.put(msg)
def _repair_transaction_data(self, transaction_id):
"""Repair forged transaction_data or asset_file by getting legitimate one from other nodes
Args:
transaction_id (bytes): target transaction_id
"""
#print("_repair_transaction_data:")
self.stats.update_stats_increment("transaction", "repair_request", 1)
forged_asset_files = set()
if len(self.data_handler.db_adaptors) > 1:
valid_txobj = None
db_nums_with_invalid_data = list()
for idx in range(1, len(self.data_handler.db_adaptors)):
result_txobj, result_asset_files = self.data_handler.search_transaction(transaction_id=transaction_id, db_num=idx)
txobj_is_valid, valid_assets, invalid_assets = bbclib.validate_transaction_object(result_txobj[0],
result_asset_files)
if txobj_is_valid and valid_txobj is None:
valid_txobj = result_txobj[0]
if not txobj_is_valid:
db_nums_with_invalid_data.append(idx)
if len(invalid_assets) > 0:
for ent in invalid_assets:
forged_asset_files.add(ent)
if valid_txobj is None:
self.stats.update_stats_increment("transaction", "fail_to_repair_in_local", 1)
self.logger.fatal("Failed to repair transaction locally (transaction_id=%s in domain=%s)" %
(transaction_id.hex(), self.domain_id.hex()))
else:
for i in db_nums_with_invalid_data:
self.data_handler.restore_transaction_data(db_num=i, transaction_id=transaction_id, txobj=valid_txobj)
self.stats.update_stats_increment("transaction", "success_repair", 1)
self._output_log({"transaction_id": transaction_id.hex(), "request_at": int(time.time()),
"repaired_by": "locally", "repaired_at": int(time.time())})
if len(forged_asset_files) > 0:
for asgid, ast in forged_asset_files:
self._repair_asset_file(asset_group_id=asgid, asset_id=ast, need_check=False)
if self.data_handler.replication_strategy == DataHandler.REPLICATION_EXT:
return
random_nonce = bbclib.get_random_value(4)
while random_nonce in self.requesting_list:
random_nonce = bbclib.get_random_value(4)
self.requesting_list[random_nonce] = {
"transaction_id": transaction_id.hex(),
"request_at": int(time.time())
}
msg = {
KeyType.domain_id: self.domain_id,
KeyType.infra_msg_type: InfraMessageCategory.CATEGORY_DATA,
KeyType.infra_command: DataHandler.REPAIR_TRANSACTION_DATA,
KeyType.command: RepairManager.REQUEST_TO_SEND_TRANSACTION_DATA,
KeyType.transaction_id: transaction_id,
KeyType.nonce: random_nonce,
}
self.network.broadcast_message_in_network(domain_id=self.domain_id,
payload_type=PayloadType.Type_any, msg=msg)
return
def _repair_asset_file(self, asset_group_id, asset_id, need_check=True):
"""Repair forged asset_file by getting legitimate one from other nodes
Args:
asset_group_id (bytes): asset_group_id of the asset
asset_id (bytes): asset_id of the asset
need_check (bool): If True, check the digest of the asset file
"""
#print("_repair_asset_file:")
if self.data_handler.use_external_storage:
return
if need_check:
asset_file = self.data_handler.get_in_storage(asset_group_id, asset_id)
if asset_file is not None and asset_id == hashlib.sha256(asset_file).digest():
return
random_nonce = bbclib.get_random_value(4)
while random_nonce in self.requesting_list:
random_nonce = bbclib.get_random_value(4)
self.requesting_list[random_nonce] = {
"asset_group_id": asset_group_id.hex(),
"asset_id": asset_id.hex(),
"request_at": int(time.time())
}
msg = {
KeyType.domain_id: self.domain_id,
KeyType.infra_msg_type: InfraMessageCategory.CATEGORY_DATA,
KeyType.infra_command: DataHandler.REPAIR_TRANSACTION_DATA,
KeyType.command: RepairManager.REQUEST_TO_SEND_ASSET_FILE,
KeyType.asset_group_id: asset_group_id,
KeyType.asset_id: asset_id,
KeyType.nonce: random_nonce,
}
self.network.broadcast_message_in_network(domain_id=self.domain_id,
payload_type=PayloadType.Type_any, msg=msg)
def _send_transaction_data(self, dat):
"""Send transaction data if having valid one"""
#print("_send_transaction_data::")
transaction_id = dat[KeyType.transaction_id]
for idx in range(len(self.data_handler.db_adaptors)):
result_txobj, result_asset_files = self.data_handler.search_transaction(transaction_id=transaction_id, db_num=idx)
txobj_is_valid, valid_assets, invalid_assets = bbclib.validate_transaction_object(result_txobj[transaction_id])
if txobj_is_valid:
dat[KeyType.command] = RepairManager.RESPONSE_TRANSACTION_DATA
dat[KeyType.transaction_data] = result_txobj[transaction_id].transaction_data
dat[KeyType.destination_node_id] = dat[KeyType.source_node_id]
self.network.send_message_in_network(None, domain_id=self.domain_id, msg=dat)
return
def _receive_transaction_data_from_others(self, dat):
"""Receive transaction data from other core_nodes and check its validity
Args:
dat (dict): received message
"""
#print("_receive_transaction_data_from_others:")
if KeyType.transaction_data not in dat or KeyType.transaction_id not in dat or KeyType.nonce not in dat:
return
if dat[KeyType.nonce] not in self.requesting_list:
return
asset_files = dict()
if KeyType.all_asset_files in dat:
asset_files = dat[KeyType.all_asset_files]
txobj = bbclib.BBcTransaction(deserialize=dat[KeyType.transaction_data])
if txobj.transaction_data is None:
return
txobj_is_valid, valid_assets, invalid_assets = bbclib.validate_transaction_object(txobj, asset_files)
if txobj_is_valid:
self.stats.update_stats_increment("transaction", "success_repair", 1)
for idx in range(len(self.data_handler.db_adaptors)):
self.data_handler.restore_transaction_data(db_num=idx, transaction_id=txobj.transaction_id, txobj=txobj)
add_info = {
"repaired_by": dat[KeyType.source_node_id].hex(),
"repaired_at": int(time.time())
}
self.requesting_list[dat[KeyType.nonce]].update(add_info)
self._output_log(self.requesting_list[dat[KeyType.nonce]])
del self.requesting_list[dat[KeyType.nonce]]
def _send_asset_file(self, dat):
"""Send the asset file if having valid one
Args:
dat (dict): received message
"""
#print("_send_asset_file::")
asset_group_id = dat[KeyType.asset_group_id]
asset_id = dat[KeyType.asset_id]
asset_file = self.data_handler.get_in_storage(asset_group_id, asset_id)
if asset_file is None:
return
result_txobj, result_asset_files = self.data_handler.search_transaction(asset_group_id=asset_group_id,
asset_id=asset_id)
txobj = next(iter(result_txobj.values()))
txobj_is_valid, valid_assets, invalid_assets = bbclib.validate_transaction_object(txobj, result_asset_files)
if (asset_group_id, asset_id) in valid_assets:
dat[KeyType.command] = RepairManager.RESPONSE_ASSET_FILE
dat[KeyType.asset_group_id] = asset_group_id
dat[KeyType.asset_id] = asset_id
dat[KeyType.asset_file] = asset_file
dat[KeyType.destination_node_id] = dat[KeyType.source_node_id]
self.network.send_message_in_network(None, domain_id=self.domain_id, msg=dat)
def _receive_asset_file_from_others(self, dat):
"""Receive asset file from other core_nodes and check its validity
Args:
dat (dict): received message
"""
#print("_receive_asset_file_from_others:")
if KeyType.nonce not in dat or dat[KeyType.nonce] not in self.requesting_list:
return
if KeyType.asset_group_id not in dat or KeyType.asset_id not in dat or KeyType.asset_file not in dat:
return
asset_group_id = dat[KeyType.asset_group_id]
asset_id = dat[KeyType.asset_id]
asset_file = dat[KeyType.asset_file]
if asset_file is None:
return
asset_files = {asset_id: asset_file}
result_txobj, result_asset_files = self.data_handler.search_transaction(asset_group_id=asset_group_id,
asset_id=asset_id)
txobj = next(iter(result_txobj.values()))
txobj_is_valid, valid_assets, invalid_assets = bbclib.validate_transaction_object(txobj, asset_files)
if (asset_group_id, asset_id) in valid_assets:
self.data_handler.store_in_storage(asset_group_id, asset_id, asset_file, do_overwrite=True)
add_info = {
"repaired_by": dat[KeyType.source_node_id].hex(),
"repaired_at": int(time.time())
}
self.requesting_list[dat[KeyType.nonce]].update(add_info)
self._output_log(self.requesting_list[dat[KeyType.nonce]])
del self.requesting_list[dat[KeyType.nonce]]