# -*- coding: utf-8 -*-
"""
Copyright (c) 2017 beyond-blockchain.org.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import gevent
from gevent import monkey
monkey.patch_all()
from gevent import socket
import traceback
import queue
import hashlib
import bson
import os
import sys
sys.path.append("../../")
from bbc1.core import bbclib
from bbc1.core import message_key_types, logger
from bbc1.core.bbclib import MsgType
from bbc1.core.message_key_types import KeyType, PayloadType
from bbc1.core.bbc_error import *
DEFAULT_CORE_PORT = 9000
DEFAULT_P2P_PORT = 6641
MESSAGE_WITH_NO_RESPONSE = (MsgType.MESSAGE, MsgType.REGISTER, MsgType.UNREGISTER, MsgType.DOMAIN_PING,
MsgType.REQUEST_INSERT_NOTIFICATION, MsgType.CANCEL_INSERT_NOTIFICATION,
MsgType.REQUEST_REPAIR)
def _parse_one_level_list(dat):
"""Get list information from queued message
Args:
dat (bytes): received message data
Returns:
list: list of information
"""
results = []
count = int.from_bytes(dat[:2], 'big')
for i in range(count):
base = 2 + 32 * i
results.append(dat[base:base + 32])
return results
def _parse_two_level_dict(dat):
"""Get hierarchical list information from queued message
Args:
dat (bytes): received message data
Returns:
dict: dictionary of information list
"""
results = dict()
count = int.from_bytes(dat[:2], 'big')
ptr = 2
for i in range(count):
first_id = dat[ptr:ptr+32]
ptr += 32
results[first_id] = list()
count2 = int.from_bytes(dat[ptr:ptr+2], 'big')
ptr += 2
for j in range(count2):
second_id = dat[ptr:ptr+32]
ptr += 32
results[first_id].append(second_id)
return results
[docs]class BBcAppClient:
"""Basic functions for a client of bbc_core"""
def __init__(self, host='127.0.0.1', port=DEFAULT_CORE_PORT, multiq=True, logname="-", loglevel="none"):
self.logger = logger.get_logger(key="bbc_app", level=loglevel, logname=logname)
self.connection = socket.create_connection((host, port))
self.callback = Callback(log=self.logger)
self.callback.set_client(self)
self.keypair = None
self.node_keypair = None
self.use_query_id_based_message_wait = multiq
self.user_id = None
self.domain_id = None
self.query_id = (0).to_bytes(2, 'little')
self.privatekey_for_ecdh = None
self.aes_key_name = None
self.is_secure_connection = False
self.cross_ref_list = list()
self.start_receiver_loop()
[docs] def set_callback(self, callback_obj):
"""Set callback object that implements message processing functions
Args:
callback_obj (obj): callback method object
"""
self.callback = callback_obj
self.callback.set_logger(self.logger)
self.callback.set_client(self)
[docs] def set_domain_id(self, domain_id):
"""Set domain_id to this client to include it in all messages
Args:
domain_id (bytes): domain_id to join in
"""
self.domain_id = domain_id
[docs] def set_user_id(self, identifier):
"""Set user_id of the object
Args:
identifier (bytes): user_id of this clients
"""
self.user_id = identifier
[docs] def set_keypair(self, keypair):
"""Set keypair for the user
Args:
keypair (KeyPair): KeyPair object for signing
"""
self.keypair = keypair
[docs] def set_node_key(self, pem_file=None):
"""Set node_key to this client
Args:
pem_file (str): path string for the pem file
"""
if pem_file is None:
self.node_keypair = None
try:
self.node_keypair = bbclib.KeyPair()
with open(pem_file, "r") as f:
self.node_keypair.mk_keyobj_from_private_key_pem(f.read())
except:
return
[docs] def include_admin_info(self, dat, admin_info, keypair):
if keypair is not None:
dat[KeyType.admin_info] = message_key_types.make_TLV_formatted_message(admin_info)
digest = hashlib.sha256(dat[KeyType.admin_info]).digest()
dat[KeyType.nodekey_signature] = keypair.sign(digest)
else:
dat.update(admin_info)
def _make_message_structure(self, cmd):
"""Make a base message structure for sending to the core node
Args:
cmd (bytes): command type defined in bbclib.MsgType class
"""
self.query_id = ((int.from_bytes(self.query_id, 'little') + 1) % 65536).to_bytes(2, 'little')
msg = {
KeyType.command: cmd,
KeyType.domain_id: self.domain_id,
KeyType.source_user_id: self.user_id,
KeyType.status: ESUCCESS,
}
if cmd not in MESSAGE_WITH_NO_RESPONSE:
msg[KeyType.query_id] = self.query_id
if self.use_query_id_based_message_wait:
if self.query_id not in self.callback.query_queue:
self.callback.create_queue(self.query_id)
return msg
def _send_msg(self, dat):
"""Send the message to the core node
Args:
dat (dict): message object to send
Returns:
bytes: query ID for request/response type message
"""
if KeyType.domain_id not in dat or KeyType.source_user_id not in dat:
self.logger.warn("Message must include domain_id and source_id")
return None
try:
if self.is_secure_connection:
msg = message_key_types.make_message(PayloadType.Type_encrypted_msgpack, dat, key_name=self.aes_key_name)
else:
msg = message_key_types.make_message(PayloadType.Type_msgpack, dat)
self.connection.sendall(msg)
except Exception as e:
self.logger.error(traceback.format_exc())
return None
return self.query_id
[docs] def exchange_key(self):
"""Perform ECDH (key exchange algorithm)
Returns:
bytes: query_id
"""
if self.domain_id is None:
self.logger.error("Need to set domain first!")
return None
dat = self._make_message_structure(MsgType.REQUEST_ECDH_KEY_EXCHANGE)
self.privatekey_for_ecdh, dat[KeyType.ecdh], self.aes_key_name = message_key_types.get_ECDH_parameters()
dat[KeyType.nonce] = os.urandom(16)
dat[KeyType.hint] = self.aes_key_name
dat[KeyType.random] = os.urandom(8)
return self._send_msg(dat)
[docs] def domain_setup(self, domain_id, config=None):
"""Set up domain with the specified network module and storage
This method should be used by a system administrator.
Args:
domain_id (bytes): domain_id to create
config (str): system config in json format
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_SETUP_DOMAIN)
admin_info = {
KeyType.domain_id: domain_id,
KeyType.random: bbclib.get_random_value(32)
}
if config is not None:
admin_info[KeyType.bbc_configuration] = config
self.include_admin_info(dat, admin_info, self.node_keypair)
return self._send_msg(dat)
[docs] def domain_close(self, domain_id=None):
"""Close domain leading to remove_domain in the core
Args:
domain_id (bytes): domain_id to delete
Returns:
bytes: query_id
"""
if domain_id is None and self.domain_id is not None:
domain_id = self.domain_id
if domain_id is None:
return None
dat = self._make_message_structure(MsgType.REQUEST_CLOSE_DOMAIN)
admin_info = {
KeyType.domain_id: domain_id,
KeyType.random: bbclib.get_random_value(32)
}
self.include_admin_info(dat, admin_info, self.node_keypair)
return self._send_msg(dat)
[docs] def get_node_id(self):
"""Get node_id of the connecting core node
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_GET_NODEID)
return self._send_msg(dat)
[docs] def get_domain_neighborlist(self, domain_id):
"""Get peer list of the domain from the core node
This method should be used by a system administrator.
Args:
domain_id (bytes): domain_id of the neighbor list
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_GET_NEIGHBORLIST)
dat[KeyType.domain_id] = domain_id
admin_info = {
KeyType.random: bbclib.get_random_value(32)
}
self.include_admin_info(dat, admin_info, self.node_keypair)
return self._send_msg(dat)
[docs] def set_domain_static_node(self, domain_id, node_id, ipv4, ipv6, port):
"""Set static node to the core node
IPv6 is used for socket communication if both IPv4 and IPv6 is specified.
This method should be used by a system administrator.
Args:
domain_id (bytes): target domain_id to set static neighbor entry
node_id (bytes): node_id to register
ipv4 (str): IPv4 address of the node
ipv6 (str): IPv6 address of the node
port (int): Port number to wait messages (UDP/TCP)
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_SET_STATIC_NODE)
dat[KeyType.domain_id] = domain_id
admin_info = {
KeyType.node_info: [node_id, ipv4, ipv6, port]
}
self.include_admin_info(dat, admin_info, self.node_keypair)
return self._send_msg(dat)
[docs] def send_domain_ping(self, domain_id, ipv4=None, ipv6=None, port=DEFAULT_P2P_PORT):
""" Send domain ping to notify the existence of the node
This method should be used by a system administrator.
Args:
domain_id (bytes): target domain_id to send ping
ipv4 (str): IPv4 address of the node
ipv6 (str): IPv6 address of the node
port (int): Port number to wait messages UDP
Returns:
bytes: query_id
"""
if ipv4 is None and ipv6 is None:
return
dat = self._make_message_structure(MsgType.DOMAIN_PING)
dat[KeyType.domain_id] = domain_id
admin_info = dict()
if ipv4 is not None and ipv4 != "0.0.0.0":
admin_info[KeyType.ipv4_address] = ipv4
if ipv6 is not None and ipv6 != "::":
admin_info[KeyType.ipv6_address] = ipv6
admin_info[KeyType.port_number] = port
admin_info[KeyType.static_entry] = True
self.include_admin_info(dat, admin_info, self.node_keypair)
return self._send_msg(dat)
[docs] def get_bbc_config(self):
"""Get config file of bbc_core
This method should be used by a system administrator.
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_GET_CONFIG)
admin_info = {
KeyType.random: bbclib.get_random_value(32)
}
self.include_admin_info(dat, admin_info, self.node_keypair)
return self._send_msg(dat)
[docs] def notify_domain_key_update(self):
"""Notify update of bbc_core
This method should be used by a system administrator.
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.NOTIFY_DOMAIN_KEY_UPDATE)
admin_info = {
KeyType.random: bbclib.get_random_value(32)
}
self.include_admin_info(dat, admin_info, self.node_keypair)
return self._send_msg(dat)
[docs] def get_domain_list(self):
"""Get domain_id list in bbc_core
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_GET_DOMAINLIST)
admin_info = {
KeyType.random: bbclib.get_random_value(32)
}
self.include_admin_info(dat, admin_info, self.node_keypair)
return self._send_msg(dat)
[docs] def get_user_list(self):
"""Get user_ids in the domain that are connecting to the core node
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_GET_USERS)
admin_info = {
KeyType.random: bbclib.get_random_value(32)
}
self.include_admin_info(dat, admin_info, self.node_keypair)
return self._send_msg(dat)
[docs] def get_forwarding_list(self):
"""Get forwarding_list of the domain in the core node
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_GET_FORWARDING_LIST)
admin_info = {
KeyType.random: bbclib.get_random_value(32)
}
self.include_admin_info(dat, admin_info, self.node_keypair)
return self._send_msg(dat)
[docs] def get_notification_list(self):
"""Get notification_list of the core node
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_GET_NOTIFICATION_LIST)
admin_info = {
KeyType.random: bbclib.get_random_value(32)
}
self.include_admin_info(dat, admin_info, self.node_keypair)
return self._send_msg(dat)
[docs] def manipulate_ledger_subsystem(self, enable=False, domain_id=None):
"""Start/stop ledger_subsystem on the bbc_core
This method should be used by a system administrator.
Args:
enable (bool): True->start, False->stop
domain_id (bytes): target domain_id to enable/disable ledger_subsystem
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_MANIP_LEDGER_SUBSYS)
dat[KeyType.domain_id] = domain_id
admin_info = {
KeyType.ledger_subsys_manip: enable,
KeyType.random: bbclib.get_random_value(32)
}
self.include_admin_info(dat, admin_info, self.node_keypair)
return self._send_msg(dat)
[docs] def register_to_core(self, on_multiple_nodes=False):
"""Register the client (user_id) to the core node
After that, the client can communicate with the core node.
Args:
on_multiple_nodes (bool): True if this user_id is for multicast address
Returns:
bool: True
"""
dat = self._make_message_structure(MsgType.REGISTER)
if on_multiple_nodes:
dat[KeyType.on_multinodes] = True
self._send_msg(dat)
return True
[docs] def unregister_from_core(self):
"""Unregister and disconnect from the core node
Returns:
bool: True
"""
dat = self._make_message_structure(MsgType.UNREGISTER)
self._send_msg(dat)
if self.aes_key_name is not None:
message_key_types.unset_cipher(self.aes_key_name)
self.privatekey_for_ecdh = None
self.aes_key_name = None
self.is_secure_connection = False
return True
[docs] def request_insert_completion_notification(self, asset_group_id):
"""Request notification when a transaction has been inserted (as a copy of transaction)
Args:
asset_group_id (bytes): asset_group_id for requesting notification about insertion
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_INSERT_NOTIFICATION)
dat[KeyType.asset_group_id] = asset_group_id
return self._send_msg(dat)
[docs] def cancel_insert_completion_notification(self, asset_group_id):
"""Cancel notification when a transaction has been inserted (as a copy of transaction)
Args:
asset_group_id (bytes): asset_group_id for requesting notification about insertion
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.CANCEL_INSERT_NOTIFICATION)
dat[KeyType.asset_group_id] = asset_group_id
return self._send_msg(dat)
[docs] def gather_signatures(self, txobj, reference_obj=None, asset_files=None, destinations=None, anycast=False):
"""Request to gather signatures from the specified user_ids
Args:
txobj (BBcTransaction):
reference_obj (BBcReference): BBcReference object that includes the information about destinations
asset_files (dict): mapping from asset_id to its file content
destinations (list): list of destination user_ids
anycast (bool): True if this message is for anycasting
Returns:
bytes: query_id
"""
if reference_obj is None and destinations is None:
return False
dat = self._make_message_structure(MsgType.REQUEST_GATHER_SIGNATURE)
dat[KeyType.transaction_data] = txobj.serialize()
dat[KeyType.transaction_id] = txobj.transaction_id
if anycast:
dat[KeyType.is_anycast] = True
if reference_obj is not None:
dat[KeyType.destination_user_ids] = reference_obj.get_destinations()
referred_transactions = dict()
referred_transactions.update(reference_obj.get_referred_transaction())
if len(referred_transactions) > 0:
dat[KeyType.transactions] = referred_transactions
elif destinations is not None:
dat[KeyType.destination_user_ids] = destinations
if isinstance(asset_files, dict):
dat[KeyType.all_asset_files] = asset_files
return self._send_msg(dat)
[docs] def sendback_signature(self, dest_user_id=None, transaction_id=None, ref_index=-1, signature=None, query_id=None):
"""Send back the signed transaction to the source
This method is called if the receiver (signer) approves the transaction.
Args:
dest_user_id (bytes): destination user_id to send back
transaction_id (bytes):
ref_index (int): (optional) which reference in transaction the signature is for
signature (BBcSignature): Signature that expresses approval of the transaction with transaction_id
query_id: The query_id that was in the received SIGN_REQUEST message
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.RESPONSE_SIGNATURE)
dat[KeyType.destination_user_id] = dest_user_id
dat[KeyType.transaction_id] = transaction_id
dat[KeyType.ref_index] = ref_index
if signature.format_type in [bbclib.BBcFormat.FORMAT_BSON, bbclib.BBcFormat.FORMAT_BSON_COMPRESS_BZ2]:
dat[KeyType.signature] = bson.dumps(signature.serialize())
dat[KeyType.transaction_data_format] = bbclib.BBcFormat.FORMAT_BSON
else:
dat[KeyType.signature] = signature.serialize()
dat[KeyType.transaction_data_format] = bbclib.BBcFormat.FORMAT_BINARY
if query_id is not None:
dat[KeyType.query_id] = query_id
return self._send_msg(dat)
[docs] def sendback_denial_of_sign(self, dest_user_id=None, transaction_id=None, reason_text=None, query_id=None):
"""Send back the denial of sign the transaction
This method is called if the receiver (signer) denies the transaction.
Args:
dest_user_id (bytes): destination user_id to send back
transaction_id (bytes):
reason_text (str): message to the requester about why the node denies the transaction
query_id: The query_id that was in the received SIGN_REQUEST message
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.RESPONSE_SIGNATURE)
dat[KeyType.destination_user_id] = dest_user_id
dat[KeyType.transaction_id] = transaction_id
dat[KeyType.status] = EOTHER
dat[KeyType.reason] = reason_text
if query_id is not None:
dat[KeyType.query_id] = query_id
return self._send_msg(dat)
[docs] def insert_transaction(self, tx_obj):
"""Request to insert a legitimate transaction
Args:
tx_obj (BBcTransaction): Transaction object to insert
Returns:
bytes: query_id
"""
if tx_obj.transaction_id is None:
tx_obj.digest()
dat = self._make_message_structure(MsgType.REQUEST_INSERT)
dat[KeyType.transaction_data] = tx_obj.serialize()
ast = dict()
for evt in tx_obj.events:
if evt.asset is None:
continue
asset_digest, content = evt.asset.get_asset_file()
if content is not None:
ast[evt.asset.asset_id] = content
for rtn in tx_obj.relations:
if rtn.asset is None:
continue
asset_digest, content = rtn.asset.get_asset_file()
if content is not None:
ast[rtn.asset.asset_id] = content
dat[KeyType.all_asset_files] = ast
return self._send_msg(dat)
[docs] def search_transaction_with_condition(self, asset_group_id=None, asset_id=None, user_id=None, direction=0, count=1):
"""Search transaction data by asset_group_id/asset_id/user_id
If multiple conditions are specified, they are considered as AND condition.
Args:
asset_group_id (bytes): asset_group_id in BBcEvent and BBcRelations
asset_id (bytes): asset_id in BBcAsset
user_id (bytes): user_id in BBcAsset that means the owner of the asset
direction (int): 0: descend, 1: ascend
count (int): the number of transactions to retrieve
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_SEARCH_WITH_CONDITIONS)
if asset_group_id is not None:
dat[KeyType.asset_group_id] = asset_group_id
if asset_id is not None:
dat[KeyType.asset_id] = asset_id
if user_id is not None:
dat[KeyType.user_id] = user_id
dat[KeyType.direction] = direction
dat[KeyType.count] = count
return self._send_msg(dat)
[docs] def search_transaction(self, transaction_id):
"""Search request for a transaction
Args:
transaction_id (bytes): the target transaction to retrieve
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_SEARCH_TRANSACTION)
dat[KeyType.transaction_id] = transaction_id
return self._send_msg(dat)
[docs] def count_transactions(self, asset_group_id=None, asset_id=None, user_id=None):
"""Count transactions that matches the given conditions
If multiple conditions are specified, they are considered as AND condition.
Args:
asset_group_id (bytes): asset_group_id in BBcEvent and BBcRelations
asset_id (bytes): asset_id in BBcAsset
user_id (bytes): user_id in BBcAsset that means the owner of the asset
Returns:
int: the number of transactions
"""
dat = self._make_message_structure(MsgType.REQUEST_COUNT_TRANSACTIONS)
if asset_group_id is not None:
dat[KeyType.asset_group_id] = asset_group_id
if asset_id is not None:
dat[KeyType.asset_id] = asset_id
if user_id is not None:
dat[KeyType.user_id] = user_id
return self._send_msg(dat)
[docs] def traverse_transactions(self, transaction_id, asset_group_id=None, user_id=None, direction=1, hop_count=3):
"""Search request for transactions
The method traverses the transaction graph in the ledger.
The response from the bbc_core includes the list of transactions.
Args:
transaction_id (bytes): the target transaction to retrieve
asset_group_id (bytes): asset_group_id that target transactions should have
user_id (bytes): user_id that target transactions should have
direction (int): 1:backforward, non-1:forward
hop_count (int): hop count to traverse from the specified origin point
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_TRAVERSE_TRANSACTIONS)
dat[KeyType.transaction_id] = transaction_id
if asset_group_id is not None:
dat[KeyType.asset_group_id] = asset_group_id
if user_id is not None:
dat[KeyType.user_id] = user_id
dat[KeyType.direction] = direction
dat[KeyType.hop_count] = hop_count
return self._send_msg(dat)
[docs] def request_to_repair_transaction(self, transaction_id):
"""Request to repair compromised transaction data
Args:
transaction_id (bytes): the target transaction to repair
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_REPAIR)
dat[KeyType.transaction_id] = transaction_id
return self._send_msg(dat)
[docs] def request_to_repair_asset(self, asset_group_id, asset_id):
"""Request to repair compromised asset file
Args:
asset_group_id (bytes): the asset_group_id of the target asset
asset_id (bytes): the target asset_id
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_REPAIR)
dat[KeyType.asset_group_id] = asset_group_id
dat[KeyType.asset_id] = asset_id
return self._send_msg(dat)
[docs] def request_verify_by_cross_ref(self, transaction_id):
"""Request to verify the transaction by Cross_ref in transaction of outer domain
Args:
transaction_id (bytes): the target transaction_id
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_CROSS_REF_VERIFY)
dat[KeyType.transaction_id] = transaction_id
return self._send_msg(dat)
[docs] def request_cross_ref_holders_list(self):
"""Request the list of transaction_ids that are registered as cross_ref in outer domains
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_CROSS_REF_LIST)
# TODO: need to limit the number of entries??
return self._send_msg(dat)
[docs] def register_in_ledger_subsystem(self, asset_group_id, transaction_id):
"""Register transaction_id in the ledger_subsystem
Args:
asset_group_id (bytes):
transaction_id (bytes): the target transaction_id
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_REGISTER_HASH_IN_SUBSYS)
dat[KeyType.transaction_id] = transaction_id
dat[KeyType.asset_group_id] = asset_group_id
return self._send_msg(dat)
[docs] def verify_in_ledger_subsystem(self, asset_group_id, transaction_id):
"""Verify transaction_id in the ledger_subsystem
Args:
asset_group_id (bytes):
transaction_id (bytes): the target transaction_id
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_VERIFY_HASH_IN_SUBSYS)
dat[KeyType.transaction_id] = transaction_id
dat[KeyType.asset_group_id] = asset_group_id
return self._send_msg(dat)
[docs] def get_stats(self):
"""Get statistics of bbc_core
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.REQUEST_GET_STATS)
admin_info = {
KeyType.random: bbclib.get_random_value(32)
}
self.include_admin_info(dat, admin_info, self.node_keypair)
return self._send_msg(dat)
[docs] def send_message(self, msg, dst_user_id, is_anycast=False):
"""Send a message to the specified user_id
Args:
msg (dict): message to send
dst_user_id (bytes): destination user_id
is_anycast (bool): If true, the message is treated as an anycast message.
Returns:
bytes: query_id
"""
dat = self._make_message_structure(MsgType.MESSAGE)
dat[KeyType.destination_user_id] = dst_user_id
dat[KeyType.message] = msg
if is_anycast:
dat[KeyType.is_anycast] = True
return self._send_msg(dat)
[docs] def start_receiver_loop(self):
jobs = [gevent.spawn(self.receiver_loop)]
#gevent.joinall(jobs)
[docs] def receiver_loop(self):
msg_parser = message_key_types.Message()
try:
while True:
buf = self.connection.recv(8192)
if len(buf) == 0:
break
msg_parser.recv(buf)
while True:
msg = msg_parser.parse()
if msg is None:
break
self.callback.dispatch(msg, msg_parser.payload_type)
except Exception as e:
self.logger.info("TCP disconnect: %s" % e)
print(traceback.format_exc())
self.connection.close()
[docs] def include_cross_ref(self, txobj):
"""Include BBcCrossRef from other domains in the transaction
If the client object has one or more cross_ref objects, one of them is included in the given transaction.
This method should be voluntarily called for inter-domain weak collaboration.
Args:
txobj (BBcTransaction): Transaction object to include cross_ref
"""
if len(self.cross_ref_list) > 0:
txobj.add(cross_ref=self.cross_ref_list.pop(0))
txobj.cross_ref.format_type = txobj.format_type
[docs]class Callback:
"""Set of callback functions for processing received message
If you want to implement your own way to process messages, inherit this class.
"""
def __init__(self, log=None):
self.logger = log
self.client = None
self.queue = queue.Queue()
self.query_queue = dict()
[docs] def set_logger(self, log):
self.logger = log
[docs] def set_client(self, client):
self.client = client
[docs] def create_queue(self, query_id):
self.query_queue.setdefault(query_id, queue.Queue())
[docs] def get_from_queue(self, query_id, timeout=None, no_delete=False):
msg = self.query_queue[query_id].get(timeout=timeout)
if not no_delete:
del self.query_queue[query_id]
return msg
[docs] def dispatch(self, dat, payload_type):
#self.logger.debug("Received: %s" % dat)
if KeyType.command not in dat:
self.logger.warn("No command exists")
return
if KeyType.query_id in dat and dat[KeyType.query_id] in self.query_queue:
self.query_queue[dat[KeyType.query_id]].put(dat)
return
if dat[KeyType.command] == MsgType.RESPONSE_SEARCH_TRANSACTION:
self.proc_resp_search_transaction(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_SEARCH_WITH_CONDITIONS:
self.proc_resp_search_with_condition(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_COUNT_TRANSACTIONS:
self.proc_resp_count_transactions(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_TRAVERSE_TRANSACTIONS:
self.proc_resp_traverse_transactions(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_GATHER_SIGNATURE:
self.proc_resp_gather_signature(dat)
elif dat[KeyType.command] == MsgType.REQUEST_SIGNATURE:
self.proc_cmd_sign_request(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_SIGNATURE:
self.proc_resp_sign_request(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_INSERT:
self.proc_resp_insert(dat)
elif dat[KeyType.command] == MsgType.NOTIFY_INSERTED:
self.proc_notify_inserted(dat)
elif dat[KeyType.command] == MsgType.NOTIFY_CROSS_REF:
self.proc_notify_cross_ref(dat)
elif dat[KeyType.command] == MsgType.MESSAGE:
self.proc_user_message(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_CROSS_REF_VERIFY:
self.proc_resp_verify_cross_ref(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_CROSS_REF_LIST:
self.proc_resp_cross_ref_list(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_REGISTER_HASH_IN_SUBSYS:
self.proc_resp_register_hash(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_VERIFY_HASH_IN_SUBSYS:
self.proc_resp_verify_hash(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_GET_STATS:
self.proc_resp_get_stats(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_GET_NEIGHBORLIST:
self.proc_resp_get_neighborlist(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_GET_DOMAINLIST:
self.proc_resp_get_domainlist(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_GET_USERS:
self.proc_resp_get_userlist(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_GET_FORWARDING_LIST:
self.proc_resp_get_forwardinglist(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_GET_NOTIFICATION_LIST:
self.proc_resp_get_notificationlist(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_GET_NODEID:
self.proc_resp_get_node_id(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_GET_CONFIG:
self.proc_resp_get_config(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_MANIP_LEDGER_SUBSYS:
self.proc_resp_ledger_subsystem(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_SETUP_DOMAIN:
self.proc_resp_domain_setup(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_SET_STATIC_NODE:
self.proc_resp_set_neighbor(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_CLOSE_DOMAIN:
self.proc_resp_domain_close(dat)
elif dat[KeyType.command] == MsgType.RESPONSE_ECDH_KEY_EXCHANGE:
self.proc_resp_ecdh_key_exchange(dat)
else:
self.logger.warn("No method to process for command=%d" % dat[KeyType.command])
[docs] def synchronize(self, timeout=None):
"""Wait for receiving message with a common queue
Args:
timeout (int): timeout for waiting a message in seconds
Returns:
dict: a received message
"""
try:
return self.queue.get(timeout=timeout)
except:
return None
[docs] def sync_by_queryid(self, query_id, timeout=None, no_delete_q=False):
"""Wait for the message with specified query_id
This method creates a queue for the query_id and waits for the response
Args:
query_id (byte): timeout for waiting a message in seconds
timeout (int): timeout for waiting a message in seconds
no_delete_q (bool): If True, the queue for the query_id remains after popping a message
Returns:
dict: a received message
"""
try:
if query_id not in self.query_queue:
self.create_queue(query_id)
return self.get_from_queue(query_id, timeout=timeout)
except:
return None
[docs] def proc_notify_cross_ref(self, dat):
"""Callback for message NOTIFY_CROSS_REF
This method must not be overridden.
Args:
dat (dict): received message
"""
cross_ref = bbclib.BBcCrossRef(domain_id=dat[KeyType.cross_ref][0], transaction_id=dat[KeyType.cross_ref][1])
self.client.cross_ref_list.append(cross_ref)
[docs] def proc_cmd_sign_request(self, dat):
"""Callback for message REQUEST_SIGNATURE
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
self.queue.put(dat)
[docs] def proc_resp_sign_request(self, dat):
"""Callback for message RESPONSE_SIGNATURE
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
self.queue.put(dat)
[docs] def proc_resp_gather_signature(self, dat):
"""Callback for message RESPONSE_GATHER_SIGNATURE
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
if KeyType.status not in dat or dat[KeyType.status] < ESUCCESS:
self.queue.put(dat)
return
format_type = dat[KeyType.transaction_data_format]
if format_type in [bbclib.BBcFormat.FORMAT_BSON, bbclib.BBcFormat.FORMAT_BSON_COMPRESS_BZ2]:
sigdata = bson.loads(dat[KeyType.signature])
else:
sigdata = dat[KeyType.signature]
sig = bbclib.recover_signature_object(sigdata, format_type=format_type)
self.queue.put({KeyType.status: ESUCCESS, KeyType.result: (dat[KeyType.ref_index], dat[KeyType.source_user_id], sig)})
[docs] def proc_resp_insert(self, dat):
"""Callback for message RESPONSE_INSERT
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
self.queue.put(dat)
[docs] def proc_notify_inserted(self, dat):
"""Callback for message NOTIFY_INSERTED
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
self.queue.put(dat)
[docs] def proc_resp_search_with_condition(self, dat):
"""Callback for message RESPONSE_SEARCH_WITH_CONDITIONS
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
self.queue.put(dat)
[docs] def proc_resp_search_transaction(self, dat):
"""Callback for message RESPONSE_SEARCH_TRANSACTION
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
self.queue.put(dat)
[docs] def proc_resp_count_transactions(self, dat):
"""Callback for message RESPONSE_COUNT_TRANSACTIONS
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
self.queue.put(dat)
[docs] def proc_resp_traverse_transactions(self, dat):
"""Callback for message RESPONSE_TRAVERSE_TRANSACTIONS
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
self.queue.put(dat)
[docs] def proc_user_message(self, dat):
"""Callback for message MESSAGE
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
self.queue.put(dat)
[docs] def proc_resp_verify_cross_ref(self, dat):
"""Callback for message RESPONSE_CROSS_REF_VERIFY
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
self.queue.put(dat)
[docs] def proc_resp_cross_ref_list(self, dat):
"""Callback for message RESPONSE_CROSS_REF_LIST
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
self.queue.put(dat)
[docs] def proc_resp_ledger_subsystem(self, dat):
"""Callback for message RESPONSE_MANIP_LEDGER_SUBSYS
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
self.queue.put(dat)
[docs] def proc_resp_register_hash(self, dat):
"""Callback for message RESPONSE_REGISTER_HASH_IN_SUBSYS
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
self.queue.put(dat)
[docs] def proc_resp_verify_hash(self, dat):
"""Callback for message RESPONSE_VERIFY_HASH_IN_SUBSYS
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
self.queue.put(dat)
[docs] def proc_resp_domain_setup(self, dat):
"""Callback for message RESPONSE_SETUP_DOMAIN
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
self.queue.put(dat)
[docs] def proc_resp_domain_close(self, dat):
"""Callback for message RESPONSE_CLOSE_DOMAIN
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
self.queue.put(dat)
[docs] def proc_resp_set_neighbor(self, dat):
"""Callback for message RESPONSE_SET_STATIC_NODE
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
self.queue.put(dat)
[docs] def proc_resp_get_config(self, dat):
"""Callback for message RESPONSE_GET_CONFIG
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
self.queue.put(dat)
[docs] def proc_resp_get_neighborlist(self, dat):
"""Callback for message RESPONSE_GET_NEIGHBORLIST
List of neighbor node info (the first one is that of the connecting core) is queued rather than message itself.
This method must not be overridden.
Args:
dat (dict): received message
"""
if KeyType.neighbor_list not in dat:
self.queue.put(None)
return
neighbor_list = dat[KeyType.neighbor_list]
results = []
count = int.from_bytes(neighbor_list[:4], 'big')
for i in range(count):
base = 4 + i*(32+4+16+2+1+8)
node_id = neighbor_list[base:base+32]
ipv4 = socket.inet_ntop(socket.AF_INET, neighbor_list[base + 32:base + 36])
ipv6 = socket.inet_ntop(socket.AF_INET6, neighbor_list[base + 36:base + 52])
port = socket.ntohs(int.from_bytes(neighbor_list[base + 52:base + 54], 'big'))
domain0 = True if neighbor_list[base + 54] == 0x01 else False
updated_at = neighbor_list[base+55:base+63]
results.append([node_id, ipv4, ipv6, port, domain0])
self.queue.put(results)
[docs] def proc_resp_get_domainlist(self, dat):
"""Callback for message RESPONSE_GET_DOMAINLIST
List of domain_ids is queued rather than message itself.
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
if KeyType.domain_list not in dat:
self.queue.put(None)
return
self.queue.put(_parse_one_level_list(dat[KeyType.domain_list]))
[docs] def proc_resp_get_userlist(self, dat):
"""Callback for message RESPONSE_GET_USERS
List of user_ids is queued rather than message itself.
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
if KeyType.user_list not in dat:
self.queue.put(None)
return
self.queue.put(_parse_one_level_list(dat[KeyType.user_list]))
[docs] def proc_resp_get_forwardinglist(self, dat):
"""Callback for message RESPONSE_GET_FORWARDING_LIST
List of user_ids in other core nodes is queued rather than message itself.
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
if KeyType.forwarding_list not in dat:
self.queue.put(None)
return
self.queue.put(_parse_two_level_dict(dat[KeyType.forwarding_list]))
[docs] def proc_resp_get_notificationlist(self, dat):
"""Callback for message RESPONSE_GET_NOTIFICATION_LIST
List of user_ids in other core nodes is queued rather than message itself.
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
if KeyType.notification_list not in dat:
self.queue.put(None)
return
self.queue.put(_parse_two_level_dict(dat[KeyType.notification_list]))
[docs] def proc_resp_get_node_id(self, dat):
"""Callback for message RESPONSE_GET_NODEID
Node_id is queued rather than message itself.
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
if KeyType.node_id not in dat:
self.queue.put(dat)
return
self.queue.put(dat[KeyType.node_id])
[docs] def proc_resp_get_stats(self, dat):
"""Callback for message RESPONSE_GET_STATS
This method should be overridden if you want to process the message asynchronously.
Args:
dat (dict): received message
"""
self.queue.put(dat)
[docs] def proc_resp_ecdh_key_exchange(self, dat):
"""Callback for message RESPONSE_ECDH_KEY_EXCHANGE
This method must not be overridden.
Args:
dat (dict): received message
"""
shared_key = message_key_types.derive_shared_key(self.client.privatekey_for_ecdh,
dat[KeyType.ecdh], dat[KeyType.random])
message_key_types.set_cipher(shared_key, dat[KeyType.nonce], self.client.aes_key_name, dat[KeyType.hint])
self.client.is_secure_connection = False
self.queue.put(True)