| ---
tpeertopeer.py (16716B)
---
1 #! /usr/bin/env python
2
3 import socket
4 import time
5 import base64
6 import threading
7 import queue
8 import random
9 from struct import pack, unpack
10 from datetime import datetime
11
12 import electrumpersonalserver.bitcoin as btc
13 from electrumpersonalserver.server.socks import (
14 socksocket,
15 setdefaultproxy,
16 PROXY_TYPE_SOCKS5
17 )
18 from electrumpersonalserver.server.jsonrpc import JsonRpcError
19
20 PROTOCOL_VERSION = 70016
21 DEFAULT_USER_AGENT = '/Satoshi:0.21.0/'
22
23 #https://github.com/bitcoin/bitcoin/blob/master/src/protocol.h
24 NODE_NETWORK = 1
25 NODE_BLOOM = 1 << 2
26 NODE_WITNESS = 1 << 3
27 NODE_NETWORK_LIMITED = 1 << 10
28
29 # protocol versions above this also send a relay boolean
30 RELAY_TX_VERSION = 70001
31
32 # length of bitcoin p2p packets
33 HEADER_LENGTH = 24
34
35 # if no message has been seen for this many seconds, send a ping
36 KEEPALIVE_INTERVAL = 2 * 60
37
38 # close connection if keep alive ping isnt responded to in this many seconds
39 KEEPALIVE_TIMEOUT = 20 * 60
40
41
42 def ip_to_hex(ip_str):
43 # ipv4 only for now
44 return socket.inet_pton(socket.AF_INET, ip_str)
45
46 def create_net_addr(hexip, port): # doesnt contain time as in bitcoin wiki
47 services = 0
48 hex = bytes(10) + b'\xFF\xFF' + hexip
49 return pack('H', port)
50
51 def create_var_str(s):
52 return btc.num_to_var_int(len(s)) + s.encode()
53
54 def read_int(ptr, payload, n, littleendian=True):
55 data = payload[ptr[0] : ptr[0]+n]
56 if littleendian:
57 data = data[::-1]
58 ret = btc.decode(data, 256)
59 ptr[0] += n
60 return ret
61
62 def read_var_int(ptr, payload):
63 val = payload[ptr[0]]
64 ptr[0] += 1
65 if val < 253:
66 return val
67 return read_int(ptr, payload, 2**(val - 252))
68
69 def read_var_str(ptr, payload):
70 l = read_var_int(ptr, payload)
71 ret = payload[ptr[0]: ptr[0] + l]
72 ptr[0] += l
73 return ret
74
75 def ip_hex_to_str(ip_hex):
76 # https://en.wikipedia.org/wiki/IPv6#IPv4-mapped_IPv6_addresses
77 # https://www.cypherpunk.at/onioncat_trac/wiki/OnionCat
78 if ip_hex[:14] == '\x00'*10 + '\xff'*2:
79 # ipv4 mapped ipv6 addr
80 return socket.inet_ntoa(ip_hex[12:])
81 elif ip_hex[:6] == '\xfd\x87\xd8\x7e\xeb\x43':
82 return base64.b32encode(ip_hex[6:]).lower() + '.onion'
83 else:
84 return socket.inet_ntop(socket.AF_INET6, ip_hex)
85
86 class P2PMessageHandler(object):
87 def __init__(self, logger):
88 self.last_message = datetime.now()
89 self.waiting_for_keepalive = False
90 self.logger = logger
91
92 def check_keepalive(self, p2p):
93 if self.waiting_for_keepalive:
94 if ((datetime.now() - self.last_message).total_seconds()
95 < KEEPALIVE_TIMEOUT):
96 return
97 self.logger.debug('keepalive timed out, closing')
98 p2p.sock.close()
99 else:
100 if ((datetime.now() - self.last_message).total_seconds()
101 < KEEPALIVE_INTERVAL):
102 return
103 self.logger.debug('sending keepalive to peer')
104 self.waiting_for_keepalive = True
105 p2p.sock.sendall(p2p.create_message('ping', '\x00'*8))
106
107 def handle_message(self, p2p, command, length, payload):
108 self.last_message = datetime.now()
109 self.waiting_for_keepalive = False
110 ptr = [0]
111 if command == b'version':
112 version = read_int(ptr, payload, 4)
113 services = read_int(ptr, payload, 8)
114 timestamp = read_int(ptr, payload, 8)
115 addr_recv_services = read_int(ptr, payload, 8)
116 addr_recv_ip = payload[ptr[0] : ptr[0]+16]
117 ptr[0] += 16
118 addr_recv_port = read_int(ptr, payload, 2, False)
119 addr_trans_services = read_int(ptr, payload, 8)
120 addr_trans_ip = payload[ptr[0] : ptr[0]+16]
121 ptr[0] += 16
122 addr_trans_port = read_int(ptr, payload, 2, False)
123 ptr[0] += 8 # skip over nonce
124 user_agent = read_var_str(ptr, payload)
125 start_height = read_int(ptr, payload, 4)
126 if version > RELAY_TX_VERSION:
127 relay = read_int(ptr, payload, 1) != 0
128 else:
129 # must check node accepts unconfirmed txes before broadcasting
130 relay = True
131 self.logger.debug(('Received peer version message: version=%d'
132 + ' services=0x%x'
133 + ' timestamp=%s user_agent=%s start_height=%d relay=%i'
134 + ' them=%s:%d us=%s:%d') % (version,
135 services, str(datetime.fromtimestamp(timestamp)),
136 user_agent, start_height, relay, ip_hex_to_str(addr_trans_ip)
137 , addr_trans_port, ip_hex_to_str(addr_recv_ip), addr_recv_port))
138 p2p.sock.sendall(p2p.create_message('verack', b''))
139 self.on_recv_version(p2p, version, services, timestamp,
140 addr_recv_services, addr_recv_ip, addr_trans_services,
141 addr_trans_ip, addr_trans_port, user_agent, start_height,
142 relay)
143 elif command == b'verack':
144 self.on_connected(p2p)
145 elif command == b'ping':
146 p2p.sock.sendall(p2p.create_message('pong', payload))
147
148 # optional override these in a subclass
149
150 def on_recv_version(self, p2p, version, services, timestamp,
151 addr_recv_services, addr_recv_ip, addr_trans_services,
152 addr_trans_ip, addr_trans_port, user_agent, start_height, relay):
153 pass
154
155 def on_connected(self, p2p):
156 pass
157
158 def on_heartbeat(self, p2p):
159 pass
160
161
162 class P2PProtocol(object):
163 def __init__(self, p2p_message_handler, remote_hostport,
164 network, logger, notify_queue, user_agent=DEFAULT_USER_AGENT,
165 socks5_hostport=("localhost", 9050), connect_timeout=30,
166 heartbeat_interval=15, start_height=0):
167 self.p2p_message_handler = p2p_message_handler
168 self.remote_hostport = remote_hostport
169 self.logger = logger
170 self.notify_queue = notify_queue
171 self.user_agent = user_agent
172 self.socks5_hostport = socks5_hostport
173 self.connect_timeout = connect_timeout
174 self.heartbeat_interval = heartbeat_interval
175 self.start_height = start_height
176 if network == "testnet":
177 self.magic = 0x0709110b
178 elif network == "regtest":
179 self.magic = 0xdab5bffa
180 else:
181 self.magic = 0xd9b4bef9
182 self.closed = False
183
184 def run(self):
185 services = (NODE_NETWORK | NODE_WITNESS | NODE_NETWORK_LIMITED)
186 st = int(time.time())
187 nonce = random.getrandbits(64)
188
189 netaddr_them = create_net_addr(ip_to_hex('0.0.0.0'), 0)
190 netaddr_us = create_net_addr(ip_to_hex('0.0.0.0'), 0)
191 version_message = (pack('= HEADER_LENGTH):
230 net_magic, command, payload_length, checksum =\
231 unpack('= 0 and (len(recv_buffer)
241 >= payload_length):
242 payload = recv_buffer[:payload_length]
243 recv_buffer = recv_buffer[payload_length:]
244 if btc.bin_dbl_sha256(payload)[:4] == checksum:
245 self.p2p_message_handler.handle_message(
246 self, command, payload_length, payload)
247 else:
248 self.logger.debug("wrong checksum, " +
249 "dropping " +
250 "message, cmd=" + command +
251 " payloadlen=" + str(payload_length))
252 payload_length = -1
253 data_remaining = True
254 else:
255 data_remaining = False
256 except socket.timeout:
257 self.p2p_message_handler.check_keepalive(self)
258 self.p2p_message_handler.on_heartbeat(self)
259 except EOFError as e:
260 self.closed = True
261 except IOError as e:
262 import traceback
263 self.logger.debug("logging traceback from %s: \n" %
264 traceback.format_exc())
265 self.closed = True
266 finally:
267 try:
268 self.sock.close()
269 except Exception as _:
270 pass
271
272 def close(self):
273 self.closed = True
274
275 def create_message(self, command, payload):
276 return (pack(" required_address_count:
393 break
394 node_addrs_chunks = chunk_list(
395 node_addrs_witness[:required_address_count],
396 CONNECTION_ATTEMPTS_PER_THREAD
397 )
398 notify_queue = queue.Queue()
399 start_height = rpc.call("getblockcount", [])
400 for node_addrs in node_addrs_chunks:
401 t = threading.Thread(target=broadcaster_thread,
402 args=(txhex, node_addrs, tor_hostport, network, logger,
403 start_height, notify_queue),
404 daemon=True)
405 t.start()
406 try:
407 success = notify_queue.get(block=True, timeout=20)
408 except queue.Empty:
409 logger.debug("Timed out getting notification for broadcasting "
410 + "transaction")
411 #the threads will maybe still continue to try broadcasting even
412 # after this timeout
413 #could time out at 20 seconds for any legitimate reason, tor is slow
414 # so no point failing, this timeout is just so the user doesnt have
415 # to stare at a seemingly-frozen dialog
416 success = True
417 return success |