# -*- coding: utf-8 -*-
"""
Copyright (c) 2017 beyond-blockchain.org.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from gevent import monkey
monkey.patch_all()
import gevent
import socket
import select
import threading
import random
import binascii
import hashlib
import time
import os
import sys
sys.path.extend(["../../", os.path.abspath(os.path.dirname(__file__))])
from bbc1.core.bbc_config import DEFAULT_P2P_PORT
from bbc1.core.key_exchange_manager import KeyExchangeManager
from bbc1.core.topology_manager import TopologyManagerBase
from bbc1.core.user_message_routing import UserMessageRouting
from bbc1.core.data_handler import DataHandler, DataHandlerDomain0
from bbc1.core.repair_manager import RepairManager
from bbc1.core.domain0_manager import Domain0Manager
from bbc1.core import query_management, message_key_types, logger
import bbclib
from bbc1.core.message_key_types import to_2byte, PayloadType, KeyType, InfraMessageCategory
TCP_THRESHOLD_SIZE = 1300
ZEROS = bytes([0] * 32)
NUM_CROSS_REF_COPY = 2
DURATION_GIVEUP_PUT = 30
INTERVAL_RETRY = 3
GET_RETRY_COUNT = 5
ROUTE_RETRY_COUNT = 1
REFRESH_INTERVAL = 1800 # not sure whether it's appropriate
ALIVE_CHECK_PING_WAIT = 2
ticker = query_management.get_ticker()
def _check_my_IPaddresses(target4='8.8.8.8', target6='2001:4860:4860::8888', port=80):
"""Check IP address by sending DNS query"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect((target4, port))
ip4 = s.getsockname()[0]
s.close()
except OSError:
ip4 = "127.0.0.1"
if socket.has_ipv6:
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
s.connect((target6, port))
ip6 = s.getsockname()[0]
s.close()
except OSError:
ip6 = None
return ip4, ip6
def _send_data_by_tcp(ipv4=None, ipv6=None, port=DEFAULT_P2P_PORT, msg=None):
"""Establish TCP connection and send data"""
def worker():
if ipv6 is not None and ipv6 != "::":
conn = socket.create_connection((ipv6, port))
elif ipv4 is not None and ipv4 != "0.0.0.0":
conn = socket.create_connection((ipv4, port))
else:
return
conn.sendall(msg)
conn.close()
gevent.spawn(worker)
def _convert_to_string(array):
"""Data convert utility"""
for i in range(len(array)):
if isinstance(array[i], bytes):
array[i] = array[i].decode()
return array
[docs]def is_less_than(val_a, val_b):
"""Return True if val_a is less than val_b (evaluate as integer)"""
size = len(val_a)
if size != len(val_b):
return False
for i in reversed(range(size)):
if val_a[i] < val_b[i]:
return True
elif val_a[i] > val_b[i]:
return False
return False
[docs]class BBcNetwork:
"""Socket and thread management for infrastructure layers"""
NOTIFY_LEAVE = to_2byte(0)
REQUEST_KEY_EXCHANGE = to_2byte(1)
RESPONSE_KEY_EXCHANGE = to_2byte(2)
CONFIRM_KEY_EXCHANGE = to_2byte(3)
def __init__(self, config, core=None, p2p_port=None, external_ip4addr=None, external_ip6addr=None,
loglevel="all", logname=None):
self.core = core
self.stats = core.stats
self.logger = logger.get_logger(key="bbc_network", level=loglevel, logname=logname)
self.logname = logname
self.loglevel = loglevel
self.config = config
self.domain0manager = None
conf = self.config.get_config()
self.domains = dict()
self.ip_address, self.ip6_address = _check_my_IPaddresses()
if external_ip4addr is not None:
self.external_ip4addr = external_ip4addr
else:
self.external_ip4addr = self.ip_address
if external_ip6addr is not None:
self.external_ip6addr = external_ip6addr
else:
self.external_ip6addr = self.ip6_address
if p2p_port is not None:
conf['network']['p2p_port'] = p2p_port
self.config.update_config()
self.port = conf['network']['p2p_port']
self.socket_udp = None
self.socket_udp6 = None
if not self.setup_udp_socket():
self.logger.error("** Fail to setup UDP socket **")
return
self.listen_socket = None
self.listen_socket6 = None
self.max_connections = conf['network']['max_connections']
if not self.setup_tcp_server():
self.logger.error("** Fail to setup TCP server **")
return
def _get_my_nodeinfo(self, node_id):
"""Return NodeInfo
Args:
node_id (bytes): my node_id
Returns:
NodeInfo: my NodeInfo
"""
ipv4 = self.ip_address
if self.external_ip4addr is not None:
ipv4 = self.external_ip4addr
if ipv4 is None or len(ipv4) == 0:
ipv4 = "0.0.0.0"
ipv6 = self.ip6_address
if self.external_ip6addr is not None:
ipv6 = self.external_ip6addr
if ipv6 is None or len(ipv6) == 0:
ipv6 = "::"
domain0 = True if self.domain0manager is not None else False
return NodeInfo(node_id=node_id, ipv4=ipv4, ipv6=ipv6, port=self.port, domain0=domain0)
[docs] def include_admin_info_into_message_if_needed(self, domain_id, msg, admin_info):
"""Serialize admin info into one binary object and add signature"""
admin_info[KeyType.message_seq] = self.domains[domain_id]["neighbor"].admin_sequence_number + 1
self.domains[domain_id]["neighbor"].admin_sequence_number += 1
if "keypair" in self.domains[domain_id] and self.domains[domain_id]["keypair"] is not None:
msg[KeyType.admin_info] = message_key_types.make_TLV_formatted_message(admin_info)
digest = hashlib.sha256(msg[KeyType.admin_info]).digest()
msg[KeyType.nodekey_signature] = self.domains[domain_id]["keypair"]['keys'][0].sign(digest)
else:
msg.update(admin_info)
[docs] def send_key_exchange_message(self, domain_id, node_id, command, pubkey, nonce, random_val, key_name):
"""Send ECDH key exchange message"""
if command == "request":
command = BBcNetwork.REQUEST_KEY_EXCHANGE
elif command == "response":
command = BBcNetwork.RESPONSE_KEY_EXCHANGE
else:
command = BBcNetwork.CONFIRM_KEY_EXCHANGE
msg = {
KeyType.infra_msg_type: InfraMessageCategory.CATEGORY_NETWORK,
KeyType.domain_id: domain_id,
KeyType.destination_node_id: node_id,
}
admin_info = {
KeyType.destination_node_id: node_id, # To defend from replay attack
KeyType.command: command,
KeyType.ecdh: pubkey,
KeyType.nonce: nonce,
KeyType.random: random_val,
KeyType.hint: key_name,
}
self.include_admin_info_into_message_if_needed(domain_id, msg, admin_info)
return self.send_message_in_network(None, PayloadType.Type_msgpack, domain_id, msg)
[docs] def create_domain(self, domain_id=ZEROS, config=None):
"""Create domain and register user in the domain
Args:
domain_id (bytes): target domain_id to create
config (dict): configuration for the domain
Returns:
bool:
"""
if domain_id in self.domains:
return False
conf = self.config.get_domain_config(domain_id, create_if_new=True)
if config is not None:
conf.update(config)
if 'node_id' not in conf or conf['node_id'] == "":
node_id = bbclib.get_random_id()
conf['node_id'] = bbclib.convert_id_to_string(node_id)
self.config.update_config()
else:
node_id = bbclib.convert_idstring_to_bytes(conf.get('node_id'))
self.domains[domain_id] = dict()
self.domains[domain_id]['node_id'] = node_id
self.domains[domain_id]['name'] = node_id.hex()[:4]
self.domains[domain_id]['neighbor'] = NeighborInfo(network=self, domain_id=domain_id, node_id=node_id,
my_info=self._get_my_nodeinfo(node_id))
self.domains[domain_id]['topology'] = TopologyManagerBase(network=self, domain_id=domain_id, node_id=node_id,
logname=self.logname, loglevel=self.loglevel)
self.domains[domain_id]['user'] = UserMessageRouting(self, domain_id, logname=self.logname,
loglevel=self.loglevel)
self.get_domain_keypair(domain_id)
workingdir = self.config.get_config()['workingdir']
if domain_id == ZEROS:
self.domains[domain_id]['data'] = DataHandlerDomain0(self, domain_id=domain_id, logname=self.logname,
loglevel=self.loglevel)
self.domain0manager = Domain0Manager(self, node_id=node_id, logname=self.logname, loglevel=self.loglevel)
else:
self.domains[domain_id]['data'] = DataHandler(self, config=conf, workingdir=workingdir,
domain_id=domain_id, logname=self.logname,
loglevel=self.loglevel)
self.domains[domain_id]['repair'] = RepairManager(self, domain_id, workingdir=workingdir,
logname=self.logname, loglevel=self.loglevel)
if self.domain0manager is not None:
self.domain0manager.update_domain_belong_to()
for dm in self.domains.keys():
if dm != ZEROS:
self.domains[dm]['neighbor'].my_info.update(domain0=True)
self.domains[domain_id]['topology'].update_refresh_timer_entry(1)
self.stats.update_stats_increment("network", "num_domains", 1)
self.logger.info("Domain %s is created" % (domain_id.hex()))
return True
[docs] def remove_domain(self, domain_id=ZEROS):
"""Leave the domain and remove it
Args:
domain_id (bytes): target domain_id to remove
Returns:
bool: True if successful
"""
if domain_id not in self.domains:
return False
self.domains[domain_id]['topology'].stop_all_timers()
self.domains[domain_id]['user'].stop_all_timers()
self.domains[domain_id]['repair'].exit_loop()
for nd in self.domains[domain_id]["neighbor"].nodeinfo_list.values():
nd.key_manager.stop_all_timers()
msg = {
KeyType.infra_msg_type: InfraMessageCategory.CATEGORY_NETWORK,
KeyType.domain_id: domain_id,
KeyType.command: BBcNetwork.NOTIFY_LEAVE,
}
admin_info = {
KeyType.source_node_id: self.domains[domain_id]["neighbor"].my_node_id,
KeyType.nonce: bbclib.get_random_value(32) # just for randomization
}
self.include_admin_info_into_message_if_needed(domain_id, msg, admin_info)
self.broadcast_message_in_network(domain_id=domain_id, msg=msg)
if domain_id == ZEROS:
self.domain0manager.stop_all_timers()
for dm in self.domains.keys():
if dm != ZEROS:
self.domains[dm]['neighbor'].my_info.update(domain0=False)
self.domains[dm]['topology'].update_refresh_timer_entry(1)
del self.domains[domain_id]
if self.domain0manager is not None:
self.domain0manager.update_domain_belong_to()
self.config.remove_domain_config(domain_id)
self.stats.update_stats_decrement("network", "num_domains", 1)
self.logger.info("Domain %s is removed" % (domain_id.hex()))
return True
[docs] def save_all_static_node_list(self):
"""Save all static nodes in the config file"""
self.logger.info("Saving the neighbor list")
for domain_id in self.domains.keys():
conf = self.config.get_domain_config(domain_id)
conf['static_node'] = dict()
for node_id, nodeinfo in self.domains[domain_id]['neighbor'].nodeinfo_list.items():
if nodeinfo.is_static:
nid = bbclib.convert_id_to_string(node_id)
info = _convert_to_string([nodeinfo.ipv4, nodeinfo.ipv6, nodeinfo.port])
conf['static_node'][nid] = info
self.config.update_config()
self.logger.info("Done...")
[docs] def send_message_to_a_domain0_manager(self, domain_id, msg):
"""Choose one of domain0_managers and send msg to it
Args:
domain_id (bytes): target domain_id
msg (bytes): message to send
"""
if domain_id not in self.domains:
return
managers = tuple(filter(lambda nd: nd.is_domain0_node,
self.domains[domain_id]['neighbor'].nodeinfo_list.values()))
if len(managers) == 0:
return
dst_manager = random.choice(managers)
msg[KeyType.destination_node_id] = dst_manager.node_id
msg[KeyType.infra_msg_type] = InfraMessageCategory.CATEGORY_DOMAIN0
self.send_message_in_network(dst_manager, PayloadType.Type_msgpack, domain_id, msg)
[docs] def get_domain_keypair(self, domain_id):
"""Get domain_keys (private key and public key)
Args:
domain_id (bytes): target domain_id
"""
keyconfig = self.config.get_config().get('domain_key', None)
if keyconfig is None:
self.domains[domain_id]['keypair'] = None
return
if 'use' not in keyconfig or not keyconfig['use']:
return
if 'directory' not in keyconfig or not os.path.exists(keyconfig['directory']):
self.domains[domain_id]['keypair'] = None
return
domain_id_str = domain_id.hex()
keypair = bbclib.KeyPair()
try:
with open(os.path.join(keyconfig['directory'], domain_id_str+".pem"), "r") as f:
keypair.mk_keyobj_from_private_key_pem(f.read())
except:
self.domains[domain_id]['keypair'] = None
return
self.domains[domain_id].setdefault('keypair', dict())
self.domains[domain_id]['keypair'].setdefault('keys', list())
self.domains[domain_id]['keypair']['keys'].insert(0, keypair)
timer = self.domains[domain_id]['keypair'].setdefault('timer', None)
if timer is None or not timer.active:
self.domains[domain_id]['keypair']['timer'] = query_management.QueryEntry(
expire_after=keyconfig['obsolete_timeout'],
data={KeyType.domain_id: domain_id}, callback_expire=self._delete_obsoleted_domain_keys)
else:
timer.update_expiration_time(keyconfig['obsolete_timeout'])
self.domains[domain_id]['keypair']['keys'].insert(0, keypair)
def _delete_obsoleted_domain_keys(self, query_entry):
domain_id = query_entry.data[KeyType.domain_id]
if self.domains[domain_id]['keypair'] is not None and len(self.domains[domain_id]['keypair']['keys']) > 1:
del self.domains[domain_id]['keypair']['keys'][1:]
[docs] def send_domain_ping(self, domain_id, ipv4, ipv6, port, is_static=False):
"""Send domain ping to the specified node
Args:
domain_id (bytes): target domain_id
ipv4 (str): IPv4 address of the node
ipv6 (str): IPv6 address of the node
port (int): Port number
is_static (bool): If true, the entry is treated as static one and will be saved in config.json
Returns:
bool: True if successful
"""
if domain_id not in self.domains:
return False
if ipv4 is None and ipv6 is None:
return False
node_id = self.domains[domain_id]['neighbor'].my_node_id
nodeinfo = NodeInfo(ipv4=ipv4, ipv6=ipv6, port=port, is_static=is_static)
query_entry = query_management.QueryEntry(expire_after=10,
callback_error=self._domain_ping,
callback_expire=self._invalidate_neighbor,
data={KeyType.domain_id: domain_id,
KeyType.node_id: node_id,
KeyType.node_info: nodeinfo},
retry_count=3)
self._domain_ping(query_entry)
return True
def _domain_ping(self, query_entry):
"""Send domain ping"""
domain_id = query_entry.data[KeyType.domain_id]
msg = {
KeyType.infra_msg_type: InfraMessageCategory.CATEGORY_NETWORK,
KeyType.domain_id: domain_id,
}
admin_info = {
KeyType.node_id: query_entry.data[KeyType.node_id],
KeyType.domain_ping: 0,
KeyType.nonce: query_entry.nonce,
KeyType.static_entry: query_entry.data[KeyType.node_info].is_static,
}
if self.external_ip4addr is not None:
admin_info[KeyType.external_ip4addr] = self.external_ip4addr
else:
admin_info[KeyType.external_ip4addr] = self.ip_address
if self.external_ip6addr is not None:
admin_info[KeyType.external_ip6addr] = self.external_ip6addr
else:
admin_info[KeyType.external_ip6addr] = self.ip6_address
if query_entry.data[KeyType.node_info].ipv6 is not None:
self.logger.debug("Send domain_ping to %s:%d" % (query_entry.data[KeyType.node_info].ipv6,
query_entry.data[KeyType.node_info].port))
else:
self.logger.debug("Send domain_ping to %s:%d" % (query_entry.data[KeyType.node_info].ipv4,
query_entry.data[KeyType.node_info].port))
query_entry.update(fire_after=1)
self.stats.update_stats_increment("network", "domain_ping_send", 1)
self.include_admin_info_into_message_if_needed(domain_id, msg, admin_info)
self.send_message_in_network(query_entry.data[KeyType.node_info], PayloadType.Type_msgpack, domain_id, msg)
def _receive_domain_ping(self, domain_id, port, msg):
"""Process received domain_ping.
If KeyType.domain_ping value is 1, the sender of the ping is registered as static
Args:
domain_id (bytes): target domain_id
port (int): Port number
msg (dict): received message
"""
if KeyType.node_id not in msg:
return
self.stats.update_stats_increment("network", "domain_ping_receive", 1)
node_id = msg[KeyType.node_id]
ipv4 = msg.get(KeyType.external_ip4addr, None)
ipv6 = msg.get(KeyType.external_ip6addr, None)
is_static = msg.get(KeyType.static_entry, False)
self.logger.debug("Receive domain_ping for domain %s from %s" % (binascii.b2a_hex(domain_id[:4]), (ipv4, ipv6)))
self.logger.debug(msg)
if domain_id not in self.domains:
self.logger.debug("no domain_id")
return
if self.domains[domain_id]['neighbor'].my_node_id == node_id:
self.logger.debug("no other node_id")
return
self.add_neighbor(domain_id=domain_id, node_id=node_id, ipv4=ipv4, ipv6=ipv6, port=port, is_static=is_static)
self.stats.update_stats_increment("network", "domain_ping_received", 1)
if msg[KeyType.domain_ping] == 1:
query_entry = ticker.get_entry(msg[KeyType.nonce])
query_entry.deactivate()
else:
nonce = msg[KeyType.nonce]
msg = {
KeyType.infra_msg_type: InfraMessageCategory.CATEGORY_NETWORK,
KeyType.domain_id: domain_id,
}
admin_info = {
KeyType.node_id: self.domains[domain_id]['neighbor'].my_node_id,
KeyType.domain_ping: 1,
KeyType.nonce: nonce,
KeyType.static_entry: is_static,
}
if self.external_ip4addr is not None:
admin_info[KeyType.external_ip4addr] = self.external_ip4addr
else:
admin_info[KeyType.external_ip4addr] = self.ip_address
if self.external_ip6addr is not None:
admin_info[KeyType.external_ip6addr] = self.external_ip6addr
else:
admin_info[KeyType.external_ip6addr] = self.ip6_address
self.include_admin_info_into_message_if_needed(domain_id, msg, admin_info)
nodeinfo = NodeInfo(ipv4=ipv4, ipv6=ipv6, port=port)
self.send_message_in_network(nodeinfo, PayloadType.Type_msgpack, domain_id, msg)
def _invalidate_neighbor(self, query_entry):
"""Set the flag of the nodeinfo false"""
domain_id = query_entry.data[KeyType.domain_id]
node_id = query_entry.data[KeyType.node_id]
try:
self.domains[domain_id]['neighbor'].nodelist[node_id].is_alive = False
except:
pass
[docs] def send_message_in_network(self, nodeinfo=None, payload_type=PayloadType.Type_any, domain_id=None, msg=None):
"""Send message over a domain network
Args:
nodeinfo (NodeInfo): NodeInfo object of the destination
payload_type (bytes): message format type
domain_id (bytes): target domain_id
msg (dict): message to send
Returns:
bool: True if successful
"""
if nodeinfo is None:
if domain_id not in self.domains:
return False
if msg[KeyType.destination_node_id] not in self.domains[domain_id]['neighbor'].nodeinfo_list:
return False
nodeinfo = self.domains[domain_id]['neighbor'].nodeinfo_list[msg[KeyType.destination_node_id]]
msg[KeyType.source_node_id] = self.domains[domain_id]['neighbor'].my_node_id
if payload_type == PayloadType.Type_any:
if nodeinfo.key_manager is not None and nodeinfo.key_manager.key_name is not None and \
nodeinfo.key_manager.key_name in message_key_types.encryptors:
payload_type = PayloadType.Type_encrypted_msgpack
else:
payload_type = PayloadType.Type_msgpack
if payload_type in [PayloadType.Type_msgpack, PayloadType.Type_binary]:
data_to_send = message_key_types.make_message(payload_type, msg)
elif payload_type == PayloadType.Type_encrypted_msgpack:
payload_type = PayloadType.Type_encrypted_msgpack
data_to_send = message_key_types.make_message(payload_type, msg, key_name=nodeinfo.key_manager.key_name)
if data_to_send is None:
self.logger.error("Fail to encrypt message")
return False
else:
return False
if len(data_to_send) > TCP_THRESHOLD_SIZE:
_send_data_by_tcp(ipv4=nodeinfo.ipv4, ipv6=nodeinfo.ipv6, port=nodeinfo.port, msg=data_to_send)
self.stats.update_stats_increment("network", "send_msg_by_tcp", 1)
self.stats.update_stats_increment("network", "sent_data_size", len(data_to_send))
return True
if nodeinfo.ipv6 is not None and self.socket_udp6 is not None:
self.socket_udp6.sendto(data_to_send, (nodeinfo.ipv6, nodeinfo.port))
self.stats.update_stats_increment("network", "send_msg_by_udp6", 1)
self.stats.update_stats_increment("network", "sent_data_size", len(data_to_send))
return True
if nodeinfo.ipv4 is not None and self.socket_udp is not None:
self.socket_udp.sendto(data_to_send, (nodeinfo.ipv4, nodeinfo.port))
self.stats.update_stats_increment("network", "send_msg_by_udp4", 1)
self.stats.update_stats_increment("network", "sent_data_size", len(data_to_send))
return True
[docs] def broadcast_message_in_network(self, domain_id, payload_type=PayloadType.Type_any, msg=None):
"""Send message to all neighbor nodes
Args:
payload_type (bytes): message format type
domain_id (bytes): target domain_id
msg (dict): message to send
Returns:
bool: True if successful
"""
if domain_id not in self.domains:
return
for node_id, nodeinfo in self.domains[domain_id]['neighbor'].nodeinfo_list.items():
msg[KeyType.destination_node_id] = node_id
#print("broadcast:", node_id.hex(), node_id)
self.send_message_in_network(nodeinfo, payload_type, domain_id, msg)
[docs] def add_neighbor(self, domain_id, node_id, ipv4=None, ipv6=None, port=None, is_static=False):
"""Add node in the neighbor list
Args:
domain_id (bytes): target domain_id
node_id (bytes): target node_id
ipv4 (str): IPv4 address of the node
ipv6 (str): IPv6 address of the node
port (int): Port number that the node is waiting at
is_static (bool): If true, the entry is treated as static one and will be saved in config.json
Returns:
bool: True if it is a new entry, None if error.
"""
if domain_id not in self.domains or self.domains[domain_id]['neighbor'].my_node_id == node_id or port is None:
return None
is_new = self.domains[domain_id]['neighbor'].add(node_id=node_id, ipv4=ipv4, ipv6=ipv6, port=port, is_static=is_static)
if is_new is not None and is_new:
nodelist = self.domains[domain_id]['neighbor'].nodeinfo_list
self.domains[domain_id]['topology'].notify_neighbor_update(node_id, is_new=True)
self.stats.update_stats("network", "neighbor_nodes", len(nodelist))
return is_new
[docs] def check_admin_signature(self, domain_id, msg):
"""Check admin signature in the message
Args:
domain_id (bytes): target domain_id
msg (dict): received message
Returns:
bool: True if valid
"""
if domain_id not in self.domains:
return False
if "keypair" not in self.domains[domain_id] or self.domains[domain_id]["keypair"] is None:
return True
if KeyType.nodekey_signature not in msg or KeyType.admin_info not in msg:
return False
digest = hashlib.sha256(msg[KeyType.admin_info]).digest()
flag = False
for key in self.domains[domain_id]["keypair"]['keys']:
if key.verify(digest, msg[KeyType.nodekey_signature]):
flag = True
break
if not flag:
return False
admin_info = message_key_types.make_dictionary_from_TLV_format(msg[KeyType.admin_info])
msg.update(admin_info)
return True
def _process_message_base(self, domain_id, ipv4, ipv6, port, msg):
"""Process received message (common process for any kind of network module)
Args:
domain_id (bytes): target domain_id
ipv4 (str): IPv4 address of the sender node
ipv6 (str): IPv6 address of the sender node
port (int): Port number of the sender
msg (dict): received message
"""
if KeyType.infra_msg_type not in msg:
return
self.logger.debug("[%s] process_message(type=%d)" % (self.domains[domain_id]['name'],
int.from_bytes(msg[KeyType.infra_msg_type], 'big')))
if msg[KeyType.infra_msg_type] == InfraMessageCategory.CATEGORY_NETWORK:
self._process_message(domain_id, ipv4, ipv6, port, msg)
elif msg[KeyType.infra_msg_type] == InfraMessageCategory.CATEGORY_USER:
self.add_neighbor(domain_id, msg[KeyType.source_node_id], ipv4, ipv6, port)
self.domains[domain_id]['user'].process_message(msg)
elif msg[KeyType.infra_msg_type] == InfraMessageCategory.CATEGORY_DATA:
self.add_neighbor(domain_id, msg[KeyType.source_node_id], ipv4, ipv6, port)
self.domains[domain_id]['data'].process_message(msg)
elif msg[KeyType.infra_msg_type] == InfraMessageCategory.CATEGORY_TOPOLOGY:
self.add_neighbor(domain_id, msg[KeyType.source_node_id], ipv4, ipv6, port)
self.domains[domain_id]['topology'].process_message(msg)
elif msg[KeyType.infra_msg_type] == InfraMessageCategory.CATEGORY_DOMAIN0:
self.add_neighbor(domain_id, msg[KeyType.source_node_id], ipv4, ipv6, port)
self.domain0manager.process_message(msg)
def _process_message(self, domain_id, ipv4, ipv6, port, msg):
"""Process received message
Args:
domain_id (bytes): target domain_id
ipv4 (str): IPv4 address of the sender node
ipv6 (str): IPv6 address of the sender node
port (int): Port number of the sender
msg (dict): received message
"""
if not self.check_admin_signature(domain_id, msg):
self.logger.error("Illegal access to domain %s" % domain_id.hex())
return
source_node_id = msg[KeyType.source_node_id]
if source_node_id in self.domains[domain_id]["neighbor"].nodeinfo_list:
admin_msg_seq = msg[KeyType.message_seq]
if self.domains[domain_id]["neighbor"].nodeinfo_list[source_node_id].admin_sequence_number >= admin_msg_seq:
return
self.domains[domain_id]["neighbor"].nodeinfo_list[source_node_id].admin_sequence_number = admin_msg_seq
if KeyType.domain_ping in msg and port is not None:
self._receive_domain_ping(domain_id, port, msg)
elif msg[KeyType.command] == BBcNetwork.REQUEST_KEY_EXCHANGE:
if KeyType.ecdh in msg and KeyType.hint in msg and KeyType.nonce in msg and KeyType.random in msg:
if source_node_id not in self.domains[domain_id]['neighbor'].nodeinfo_list:
self.add_neighbor(domain_id, source_node_id, ipv4, ipv6, port)
nodeinfo = self.domains[domain_id]['neighbor'].nodeinfo_list[source_node_id]
if nodeinfo.key_manager is None:
nodeinfo.key_manager = KeyExchangeManager(self, domain_id, source_node_id)
nodeinfo.key_manager.receive_exchange_request(msg[KeyType.ecdh], msg[KeyType.nonce],
msg[KeyType.random], msg[KeyType.hint])
elif msg[KeyType.command] == BBcNetwork.RESPONSE_KEY_EXCHANGE:
if KeyType.ecdh in msg and KeyType.hint in msg and KeyType.nonce in msg and KeyType.random in msg:
nodeinfo = self.domains[domain_id]['neighbor'].nodeinfo_list[source_node_id]
nodeinfo.key_manager.receive_exchange_response(msg[KeyType.ecdh], msg[KeyType.random], msg[KeyType.hint])
elif msg[KeyType.command] == BBcNetwork.CONFIRM_KEY_EXCHANGE:
nodeinfo = self.domains[domain_id]['neighbor'].nodeinfo_list[source_node_id]
nodeinfo.key_manager.receive_confirmation()
elif msg[KeyType.command] == BBcNetwork.NOTIFY_LEAVE:
if KeyType.source_node_id in msg:
self.domains[domain_id]['topology'].notify_neighbor_update(source_node_id, is_new=False)
self.domains[domain_id]['neighbor'].remove(source_node_id)
[docs] def setup_udp_socket(self):
"""Setup UDP socket"""
try:
self.socket_udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket_udp.bind(("0.0.0.0", self.port))
except OSError:
self.socket_udp = None
self.logger.error("UDP Socket error for IPv4")
if self.ip6_address is not None:
try:
self.socket_udp6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
self.socket_udp6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.socket_udp6.bind(("::", self.port))
except OSError:
self.socket_udp6 = None
self.logger.error("UDP Socket error for IPv6")
if self.socket_udp is None and self.socket_udp6 is None:
return False
th_nw_loop = threading.Thread(target=self.udp_message_loop)
th_nw_loop.setDaemon(True)
th_nw_loop.start()
return True
[docs] def udp_message_loop(self):
"""Message loop for UDP socket"""
self.logger.debug("Start udp_message_loop")
msg_parser = message_key_types.Message()
# readfds = set([self.socket_udp, self.socket_udp6])
readfds = set()
if self.socket_udp:
readfds.add(self.socket_udp)
if self.socket_udp6:
readfds.add(self.socket_udp6)
try:
while True:
rready, wready, xready = select.select(readfds, [], [])
for sock in rready:
data = None
ipv4 = None
ipv6 = None
if sock is self.socket_udp:
data, addr = self.socket_udp.recvfrom(1500)
ipv4 = addr[0]
port = addr[1]
elif sock is self.socket_udp6:
data, addr = self.socket_udp6.recvfrom(1500)
ipv6 = addr[0]
port = addr[1]
if data is not None:
self.stats.update_stats_increment("network", "packets_received_by_udp", 1)
msg_parser.recv(data)
msg = msg_parser.parse()
#self.logger.debug("Recv_UDP from %s: data=%s" % (addr, msg))
if KeyType.domain_id not in msg:
continue
if msg[KeyType.domain_id] in self.domains:
self._process_message_base(msg[KeyType.domain_id], ipv4, ipv6, port, msg)
finally:
for sock in readfds:
sock.close()
self.socket_udp = None
self.socket_udp6 = None
[docs] def setup_tcp_server(self):
"""Start tcp server"""
try:
self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.listen_socket.bind(("0.0.0.0", self.port))
self.listen_socket.listen(self.max_connections)
except OSError:
self.listen_socket = None
self.logger.error("TCP Socket error for IPv4")
if self.ip6_address is not None:
try:
self.listen_socket6 = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.listen_socket6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.listen_socket6.bind(("::", self.port))
self.listen_socket6.listen(self.max_connections)
except OSError:
self.listen_socket6 = None
self.logger.error("TCP Socket error for IPv6")
if self.listen_socket is None and self.listen_socket6 is None:
return False
th_tcp_loop = threading.Thread(target=self.tcpserver_loop)
th_tcp_loop.setDaemon(True)
th_tcp_loop.start()
return True
[docs] def tcpserver_loop(self):
"""Message loop for TCP socket"""
self.logger.debug("Start tcpserver_loop")
msg_parsers = dict()
readfds = set()
if self.listen_socket:
readfds.add(self.listen_socket)
if self.listen_socket6:
readfds.add(self.listen_socket6)
try:
while True:
rready, wready, xready = select.select(readfds, [], [])
for sock in rready:
if sock is self.listen_socket:
conn, address = self.listen_socket.accept()
conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
#print("accept from ipv4: ", address)
readfds.add(conn)
msg_parsers[conn] = message_key_types.Message()
elif sock is self.listen_socket6:
conn, address = self.listen_socket6.accept()
conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
#print("accept from ipv6: ", address)
readfds.add(conn)
msg_parsers[conn] = message_key_types.Message()
else:
buf = sock.recv(8192)
if len(buf) == 0:
del msg_parsers[sock]
sock.close()
readfds.remove(sock)
else:
msg_parsers[sock].recv(buf)
self.stats.update_stats_increment("network", "message_size_received_by_tcy", len(buf))
while True:
msg = msg_parsers[sock].parse()
if msg is None:
break
#self.logger.debug("Recv_TCP at %s: data=%s" % (sock.getsockname(), msg))
if KeyType.destination_node_id not in msg or KeyType.domain_id not in msg:
continue
self._process_message_base(msg[KeyType.domain_id], None, None, None, msg)
finally:
for sock in readfds:
sock.close()
self.listen_socket = None
self.listen_socket6 = None
[docs]class NeighborInfo:
"""Manage information of neighbor nodes"""
PURGE_INTERVAL_SEC = 300
NODEINFO_LIFETIME = 900
def __init__(self, network=None, domain_id=None, node_id=None, my_info=None):
self.networking = network
self.domain_id = domain_id
self.my_node_id = node_id
self.my_info = my_info
self.admin_sequence_number = 0
self.nodeinfo_list = dict()
self.purge_timer = query_management.QueryEntry(expire_after=NeighborInfo.PURGE_INTERVAL_SEC,
callback_expire=self.purge, retry_count=3)
[docs] def purge(self, query_entry):
"""Purge obsoleted entry in nodeinfo_list"""
for node_id in list(self.nodeinfo_list.keys()):
if not self.nodeinfo_list[node_id].is_alive or self.nodeinfo_list[node_id].updated_at + \
NeighborInfo.NODEINFO_LIFETIME < time.time():
self.nodeinfo_list.pop(node_id, None)
self.purge_timer = query_management.QueryEntry(expire_after=NeighborInfo.PURGE_INTERVAL_SEC,
callback_expire=self.purge, retry_count=3)
[docs] def add(self, node_id, ipv4=None, ipv6=None, port=None, is_static=False, domain0=None):
"""Add or update an neighbor node entry"""
if node_id not in self.nodeinfo_list:
self.nodeinfo_list[node_id] = NodeInfo(node_id=node_id, ipv4=ipv4, ipv6=ipv6, port=port,
is_static=is_static, domain0=domain0)
self.nodeinfo_list[node_id].key_manager = KeyExchangeManager(self.networking, self.domain_id, node_id)
rand_time = random.uniform(1, KeyExchangeManager.KEY_EXCHANGE_INVOKE_MAX_BACKOFF)
self.nodeinfo_list[node_id].key_manager.set_invoke_timer(rand_time)
return True
else:
change_flag = self.nodeinfo_list[node_id].update(ipv4=ipv4, ipv6=ipv6, port=port, domain0=domain0)
return change_flag
[docs] def remove(self, node_id):
"""Remove entry in the nodeinfo_list"""
if self.nodeinfo_list[node_id].key_manager is not None:
self.nodeinfo_list[node_id].key_manager.stop_all_timers()
self.nodeinfo_list[node_id].key_manager.unset_cipher()
self.nodeinfo_list.pop(node_id, None)
[docs] def show_list(self):
"""Return nodeinfo list in human readable format"""
result = "========= node list of [%s] ==========\n" % self.my_node_id.hex()
for nodeinfo in self.nodeinfo_list.values():
result += "%s\n" % nodeinfo
return result
[docs]class NodeInfo:
"""Node information entry"""
SECURITY_STATE_NONE = 0
SECURITY_STATE_REQUESTING = 1
SECURITY_STATE_CONFIRMING = 2
SECURITY_STATE_ESTABLISHED = 3
def __init__(self, node_id=None, ipv4=None, ipv6=None, port=None, is_static=False, domain0=False):
self.node_id = node_id
if ipv4 is None or len(ipv4) == 0:
self.ipv4 = None
else:
if isinstance(ipv4, bytes):
self.ipv4 = ipv4.decode()
else:
self.ipv4 = ipv4
if ipv6 is None or len(ipv6) == 0:
self.ipv6 = None
else:
if isinstance(ipv6, bytes):
self.ipv6 = ipv6.decode()
else:
self.ipv6 = ipv6
self.port = port
self.admin_sequence_number = 0
self.is_static = is_static
self.is_domain0_node = domain0
self.created_at = self.updated_at = time.time()
self.is_alive = True
self.disconnect_at = 0
self.key_manager = None
def __lt__(self, other):
if self.is_alive and other.is_alive:
return is_less_than(self.node_id, other.node_id)
elif self.is_alive and not other.is_alive:
return True
elif not self.is_alive and other.is_alive:
return False
else:
return is_less_than(self.node_id, other.node_id)
def __len__(self):
return len(self.node_id)
def __str__(self):
ipv4 = self.ipv4
if ipv4 is None:
ipv4 = "0.0.0.0"
ipv6 = self.ipv6
if ipv6 is None:
ipv6 = "::"
security_state = (self.key_manager is not None and self.key_manager.state == KeyExchangeManager.STATE_ESTABLISHED)
output = "[node_id=%s, ipv4=%s, ipv6=%s, port=%d, seq=%d, " \
"alive=%s, static=%s, encryption=%s, domain0=%s, time=%d]" %\
(binascii.b2a_hex(self.node_id), ipv4, ipv6, self.port, self.admin_sequence_number,
self.is_alive, self.is_static, security_state, self.is_domain0_node, self.updated_at)
return output
[docs] def touch(self):
self.updated_at = time.time()
self.is_alive = True
[docs] def update(self, ipv4=None, ipv6=None, port=None, seq=None, domain0=None):
"""Update the entry
Args:
ipv4 (str): IPv4 address of the sender node
ipv6 (str): IPv6 address of the sender node
port (int): Port number of the sender
sec (int): message sequence number
domain0 (bool or None): If True, the node is domain0 manager
Returns:
bool: True if the entry has changed
"""
change_flag = None
if ipv4 is not None and self.ipv4 != ipv4:
if isinstance(ipv4, bytes):
self.ipv4 = ipv4.decode()
else:
self.ipv4 = ipv4
if self.ipv4 != "127.0.0.1":
change_flag = True
if ipv6 is not None and self.ipv6 != ipv6:
if isinstance(ipv6, bytes):
self.ipv6 = ipv6.decode()
else:
self.ipv6 = ipv6
if self.ipv6 != "::":
change_flag = True
if port is not None and self.port != port:
self.port = port
change_flag = True
if seq is not None and self.admin_sequence_number < seq:
self.admin_sequence_number = seq
if domain0 is not None and self.is_domain0_node != domain0:
self.is_domain0_node = domain0
change_flag = True
self.updated_at = time.time()
self.is_alive = True
return change_flag
[docs] def get_nodeinfo(self):
"""Return a list of node info
Returns:
list: [node_id, ipv4, ipv6, port, domain0_flag, update_at]
"""
if self.ipv4 is not None:
ipv4 = socket.inet_pton(socket.AF_INET, self.ipv4)
else:
ipv4 = socket.inet_pton(socket.AF_INET, "0.0.0.0")
if self.ipv6 is not None:
ipv6 = socket.inet_pton(socket.AF_INET6, self.ipv6)
else:
ipv6 = socket.inet_pton(socket.AF_INET6, "::")
domain0 = int(1).to_bytes(1, 'little') if self.is_domain0_node else int(0).to_bytes(1, 'little')
return self.node_id, ipv4, ipv6, socket.htons(self.port).to_bytes(2, 'big'), domain0, int(self.updated_at).to_bytes(8, 'big')