bittensor.utils.async_substrate_interface#
This library comprises the asyncio-compatible version of the subtensor interface commands we use in bittensor, as well as its helper functions and classes. The docstring for the AsyncSubstrateInterface class goes more in-depth in regard to how to instantiate and use it.
Attributes#
Exceptions#
Common base class for all non-exit exceptions. |
Classes#
The asyncio-compatible version of the subtensor interface commands we use in bittensor. It is important to |
|
Object containing information of submitted extrinsic. Block hash where extrinsic is included is required |
|
Websocket manager object. Allows for the use of a single websocket connection by multiple |
Functions#
|
Module Contents#
- class bittensor.utils.async_substrate_interface.AsyncSubstrateInterface(chain_endpoint, use_remote_preset=False, auto_discover=True, ss58_format=None, type_registry=None, chain_name=None)#
The asyncio-compatible version of the subtensor interface commands we use in bittensor. It is important to initialise this class asynchronously in an async context manager using async with AsyncSubstrateInterface(). Otherwise, some (most) methods will not work properly, and may raise exceptions.
- Parameters:
chain_endpoint (str) – the URI of the chain to connect to
use_remote_preset (bool) – whether to pull the preset from GitHub
auto_discover (bool) – whether to automatically pull the presets based on the chain name and type registry
ss58_format (Optional[int]) – the specific SS58 format to use
type_registry (Optional[dict]) – a dict of custom types
chain_name (Optional[str]) – the name of the chain (the result of the rpc request for “system_chain”)
- async __aenter__()#
- async __aexit__(exc_type, exc_val, exc_tb)#
- __chain#
- __metadata = None#
- __metadata_cache#
- _forgettable_task = None#
- async _get_block_handler(block_hash, ignore_decoding_errors=False, include_author=False, header_only=False, finalized_only=False, subscription_handler=None)#
- async _get_current_block_hash(block_hash, reuse)#
- _lock#
- async _make_rpc_request(payloads, value_scale_type=None, storage_item=None, runtime=None, result_handler=None)#
- Parameters:
- Return type:
- async _preprocess(query_for, block_hash, storage_function, module)#
Creates a Preprocessed data object for passing to _make_rpc_request
- Parameters:
- Return type:
- async _process_response(response, subscription_id, value_scale_type=None, storage_item=None, runtime=None, result_handler=None)#
Processes the RPC call response by decoding it, returning it as is, or setting a handler for subscriptions, depending on the specific call.
- Parameters:
response (dict) – the RPC call response
subscription_id (Union[int, str]) – the subscription id for subscriptions, used only for subscriptions with a result handler
value_scale_type (Optional[str]) – Scale Type string used for decoding ScaleBytes results
storage_item (Optional[scalecodec.base.ScaleType]) – The ScaleType object used for decoding ScaleBytes results
runtime (Optional[Runtime]) – the runtime object, used for decoding ScaleBytes results
result_handler (Optional[ResultHandler]) – the result handler coroutine used for handling longer-running subscriptions
- Returns:
(decoded response, completion)
- Return type:
- apply_type_registry_presets(use_remote_preset=True, auto_discover=True)#
- property chain#
- Returns the substrate chain currently associated with object
- chain_endpoint#
- async close()#
Closes the substrate connection, and the websocket connection.
- async compose_call(call_module, call_function, call_params=None, block_hash=None)#
Composes a call payload which can be used in an extrinsic.
- Parameters:
call_module (str) – Name of the runtime module e.g. Balances
call_function (str) – Name of the call function e.g. transfer
call_params (Optional[dict]) – This is a dict containing the params of the call. e.g. {‘dest’: ‘EaG2CRhJWPb7qmdcJvy3LiWdh26Jreu9Dx6R1rXxPmYXoDk’, ‘value’: 1000000000000}
block_hash (Optional[str]) – Use metadata at given block_hash to compose call
- Returns:
A composed call
- Return type:
scalecodec.types.GenericCall
- config#
- async create_scale_object(type_string, data=None, block_hash=None, **kwargs)#
Convenience method to create a SCALE object of type type_string, this will initialize the runtime automatically at moment of block_hash, or chain tip if omitted.
- Parameters:
- Returns:
The created Scale Type object
- Return type:
scalecodec.base.ScaleType
- async create_signed_extrinsic(call, keypair, era=None, nonce=None, tip=0, tip_asset_id=None, signature=None)#
Creates an extrinsic signed by given account details
- Parameters:
call (scalecodec.types.GenericCall) – GenericCall to create extrinsic for
keypair (bittensor_wallet.Keypair) – Keypair used to sign the extrinsic
era (Optional[dict]) – Specify mortality in blocks in follow format: {‘period’: [amount_blocks]} If omitted the extrinsic is immortal
nonce (Optional[int]) – nonce to include in extrinsics, if omitted the current nonce is retrieved on-chain
tip (int) – The tip for the block author to gain priority during network congestion
tip_asset_id (Optional[int]) – Optional asset ID with which to pay the tip
signature (Optional[Union[bytes, str]]) – Optionally provide signature if externally signed
- Returns:
The signed Extrinsic
- Return type:
scalecodec.GenericExtrinsic
- async create_storage_key(pallet, storage_function, params=None, block_hash=None)#
Create a StorageKey instance providing storage function details. See subscribe_storage().
- Parameters:
- Returns:
StorageKey
- Return type:
substrateinterface.storage.StorageKey
- async decode_scale(type_string, scale_bytes)#
Helper function to decode arbitrary SCALE-bytes (e.g. 0x02000000) according to given RUST type_string (e.g. BlockNumber). The relevant versioning information of the type (if defined) will be applied if block_hash is set
- Parameters:
type_string – the type string of the SCALE object for decoding
scale_bytes (bytes) – the SCALE-bytes representation of the SCALE object to decode
- Returns:
Decoded object
- Return type:
Any
- async generate_signature_payload(call, era=None, nonce=0, tip=0, tip_asset_id=None, include_call_length=False)#
- async get_account_nonce(account_address)#
Returns current nonce for given account address
- async get_block(block_hash=None, block_number=None, ignore_decoding_errors=False, include_author=False, finalized_only=False)#
Retrieves a block and decodes its containing extrinsics and log digest items. If block_hash and block_number is omitted the chain tip will be retrieve, or the finalized head if finalized_only is set to true.
Either block_hash or block_number should be set, or both omitted.
- Parameters:
block_hash (Optional[str]) – the hash of the block to be retrieved
block_number (Optional[int]) – the block number to retrieved
ignore_decoding_errors (bool) – When set this will catch all decoding errors, set the item to None and continue decoding
include_author (bool) – This will retrieve the block author from the validator set and add to the result
finalized_only (bool) – when no block_hash or block_number is set, this will retrieve the finalized head
- Returns:
A dict containing the extrinsic and digest logs data
- Return type:
Optional[dict]
- async get_block_metadata(block_hash=None, decode=True)#
A pass-though to existing JSONRPC method state_getMetadata.
- async get_block_number(block_hash=None)#
Async version of substrateinterface.base.get_block_number method.
- async get_block_runtime_version(block_hash)#
Retrieve the runtime version id of given block_hash
- async get_chain_finalised_head()#
A pass-though to existing JSONRPC method chain_getFinalizedHead
Returns#
- async get_constant(module_name, constant_name, block_hash=None, reuse_block_hash=False)#
Returns the decoded ScaleType object of the constant for given module name, call function name and block_hash (or chaintip if block_hash is omitted)
- Parameters:
- Returns:
ScaleType from the runtime call
- Return type:
Optional[scalecodec.base.ScaleType]
- async get_events(block_hash=None)#
Convenience method to get events for a certain block (storage call for module ‘System’ and function ‘Events’)
- async get_metadata_call_function(module_name, call_function_name, block_hash=None)#
Retrieves a list of all call functions in metadata active for given block_hash (or chaintip if block_hash is omitted)
- async get_metadata_constant(module_name, constant_name, block_hash=None)#
Retrieves the details of a constant for given module name, call function name and block_hash (or chaintip if block_hash is omitted)
- Parameters:
module_name – name of the module you are querying
constant_name – name of the constant you are querying
block_hash – hash of the block at which to make the runtime API call
- Returns:
MetadataModuleConstants
- async get_payment_info(call, keypair)#
Retrieves fee estimation via RPC for given extrinsic
- Parameters:
call (scalecodec.types.GenericCall) – Call object to estimate fees for
keypair (bittensor_wallet.Keypair) – Keypair of the sender, does not have to include private key because no valid signature is required
- Returns:
Dict with payment info E.g. {‘class’: ‘normal’, ‘partialFee’: 151000000, ‘weight’: {‘ref_time’: 143322000}}
- Return type:
- property implements_scaleinfo: bool | None#
Returns True if current runtime implementation a PortableRegistry (MetadataV14 and higher)
Returns#
bool
- Return type:
Optional[bool]
- async init_runtime(block_hash=None, block_id=None)#
This method is used by all other methods that deals with metadata and types defined in the type registry. It optionally retrieves the block_hash when block_id is given and sets the applicable metadata for that block_hash. Also, it applies all the versioned types at the time of the block_hash.
Because parsing of metadata and type registry is quite heavy, the result will be cached per runtime id. In the future there could be support for caching backends like Redis to make this cache more persistent.
- async initialize()#
Initialize the connection to the chain.
- initialized = False#
- async load_registry()#
- static make_payload(id_, method, params)#
Creates a payload for making an rpc_request with _make_rpc_request
- property metadata#
- metadata_version_hex = '0x0f000000'#
- async query(module, storage_function, params=None, block_hash=None, raw_storage_key=None, subscription_handler=None, reuse_block_hash=False)#
Queries subtensor. This should only be used when making a single request. For multiple requests, you should use
self.query_multiple
- async query_map(module, storage_function, params=None, block_hash=None, max_results=None, start_key=None, page_size=100, ignore_decoding_errors=False, reuse_block_hash=False)#
Iterates over all key-pairs located at the given module and storage_function. The storage item must be a map.
Example:
``` result = await substrate.query_map(‘System’, ‘Account’, max_results=100)
- async for account, account_info in result:
print(f”Free balance of account ‘{account.value}’: {account_info.value[‘data’][‘free’]}”)
Note: it is important that you do not use for x in result.records, as this will sidestep possible pagination. You must do async for x in result.
- Parameters:
module (str) – The module name in the metadata, e.g. System or Balances.
storage_function (str) – The storage function name, e.g. Account or Locks.
params (Optional[list]) – The input parameters in case of for example a DoubleMap storage function
block_hash (Optional[str]) – Optional block hash for result at given block, when left to None the chain tip will be used.
max_results (Optional[int]) – the maximum of results required, if set the query will stop fetching results when number is reached
start_key (Optional[str]) – The storage key used as offset for the results, for pagination purposes
page_size (int) – The results are fetched from the node RPC in chunks of this size
ignore_decoding_errors (bool) – When set this will catch all decoding errors, set the item to None and continue decoding
reuse_block_hash (bool) – use True if you wish to make the query using the last-used block hash. Do not mark True if supplying a block_hash
- Returns:
QueryMapResult object
- Return type:
- async query_multi(storage_keys, block_hash=None)#
Query multiple storage keys in one request.
Example:
``` storage_keys = [
- substrate.create_storage_key(
“System”, “Account”, [“F4xQKRUagnSGjFqafyhajLs94e7Vvzvr8ebwYJceKpr8R7T”]
), substrate.create_storage_key(
“System”, “Account”, [“GSEX8kR4Kz5UZGhvRUCJG93D5hhTAoVZ5tAe6Zne7V42DSi”]
)
]
- async query_multiple(params, storage_function, module, block_hash=None, reuse_block_hash=False)#
Queries the subtensor. Only use this when making multiple queries, else use
self.query
- reload_type_registry(use_remote_preset=True, auto_discover=True)#
Reload type registry and preset used to instantiate the AsyncSubstrateInterface object. Useful to periodically apply changes in type definitions when a runtime upgrade occurred
- async rpc_request(method, params, block_hash=None, reuse_block_hash=False)#
Makes an RPC request to the subtensor. Use this only if self.query` and self.query_multiple and self.query_map do not meet your needs.
- Parameters:
method (str) – str the method in the RPC request
params (Optional[list]) – list of the params in the RPC request
block_hash (Optional[str]) – the hash of the block — only supply this if not supplying the block hash in the params, and not reusing the block hash
reuse_block_hash (bool) – whether to reuse the block hash in the params — only mark as True if not supplying the block hash in the params, or via the block_hash parameter
- Returns:
the response from the RPC request
- Return type:
Any
- runtime = None#
- runtime_cache#
- async runtime_call(api, method, params=None, block_hash=None)#
Calls a runtime API method
- Parameters:
- Returns:
ScaleType from the runtime call
- Return type:
scalecodec.base.ScaleType
- runtime_config#
- runtime_version = None#
- ss58_format#
- async submit_extrinsic(extrinsic, wait_for_inclusion=False, wait_for_finalization=False)#
- Submit an extrinsic to the connected node, with the possibility to wait until the extrinsic is included
in a block and/or the block is finalized. The receipt returned provided information about the block and triggered events
- Parameters:
extrinsic (scalecodec.GenericExtrinsic) – Extrinsic The extrinsic to be sent to the network
wait_for_inclusion (bool) – wait until extrinsic is included in a block (only works for websocket connections)
wait_for_finalization (bool) – wait until extrinsic is finalized (only works for websocket connections)
- Returns:
ExtrinsicReceipt object of your submitted extrinsic
- Return type:
- transaction_version = None#
- type_registry#
- type_registry_preset = None#
- async wait_for_block(block, result_handler, task_return=True)#
Executes the result_handler when the chain has reached the block specified.
- Parameters:
block (int) – block number
result_handler (Callable[[dict], Awaitable[Any]]) – coroutine executed upon reaching the block number. This can be basically anything, but must accept one single arg, a dict with the block data; whether you use this data or not is entirely up to you.
task_return (bool) – True to immediately return the result of wait_for_block as an asyncio Task, False to wait for the block to be reached, and return the result of the result handler.
- Returns:
- Either an asyncio.Task (which contains the running subscription, and whose result() will contain the
return of the result_handler), or the result itself, depending on task_return flag. Note that if your result_handler returns None, this method will return True, otherwise the return will be the result of your result_handler.
- Return type:
Union[asyncio.Task, Union[bool, Any]]
- ws#
- class bittensor.utils.async_substrate_interface.ExtrinsicReceipt(substrate, extrinsic_hash=None, block_hash=None, block_number=None, extrinsic_idx=None, finalized=None)#
Object containing information of submitted extrinsic. Block hash where extrinsic is included is required when retrieving triggered events or determine if extrinsic was successful
Object containing information of submitted extrinsic. Block hash where extrinsic is included is required when retrieving triggered events or determine if extrinsic was successful
- Parameters:
substrate (AsyncSubstrateInterface) – the AsyncSubstrateInterface instance
extrinsic_hash (Optional[str]) – the hash of the extrinsic
block_hash (Optional[str]) – the hash of the block on which this extrinsic exists
finalized – whether the extrinsic is finalized
block_number (Optional[int])
extrinsic_idx (Optional[int])
- __error_message = None#
- __extrinsic = None#
- __extrinsic_idx#
- static __get_extrinsic_index(block_extrinsics, extrinsic_hash)#
Returns the index of a provided extrinsic
- __getitem__(item)#
- __iter__()#
- __total_fee_amount = None#
- __weight = None#
- block_hash#
- block_number#
- async error_message()#
Returns the error message if the extrinsic failed in format e.g.:
{‘type’: ‘System’, ‘name’: ‘BadOrigin’, ‘docs’: ‘Bad origin’}
Returns#
dict
- Return type:
Optional[dict]
- extrinsic_hash#
- async extrinsic_idx()#
Retrieves the index of this extrinsic in containing block
Returns#
int
- Return type:
- finalized#
- get(name)#
- async get_extrinsic_identifier()#
Returns the on-chain identifier for this extrinsic in format “[block_number]-[extrinsic_idx]” e.g. 134324-2 Returns ——- str
- Return type:
- async is_success()#
Returns True if ExtrinsicSuccess event is triggered, False in case of ExtrinsicFailed In case of False error_message will contain more details about the error
Returns#
bool
- Return type:
- async process_events()#
- async retrieve_extrinsic()#
- substrate#
- async total_fee_amount()#
Contains the total fee costs deducted when executing this extrinsic. This includes fee for the validator ( (Balances.Deposit event) and the fee deposited for the treasury (Treasury.Deposit event)
Returns#
int
- Return type:
- class bittensor.utils.async_substrate_interface.Preprocessed#
-
- storage_item: scalecodec.base.ScaleType#
- class bittensor.utils.async_substrate_interface.QueryMapResult(records, page_size, substrate, module=None, storage_function=None, params=None, block_hash=None, last_key=None, max_results=None, ignore_decoding_errors=False)#
- Parameters:
- __aiter__()#
- async __anext__()#
- __getitem__(item)#
- _buffer#
- block_hash#
- ignore_decoding_errors#
- last_key#
- loading_complete = False#
- max_results#
- module#
- page_size#
- params#
- records#
- storage_function#
- substrate#
- class bittensor.utils.async_substrate_interface.RequestManager(payloads)#
- RequestResults#
- add_request(item_id, request_id)#
Adds an outgoing request to the responses map for later retrieval
- Parameters:
item_id (int)
request_id (Any)
- add_response(item_id, response, complete)#
Maps a response to the request for later retrieval
- get_results()#
Generates a dictionary mapping the requests initiated to the responses received.
- Return type:
RequestResults
- overwrite_request(item_id, request_id)#
Overwrites an existing request in the responses map with a new request_id. This is used for multipart responses that generate a subscription id we need to watch, rather than the initial request_id.
- Parameters:
item_id (int)
request_id (Any)
- payloads_count#
- response_map#
- responses#
- bittensor.utils.async_substrate_interface.ResultHandler#
- class bittensor.utils.async_substrate_interface.Runtime(chain, runtime_config, metadata, type_registry)#
- __str__()#
- apply_type_registry_presets(use_remote_preset=True, auto_discover=True)#
Applies type registry presets to the runtime
- cache_region = None#
- chain#
- config#
- property implements_scaleinfo: bool#
Returns True if current runtime implementation a PortableRegistry (MetadataV14 and higher)
- Return type:
- metadata = None#
- reload_type_registry(use_remote_preset=True, auto_discover=True)#
Reload type registry and preset used to instantiate the SubstrateInterface object. Useful to periodically apply changes in type definitions when a runtime upgrade occurred
- runtime_config#
- runtime_version = None#
- transaction_version = None#
- type_registry#
- type_registry_preset = None#
- exception bittensor.utils.async_substrate_interface.TimeoutException#
Bases:
Exception
Common base class for all non-exit exceptions.
Initialize self. See help(type(self)) for accurate signature.
- class bittensor.utils.async_substrate_interface.Websocket(ws_url, max_subscriptions=1024, max_connections=100, shutdown_timer=5, options=None)#
Websocket manager object. Allows for the use of a single websocket connection by multiple calls.
- Parameters:
- async __aenter__()#
- async __aexit__(exc_type, exc_val, exc_tb)#
- _attempts = 0#
- _exit_task = None#
- async _exit_with_timer()#
Allows for graceful shutdown of websocket connection after specified number of seconds, allowing for reuse of the websocket connection.
- _in_use = 0#
- _initialized = False#
- _lock#
- _open_subscriptions = 0#
- _options#
- _received#
- _receiving_task = None#
- async _recv()#
- Return type:
None
- async _start_receiving()#
- id = 0#
- max_connections#
- max_subscriptions#
- async retrieve(item_id)#
Retrieves a single item from received responses dict queue
- async send(payload)#
Sends a payload to the websocket connection.
- async shutdown()#
- shutdown_timer#
- ws_url#
- bittensor.utils.async_substrate_interface.timeout_handler(signum, frame)#