Kademlia (My Implementation) If you haven't yet read it, please see my [overview of Kademlia](./kademlia-the-protocol.html). Kademlia is obviously an idea that I have a fairly deep interest in at this point, so I spent the last couple weeks writing up another implementation of it. This one managed to solve many of the problems I had in the past by using a slightly different concurrency model, by focusing narrowly on UDP, and by using equivalent alternatives to some structures defined in the Kademlia specification. What I want to do in this post is walk through how this implementation works, what progress it makes on my larger goal of developing Retrion, and what problems it has that remain to be solved. Code can be found [here](https://github.com/p2p-today/retrion-development), in the files that use protocol version 4. Version 5 is my current work to try and extend it. # How It Works ## Concurrency Model The node's world is divided into four threads (plus the main thread, but that doesn't count). Threads 1 and 2 listen for incoming messages on UDP/IPv4 and UDP/IPv6 respectively. Thread 3, the reaction thread, is responsible for reacting and responding to messages. Thread 4, the heartbeat thread, is responsible for scheduled events in the future. It implements a sort of event loop using the [sched](https://docs.python.org/3/library/sched.html) module. The listener threads exist primarily out of laziness, since [select](https://docs.python.org/3/library/select.html) would have easily let me consolidate them. It should be noted, however, that having multiple threads like this will allow for future abstraction. If each listening socket gets its own thread, then you don't need to deal as much with how those transport methods differ. A websocket, for example, would have a very different interface when compared with a UDP one or a Bluetooth one. ## Serialization Serialization was done with [u-msgpack-python](https://github.com/vsergeev/u-msgpack-python), which is a particularly nice implementation of the [MessagePack](https://github.com/msgpack/msgpack/blob/master/spec.md) serialization scheme. It's easiest to think of it as "JSON in binary, with some customizability". Of particular usefulness, MessagePack allows you to define how custom objects should be serialized and deserialized using their concept of "extension objects". Basically they throw a header onto whatever bytestream you hand them and they can use that header to send the stream to the correct factory function. I'm going to borrow their notation for how to represent these bytestreams: one byte: +--------+ | | +--------+ a variable number of bytes: +========+ | | +========+ variable number of objects stored in MessagePack format: +~~~~~~~~~~~~~~~~~+ | | +~~~~~~~~~~~~~~~~~+ For an example of how this looks, let's show the PING message: <pre><code class="plaintext">+========+--------+--------+--------+--------+--------+========+========+ | header | 0 | 4 |compress|10010011| 1 |sequence|senderID| +========+--------+--------+--------+--------+--------+========+========+ |-------------compressed------------|</code></pre> Let's look at that cell-by-cell. The header is a few bytes that MessagePack tacks on to the beginning. It essentially says "I am an extension object __ bytes in length." The first value we actually give it is that 0, which is the extension typecode I defined for Message objects. The next cell indicates the compression method used for the remainder of the bytestream. This is negotiated between nodes in a HELLO message. The ones currently defined are none, ZLIB, GZIP, LZMA, and BZ2, taking their values in that order. The next cell is a header for an array. The first four bits indicate that it is an array, and the second four bits indicate the length of the array. The next cell is a 1, which is the message type assigned to PING. The next cell is the sequence number of the packet. This number should monotonically increase and is assigned by the sending node. It is what gets referred to if a response gets sent back. The next cell is the sender ID. By default this is just 20 random bytes. In future implementations it will likely be a hash of some node information in a canonical order. This entire array is unpacked and then handed to a factory function to regenerate that message object. ## Reactions When nodes reconstruct a message, they find that each have a `react()` method. For the simplest example, let's look at the IDENTIFY message: class Message(BaseMessage): ... def react(self, node, addr, sock): """Delay sending a PING to the sending node, since the connection is clearly active if you got a message.""" try: event = node.timeouts.pop((self.sender, )) node.schedule.cancel(event) except (KeyError, ValueError): pass def ping(): node._send(sock, addr, PingMessage()) node.timeouts[(self.sender, )] = node.schedule.enter(60, 2, ping) # enter() takes in delay, priority, callable @Message.register(int(MessageType.IDENTIFY)) class IdentifyMessage(PingMessage): def __init__(self, compress: int = 0, seq: Optional[int] = None, sender: bytes = b''): super().__init__(compress, seq, sender) self.message_type = MessageType.IDENTIFY def react(self, node, addr, sock): """Send a HELLO back in leiu of an ACK.""" node.logger.debug("Got an IDENTIFY request from %r (%r)", addr, self) node._send_hello(sock, addr) Message.react(self, node, addr, sock) A node who sends IDENTIFY is saying "hey, I have your address but I don't know who you are. Would you mind introducing yourself?" So this event gets processed in two phases. The first is just the node sending a HELLO message. The second part is to modify the node's schedule. By default, nodes will send a PING after 60 seconds of dead air to make sure that their peer is still alive. If you get a message this timer needs to be reset, which is what happens in `Message.react()`. ## Responses Several messages also have a method called `react_response()`. The idea here is that if you received an ACK, probably that means something to some other message. The way this gets triggered looks like: @Message.register(int(MessageType.ACK)) class AckMessage(Message): ... def react(self, node, addr, sock): """Clear the message from node.awaiting_ack and call the relevant react_response() method.""" node.logger.debug("Got an %sACK from %r (%r)", 'N' if self.status else '', addr, self) super().react(node, addr, sock) try: node.routing_table.member_info[self.sender].local.hits += 1 except KeyError: pass if self.resp_seq in node.awaiting_ack: node.awaiting_ack[self.resp_seq].react_response(self, node, addr, sock) del node.awaiting_ack[self.resp_seq] Let's break that down. The first step is just like above, where it resets that PING timer for that peer. After that, we'd like to record that a message successfully went through. So, if we have the sender in our routing table, we look up our local information on that node (as opposed to public data like their ID or listening addresses) and record that there was a hit. This information can be used when pruning peers (which I did not yet implement). The last step is to see if the node was expecting this acknowledgement for something. If it was, we call the message in question's `react_response()` method. Currently there are only two messages that use this feature: FIND\_NODE and FIND\_KEY. Let's look at the former: @Message.register(int(MessageType.FIND_NODE)) class FindNodeMessage(Message): ... def react_response(self, ack, node, addr, sock, message_constructor=PingMessage): """Attempt to connect to each of the nodes you were told about.""" node.logger.debug("Got a response to a FIND_NODE from %r (%r, %r)", addr, self, ack) for info in ack.data: name = info.name if name != node.id and name not in node.routing_table: for address in info.addresses: try: if node.routing_table.add(name, address.args, address.addr_type): node._send(address.addr_type, address.args, message_constructor()) node.routing_table.member_info[name].public = info break except Exception: continue A FIND\_NODE message expects to get back an array of `GlobalPeerInfo` objects, the details of which don't matter very much here. The only important things for our purposes is that it contains a list of listening addresses and a list of supported compression methods in order of preference. For each of objects, we check that it's not someone we already know about or ourself. If not, then we try to send a message to each address in sequence, ignoring errors. # Design Problems ## The math on $b$ The idea behind $b$, Kademlia's accelleration parameter, is that you can trade between routing efficiency and routing table size by looking at $b$-bit symbols rather than 1-bit ones. So if you are ID 101010..., $b=1$ would have you making a routing table with entries like: - 0... - 01... - 010... - 0101... - 01010... - 010101... - etc. If $b=2$, however, you would end up with a routing table like: - 00... - 01... - 11... - 0100... - 0101... - 0111... - 010100... - 010101... - 010111... - etc. The trade off you make here is that the number of queries you need to find a node is $O(\log\_{2^b}(n))$ whereas routing table size is $O(2^b \cdot \log\_{2^b}(n))$. A more precise guess for the routing table is that it would scale as $\lceil\tfrac{(2^b - 1)}{b}\rceil \cdot \log\_{2^b}(n)$, but I haven't actually sat down to check that math, so take it with a grain of salt. The problem is, I'm doing the math wrong in my routing table for $b > 1$, and I haven't been able to identify where. All I know is that occasionally I get an `IndexError` and my routing table has to fall back to searching over the set of all known peers instead of a particular bucket. ## The Crappy Broadcast Algorithm The broadcast method I used in this implementation is naive at best. It uses a simple flooding model using the following rules: 1. If you saw this broadcast already (identified by sender, sequence pairs), ignore this 2. For each peer in your routing table, echo this message to them unless 1. They are the original sender, as identified by the sender 2. They are the node that sent it to you, as identified by the addr, sock pair in `react()` This ends up with $O(n^2)$ messages sent, and given the topology of a Kademlia network, $O(log(n))$ hops before reaching the final node. This is obviously sub-optimal, and is a topic I will explore further when I manage to get a write-up for *Solution for the broadcasting in Kademlia peer-to-peer overlay* by Czirkos and HosszĂș. ## What's This $\alpha$ Parameter? Kademlia defines a concurrency parameter called $\alpha$, which essentially says "thou shalt send $\alpha$ messages per request." Confusingly, some messages use $k$ for their concurrency parameter instead. The main culprit there is STORE, but rather than figure out where to do what, I just assumed that $k=\alpha$. It's a relatively small change to fix that, so presumably I will before I do anything major with this. # Retrion Progress Recall the initial goals of Retrion: 1. An object model and serialization standard that can be used easily in most languages 2. A simple high-level API (along with a fine-grained one) 3. $log(n)$ distributed hash table get/sets 4. Support for disabling DHT support network-wide 5. Support for "edge nodes" which do not store DHT data except as a cache 6. $log(n)$ broadcast delivery time 7. $log(n)$ routed message delivery 8. Support for multicast routed messages 9. Support for in-protocol group messages 10. Transparent (to the developer) compression 11. transparent (to the user) encryption 12. Support for multiple transport protocols (TCP, UDP, etc.) 13. Support for "virtual transports" (using other nodes as a bridge) 14. A well-defined standard configuration space 15. Support for extension subprotocols and implementation-specific configurations I would say that this implementation likely satisfies 1, 3, 6, 10, 14, and 15. It certainly does not satisfy 4, 5, 7, 8, 9, 11, 12, or 13. That means we need some kind of roadmap to delivering these other properties and addressing other deficiencies ## 4. Support for disabling DHT support network-wide This is probably the easiest to do, as it just puts you into a mode similar to how Ethereum already works. Probably it's as simple as adding a flag to the NetworkConfiguration object and having done with it. ## 5. Support for "edge nodes" which do not store DHT data except as a cache This one is harder, as it presents some design challenges. My initial thought would be to have edge node status indicated by having the most significant bit of your ID be 1, but this presents a few problems: 1. How do you guarantee that every edge node is in someone's routing table? Otherwise no broadcast support 2. This means that nodes have to decide that at ID-assignment time, unless we allow nodes to make new IDs 3. The use case for edge nodes is something that runs in a browser or phone. That means we need support for 1. roaming addresses, and 2. websocket or webrtc connections For now, I think this is off the table. It definitely should be revisited in the future, but there are too many problems with the concept as it exists now to implement it. ## 7. $log(n)$ routed message delivery This one is trivial to implement, I just haven't gotten to it yet. It's almost a duplicate of the work I already do in FIND\_NODE and STORE anyways, I just need to change the side effect. Probably there are optimizable algorithms that I'll need to look for. ## 8. Support for multicast routed messages An naive implementation of this should be fairly easy, I would just need to change the target field in a message like above to hold an array of targets instead of just a target. ## 9. Support for in-protocol group messages Leveraging the above, it should be possible to make a chatroom context or the like in-protocol. The real question is whether it should use a name registry at the network level which references some key in the DHT, or whether individual nodes should track their own rooms and be able to attach a room name to a multicast routed message. I kinda lean towards the latter, but we'll see. ## 11. transparent (to the user) encryption I have no idea how to approach this yet beyond broad concepts. It would be fantastic if you could have a bit in your GlobalPeerInfo object that says "hey, use the OWS protocol with me now", or something similar. The only question is going to be how many methods are easily implementable in many languages. Probably the languages I will choose to care about here are C/C++, JavaScript, Python, and possibly Java or Kotlin. ## 12. Support for multiple transport protocols (TCP, UDP, etc.) No idea how to approach this in terms of limiting its downsides. I'll update when I have a confident idea of how to handle the local resource overhead (number of sockets the system lets you have, for example) of connection-based transports before this is really approachable. I think the way I'll approach this is a two-pronged approach. First I implement it under the assumption of UDPv4 being the minimum requirement to avoid network fragmentation. In parallel I need to do some kind of formal analysis on network fragmentation risk if you allow some nodes to have incompatible lists of supported transports. It would be useful if your sandboxed browser code could interact with a large network of external nodes that communicate with more convenient transports than WebSockets, but I'm fairly worried about how many risks and inefficiencies that would expose. ## 13. Support for "virtual transports" (using other nodes as a bridge) This is an idea that I might want to scrap. If routed messages are workable, then virtual transports are probably trivial to implement. The only question is gonna be how inefficient it is without explicitly setting up a bridge route. # Conclusions I've got a long road ahead of me, but this is a pretty good start! tags: research-progress, hobby-horse, retrion