python區塊鏈實現簡版網絡
說明
本文根據https://github.com/liuchengxu/blockchain-tutorial的內容,用python實現的,但根據個人的理解進行瞭一些修改,大量引用瞭原文的內容。文章末尾有"本節完整源碼實現地址"。
引言
到目前為止,我們所構建的原型已經具備瞭區塊鏈所有的關鍵特性:匿名,安全,隨機生成的地址;區塊鏈數據存儲;工作量證明系統;可靠地存儲交易。盡管這些特性都不可或缺,但是仍有不足。能夠使得這些特性真正發光發熱,使得加密貨幣成為可能的,是網絡(network)。如果實現的這樣一個區塊鏈僅僅運行在單一節點上,有什麼用呢?如果隻有一個用戶,那麼這些基於密碼學的特性,又有什麼用呢?正是由於網絡,才使得整個機制能夠運轉和發光發熱。
你可以將這些區塊鏈特性認為是規則(rule),類似於人類在一起生活,繁衍生息建立的規則,一種社會安排。區塊鏈網絡就是一個程序社區,裡面的每個程序都遵循同樣的規則,正是由於遵循著同一個規則,才使得網絡能夠長存。類似的,當人們都有著同樣的想法,就能夠將拳頭攥在一起構建一個更好的生活。如果有人遵循著不同的規則,那麼他們就將生活在一個分裂的社區(州,公社,等等)中。同樣的,如果有區塊鏈節點遵循不同的規則,那麼也會形成一個分裂的網絡。
重點在於:如果沒有網絡,或者大部分節點都不遵守同樣的規則,那麼規則就會形同虛設,毫無用處!
區塊鏈網絡
區塊鏈網絡是去中心化的,這意味著沒有服務器,客戶端也不需要依賴服務器來獲取或處理數據。在區塊鏈網絡中,有的是節點,每個節點是網絡的一個完全(full-fledged)成員。節點就是一切:它既是一個客戶端,也是一個服務器。這一點需要牢記於心,因為這與傳統的網頁應用非常不同。
區塊鏈網絡是一個 P2P(Peer-to-Peer,端到端)的網絡,即節點直接連接到其他節點。它的拓撲是扁平的,因為在節點的世界中沒有層級之分。下面是它的示意圖:
Business vector created by Dooder – Freepik.com
要實現這樣一個網絡節點更加困難,因為它們必須執行很多操作。每個節點必須與很多其他節點進行交互,它必須請求其他節點的狀態,與自己的狀態進行比較,當狀態過時時進行更新。
kademlia發現協議
kademlia是p2p的一種節點發現協議,其核心是通過計算節點之間的邏輯距離來發現附近節點以實現節點查找的收斂。
kademlia詳細介紹
簡化協議
這裡我們為瞭說明原理盡可能的簡化協議。我們隻實現三種請求:
- 節點握手
- 獲取區塊數據
- 交易廣播為瞭方便,其中又將節點握手作為心跳發送,並根據心跳信息進行區塊同步。
網絡協議方面,借鑒以太坊的做法,UDP做協議發現,TCP做數據傳輸。每當發現一個節點,就通過TCP建立連接,並發送心跳數據,以保證數據的一致性。
消息
定義消息類,分別定義瞭無意義回應和以上三種請求。為瞭方便處理,這裡統一使用字符串而不是二進制數據進行數據傳輸。
class Msg(object): NONE_MSG = 0 HAND_SHAKE_MSG = 1 GET_BLOCK_MSG = 2 TRANSACTION_MSG = 3 def __init__(self, code, data): self.code = code self.data = data
TCP服務端
class TCPServer(object): def __init__(self, ip='0.0.0.0', port=listen_port): self.sock = socket.socket() self.ip = ip self.port = port def listen(self): self.sock.bind((self.ip, self.port)) self.sock.listen(5) def run(self): t = threading.Thread(target=self.listen_loop, args=()) t.start() def handle_loop(self, conn, addr): while True: recv_data = conn.recv(4096) log.info("recv_data:"+str(recv_data)) try: recv_msg = json.loads(recv_data) except ValueError as e: conn.sendall('{"code": 0, "data": ""}'.encode()) send_data = self.handle(recv_msg) log.info("tcpserver_send:"+send_data) conn.sendall(send_data.encode()) def listen_loop(self): while True: conn, addr = self.sock.accept() t = threading.Thread(target=self.handle_loop, args=(conn, addr)) t.start() def handle(self, msg): code = msg.get("code", 0) log.info("code:"+str(code)) if code == Msg.HAND_SHAKE_MSG: res_msg = self.handle_handshake(msg) elif code == Msg.GET_BLOCK_MSG: res_msg = self.handle_get_block(msg) elif code == Msg.TRANSACTION_MSG: res_msg = self.handle_transaction(msg) else: return '{"code": 0, "data":""}' return json.dumps(res_msg.__dict__) def handle_handshake(self, msg): block_chain = BlockChain() block = block_chain.get_last_block() try: genesis_block = block_chain[0] except IndexError as e: genesis_block = None data = { "last_height": -1, "genesis_block": "" } if genesis_block: data = { "last_height": block.block_header.height, "genesis_block": genesis_block.serialize() } msg = Msg(Msg.HAND_SHAKE_MSG, data) return msg def handle_get_block(self, msg): height = msg.get("data", 1) block_chain = BlockChain() block = block_chain.get_block_by_height(height) data = block.serialize() msg = Msg(Msg.GET_BLOCK_MSG, data) return msg def handle_transaction(self, msg): tx_pool = TxPool() txs = msg.get("data", {}) for tx_data in txs: tx = Transaction.deserialize(tx_data) tx_pool.add(tx) if tx_pool.is_full(): bc = BlockChain() bc.add_block(tx_pool.txs) log.info("add block") tx_pool.clear() msg = Msg(Msg.NONE_MSG, "") return msg
TCP端比較簡單,listen_loop方法監聽新的請求並開啟一個新線程處理連接中的數據交互。
handle_loop方法調用瞭handle分發處理請求。
handle_handshake處理握手請求,這裡將最新塊高度和創世塊發送出去瞭,方便和本地數據進行比較,如果遠程數據更新,那麼就獲取新的部分的區塊。
handle_get_block獲取對應的區塊並將數據發送給客戶端。
handle_transaction 處理客戶端發送來的交易信息。把客戶端發送來的交易添加到未確認交易池,如果交易池滿瞭就添加到區塊。這裡是方便處理才這麼做的,實際上,比特幣中並不是這樣做的,而是由礦工根據情況進行打包區塊的。
TCP客戶端
class TCPClient(object): def __init__(self, ip, port): self.txs = [] self.sock = socket.socket() log.info("connect ip:"+ip+"\tport:"+str(port)) self.sock.connect((ip, port)) def add_tx(self, tx): self.txs.append(tx) def send(self, msg): data = json.dumps(msg.__dict__) self.sock.sendall(data.encode()) log.info("send:"+data) recv_data = self.sock.recv(4096) log.info("client_recv_data:"+str(recv_data)) try: recv_msg = json.loads(recv_data) except json.decoder.JSONDecodeError as e: return self.handle(recv_msg) def handle(self, msg): code = msg.get("code", 0) log.info("recv code:"+str(code)) if code == Msg.HAND_SHAKE_MSG: self.handle_shake(msg) elif code == Msg.GET_BLOCK_MSG: self.handle_get_block(msg) elif code == Msg.TRANSACTION_MSG: self.handle_transaction(msg) def shake_loop(self): while True: if self.txs: data = [tx.serialize() for tx in self.txs] msg = Msg(Msg.TRANSACTION_MSG, data) self.send(msg) self.txs.clear() else: log.info("shake") block_chain = BlockChain() block = block_chain.get_last_block() try: genesis_block = block_chain[0] except IndexError as e: genesis_block = None data = { "last_height": -1, "genesis_block": "" } if genesis_block: data = { "last_height": block.block_header.height, "genesis_block": genesis_block.serialize() } msg = Msg(Msg.HAND_SHAKE_MSG, data) self.send(msg) time.sleep(5) def handle_shake(self, msg): data = msg.get("data", "") last_height = data.get("last_height", 0) block_chain = BlockChain() block = block_chain.get_last_block() if block: local_last_height = block.block_header.height else: local_last_height = -1 log.info("local_last_height %d, last_height %d" %(local_last_height, last_height)) for i in range(local_last_height + 1, last_height+1): send_msg = Msg(Msg.GET_BLOCK_MSG, i) self.send(send_msg) def handle_get_block(self, msg): data = msg.get("data", "") block = Block.deserialize(data) bc = BlockChain() try: bc.add_block_from_peers(block) except ValueError as e: log.info(str(e)) def handle_transaction(self, msg): data = msg.get("data", {}) tx = Transaction.deserialize(data) tx_pool = TxPool() tx_pool.add(tx) if tx_pool.is_full(): bc.add_block(tx_pool.txs) log.info("mined a block") tx_pool.clear() def close(self): self.sock.close()
handle_transaction處理服務器發送來的交易,將交易添加到交易池,如果交易池滿瞭就添加到區塊鏈中。
handle_get_block處理服務器發送來的區塊,並將區塊更新到鏈上。
handle_shake處理服務器響應的握手信息,如果發現當前的的區塊高度低於數據中響應的區塊高高度,則發起請求獲取新的幾個區塊。
shake_loop 每間隔10秒發送一次握手信息(5秒同步一次區塊),如果發現有需要廣播的交易則進行交易的廣播。
P2P服務器
p2p節點發現部分,使用瞭kademlia協議,並使用瞭kademlia庫,安裝方法pip3 install kademlia
class P2p(object): def __init__(self): self.server = Server() self.loop = None def run(self): loop = asyncio.get_event_loop() self.loop = loop loop.run_until_complete(self.server.listen(listen_port)) self.loop.run_until_complete(self.server.bootstrap([(bootstrap_host, bootstrap_port)])) loop.run_forever() def get_nodes(self): nodes = [] for bucket in self.server.protocol.router.buckets: nodes.extend(bucket.get_nodes()) return nodes
其中run方法啟動節點監聽並連接一個初始節點,並運行p2p節點監聽。get_nodes方法獲取當前所有的節點。
連接節點
class PeerServer(Singleton): def __init__(self): if not hasattr(self, "peers"): self.peers = [] if not hasattr(self, "nodes"): self.nodes = [] def nodes_find(self, p2p_server): local_ip = socket.gethostbyname(socket.getfqdn(socket.gethostname())) while True: nodes = p2p_server.get_nodes() for node in nodes: if node not in self.nodes: ip = node.ip port = node.port if local_ip == ip: continue client = TCPClient(ip, port) t = threading.Thread(target=client.shake_loop, args=()) t.start() self.peers.append(client) self.nodes.append(node) time.sleep(1) def broadcast_tx(self, tx): for peer in self.peers: peer.add_tx(tx) def run(self, p2p_server): t = threading.Thread(target=self.nodes_find, args=(p2p_server,)) t.start()
nodes_find為節點發現方法,每隔1秒進行查找當前是否有新的節點,並開啟線程進行連接。broadcast_tx為廣播交易的方法,將交易添加到待廣播交易池。
RPC
開啟網絡監聽後,主線程就被p2p網絡占用瞭,我們需要另外的方法進行交互操作。RPC就是常用的方法。我們將命令行操作都通過rpc導出,然後通過rpc調用獲取信息。
class Cli(object): def get_balance(self, addr): bc = BlockChain() balance = 0 utxo = UTXOSet() utxo.reindex(bc) utxos = utxo.find_utxo(addr) print(utxos) for fout in utxos: balance += fout.txoutput.value print('%s balance is %d' %(addr, balance)) return balance def create_wallet(self): w = Wallet.generate_wallet() ws = Wallets() ws[w.address] = w ws.save() return w.address def print_all_wallet(self): ws = Wallets() wallets = [] for k, _ in ws.items(): wallets.append(k) return wallets def send(self, from_addr, to_addr, amount): bc = BlockChain() tx = bc.new_transaction(from_addr, to_addr, amount) # bc.add_block([tx]) tx_pool = TxPool() tx_pool.add(tx) from network import log log.info("tx_pool:"+str(id(tx_pool))) log.info("txs_len:"+str(len(tx_pool.txs))) try: server = PeerServer() server.broadcast_tx(tx) log.info("tx_pool is full:"+str(tx_pool.is_full())) log.info("tx_pool d :"+str(tx_pool)) if tx_pool.is_full(): bc.add_block(tx_pool.txs) log.info("add block") tx_pool.clear() except Exception as e: import traceback msg = traceback.format_exc() log.info("error_msg:"+msg) print('send %d from %s to %s' %(amount, from_addr, to_addr)) def print_chain(self, height): bc = BlockChain() return bc[height].block_header.serialize() def create_genesis_block(self): bc = BlockChain() w = Wallet.generate_wallet() ws = Wallets() ws[w.address] = w ws.save() tx = bc.coin_base_tx(w.address) bc.new_genesis_block(tx) return w.address
RPC導出:
rpc = RPCServer(export_instance=Cli()) rpc.start(False)
測試
分別打開兩臺主機A和B:A主機:
$python3 cli.py start
將B主機的conf.py中的bootstrap_host和bootstrap_port修改為A主機的ip和端口。然後啟動B主機。
$python3 cli.py start
任意一臺主機開啟新的窗口執行生成創世塊:
$python3 cli.py genesis_block Genesis Wallet is: 1LYHea8NjTxaYboXJbR7LemvUZjyQc839r
分別在兩臺機器上查看餘額:
$python3 cli.py balance 1LYHea8NjTxaYboXJbR7LemvUZjyQc839r 1LYHea8NjTxaYboXJbR7LemvUZjyQc839r balance is 1000
分別在兩臺機器上創建地址:
$python3 cli.py createwallet Wallet address is 14sQYjj3n2fReJyVNoqHCmCFjNKEZAVcEB
查看當前機器的所有地址
python3 cli.py printwallet Wallet are: 19zR4zT9eSFsbSNvnQ1RCrhjN71VzPFTnH 1MVUrxPuRgtkyLQvAoma4yEarzcMzvQqym 18kruspe7jAbggR1sUF8fCFsZLn6efSeFk 14sQYjj3n2fReJyVNoqHCmCFjNKEZAVcEB
轉賬(至少要轉兩筆才能確認哦,可以修改txpool.py的SIZE屬性來調整區塊大小)。註意:隻有當前有這個地址(即有這個私鑰)才能作為from轉賬給其他地址。
$python3 cli.py send --from 1LYHea8NjTxaYboXJbR7LemvUZjyQc839r --to 19zR4zT9eSFsbSNvnQ1RCrhjN71VzPFTnH --amount 100 $python3 cli.py send --from 1LYHea8NjTxaYboXJbR7LemvUZjyQc839r --to 19zR4zT9eSFsbSNvnQ1RCrhjN71VzPFTnH --amount 100
分別在兩臺機器上查看餘額:
python3 cli.py balance 1LYHea8NjTxaYboXJbR7LemvUZjyQc839r 1LYHea8NjTxaYboXJbR7LemvUZjyQc839r balance is 1900
註意:這裡因為重復轉瞭兩筆賬,使用瞭同一個UTXO,所以第二筆會失敗,由於1LYHea8NjTxaYboXJbR7LemvUZjyQc839r為被獎勵地址,所以獲得瞭1000得挖礦獎勵所以餘額為:1000-100+900=1900。
打印區塊信息:
$python3 cli.py print 1 {'timestamp': '1551347915.3271294', 'prev_block_hash': '9f12dad81ab988f247884d7d06de46c6951688dcbedb87df2159669594a44f0d', 'hash': 'a9d02b72690398805fb83efd4680cb710ed4f3c67ea7926fe8faab256c1cad1c', 'hash_merkle_root': 'fe768edf1040c504674e8a468c89f00574a181b88ad2297ef29d307695adb38e', 'height': 1, 'nonce': 3}
區塊同步方式
為瞭簡單,區塊采用最簡單的方式進行同步。方法如下:
如果發現對方區塊高度低於自己,則不做處理。
如果發現對方區塊高度高於自己
(1) 當前最新區塊在對應區塊能找到,那麼就更新最新的區塊
(2) 當前最新區塊在對應區塊不能找到,那麼回滾當前區塊,直到回到交叉點,再進行更新區塊。
涉及到的源碼修改較多,這裡就不貼源碼瞭。移步到本節完整實現源碼查看完整源碼。
問題
- 為瞭簡單,將握手和廣播交易合一瞭,這導致瞭廣播交易不及時。
- 新區塊沒有實時進行廣播,而是被動等待同步,這也導致瞭區塊同步較慢。
- 在區塊未確認的情況下用同一個地址的幣進行轉賬有隻有第一筆會成功,後面的都會失敗。這是由目前獲取UTXO的方式決定的。
總結
我們已經實現瞭一個簡版的比特幣,並且實現瞭任意節點加入和區塊的同步等功能。為瞭簡化並說明原理,忽略掉瞭很多細節,並且忽略掉瞭性能問題,但它可以說明區塊鏈的基本原理。
參考:
[1] 本節完整實現源碼
以上就是python區塊鏈實現簡版網絡的詳細內容,更多關於python區塊鏈網絡的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- python區塊鏈基本原型簡版實現示例
- python區塊鏈持久化和命令行接口實現簡版
- Python實現Socket通信建立TCP反向連接
- Python區塊鏈范圍結論及Genesis Block的添加教程
- Python中的socket網絡模塊介紹