Skip to content

RPC Client

Overview

The Kaspa Python SDK provides an asynchronous RPC client for communicating with Kaspa nodes via WebSocket. Features include:

  • Automatic node discovery (PNN) via Resolver.
  • Connection management with reconnection support.
  • Full RPC API coverage, including event subscriptions for real-time updates.

Quick Start

import asyncio
from kaspa import RpcClient, Resolver

async def main():
    # Create client with resolver
    client = RpcClient(
        resolver=Resolver(),
        network_id="mainnet"
    )

    # Connect
    await client.connect()
    print(f"Connected to: {client.url}")

    # Make RPC calls
    info = await client.get_info()
    print(f"Server info: {info}")

    # Disconnect
    await client.disconnect()

asyncio.run(main())

Connection Options

Using a Resolver

The Resolver automatically finds available PNN nodes:

from kaspa import RpcClient, Resolver

# Default resolver (uses public infrastructure)
resolver = Resolver()

# Custom resolver URLs
resolver = Resolver(urls=["https://resolver1.kaspa.org"])

# With TLS configuration
resolver = Resolver(tls=True)

client = RpcClient(resolver=resolver, network_id="mainnet")

Direct Connection

Connect directly to a known node:

from kaspa import RpcClient

client = RpcClient(
    url="wss://node.kaspa.org:17110",
    network_id="mainnet",
    encoding="borsh"  # or "json"
)

Connection Parameters

await client.connect(
    block_async_connect=True,      # Wait for connection
    strategy="fallback",           # Connection strategy
    timeout_duration=30000,        # Timeout in ms
    retry_interval=1000,           # Retry interval in ms
)

Client Properties

# Check connection status
print(f"Connected: {client.is_connected}")

# Get current URL
print(f"URL: {client.url}")

# Get encoding
print(f"Encoding: {client.encoding}")

# Get node ID
print(f"Node ID: {client.node_id}")

# Get resolver
resolver = client.resolver

RPC Methods

Network Information

# Get general info
info = await client.get_info()

# Get block count
count = await client.get_block_count()
print(f"Blocks: {count['blockCount']}, Headers: {count['headerCount']}")

# Get block DAG info
dag_info = await client.get_block_dag_info()
print(f"Network: {dag_info['networkName']}")
print(f"Block count: {dag_info['blockCount']}")

# Get coin supply
supply = await client.get_coin_supply()
print(f"Circulating: {supply['circulatingSompi']}")

# Get current network
network = await client.get_current_network()

# Get sync status
sync = await client.get_sync_status()
print(f"Synced: {sync['isSynced']}")

Balance and UTXOs

# Get balance for single address
balance = await client.get_balance_by_address({
    "address": "kaspa:qz..."
})
print(f"Balance: {balance['balance']} sompi")

# Get balances for multiple addresses
balances = await client.get_balances_by_addresses({
    "addresses": ["kaspa:qz...", "kaspa:qr..."]
})

# Get UTXOs
utxos = await client.get_utxos_by_addresses({
    "addresses": ["kaspa:qz..."]
})
for entry in utxos.get("entries", []):
    print(f"UTXO: {entry['outpoint']} = {entry['utxoEntry']['amount']}")

Blocks

# Get specific block
block = await client.get_block({
    "hash": "block-hash-hex",
    "includeTransactions": True
})

# Get multiple blocks
blocks = await client.get_blocks({
    "lowHash": "starting-hash",
    "includeBlocks": True,
    "includeTransactions": False
})

# Get block template (for mining)
template = await client.get_block_template({
    "payAddress": "kaspa:mining-address...",
    "extraData": []
})

Transactions

# Submit transaction
from kaspa import Transaction

result = await client.submit_transaction({
    "transaction": tx.serialize_to_dict(),
    "allowOrphan": False
})
print(f"Transaction ID: {result['transactionId']}")

# Get mempool entries
mempool = await client.get_mempool_entries({
    "includeOrphanPool": False,
    "filterTransactionPool": True
})

# Get mempool entry by transaction ID
entry = await client.get_mempool_entry({
    "transactionId": "tx-id...",
    "includeOrphanPool": False,
    "filterTransactionPool": True
})

Fees

# Get fee estimate
fee = await client.get_fee_estimate()
print(f"Priority fee: {fee['estimate']['priorityBucket']}")

# Experimental fee estimate with more detail
fee_exp = await client.get_fee_estimate_experimental({
    "verbose": True
})

Peer Management

# Get connected peers
peers = await client.get_connected_peer_info()

# Get peer addresses
addresses = await client.get_peer_addresses()

# Add peer
await client.add_peer({
    "peerAddress": "192.168.1.1:16111",
    "isPermanent": False
})

# Ban/unban peer
await client.ban({"ip": "192.168.1.1"})
await client.unban({"ip": "192.168.1.1"})

System

# Ping node
pong = await client.ping()

# Get server info
server_info = await client.get_server_info()

# Get system info
system_info = await client.get_system_info()

# Get metrics
metrics = await client.get_metrics({
    "processMetrics": True,
    "connectionMetrics": True,
    "bandwidthMetrics": True,
    "consensusMetrics": True,
    "storageMetrics": False,
    "customMetrics": False
})

Event Subscriptions

Subscribe to real-time events.

Available Events

Event Subscription Method
utxos-changed subscribe_utxos_changed()
block-added subscribe_block_added()
virtual-chain-changed subscribe_virtual_chain_changed()
virtual-daa-score-changed subscribe_virtual_daa_score_changed()
sink-blue-score-changed subscribe_sink_blue_score_changed()
finality-conflict subscribe_finality_conflict()
finality-conflict-resolved subscribe_finality_conflict_resolved()
new-block-template subscribe_new_block_template()
pruning-point-utxo-set-override subscribe_pruning_point_utxo_set_override()

UTXO Changes

from kaspa import Address

# Define callback
def on_utxo_change(event):
    print(f"UTXO change: {event}")

# Add listener
client.add_event_listener("utxos-changed", on_utxo_change)

# Subscribe to addresses
await client.subscribe_utxos_changed([
    Address("kaspa:qz...")
])

# Later: unsubscribe
await client.unsubscribe_utxos_changed([
    Address("kaspa:qz...")
])

Block Events

def on_block_added(event):
    print(f"New block: {event['block']['header']['hash']}")

client.add_event_listener("block-added", on_block_added)
await client.subscribe_block_added()

Virtual Chain Changes

def on_chain_change(event):
    print(f"Chain updated: {event}")

client.add_event_listener("virtual-chain-changed", on_chain_change)
await client.subscribe_virtual_chain_changed(
    include_accepted_transaction_ids=True
)

DAA Score Changes

def on_daa_change(event):
    print(f"DAA score: {event['virtualDaaScore']}")

client.add_event_listener("virtual-daa-score-changed", on_daa_change)
await client.subscribe_virtual_daa_score_changed()

Managing Listeners

# Add listener with extra args
client.add_event_listener("block-added", callback, extra_arg)

# Remove specific listener
client.remove_event_listener("block-added", callback)

# Remove all listeners for an event
client.remove_event_listener("block-added")

# Remove all listeners
client.remove_all_event_listeners()

Complete Example: Wallet Monitor

import asyncio
from kaspa import RpcClient, Resolver, Address, sompi_to_kaspa

class WalletMonitor:
    def __init__(self, addresses):
        self.addresses = [Address(a) for a in addresses]
        self.client = RpcClient(
            resolver=Resolver(),
            network_id="mainnet"
        )

    async def start(self):
        await self.client.connect()

        # Set up event handler
        self.client.add_event_listener(
            "utxos-changed",
            self.on_utxo_change
        )

        # Subscribe to address changes
        await self.client.subscribe_utxos_changed(self.addresses)

        # Get initial balances
        await self.check_balances()

        print("Monitoring... Press Ctrl+C to stop")

        # Keep running
        while True:
            await asyncio.sleep(1)

    async def check_balances(self):
        for addr in self.addresses:
            result = await self.client.get_balance_by_address({
                "address": addr.to_string()
            })
            balance = sompi_to_kaspa(result.get("balance", 0))
            print(f"{addr.to_string()[:20]}...: {balance} KAS")

    def on_utxo_change(self, event):
        print(f"UTXO change detected!")
        for added in event.get("added", []):
            amount = sompi_to_kaspa(added["utxoEntry"]["amount"])
            print(f"  + {amount} KAS")
        for removed in event.get("removed", []):
            amount = sompi_to_kaspa(removed["utxoEntry"]["amount"])
            print(f"  - {amount} KAS")

    async def stop(self):
        await self.client.disconnect()

async def main():
    addresses = ["kaspa:qz..."]
    monitor = WalletMonitor(addresses)

    try:
        await monitor.start()
    except KeyboardInterrupt:
        await monitor.stop()

asyncio.run(main())