#!/bin/sh
""":" .
exec python "$0" "$@"
"""
# -*- coding: utf-8 -*-
"""
Copyright (c) 2017 beyond-blockchain.org.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from gevent import monkey
monkey.patch_all()
from gevent.pool import Pool
from gevent.server import StreamServer
import socket as py_socket
from gevent.socket import wait_read
import gevent
import os
import signal
import hashlib
import binascii
import traceback
import json
import copy
import sys
sys.path.extend(["../../"])
from bbc1.core import bbclib
from bbc1.core.message_key_types import KeyType, to_2byte
from bbc1.core.bbclib import BBcTransaction, MsgType
from bbc1.core import bbc_network, user_message_routing, data_handler, repair_manager, message_key_types, logger
from bbc1.core import domain0_manager, query_management, bbc_stats
from bbc1.core.bbc_config import BBcConfig
from bbc1.core.data_handler import InfraMessageCategory
from bbc1.core import command
from bbc1.core.bbc_error import *
VERSION = "core version 1.0"
PID_FILE = "/tmp/bbc1.pid"
POOL_SIZE = 1000
DURATION_GIVEUP_GET = 10
GET_RETRY_COUNT = 3
INTERVAL_RETRY = 3
DEFAULT_ANYCAST_TTL = 5
ticker = query_management.get_ticker()
core_service = None
ledger_subsystem_module = None
admin_message_commands = (
MsgType.REQUEST_GET_STATS, MsgType.REQUEST_GET_NEIGHBORLIST,
MsgType.REQUEST_GET_CONFIG, MsgType.REQUEST_GET_DOMAINLIST,
MsgType.REQUEST_GET_FORWARDING_LIST, MsgType.REQUEST_GET_USERS,
MsgType.REQUEST_GET_NODEID, MsgType.REQUEST_GET_NOTIFICATION_LIST,
MsgType.REQUEST_SETUP_DOMAIN, MsgType.REQUEST_CLOSE_DOMAIN,
MsgType.NOTIFY_DOMAIN_KEY_UPDATE,
MsgType.DOMAIN_PING, MsgType.REQUEST_SET_STATIC_NODE,
MsgType.REQUEST_MANIP_LEDGER_SUBSYS
)
[docs]def activate_ledgersubsystem():
"""Load module of ledger_subsystem if installed"""
global ledger_subsystem_module
if ledger_subsystem_module is None:
try:
ledger_subsystem_module = __import__("ledger_subsystem")
except:
ledger_subsystem_module = None
def _make_message_structure(domain_id, cmd, dstid, qid):
"""Create a base structure of message
Args:
domain_id (bytes): the target domain_id
cmd (bytes): command type in message_key_types.KeyType
dstid (bytes): destination user_id
qid (bytes): query_id to include in the message
Returns:
dict: message
"""
return {
KeyType.domain_id: domain_id,
KeyType.command: cmd,
KeyType.destination_user_id: dstid,
KeyType.query_id: qid,
KeyType.status: ESUCCESS,
}
def _create_search_result(txobj_dict, asset_files_dict):
"""Create transaction search result"""
response_info = dict()
for txid, txobj in txobj_dict.items():
if txid != txobj.transaction_id:
response_info.setdefault(KeyType.compromised_transactions, list()).append(txobj.transaction_data)
response_info.setdefault(KeyType.compromised_transaction_ids, list()).append(txid)
continue
txobj_is_valid, valid_assets, invalid_assets = bbclib.validate_transaction_object(txobj, asset_files_dict)
if txobj_is_valid:
response_info.setdefault(KeyType.transactions, list()).append(txobj.transaction_data)
else:
response_info.setdefault(KeyType.compromised_transactions, list()).append(txobj.transaction_data)
response_info.setdefault(KeyType.compromised_transaction_ids, list()).append(txid)
if len(valid_assets) > 0:
response_info.setdefault(KeyType.all_asset_files, dict())
for asgid, asid in valid_assets:
response_info[KeyType.all_asset_files][asid] = asset_files_dict[asid]
if len(invalid_assets) > 0:
response_info.setdefault(KeyType.compromised_asset_files, dict())
for asgid, asid in invalid_assets:
response_info[KeyType.compromised_asset_files][asid] = asset_files_dict[asid]
return response_info
[docs]class BBcCoreService:
"""Base service object of BBc-1"""
def __init__(self, p2p_port=None, core_port=None, use_domain0=False, ip4addr=None, ip6addr=None,
workingdir=".bbc1", configfile=None, use_nodekey=None, use_ledger_subsystem=False,
default_conffile=None, loglevel="all", logname="-", server_start=True):
self.logger = logger.get_logger(key="core", level=loglevel, logname=logname)
self.stats = bbc_stats.BBcStats()
self.config = BBcConfig(workingdir, configfile, default_conffile)
conf = self.config.get_config()
if p2p_port is not None:
conf['client']['port'] = core_port
else:
core_port = conf['client']['port']
self.node_key = None
if use_nodekey is not None:
if use_nodekey:
conf['client']['use_node_key'] = True
elif not use_nodekey:
conf['client']['use_node_key'] = False
if 'use_node_key' in conf['client'] and conf['client']['use_node_key']:
self._get_node_key()
self.logger.debug("config = %s" % conf)
self.search_max_count = conf['search_config']['max_count']
self.traverse_max_count = conf['search_config']['max_traverse']
self.test_tx_obj = BBcTransaction()
self.insert_notification_user_list = dict()
self.networking = bbc_network.BBcNetwork(self.config, core=self, p2p_port=p2p_port,
external_ip4addr=ip4addr, external_ip6addr=ip6addr,
loglevel=loglevel, logname=logname)
self.ledger_subsystems = dict()
for domain_id_str in conf['domains'].keys():
domain_id = bbclib.convert_idstring_to_bytes(domain_id_str)
if not use_domain0 and domain_id == bbclib.domain_global_0:
continue
c = self.config.get_domain_config(domain_id)
self.networking.create_domain(domain_id=domain_id, config=c)
for nd, info in c['static_nodes'].items():
node_id, ipv4, ipv6, port = bbclib.convert_idstring_to_bytes(nd), info[0], info[1], info[2]
self.networking.add_neighbor(domain_id, node_id, ipv4, ipv6, port, is_static=True)
if ('use_ledger_subsystem' in c and c['use_ledger_subsystem']) or use_ledger_subsystem:
activate_ledgersubsystem()
if ledger_subsystem_module is not None:
self.ledger_subsystems[domain_id] = ledger_subsystem_module.LedgerSubsystem(self.config,
networking=self.networking,
domain_id=domain_id,
loglevel=loglevel,
logname=logname)
else:
self.logger.info("Failed to load ledger_subsystem module")
gevent.signal(signal.SIGINT, self.quit_program)
if server_start:
self._start_server(core_port)
[docs] def quit_program(self):
"""Processes when quiting program"""
self.networking.save_all_static_node_list()
self.config.update_config()
os._exit(0)
def _start_server(self, port):
"""Start TCP(v4 or v6) server"""
pool = Pool(POOL_SIZE)
if self.networking.ip6_address == "::":
server = StreamServer(("0.0.0.0", port), self._handler, spawn=pool)
else:
server = StreamServer(("::", port), self._handler, spawn=pool)
try:
server.serve_forever()
except KeyboardInterrupt:
pass
def _error_reply(self, msg=None, err_code=EINVALID_COMMAND, txt=""):
"""Create and send error reply message
Args:
msg (dict): message to send
err_code (int): error code defined in bbc_error.py
txt (str): error message
Returns:
bool:
"""
msg[KeyType.status] = err_code
msg[KeyType.reason] = txt
domain_id = msg[KeyType.domain_id]
if domain_id in self.networking.domains:
self.networking.domains[domain_id]['user'].send_message_to_user(msg)
return True
else:
return False
def _handler(self, socket, address):
"""Message wait loop for a client"""
# self.logger.debug("New connection")
self.stats.update_stats_increment("client", "total_num", 1)
user_info = None
msg_parser = message_key_types.Message()
try:
while True:
wait_read(socket.fileno())
buf = socket.recv(8192)
if len(buf) == 0:
break
msg_parser.recv(buf)
while True:
msg = msg_parser.parse()
if msg is None:
break
disconnection, new_info = self._process(socket, msg, msg_parser.payload_type)
if disconnection:
break
if new_info is not None:
user_info = new_info
except Exception as e:
self.logger.info("TCP disconnect: %s" % e)
traceback.print_exc()
self.logger.debug("closing socket")
if user_info is not None:
self.networking.domains[user_info[0]]['user'].unregister_user(user_info[1], socket)
try:
socket.shutdown(py_socket.SHUT_RDWR)
socket.close()
except:
pass
self.logger.debug("connection closed")
self.stats.update_stats_decrement("client", "total_num", 1)
def _get_node_key(self):
"""Get or create node key for creating a domain by bbc_app"""
self.logger.info("The core use node_key to check signature on admin command message")
keypath = os.path.join(self.config.working_dir, "node_key.pem")
self.node_key = bbclib.KeyPair()
if os.path.exists(keypath):
try:
with open(keypath, "r") as f:
self.node_key.mk_keyobj_from_private_key_pem(f.read())
return
except:
pass
self.node_key.generate()
with open(keypath, "wb") as f:
f.write(self.node_key.get_private_key_in_pem())
return
def _check_signature_by_nodekey(self, dat):
"""Verify signature in the message
Args:
dat (dict): received message that includes KeyType.admin command
Returns:
bool: True if check is successful
"""
if self.node_key is None:
return True
if KeyType.admin_info not in dat:
return False
digest = hashlib.sha256(dat[KeyType.admin_info]).digest()
if not self.node_key.verify(digest, dat[KeyType.nodekey_signature]):
return False
admin_info = message_key_types.make_dictionary_from_TLV_format(dat[KeyType.admin_info])
dat.update(admin_info)
return True
def _param_check(self, param, dat):
"""Check if the param is included
Args:
param (bytes|list): Commands that must be included in the message
dat (dict): received message
Returns:
bool: True if check is successful
"""
if isinstance(param, list):
for p in param:
if p not in dat:
self._error_reply(msg=dat, err_code=EINVALID_COMMAND, txt="lack of mandatory params")
return False
else:
if param not in dat:
self._error_reply(msg=dat, err_code=EINVALID_COMMAND, txt="lack of mandatory params")
return False
return True
def _process(self, socket, dat, payload_type):
"""Process received message
Args:
socket (Socket): server socket
dat (dict): received message
payload_type (bytes): PayloadType value of msg
Returns:
bool: True if disconnection is detected
list: return user info (domain_id, user_id) when a new user_id is coming
"""
self.stats.update_stats_increment("client", "num_message_receive", 1)
#self.logger.debug("process message from %s: %s" % (binascii.b2a_hex(dat[KeyType.source_user_id]), dat))
if not self._param_check([KeyType.command, KeyType.source_user_id], dat):
self.logger.debug("message has bad format")
return False, None
if dat[KeyType.command] in admin_message_commands:
if self.node_key is None and KeyType.admin_info in dat:
admin_info = message_key_types.make_dictionary_from_TLV_format(dat[KeyType.admin_info])
dat.update(admin_info)
else:
if not self._check_signature_by_nodekey(dat):
self.logger.error("Illegal access to core node")
return False, None
domain_id = dat.get(KeyType.domain_id, None)
umr = None
if domain_id is not None:
if domain_id in self.networking.domains:
umr = self.networking.domains[domain_id]['user']
else:
umr = user_message_routing.UserMessageRoutingDummy(networking=self.networking, domain_id=domain_id)
cmd = dat[KeyType.command]
if cmd == MsgType.REQUEST_SEARCH_TRANSACTION:
if not self._param_check([KeyType.domain_id, KeyType.transaction_id], dat):
self.logger.debug("REQUEST_SEARCH_TRANSACTION: bad format")
return False, None
retmsg = _make_message_structure(domain_id, MsgType.RESPONSE_SEARCH_TRANSACTION,
dat[KeyType.source_user_id], dat[KeyType.query_id])
txinfo = self._search_transaction_by_txid(domain_id, dat[KeyType.transaction_id])
if txinfo is None:
if not self._error_reply(msg=retmsg, err_code=ENOTRANSACTION, txt="Cannot find transaction"):
user_message_routing.direct_send_to_user(socket, retmsg)
return False, None
if KeyType.compromised_transaction_data in txinfo or KeyType.compromised_asset_files in txinfo:
retmsg[KeyType.status] = EBADTRANSACTION
retmsg.update(txinfo)
umr.send_message_to_user(retmsg)
elif cmd == MsgType.REQUEST_SEARCH_WITH_CONDITIONS:
if not self._param_check([KeyType.domain_id], dat):
self.logger.debug("REQUEST_SEARCH_WITH_CONDITIONS: bad format")
return False, None
retmsg = _make_message_structure(domain_id, MsgType.RESPONSE_SEARCH_WITH_CONDITIONS,
dat[KeyType.source_user_id], dat[KeyType.query_id])
txinfo = self.search_transaction_with_condition(domain_id,
asset_group_id=dat.get(KeyType.asset_group_id, None),
asset_id=dat.get(KeyType.asset_id, None),
user_id=dat.get(KeyType.user_id, None),
direction=dat.get(KeyType.direction, 0),
count=dat.get(KeyType.count, 1))
if txinfo is None or KeyType.transactions not in txinfo:
if not self._error_reply(msg=retmsg, err_code=ENOTRANSACTION, txt="Cannot find transaction"):
user_message_routing.direct_send_to_user(socket, retmsg)
else:
retmsg.update(txinfo)
umr.send_message_to_user(retmsg)
elif cmd == MsgType.REQUEST_COUNT_TRANSACTIONS:
if not self._param_check([KeyType.domain_id], dat):
self.logger.debug("REQUEST_COUNT_TRANSACTIONS: bad format")
return False, None
retmsg = _make_message_structure(domain_id, MsgType.RESPONSE_COUNT_TRANSACTIONS,
dat[KeyType.source_user_id], dat[KeyType.query_id])
count = self.count_transactions(domain_id, asset_group_id=dat.get(KeyType.asset_group_id, None),
asset_id=dat.get(KeyType.asset_id, None),
user_id=dat.get(KeyType.user_id, None))
retmsg[KeyType.count] = count
umr.send_message_to_user(retmsg)
elif cmd == MsgType.REQUEST_TRAVERSE_TRANSACTIONS:
if not self._param_check([KeyType.domain_id, KeyType.transaction_id,
KeyType.direction, KeyType.hop_count], dat):
self.logger.debug("REQUEST_TRAVERSE_TRANSACTIONS: bad format")
return False, None
retmsg = _make_message_structure(domain_id, MsgType.RESPONSE_TRAVERSE_TRANSACTIONS,
dat[KeyType.source_user_id], dat[KeyType.query_id])
retmsg[KeyType.transaction_id] = dat[KeyType.transaction_id]
asset_group_id = dat.get(KeyType.asset_group_id, None)
user_id = dat.get(KeyType.user_id, None)
all_included, txtree, asset_files = self._traverse_transactions(domain_id, dat[KeyType.transaction_id],
asset_group_id=asset_group_id,
user_id=user_id,
direction=dat[KeyType.direction],
hop_count=dat[KeyType.hop_count])
if txtree is None or len(txtree) == 0:
if not self._error_reply(msg=retmsg, err_code=ENOTRANSACTION, txt="Cannot find transaction"):
user_message_routing.direct_send_to_user(socket, retmsg)
else:
retmsg[KeyType.transaction_tree] = txtree
retmsg[KeyType.all_included] = all_included
if len(asset_files) > 0:
retmsg[KeyType.all_asset_files] = asset_files
umr.send_message_to_user(retmsg)
elif cmd == MsgType.REQUEST_GATHER_SIGNATURE:
if not self._param_check([KeyType.domain_id, KeyType.transaction_data], dat):
self.logger.debug("REQUEST_GATHER_SIGNATURE: bad format")
return False, None
if not self._distribute_transaction_to_gather_signatures(dat[KeyType.domain_id], dat):
retmsg = _make_message_structure(domain_id, MsgType.RESPONSE_GATHER_SIGNATURE,
dat[KeyType.source_user_id], dat[KeyType.query_id])
if not self._error_reply(msg=retmsg, err_code=EINVALID_COMMAND, txt="Fail to forward transaction"):
user_message_routing.direct_send_to_user(socket, retmsg)
elif cmd == MsgType.REQUEST_INSERT:
if not self._param_check([KeyType.domain_id, KeyType.transaction_data, KeyType.all_asset_files], dat):
self.logger.debug("REQUEST_INSERT: bad format")
return False, None
transaction_data = dat[KeyType.transaction_data]
asset_files = dat[KeyType.all_asset_files]
retmsg = _make_message_structure(domain_id, MsgType.RESPONSE_INSERT,
dat[KeyType.source_user_id], dat[KeyType.query_id])
ret = self.insert_transaction(dat[KeyType.domain_id], transaction_data, asset_files)
if isinstance(ret, str):
if not self._error_reply(msg=retmsg, err_code=EINVALID_COMMAND, txt=ret):
user_message_routing.direct_send_to_user(socket, retmsg)
else:
retmsg.update(ret)
umr.send_message_to_user(retmsg)
elif cmd == MsgType.RESPONSE_SIGNATURE:
if not self._param_check([KeyType.domain_id, KeyType.destination_user_id, KeyType.source_user_id], dat):
self.logger.debug("RESPONSE_SIGNATURE: bad format")
return False, None
retmsg = _make_message_structure(domain_id, MsgType.RESPONSE_GATHER_SIGNATURE,
dat[KeyType.destination_user_id], dat[KeyType.query_id])
if KeyType.signature in dat:
retmsg[KeyType.transaction_data_format] = dat[KeyType.transaction_data_format]
retmsg[KeyType.signature] = dat[KeyType.signature]
retmsg[KeyType.ref_index] = dat[KeyType.ref_index]
elif KeyType.status not in dat:
retmsg[KeyType.status] = EOTHER
retmsg[KeyType.reason] = dat[KeyType.reason]
elif dat[KeyType.status] < ESUCCESS:
retmsg[KeyType.status] = dat[KeyType.status]
retmsg[KeyType.reason] = dat[KeyType.reason]
retmsg[KeyType.source_user_id] = dat[KeyType.source_user_id]
umr.send_message_to_user(retmsg)
elif cmd == MsgType.MESSAGE:
if not self._param_check([KeyType.domain_id, KeyType.source_user_id, KeyType.destination_user_id], dat):
self.logger.debug("MESSAGE: bad format")
return False, None
if KeyType.is_anycast in dat:
dat[KeyType.anycast_ttl] = DEFAULT_ANYCAST_TTL
umr.send_message_to_user(dat)
elif cmd == MsgType.REQUEST_CROSS_REF_VERIFY:
if not self._param_check([KeyType.domain_id, KeyType.source_user_id, KeyType.transaction_id], dat):
self.logger.debug("REQUEST_CROSS_REF_VERIFY: bad format")
return False, None
dat[KeyType.command] = domain0_manager.Domain0Manager.REQUEST_VERIFY
self.networking.send_message_to_a_domain0_manager(domain_id, dat)
elif cmd == MsgType.REQUEST_CROSS_REF_LIST:
if not self._param_check([KeyType.domain_id, KeyType.source_user_id], dat):
self.logger.debug("REQUEST_CROSS_REF_LIST: bad format")
return False, None
retmsg = _make_message_structure(domain_id, MsgType.RESPONSE_CROSS_REF_LIST,
dat[KeyType.source_user_id], dat[KeyType.query_id])
domain_list = self.networking.domains[domain_id]['data'].search_domain_having_cross_ref()
# domain_list = list of ["id", "transaction_id", "outer_domain_id", "txid_having_cross_ref"]
retmsg[KeyType.transaction_id_list] = [row[1] for row in domain_list]
umr.send_message_to_user(retmsg)
elif cmd == MsgType.REQUEST_REGISTER_HASH_IN_SUBSYS:
if not self._param_check([KeyType.transaction_id], dat):
self.logger.debug("REQUEST_REGISTER_HASH_IN_SUBSYS: bad format")
return False, None
retmsg = _make_message_structure(domain_id, MsgType.RESPONSE_REGISTER_HASH_IN_SUBSYS,
dat[KeyType.source_user_id], dat[KeyType.query_id])
if domain_id in self.ledger_subsystems:
transaction_id = dat[KeyType.transaction_id]
self.ledger_subsystems[domain_id].register_transaction(transaction_id=transaction_id)
umr.send_message_to_user(retmsg)
else:
self._error_reply(msg=retmsg, err_code=ENOSUBSYSTEM, txt="Ledger_subsystem is not activated")
elif cmd == MsgType.REQUEST_VERIFY_HASH_IN_SUBSYS:
if not self._param_check([KeyType.transaction_id], dat):
self.logger.debug("REQUEST_REGISTER_HASH_IN_SUBSYS: bad format")
return False, None
retmsg = _make_message_structure(domain_id, MsgType.RESPONSE_VERIFY_HASH_IN_SUBSYS,
dat[KeyType.source_user_id], dat[KeyType.query_id])
if domain_id in self.ledger_subsystems:
transaction_id = dat[KeyType.transaction_id]
result = self.ledger_subsystems[domain_id].verify_transaction(transaction_id=transaction_id)
retmsg[KeyType.merkle_tree] = result
umr.send_message_to_user(retmsg)
else:
self._error_reply(msg=retmsg, err_code=ENOSUBSYSTEM, txt="Ledger_subsystem is not activated")
elif cmd == MsgType.REGISTER:
if domain_id is None:
return False, None
if not self._param_check([KeyType.domain_id, KeyType.source_user_id], dat):
self.logger.debug("REGISTER: bad format")
return False, None
user_id = dat[KeyType.source_user_id]
self.logger.debug("[%s] register_user: %s" % (binascii.b2a_hex(domain_id[:2]),
binascii.b2a_hex(user_id[:4])))
umr.register_user(user_id, socket, on_multiple_nodes=dat.get(KeyType.on_multinodes, False))
return False, (domain_id, user_id)
elif cmd == MsgType.UNREGISTER:
if umr is not None:
umr.unregister_user(dat[KeyType.source_user_id], socket)
return True, None
elif cmd == MsgType.REQUEST_INSERT_NOTIFICATION:
self._register_to_notification_list(domain_id, dat[KeyType.asset_group_id], dat[KeyType.source_user_id])
elif cmd == MsgType.CANCEL_INSERT_NOTIFICATION:
self.remove_from_notification_list(domain_id, dat[KeyType.asset_group_id], dat[KeyType.source_user_id])
elif cmd == MsgType.REQUEST_GET_STATS:
retmsg = _make_message_structure(domain_id, MsgType.RESPONSE_GET_STATS,
dat[KeyType.source_user_id], dat[KeyType.query_id])
retmsg[KeyType.stats] = copy.deepcopy(self.stats.get_stats())
user_message_routing.direct_send_to_user(socket, retmsg)
elif cmd == MsgType.NOTIFY_DOMAIN_KEY_UPDATE:
if domain_id is not None:
self.networking.get_domain_keypair(domain_id)
elif cmd == MsgType.REQUEST_REPAIR:
if KeyType.transaction_id in dat:
dat[KeyType.command] = repair_manager.RepairManager.REQUEST_REPAIR_TRANSACTION
self.networking.domains[domain_id]['repair'].put_message(dat)
elif KeyType.asset_group_id in dat and KeyType.asset_id in dat:
dat[KeyType.command] = repair_manager.RepairManager.REQUEST_REPAIR_ASSET_FILE
self.networking.domains[domain_id]['repair'].put_message(dat)
else:
self.logger.debug("REQUEST_REPAIR: bad format")
return False, None
elif cmd == MsgType.REQUEST_GET_NEIGHBORLIST:
retmsg = _make_message_structure(domain_id, MsgType.RESPONSE_GET_NEIGHBORLIST,
dat[KeyType.source_user_id], dat[KeyType.query_id])
if domain_id in self.networking.domains:
retmsg[KeyType.domain_id] = domain_id
retmsg[KeyType.neighbor_list] = self.networking.domains[domain_id]['topology'].make_neighbor_list()
else:
retmsg[KeyType.status] = False
retmsg[KeyType.reason] = "No such domain"
user_message_routing.direct_send_to_user(socket, retmsg)
elif cmd == MsgType.REQUEST_GET_CONFIG:
retmsg = _make_message_structure(domain_id, MsgType.RESPONSE_GET_CONFIG,
dat[KeyType.source_user_id], dat[KeyType.query_id])
jsondat = self.config.get_json_config()
retmsg[KeyType.bbc_configuration] = jsondat
user_message_routing.direct_send_to_user(socket, retmsg)
elif cmd == MsgType.REQUEST_GET_DOMAINLIST:
retmsg = _make_message_structure(domain_id, MsgType.RESPONSE_GET_DOMAINLIST,
dat[KeyType.source_user_id], dat[KeyType.query_id])
data = bytearray()
data.extend(to_2byte(len(self.networking.domains)))
for domain_id in self.networking.domains:
data.extend(domain_id)
retmsg[KeyType.domain_list] = bytes(data)
user_message_routing.direct_send_to_user(socket, retmsg)
elif cmd == MsgType.REQUEST_GET_FORWARDING_LIST:
retmsg = _make_message_structure(domain_id, MsgType.RESPONSE_GET_FORWARDING_LIST,
dat[KeyType.source_user_id], dat[KeyType.query_id])
data = bytearray()
data.extend(to_2byte(len(umr.forwarding_entries)))
for user_id in umr.forwarding_entries:
data.extend(user_id)
data.extend(to_2byte(len(umr.forwarding_entries[user_id]['nodes'])))
for node_id in umr.forwarding_entries[user_id]['nodes']:
data.extend(node_id)
retmsg[KeyType.forwarding_list] = bytes(data)
user_message_routing.direct_send_to_user(socket, retmsg)
elif cmd == MsgType.REQUEST_GET_USERS:
retmsg = _make_message_structure(domain_id, MsgType.RESPONSE_GET_USERS,
dat[KeyType.source_user_id], dat[KeyType.query_id])
data = bytearray()
data.extend(to_2byte(len(umr.registered_users)))
for user_id in umr.registered_users.keys():
data.extend(user_id)
retmsg[KeyType.user_list] = bytes(data)
user_message_routing.direct_send_to_user(socket, retmsg)
elif cmd == MsgType.REQUEST_GET_NODEID:
retmsg = _make_message_structure(domain_id, MsgType.RESPONSE_GET_NODEID,
dat[KeyType.source_user_id], dat[KeyType.query_id])
data = bytearray()
data.extend(self.networking.domains[domain_id]['topology'].my_node_id)
retmsg[KeyType.node_id] = bytes(data)
user_message_routing.direct_send_to_user(socket, retmsg)
elif cmd == MsgType.REQUEST_GET_NOTIFICATION_LIST:
retmsg = _make_message_structure(domain_id, MsgType.RESPONSE_GET_NOTIFICATION_LIST,
dat[KeyType.source_user_id], dat[KeyType.query_id])
data = bytearray()
data.extend(to_2byte(len(self.insert_notification_user_list[domain_id])))
for asset_group_id in self.insert_notification_user_list[domain_id].keys():
data.extend(asset_group_id)
data.extend(to_2byte(len(self.insert_notification_user_list[domain_id][asset_group_id])))
for user_id in self.insert_notification_user_list[domain_id][asset_group_id]:
data.extend(user_id)
retmsg[KeyType.notification_list] = bytes(data)
user_message_routing.direct_send_to_user(socket, retmsg)
elif cmd == MsgType.REQUEST_SETUP_DOMAIN:
if not self._param_check([KeyType.domain_id], dat):
self.logger.debug("REQUEST_SETUP_DOMAIN: bad format")
return False, None
conf = None
if KeyType.bbc_configuration in dat:
conf = json.loads(dat[KeyType.bbc_configuration])
retmsg = _make_message_structure(None, MsgType.RESPONSE_SETUP_DOMAIN,
dat[KeyType.source_user_id], dat[KeyType.query_id])
retmsg[KeyType.result] = self.networking.create_domain(domain_id=domain_id, config=conf)
if not retmsg[KeyType.result]:
retmsg[KeyType.reason] = "Already exists"
retmsg[KeyType.domain_id] = domain_id
self.config.update_config()
user_message_routing.direct_send_to_user(socket, retmsg)
elif cmd == MsgType.REQUEST_CLOSE_DOMAIN:
retmsg = _make_message_structure(None, MsgType.RESPONSE_CLOSE_DOMAIN,
dat[KeyType.source_user_id], dat[KeyType.query_id])
retmsg[KeyType.result] = self.networking.remove_domain(domain_id)
if not retmsg[KeyType.result]:
retmsg[KeyType.reason] = "No such domain"
user_message_routing.direct_send_to_user(socket, retmsg)
elif cmd == MsgType.REQUEST_ECDH_KEY_EXCHANGE:
retmsg = _make_message_structure(None, MsgType.RESPONSE_ECDH_KEY_EXCHANGE,
dat[KeyType.source_user_id], dat[KeyType.query_id])
privatekey_for_ecdh, peer_pub_key_to_send, my_keyname = message_key_types.get_ECDH_parameters()
if privatekey_for_ecdh is None:
return False, None
nonce = dat[KeyType.nonce]
rand = dat[KeyType.random]
shared_key = message_key_types.derive_shared_key(privatekey_for_ecdh, dat[KeyType.ecdh], rand)
retmsg[KeyType.ecdh] = peer_pub_key_to_send
retmsg[KeyType.nonce] = nonce
retmsg[KeyType.random] = rand
retmsg[KeyType.hint] = my_keyname
user_message_routing.direct_send_to_user(socket, retmsg)
message_key_types.set_cipher(shared_key, nonce, my_keyname, dat[KeyType.hint])
umr.set_aes_name(socket, my_keyname)
elif cmd == MsgType.DOMAIN_PING:
if not self._param_check([KeyType.domain_id, KeyType.source_user_id, KeyType.port_number], dat):
return False, None
ipv4 = dat.get(KeyType.ipv4_address, None)
ipv6 = dat.get(KeyType.ipv6_address, None)
if ipv4 is None and ipv6 is None:
return False, None
port = dat[KeyType.port_number]
self.networking.send_domain_ping(domain_id, ipv4, ipv6, port)
elif cmd == MsgType.REQUEST_SET_STATIC_NODE:
retmsg = _make_message_structure(domain_id, MsgType.RESPONSE_SET_STATIC_NODE,
dat[KeyType.source_user_id], dat[KeyType.query_id])
retmsg[KeyType.domain_id] = domain_id
node_info = dat.get(KeyType.node_info, None)
if node_info is None:
retmsg[KeyType.result] = False
else:
self.networking.add_neighbor(domain_id, *node_info, is_static=True)
self.config.update_config()
retmsg[KeyType.result] = True
user_message_routing.direct_send_to_user(socket, retmsg)
elif cmd == MsgType.REQUEST_MANIP_LEDGER_SUBSYS:
retmsg = _make_message_structure(domain_id, MsgType.RESPONSE_MANIP_LEDGER_SUBSYS,
dat[KeyType.source_user_id], dat[KeyType.query_id])
if self.ledger_subsystems[domain_id] is not None:
if dat[KeyType.ledger_subsys_manip]:
self.ledger_subsystems[domain_id].enable()
else:
self.ledger_subsystems[domain_id].disable()
user_message_routing.direct_send_to_user(socket, retmsg)
else:
self._error_reply(msg=retmsg, err_code=ENOSUBSYSTEM, txt="Ledger_subsystem is not installed")
else:
self.logger.error("Bad command/response: %s" % cmd)
return False, None
def _register_to_notification_list(self, domain_id, asset_group_id, user_id):
"""Register user_id in insert completion notification list
Args:
domain_id (bytes): target domain_id
asset_group_id (bytes): target asset_group_id of which you want to get notification about the insertion
user_id (bytes): user_id that registers in the list
"""
self.insert_notification_user_list.setdefault(domain_id, dict())
self.insert_notification_user_list[domain_id].setdefault(asset_group_id, set())
self.insert_notification_user_list[domain_id][asset_group_id].add(user_id)
umr = self.networking.domains[domain_id]['user']
umr.send_multicast_join(asset_group_id, permanent=True)
[docs] def remove_from_notification_list(self, domain_id, asset_group_id, user_id):
"""Remove entry from insert completion notification list
This method checks validation only.
Args:
domain_id (bytes): target domain_id
asset_group_id (bytes): target asset_group_id of which you want to get notification about the insertion
user_id (bytes): user_id that registers in the list
"""
if domain_id not in self.insert_notification_user_list:
return
if asset_group_id is not None:
if asset_group_id in self.insert_notification_user_list[domain_id]:
self._remove_notification_entry(domain_id, asset_group_id, user_id)
else:
for asset_group_id in list(self.insert_notification_user_list[domain_id]):
self._remove_notification_entry(domain_id, asset_group_id, user_id)
def _remove_notification_entry(self, domain_id, asset_group_id, user_id):
"""Remove entry from insert completion notification list
Args:
domain_id (bytes): target domain_id
asset_group_id (bytes): target asset_group_id of which you want to get notification about the insertion
user_id (bytes): user_id that registers in the list
"""
self.insert_notification_user_list[domain_id][asset_group_id].remove(user_id)
if len(self.insert_notification_user_list[domain_id][asset_group_id]) == 0:
self.insert_notification_user_list[domain_id].pop(asset_group_id, None)
umr = self.networking.domains[domain_id]['user']
umr.send_multicast_leave(asset_group_id)
if len(self.insert_notification_user_list[domain_id]) == 0:
self.insert_notification_user_list.pop(domain_id, None)
[docs] def validate_transaction(self, txdata, asset_files=None):
"""Validate transaction by verifying signature
Args:
txdata (bytes): serialized transaction data
asset_files (dict): dictionary of {asset_id: content} for the transaction
Returns:
BBcTransaction: if validation fails, None returns.
"""
txobj = BBcTransaction()
if not txobj.deserialize(txdata):
self.stats.update_stats_increment("transaction", "invalid", 1)
self.logger.error("Fail to deserialize transaction data")
return None
txobj.digest()
txobj_is_valid, valid_assets, invalid_assets = bbclib.validate_transaction_object(txobj, asset_files)
if not txobj_is_valid:
self.stats.update_stats_increment("transaction", "invalid", 1)
if len(invalid_assets) > 0:
self.stats.update_stats_increment("asset_file", "invalid", 1)
if txobj_is_valid and len(invalid_assets) == 0:
return txobj
else:
return None
[docs] def insert_transaction(self, domain_id, txdata, asset_files):
"""Insert transaction into ledger
Args:
domain_id (bytes): target domain_id
txdata (bytes): serialized transaction data
asset_files (dict): dictionary of {asset_id: content} for the transaction
Returns:
dict|str: inserted transaction_id or error message
"""
self.stats.update_stats_increment("transaction", "insert_count", 1)
if domain_id is None:
self.stats.update_stats_increment("transaction", "insert_fail_count", 1)
self.logger.error("No such domain")
return "Set up the domain, first!"
if domain_id == bbclib.domain_global_0:
self.stats.update_stats_increment("transaction", "insert_fail_count", 1)
self.logger.error("Insert is not allowed in domain_global_0")
return "Insert is not allowed in domain_global_0"
txobj = self.validate_transaction(txdata, asset_files)
if txobj is None:
self.stats.update_stats_increment("transaction", "insert_fail_count", 1)
self.logger.error("Bad transaction format")
return "Bad transaction format"
self.logger.debug("[node:%s] insert_transaction %s" %
(self.networking.domains[domain_id]['name'], binascii.b2a_hex(txobj.transaction_id[:4])))
asset_group_ids = self.networking.domains[domain_id]['data'].insert_transaction(txdata, txobj=txobj,
asset_files=asset_files)
if asset_group_ids is None:
self.stats.update_stats_increment("transaction", "insert_fail_count", 1)
self.logger.error("[%s] Fail to insert a transaction into the ledger" % self.networking.domains[domain_id]['name'])
return "Failed to insert a transaction into the ledger"
self.send_inserted_notification(domain_id, asset_group_ids, txobj.transaction_id)
return {KeyType.transaction_id: txobj.transaction_id}
[docs] def send_inserted_notification(self, domain_id, asset_group_ids, transaction_id, only_registered_user=False):
"""Broadcast NOTIFY_INSERTED
Args:
domain_id (bytes): target domain_id
asset_group_ids (list): list of asset_group_ids
transaction_id (bytes): transaction_id that has just inserted
only_registered_user (bool): If True, notification is not sent to other nodes
"""
umr = self.networking.domains[domain_id]['user']
destination_users = set()
destination_nodes = set()
for asset_group_id in asset_group_ids:
if domain_id in self.insert_notification_user_list:
if asset_group_id in self.insert_notification_user_list[domain_id]:
for user_id in self.insert_notification_user_list[domain_id][asset_group_id]:
destination_users.add(user_id)
if not only_registered_user:
if asset_group_id in umr.forwarding_entries:
for node_id in umr.forwarding_entries[asset_group_id]['nodes']:
destination_nodes.add(node_id)
if len(destination_users) == 0 and len(destination_nodes) == 0:
return
msg = {
KeyType.domain_id: domain_id,
KeyType.infra_command: data_handler.DataHandler.NOTIFY_INSERTED,
KeyType.command: MsgType.NOTIFY_INSERTED,
KeyType.transaction_id: transaction_id,
KeyType.asset_group_ids: list(asset_group_ids),
}
for user_id in destination_users:
msg[KeyType.destination_user_id] = user_id
if not umr.send_message_to_user(msg=msg, direct_only=True):
self.remove_from_notification_list(domain_id, None, user_id)
msg[KeyType.infra_msg_type] = InfraMessageCategory.CATEGORY_DATA
for node_id in destination_nodes: # TODO: need test (multiple asset_groups are bundled)
msg[KeyType.destination_node_id] = node_id
self.networking.send_message_in_network(domain_id=domain_id, msg=msg)
def _distribute_transaction_to_gather_signatures(self, domain_id, dat):
"""Request to distribute sign_request to users
Args:
domain_id (bytes): target domain_id
dat (dict): message to send
Returns:
bool: True
"""
destinations = dat[KeyType.destination_user_ids]
msg = _make_message_structure(domain_id, MsgType.REQUEST_SIGNATURE, None, dat[KeyType.query_id])
msg[KeyType.source_user_id] = dat[KeyType.source_user_id]
umr = self.networking.domains[domain_id]['user']
for dst in destinations:
if dst == dat[KeyType.source_user_id]:
continue
msg[KeyType.destination_user_id] = dst
if KeyType.hint in dat:
msg[KeyType.hint] = dat[KeyType.hint]
msg[KeyType.transaction_data] = dat[KeyType.transaction_data]
if KeyType.transactions in dat:
msg[KeyType.transactions] = dat[KeyType.transactions]
if KeyType.all_asset_files in dat:
msg[KeyType.all_asset_files] = dat[KeyType.all_asset_files]
umr.send_message_to_user(msg)
return True
def _search_transaction_by_txid(self, domain_id, transaction_id):
"""Search transaction_data by transaction_id
Args:
domain_id (bytes): target domain_id
transaction_id (bytes): transaction_id to search
Returns:
dict: dictionary having transaction_id, serialized transaction data, asset files
"""
self.stats.update_stats_increment("transaction", "search_count", 1)
if domain_id is None:
self.logger.error("No such domain")
return None
if transaction_id is None:
self.logger.error("Transaction_id must not be None")
return None
dh = self.networking.domains[domain_id]['data']
ret_txobj, ret_asset_files = dh.search_transaction(transaction_id=transaction_id)
if ret_txobj is None or len(ret_txobj) == 0:
return None
response_info = _create_search_result(ret_txobj, ret_asset_files)
response_info[KeyType.transaction_id] = transaction_id
if KeyType.transactions in response_info:
response_info[KeyType.transaction_data] = response_info[KeyType.transactions][0]
del response_info[KeyType.transactions]
elif KeyType.compromised_transactions in response_info:
response_info[KeyType.compromised_transaction_data] = response_info[KeyType.compromised_transactions][0]
del response_info[KeyType.compromised_transactions]
return response_info
[docs] def search_transaction_with_condition(self, domain_id, asset_group_id=None, asset_id=None, user_id=None,
direction=0, count=1):
"""Search transactions that match given conditions
When Multiple conditions are given, they are considered as AND condition.
Args:
domain_id (bytes): target domain_id
asset_group_id (bytes): asset_group_id that target transactions should have
asset_id (bytes): asset_id that target transactions should have
user_id (bytes): user_id that target transactions should have
direction (int): 0: descend, 1: ascend
count (int): The maximum number of transactions to retrieve (self.search_max_count is the upper bound)
Returns:
dict: dictionary having transaction_id, serialized transaction data, asset files
"""
if domain_id is None:
self.logger.error("No such domain")
return None
if self.search_max_count < count:
count = self.search_max_count
dh = self.networking.domains[domain_id]['data']
ret_txobj, ret_asset_files = dh.search_transaction(asset_group_id=asset_group_id, asset_id=asset_id,
user_id=user_id, direction=direction, count=count)
if ret_txobj is None or len(ret_txobj) == 0:
return None
return _create_search_result(ret_txobj, ret_asset_files)
[docs] def count_transactions(self, domain_id, asset_group_id=None, asset_id=None, user_id=None):
"""Count transactions that match given conditions
When Multiple conditions are given, they are considered as AND condition.
Args:
domain_id (bytes): target domain_id
asset_group_id (bytes): asset_group_id that target transactions should have
asset_id (bytes): asset_id that target transactions should have
user_id (bytes): user_id that target transactions should have
Returns:
int: the number of transactions
"""
if domain_id is None:
self.logger.error("No such domain")
return None
dh = self.networking.domains[domain_id]['data']
return dh.count_transactions(asset_group_id=asset_group_id, asset_id=asset_id, user_id=user_id)
def _traverse_transactions(self, domain_id, transaction_id, asset_group_id=None, user_id=None, direction=1, hop_count=3):
"""Get transaction tree from the specified transaction_id with given condition
If both asset_group_id and user_id are specified, they are treated as AND condition.
Transaction tree in the return values are in the following format:
[ [list of serialized transactions in 1-hop from the base], [list of serialized transactions in 2-hop from the base],,,,
Args:
domain_id (bytes): target domain_id
transaction_id (bytes): the base transaction_id from which traverse starts
asset_group_id (bytes): asset_group_id that target transactions should have
user_id (bytes): user_id that target transactions should have
direction (int): 1:backward, non-1:forward
hop_count (bytes): hop count to traverse (self.traverse_max_count is the upper bound)
Returns:
list: list of [include_all_flag, transaction tree, asset_files]
"""
self.stats.update_stats_increment("transaction", "search_count", 1)
if domain_id is None:
self.logger.error("No such domain")
return None
if transaction_id is None:
self.logger.error("Transaction_id must not be None")
return None
dh = self.networking.domains[domain_id]['data']
txtree = list()
asset_files = dict()
traverse_to_past = True if direction == 1 else False
tx_count = 0
txids = dict()
current_txids = [transaction_id]
include_all_flag = True
if hop_count > self.traverse_max_count * 2:
hop_count = self.traverse_max_count * 2
for i in range(hop_count):
tx_brothers = list()
next_txids = list()
#print("### txcount=%d, len(current_txids)=%d" % (tx_count, len(current_txids)))
if tx_count + len(current_txids) > self.traverse_max_count:
include_all_flag = False
break
#print("[%d] current_txids:%s" % (i, [d.hex() for d in current_txids]))
for txid in current_txids:
if txid in txids:
continue
tx_count += 1
txids[txid] = True
ret_txobj, ret_asset_files = dh.search_transaction(transaction_id=txid)
if ret_txobj is None or len(ret_txobj) == 0:
continue
if asset_group_id is not None or user_id is not None:
flag = False
for asgid, asset_id, uid, fileflag, filedigest in dh.get_asset_info(ret_txobj[txid]):
flag = True
if asset_group_id is not None and asgid != asset_group_id:
flag = False
if user_id is not None and uid != user_id:
flag = False
if flag:
break
if not flag:
continue
tx_brothers.append(ret_txobj[txid].transaction_data)
if len(ret_asset_files) > 0:
asset_files.update(ret_asset_files)
ret = dh.search_transaction_topology(transaction_id=txid, traverse_to_past=traverse_to_past)
#print("txid=%s: (%d) ret=%s" % (txid.hex(), len(ret), ret))
if ret is not None:
for topology in ret:
if traverse_to_past:
next_txid = topology[2]
else:
next_txid = topology[1]
if next_txid not in txids:
next_txids.append(next_txid)
if len(tx_brothers) > 0:
txtree.append(tx_brothers)
current_txids = next_txids
return include_all_flag, txtree, asset_files
[docs]def daemonize(pidfile=PID_FILE):
"""Run in background"""
pid = os.fork()
if pid > 0:
os._exit(0)
os.setsid()
pid = os.fork()
if pid > 0:
f2 = open(pidfile, 'w')
f2.write(str(pid)+"\n")
f2.close()
os._exit(0)
os.umask(0)
if __name__ == '__main__':
argresult = command.parser()
if argresult.kill:
import subprocess
import sys
subprocess.call("kill `cat " + PID_FILE + "`", shell=True)
subprocess.call("rm -f " + PID_FILE, shell=True)
sys.exit(0)
if argresult.daemon:
daemonize()
use_nodekey = None
if argresult.no_nodekey:
use_nodekey = False
elif argresult.nodekey:
use_nodekey = True
BBcCoreService(
p2p_port=argresult.p2pport,
core_port=argresult.coreport,
workingdir=argresult.workingdir,
configfile=argresult.config,
use_nodekey=use_nodekey,
use_domain0=argresult.domain0,
use_ledger_subsystem=argresult.ledgersubsystem,
ip4addr=argresult.ip4addr,
ip6addr=argresult.ip6addr,
default_conffile=argresult.default_config,
logname=argresult.log,
loglevel=argresult.verbose_level,
)