Skip to content

Refactor ogmios.py module maintainability #114

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 3, 2022
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ test-single: ## runs tests with "single" markers
qa: ## runs static analyses
poetry run flake8 pycardano
poetry run mypy --install-types --non-interactive pycardano
poetry run black --check .

format: ## runs code style and formatter
poetry run isort .
Expand Down
2 changes: 1 addition & 1 deletion pycardano/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from __future__ import annotations

from enum import Enum
from typing import Union, Type
from typing import Type, Union

from pycardano.crypto.bech32 import decode, encode
from pycardano.exception import (
Expand Down
244 changes: 130 additions & 114 deletions pycardano/backend/ogmios.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import time
from enum import Enum
from typing import Any, Dict, List, Optional, Union, Tuple
from typing import Any, Dict, List, Optional, Tuple, Union

import cbor2
import requests
Expand Down Expand Up @@ -88,7 +88,31 @@ def _request(self, method: OgmiosQueryType, args: JSON) -> Any:
)
return json.loads(response)["result"]

def _check_chain_tip_and_update(self):
def _query_current_protocol_params(self) -> JSON:
args = {"query": "currentProtocolParameters"}
return self._request(OgmiosQueryType.Query, args)

def _query_genesis_config(self) -> JSON:
args = {"query": "genesisConfig"}
return self._request(OgmiosQueryType.Query, args)

def _query_current_epoch(self) -> int:
args = {"query": "currentEpoch"}
return self._request(OgmiosQueryType.Query, args)

def _query_chain_tip(self) -> JSON:
args = {"query": "chainTip"}
return self._request(OgmiosQueryType.Query, args)

def _query_utxos_by_address(self, address: str) -> List[List[JSON]]:
args = {"query": {"utxo": [address]}}
return self._request(OgmiosQueryType.Query, args)

def _query_utxos_by_tx_id(self, tx_id: str, index: int) -> List[List[JSON]]:
args = {"query": {"utxo": [{"txId": tx_id, "index": index}]}}
return self._request(OgmiosQueryType.Query, args)

def _is_chain_tip_updated(self):
slot = self.last_block_slot
if self._last_known_block_slot != slot:
self._last_known_block_slot = slot
Expand All @@ -104,84 +128,84 @@ def _fraction_parser(fraction: str) -> float:
@property
def protocol_param(self) -> ProtocolParameters:
"""Get current protocol parameters"""
args = {"query": "currentProtocolParameters"}
if not self._protocol_param or self._check_chain_tip_and_update():
result = self._request(OgmiosQueryType.Query, args)
param = ProtocolParameters(
min_fee_constant=result["minFeeConstant"],
min_fee_coefficient=result["minFeeCoefficient"],
max_block_size=result["maxBlockBodySize"],
max_tx_size=result["maxTxSize"],
max_block_header_size=result["maxBlockHeaderSize"],
key_deposit=result["stakeKeyDeposit"],
pool_deposit=result["poolDeposit"],
pool_influence=self._fraction_parser(result["poolInfluence"]),
monetary_expansion=self._fraction_parser(result["monetaryExpansion"]),
treasury_expansion=self._fraction_parser(result["treasuryExpansion"]),
decentralization_param=self._fraction_parser(
result.get("decentralizationParameter", "0/1")
),
extra_entropy=result.get("extraEntropy", ""),
protocol_major_version=result["protocolVersion"]["major"],
protocol_minor_version=result["protocolVersion"]["minor"],
min_pool_cost=result["minPoolCost"],
price_mem=self._fraction_parser(result["prices"]["memory"]),
price_step=self._fraction_parser(result["prices"]["steps"]),
max_tx_ex_mem=result["maxExecutionUnitsPerTransaction"]["memory"],
max_tx_ex_steps=result["maxExecutionUnitsPerTransaction"]["steps"],
max_block_ex_mem=result["maxExecutionUnitsPerBlock"]["memory"],
max_block_ex_steps=result["maxExecutionUnitsPerBlock"]["steps"],
max_val_size=result["maxValueSize"],
collateral_percent=result["collateralPercentage"],
max_collateral_inputs=result["maxCollateralInputs"],
coins_per_utxo_word=result.get(
"coinsPerUtxoWord", ALONZO_COINS_PER_UTXO_WORD
),
coins_per_utxo_byte=result.get("coinsPerUtxoByte", 0),
cost_models=result.get("costModels", {}),
)
if not self._protocol_param or self._is_chain_tip_updated():
self._protocol_param = self._fetch_protocol_param()
return self._protocol_param

if "plutus:v1" in param.cost_models:
param.cost_models["PlutusV1"] = param.cost_models.pop("plutus:v1")
if "plutus:v2" in param.cost_models:
param.cost_models["PlutusV2"] = param.cost_models.pop("plutus:v2")
def _fetch_protocol_param(self) -> ProtocolParameters:
result = self._query_current_protocol_params()
param = ProtocolParameters(
min_fee_constant=result["minFeeConstant"],
min_fee_coefficient=result["minFeeCoefficient"],
max_block_size=result["maxBlockBodySize"],
max_tx_size=result["maxTxSize"],
max_block_header_size=result["maxBlockHeaderSize"],
key_deposit=result["stakeKeyDeposit"],
pool_deposit=result["poolDeposit"],
pool_influence=self._fraction_parser(result["poolInfluence"]),
monetary_expansion=self._fraction_parser(result["monetaryExpansion"]),
treasury_expansion=self._fraction_parser(result["treasuryExpansion"]),
decentralization_param=self._fraction_parser(
result.get("decentralizationParameter", "0/1")
),
extra_entropy=result.get("extraEntropy", ""),
protocol_major_version=result["protocolVersion"]["major"],
protocol_minor_version=result["protocolVersion"]["minor"],
min_pool_cost=result["minPoolCost"],
price_mem=self._fraction_parser(result["prices"]["memory"]),
price_step=self._fraction_parser(result["prices"]["steps"]),
max_tx_ex_mem=result["maxExecutionUnitsPerTransaction"]["memory"],
max_tx_ex_steps=result["maxExecutionUnitsPerTransaction"]["steps"],
max_block_ex_mem=result["maxExecutionUnitsPerBlock"]["memory"],
max_block_ex_steps=result["maxExecutionUnitsPerBlock"]["steps"],
max_val_size=result["maxValueSize"],
collateral_percent=result["collateralPercentage"],
max_collateral_inputs=result["maxCollateralInputs"],
coins_per_utxo_word=result.get(
"coinsPerUtxoWord", ALONZO_COINS_PER_UTXO_WORD
),
coins_per_utxo_byte=result.get("coinsPerUtxoByte", 0),
cost_models=result.get("costModels", {}),
)

args = {"query": "genesisConfig"}
result = self._request(OgmiosQueryType.Query, args)
param.min_utxo = result["protocolParameters"]["minUtxoValue"]
if "plutus:v1" in param.cost_models:
param.cost_models["PlutusV1"] = param.cost_models.pop("plutus:v1")
if "plutus:v2" in param.cost_models:
param.cost_models["PlutusV2"] = param.cost_models.pop("plutus:v2")

self._protocol_param = param
return self._protocol_param
result = self._query_genesis_config()
param.min_utxo = result["protocolParameters"]["minUtxoValue"]
return param

@property
def genesis_param(self) -> GenesisParameters:
"""Get chain genesis parameters"""
args = {"query": "genesisConfig"}
if not self._genesis_param or self._check_chain_tip_and_update():
result = self._request(OgmiosQueryType.Query, args)
system_start_unix = int(
calendar.timegm(
time.strptime(
result["systemStart"].split(".")[0], "%Y-%m-%dT%H:%M:%S"
),
)
)
self._genesis_param = GenesisParameters(
active_slots_coefficient=self._fraction_parser(
result["activeSlotsCoefficient"]
),
update_quorum=result["updateQuorum"],
max_lovelace_supply=result["maxLovelaceSupply"],
network_magic=result["networkMagic"],
epoch_length=result["epochLength"],
system_start=system_start_unix,
slots_per_kes_period=result["slotsPerKesPeriod"],
slot_length=result["slotLength"],
max_kes_evolutions=result["maxKesEvolutions"],
security_param=result["securityParameter"],
)
if not self._genesis_param or self._is_chain_tip_updated():
self._genesis_param = self._fetch_genesis_param()
return self._genesis_param

def _fetch_genesis_param(self) -> GenesisParameters:
result = self._query_genesis_config()
system_start_unix = int(
calendar.timegm(
time.strptime(result["systemStart"].split(".")[0], "%Y-%m-%dT%H:%M:%S"),
)
)
return GenesisParameters(
active_slots_coefficient=self._fraction_parser(
result["activeSlotsCoefficient"]
),
update_quorum=result["updateQuorum"],
max_lovelace_supply=result["maxLovelaceSupply"],
network_magic=result["networkMagic"],
epoch_length=result["epochLength"],
system_start=system_start_unix,
slots_per_kes_period=result["slotsPerKesPeriod"],
slot_length=result["slotLength"],
max_kes_evolutions=result["maxKesEvolutions"],
security_param=result["securityParameter"],
)

@property
def network(self) -> Network:
"""Get current network"""
Expand All @@ -190,37 +214,29 @@ def network(self) -> Network:
@property
def epoch(self) -> int:
"""Current epoch number"""
args = {"query": "currentEpoch"}
return self._request(OgmiosQueryType.Query, args)
return self._query_current_epoch()

@property
def last_block_slot(self) -> int:
"""Slot number of last block"""
args = {"query": "chainTip"}
return self._request(OgmiosQueryType.Query, args)["slot"]

def _extract_asset_info(self, asset_hash: str) -> Tuple[str, ScriptHash, AssetName]:
policy_hex, asset_name_hex = asset_hash.split(".")
policy = ScriptHash.from_primitive(policy_hex)
asset_name = AssetName.from_primitive(asset_name_hex)
result = self._query_chain_tip()
return result["slot"]

return policy_hex, policy, asset_name

def _check_utxo_unspent(self, tx_id: str, index: int) -> bool:
"""Check whether an UTxO is unspent with Ogmios.
def utxos(self, address: str) -> List[UTxO]:
"""Get all UTxOs associated with an address.

Args:
tx_id (str): transaction id.
index (int): transaction index.
"""

args = {"query": {"utxo": [{"txId": tx_id, "index": index}]}}
results = self._request(OgmiosQueryType.Query, args)
address (str): An address encoded with bech32.

if results:
return True
Returns:
List[UTxO]: A list of UTxOs.
"""
if self._kupo_url:
utxos = self._utxos_kupo(address)
else:
return False
utxos = self._utxos_ogmios(address)

return utxos

def _utxos_kupo(self, address: str) -> List[UTxO]:
"""Get all UTxOs associated with an address with Kupo.
Expand All @@ -234,7 +250,9 @@ def _utxos_kupo(self, address: str) -> List[UTxO]:
List[UTxO]: A list of UTxOs.
"""
if self._kupo_url is None:
raise AssertionError("kupo_url object attribute has not been assigned properly.")
raise AssertionError(
"kupo_url object attribute has not been assigned properly."
)

address_url = self._kupo_url + "/" + address
results = requests.get(address_url).json()
Expand Down Expand Up @@ -288,6 +306,23 @@ def _utxos_kupo(self, address: str) -> List[UTxO]:

return utxos

def _check_utxo_unspent(self, tx_id: str, index: int) -> bool:
"""Check whether an UTxO is unspent with Ogmios.

Args:
tx_id (str): transaction id.
index (int): transaction index.
"""
results = self._query_utxos_by_tx_id(tx_id, index)
return len(results) > 0

def _extract_asset_info(self, asset_hash: str) -> Tuple[str, ScriptHash, AssetName]:
policy_hex, asset_name_hex = asset_hash.split(".")
policy = ScriptHash.from_primitive(policy_hex)
asset_name = AssetName.from_primitive(asset_name_hex)

return policy_hex, policy, asset_name

def _utxos_ogmios(self, address: str) -> List[UTxO]:
"""Get all UTxOs associated with an address with Ogmios.

Expand All @@ -297,12 +332,9 @@ def _utxos_ogmios(self, address: str) -> List[UTxO]:
Returns:
List[UTxO]: A list of UTxOs.
"""

args = {"query": {"utxo": [address]}}
results = self._request(OgmiosQueryType.Query, args)
results = self._query_utxos_by_address(address)

utxos = []

for result in results:
in_ref = result[0]
output = result[1]
Expand Down Expand Up @@ -360,22 +392,6 @@ def _utxos_ogmios(self, address: str) -> List[UTxO]:

return utxos

def utxos(self, address: str) -> List[UTxO]:
"""Get all UTxOs associated with an address.

Args:
address (str): An address encoded with bech32.

Returns:
List[UTxO]: A list of UTxOs.
"""
if self._kupo_url:
utxos = self._utxos_kupo(address)
else:
utxos = self._utxos_ogmios(address)

return utxos

def submit_tx(self, cbor: Union[bytes, str]):
"""Submit a transaction to the blockchain.

Expand Down
2 changes: 1 addition & 1 deletion pycardano/hash.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""All type of hashes in Cardano ledger spec."""

from typing import TypeVar, Union, Type
from typing import Type, TypeVar, Union

from pycardano.serialization import CBORSerializable

Expand Down
2 changes: 1 addition & 1 deletion pycardano/metadata.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any, ClassVar, List, Union, Type
from typing import Any, ClassVar, List, Type, Union

from cbor2 import CBORTag
from nacl.encoding import RawEncoder
Expand Down
2 changes: 1 addition & 1 deletion pycardano/nativescript.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import ClassVar, List, Union, Type
from typing import ClassVar, List, Type, Union

from nacl.encoding import RawEncoder
from nacl.hash import blake2b
Expand Down
2 changes: 1 addition & 1 deletion pycardano/plutus.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import json
from dataclasses import dataclass, field, fields
from enum import Enum
from typing import Any, ClassVar, List, Optional, Union, Type
from typing import Any, ClassVar, List, Optional, Type, Union

import cbor2
from cbor2 import CBORTag
Expand Down
Loading