# -*- coding: utf-8 -*-
"""
Copyright (c) 2017 beyond-blockchain.org.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import random
import os
import sys
sys.path.extend(["../../", os.path.abspath(os.path.dirname(__file__))])
from bbc1.core.message_key_types import to_2byte, PayloadType, KeyType, InfraMessageCategory
from bbc1.core import bbclib
from bbc1.core import query_management, message_key_types, logger
ticker = query_management.get_ticker()
[docs]def direct_send_to_user(sock, msg, name=None):
if name is None:
sock.sendall(message_key_types.make_message(PayloadType.Type_msgpack, msg))
else:
sock.sendall(message_key_types.make_message(PayloadType.Type_encrypted_msgpack, msg, key_name=name))
[docs]class UserMessageRouting:
"""Handle message for clients"""
REFRESH_FORWARDING_LIST_INTERVAL = 300
RESOLVE_TIMEOUT = 5
MAX_CROSS_REF_STOCK = 10
RESOLVE_USER_LOCATION = to_2byte(0)
RESPONSE_USER_LOCATION = to_2byte(1)
RESPONSE_NO_SUCH_USER = to_2byte(2)
JOIN_MULTICAST_RECEIVER = to_2byte(3)
LEAVE_MULTICAST_RECEIVER = to_2byte(4)
CROSS_REF_ASSIGNMENT = to_2byte(5)
def __init__(self, networking, domain_id, loglevel="all", logname=None):
self.networking = networking
self.stats = networking.core.stats
self.domain_id = domain_id
self.logger = logger.get_logger(key="user_message_routing", level=loglevel, logname=logname)
self.aes_name_list = dict()
self.cross_ref_list = list()
self.registered_users = dict()
self.forwarding_entries = dict()
self.on_going_timers = set()
[docs] def stop_all_timers(self):
"""Cancel all running timers"""
for user_id in self.forwarding_entries.keys():
if self.forwarding_entries[user_id]['refresh'] is not None:
self.forwarding_entries[user_id]['refresh'].deactivate()
for q in self.on_going_timers:
ticker.get_entry(q).deactivate()
[docs] def set_aes_name(self, socket, name):
"""Set name for specifying AES key for message encryption
Args:
socket (Socket): socket for the client
name (bytes): name of the client (4-byte random value generated in message_key_types.get_ECDH_parameters)
"""
self.aes_name_list[socket] = name
[docs] def register_user(self, user_id, socket, on_multiple_nodes=False):
"""Register user to forward message
Args:
user_id (bytes): user_id of the client
socket (Socket): socket for the client
on_multiple_nodes (bool): If True, the user_id is also registered in other nodes, meaning multicasting.
"""
self.registered_users.setdefault(user_id, set())
self.registered_users[user_id].add(socket)
if on_multiple_nodes:
self.send_multicast_join(user_id)
[docs] def unregister_user(self, user_id, socket):
"""Unregister user from the list and delete AES key if exists
Args:
user_id (bytes): user_id of the client
socket (Socket): socket for the client
"""
if user_id not in self.registered_users:
return
self.registered_users[user_id].remove(socket)
if len(self.registered_users[user_id]) == 0:
self.registered_users.pop(user_id, None)
if socket in self.aes_name_list:
message_key_types.unset_cipher(self.aes_name_list[socket])
del self.aes_name_list[socket]
self.send_multicast_leave(user_id=user_id)
def _add_user_for_forwarding(self, user_id, node_id, permanent=False):
"""Register user to forwarding list
Args:
user_id (bytes): target user_id
node_id (bytes): node_id which the client with the user_id connects to
parmanent (bool): If True, the entry won't expire
"""
self.forwarding_entries.setdefault(user_id, dict())
if not permanent:
if 'refresh' not in self.forwarding_entries[user_id]:
query_entry = query_management.QueryEntry(expire_after=UserMessageRouting.REFRESH_FORWARDING_LIST_INTERVAL,
callback_expire=self._remove_user_from_forwarding,
data={
KeyType.user_id: user_id,
}, retry_count=0)
self.forwarding_entries[user_id]['refresh'] = query_entry
else:
self.forwarding_entries[user_id]['refresh'].update(fire_after=UserMessageRouting.REFRESH_FORWARDING_LIST_INTERVAL)
self.forwarding_entries[user_id].setdefault('nodes', set())
self.forwarding_entries[user_id]['nodes'].add(node_id)
self.stats.update_stats("user_message", "registered_users_in_forwarding_list", len(self.forwarding_entries))
def _remove_user_from_forwarding(self, query_entry=None, user_id=None, node_id=None):
"""Unregister user to forwarding list"""
if query_entry is not None:
user_id = query_entry.data[KeyType.user_id]
self.forwarding_entries.pop(user_id, None)
return
if user_id not in self.forwarding_entries:
return
self.forwarding_entries[user_id]['nodes'].remove(node_id)
if len(self.forwarding_entries[user_id]['nodes']) == 0:
if 'refresh' in self.forwarding_entries[user_id]:
self.forwarding_entries[user_id]['refresh'].deactivate()
self.forwarding_entries.pop(user_id, None)
self.stats.update_stats("user_message", "registered_users_in_forwarding_list", len(self.forwarding_entries))
[docs] def send_message_to_user(self, msg, direct_only=False):
"""Forward message to connecting user
Args:
msg (dict): message to send
direct_only (bool): If True, _forward_message_to_another_node is not called.
"""
if KeyType.destination_user_id not in msg:
return True
msg[KeyType.infra_msg_type] = InfraMessageCategory.CATEGORY_USER
if msg.get(KeyType.is_anycast, False):
return self._send_anycast_message(msg)
socks = self.registered_users.get(msg[KeyType.destination_user_id], None)
if socks is None:
if direct_only:
return False
self._forward_message_to_another_node(msg)
return True
count = len(socks)
for s in socks:
if not self._send(s, msg):
count -= 1
return count > 0
def _send(self, sock, msg):
"""Raw function to send a message"""
try:
if sock in self.aes_name_list:
direct_send_to_user(sock, msg, name=self.aes_name_list[sock])
else:
direct_send_to_user(sock, msg)
self.stats.update_stats_increment("user_message", "sent_msg_to_user", 1)
except:
return False
return True
def _send_anycast_message(self, msg):
"""Send message as anycast"""
dst_user_id = msg[KeyType.destination_user_id]
if dst_user_id not in self.forwarding_entries:
return False
ttl = msg.get(KeyType.anycast_ttl, 0)
if ttl == 0:
return False
randmax = len(self.forwarding_entries[dst_user_id]['nodes'])
if dst_user_id in self.registered_users:
randmax += 1
while ttl > 0:
idx = random.randrange(randmax)
msg[KeyType.anycast_ttl] = ttl - 1
ttl -= 1
if idx == randmax - 1:
if len(self.registered_users) > 0:
sock = random.choice(tuple(self.registered_users.get(dst_user_id, None)))
if sock is not None and self._send(sock, msg):
return True
else:
try:
msg[KeyType.destination_node_id] = random.choice(tuple(self.forwarding_entries[dst_user_id]['nodes']))
self.networking.send_message_in_network(nodeinfo=None, payload_type=PayloadType.Type_any,
domain_id=self.domain_id, msg=msg)
except:
import traceback
traceback.print_exc()
continue
return True
return False
def _forward_message_to_another_node(self, msg):
"""Try to forward message to another node"""
dst_user_id = msg[KeyType.destination_user_id]
if dst_user_id in self.forwarding_entries:
for dst_node_id in self.forwarding_entries[dst_user_id]['nodes']:
msg[KeyType.destination_node_id] = dst_node_id
try:
self.networking.send_message_in_network(nodeinfo=None, payload_type=PayloadType.Type_any,
domain_id=self.domain_id, msg=msg)
except:
import traceback
traceback.print_exc()
pass
return
src_user_id = msg[KeyType.source_user_id]
self._resolve_accommodating_core_node(dst_user_id, src_user_id, msg)
def _resolve_accommodating_core_node(self, dst_user_id, src_user_id, orig_msg=None):
"""Resolve which node the user connects to
Find the node that accommodates the user_id first, and then, send the message to the node.
Args:
dst_user_id (bytes): destination user_id
src_user_id (bytes): source user_id
orig_msg (dict): message to send
"""
if orig_msg is not None:
query_entry = query_management.QueryEntry(expire_after=UserMessageRouting.RESOLVE_TIMEOUT,
callback_expire=self._resolve_failure,
callback=self._resolve_success,
data={
KeyType.message: orig_msg,
},
retry_count=0)
self.on_going_timers.add(query_entry.nonce)
msg = {
KeyType.infra_msg_type: InfraMessageCategory.CATEGORY_USER,
KeyType.domain_id: self.domain_id,
KeyType.infra_command: UserMessageRouting.RESOLVE_USER_LOCATION,
KeyType.destination_user_id: dst_user_id,
}
if orig_msg is not None:
msg[KeyType.nonce] = query_entry.nonce
if src_user_id is not None:
msg[KeyType.source_user_id] = src_user_id
self.networking.broadcast_message_in_network(domain_id=self.domain_id, msg=msg)
def _resolve_success(self, query_entry):
"""Callback for successful of resolving the location"""
self.on_going_timers.remove(query_entry.nonce)
msg = query_entry.data[KeyType.message]
self._forward_message_to_another_node(msg=msg)
def _resolve_failure(self, query_entry):
"""Callback for failure of resolving the location"""
self.on_going_timers.remove(query_entry.nonce)
msg = query_entry.data[KeyType.message]
msg[KeyType.destination_user_id] = msg[KeyType.source_user_id]
msg[KeyType.result] = False
msg[KeyType.reason] = "Cannot find such user"
self.send_message_to_user(msg)
[docs] def send_multicast_join(self, user_id, permanent=False):
"""Broadcast JOIN_MULTICAST_RECEIVER"""
msg = {
KeyType.infra_msg_type: InfraMessageCategory.CATEGORY_USER,
KeyType.domain_id: self.domain_id,
KeyType.infra_command: UserMessageRouting.JOIN_MULTICAST_RECEIVER,
KeyType.user_id: user_id,
KeyType.static_entry: permanent,
}
self.stats.update_stats_increment("multicast", "join", 1)
self.networking.broadcast_message_in_network(domain_id=self.domain_id, msg=msg)
[docs] def send_multicast_leave(self, user_id):
"""Broadcast LEAVE_MULTICAST_RECEIVER"""
msg = {
KeyType.domain_id: self.domain_id,
KeyType.infra_msg_type: InfraMessageCategory.CATEGORY_USER,
KeyType.infra_command: UserMessageRouting.LEAVE_MULTICAST_RECEIVER,
KeyType.user_id: user_id,
}
self.stats.update_stats_increment("multicast", "leave", 1)
self.networking.broadcast_message_in_network(domain_id=self.domain_id, msg=msg)
def _distribute_cross_refs_to_clients(self):
"""Distribute cross ref assined by the domain0_manager to client"""
if len(self.registered_users) == 0:
return
try:
for i in range(len(self.cross_ref_list)):
msg = {
KeyType.domain_id: self.domain_id,
KeyType.command: bbclib.MsgType.NOTIFY_CROSS_REF,
KeyType.destination_user_id: random.choice(tuple(self.registered_users.keys())),
KeyType.cross_ref: self.cross_ref_list.pop(0),
}
self.send_message_to_user(msg)
except:
import traceback
traceback.print_exc()
return
[docs] def process_message(self, msg):
"""Process received message
Args:
msg (dict): received message
"""
if KeyType.infra_command in msg:
if msg[KeyType.infra_command] == UserMessageRouting.RESOLVE_USER_LOCATION:
self.stats.update_stats_increment("user_message", "RESOLVE_USER_LOCATION", 1)
user_id = msg[KeyType.destination_user_id]
if user_id not in self.registered_users:
return
self._add_user_for_forwarding(msg[KeyType.source_user_id], msg[KeyType.source_node_id])
msg[KeyType.destination_node_id] = msg[KeyType.source_node_id]
if KeyType.source_user_id in msg:
msg[KeyType.destination_user_id] = msg[KeyType.source_user_id]
msg[KeyType.source_user_id] = user_id
msg[KeyType.infra_command] = UserMessageRouting.RESPONSE_USER_LOCATION
self.networking.send_message_in_network(nodeinfo=None, payload_type=PayloadType.Type_any,
domain_id=self.domain_id, msg=msg)
elif msg[KeyType.infra_command] == UserMessageRouting.RESPONSE_USER_LOCATION:
self.stats.update_stats_increment("user_message", "RESPONSE_USER_LOCATION", 1)
self._add_user_for_forwarding(msg[KeyType.source_user_id], msg[KeyType.source_node_id])
if KeyType.nonce in msg:
query_entry = ticker.get_entry(msg[KeyType.nonce])
if query_entry is not None and query_entry.active:
query_entry.callback()
elif msg[KeyType.infra_command] == UserMessageRouting.RESPONSE_NO_SUCH_USER:
self.stats.update_stats_increment("user_message", "RESPONSE_NO_SUCH_USER", 1)
self._remove_user_from_forwarding(user_id=msg[KeyType.user_id], node_id=msg[KeyType.source_node_id])
elif msg[KeyType.infra_command] == UserMessageRouting.JOIN_MULTICAST_RECEIVER:
self.stats.update_stats_increment("user_message", "JOIN_MULTICAST_RECEIVER", 1)
self._add_user_for_forwarding(msg[KeyType.user_id], msg[KeyType.source_node_id],
permanent=msg.get(KeyType.static_entry, False))
elif msg[KeyType.infra_command] == UserMessageRouting.LEAVE_MULTICAST_RECEIVER:
self.stats.update_stats_increment("user_message", "LEAVE_MULTICAST_RECEIVER", 1)
if msg[KeyType.user_id] in self.forwarding_entries:
self._remove_user_from_forwarding(user_id=msg[KeyType.user_id],
node_id=msg[KeyType.source_node_id])
elif msg[KeyType.infra_command] == UserMessageRouting.CROSS_REF_ASSIGNMENT:
self.stats.update_stats_increment("user_message", "CROSS_REF_ASSIGNMENT", 1)
if KeyType.cross_ref in msg:
self.cross_ref_list.append(msg[KeyType.cross_ref])
if len(self.cross_ref_list) > UserMessageRouting.MAX_CROSS_REF_STOCK:
self._distribute_cross_refs_to_clients()
return
src_user_id = msg[KeyType.source_user_id]
if src_user_id in self.forwarding_entries:
self.forwarding_entries[src_user_id]['refresh'].update(
fire_after=UserMessageRouting.REFRESH_FORWARDING_LIST_INTERVAL)
dst_user_id = msg[KeyType.destination_user_id]
if dst_user_id not in self.registered_users:
if msg.get(KeyType.is_anycast, False):
self._send_anycast_message(msg)
return
retmsg = {
KeyType.domain_id: self.domain_id,
KeyType.infra_msg_type: InfraMessageCategory.CATEGORY_USER,
KeyType.destination_node_id: msg[KeyType.source_node_id],
KeyType.infra_command: UserMessageRouting.RESPONSE_NO_SUCH_USER,
KeyType.user_id: dst_user_id,
}
self.stats.update_stats_increment("user_message", "fail_to_find_user", 1)
self.networking.send_message_in_network(nodeinfo=None, payload_type=PayloadType.Type_any,
domain_id=self.domain_id, msg=retmsg)
return
if KeyType.is_anycast in msg:
del msg[KeyType.is_anycast]
self.stats.update_stats_increment("user_message", "send_to_user", 1)
self.send_message_to_user(msg)
[docs]class UserMessageRoutingDummy(UserMessageRouting):
"""Dummy class for bbc_core.py"""
[docs] def stop_all_timers(self):
pass
[docs] def register_user(self, user_id, socket, on_multiple_nodes=False):
pass
[docs] def unregister_user(self, user_id, socket=None):
pass
def _add_user_for_forwarding(self, user_id, node_id, permanent=False):
pass
def _remove_user_from_forwarding(self, query_entry=None, user_id=None, node_id=None):
pass
[docs] def send_message_to_user(self, msg, direct_only=False):
pass
def _forward_message_to_another_node(self, msg):
pass
def _resolve_accommodating_core_node(self, dst_user_id, src_user_id, orig_msg=None):
pass
def _resolve_success(self, query_entry):
pass
def _resolve_failure(self, query_entry):
pass
[docs] def send_multicast_join(self, user_id, permanent=False):
pass
[docs] def process_message(self, msg):
pass