Skip to content

API Reference

This reference documents the main classes and methods in the Ethereal Python SDK.

Client Classes

AsyncRESTClient

The primary asynchronous client for interacting with the Ethereal API via REST endpoints. Recommended for all new applications.

from ethereal import AsyncRESTClient

# Create async client (use within async function)
client = await AsyncRESTClient.create({
    "base_url": "https://api.ethereal.trade",
    "chain_config": {
        "rpc_url": "https://rpc.ethereal.trade",
        "private_key": "your_private_key",  # optional, required for signing
    }
})

# Use the client
products = await client.products()
subaccounts = await client.subaccounts()

# Remember to close when done
await client.close()

ethereal.async_rest_client.AsyncRESTClient

Bases: AsyncHTTPClient

Asynchronous REST client for the Ethereal API.

Notes for maintainers: - This client composes endpoint functions from the ethereal.rest.* modules by assigning them as attributes on the class (see below). Each function expects self to provide get, post, and get_validated from AsyncHTTPClient and to expose _models for the active network. - Network-specific models are accessed via self._models which is set based on the configured network. This avoids global mutation and is predictable. - Use AsyncRESTClient.create(...) to ensure async initialization (RPC config, optional chain client) happens before use. Remember to await client.close() to release the underlying httpx.AsyncClient. - We intentionally avoid “async properties”. For convenience methods that derive indices (e.g., products by ticker/id), use explicit async methods like products_by_ticker() or products_by_id() that fetch fresh data each call to keep behavior lightweight and predictable.

Source code in ethereal/async_rest_client.py
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
class AsyncRESTClient(AsyncHTTPClient):
    """Asynchronous REST client for the Ethereal API.

    Notes for maintainers:
    - This client composes endpoint functions from the `ethereal.rest.*` modules
      by assigning them as attributes on the class (see below). Each function
      expects `self` to provide `get`, `post`, and `get_validated` from
      AsyncHTTPClient and to expose `_models` for the active network.
    - Network-specific models are accessed via `self._models` which is set based
      on the configured network. This avoids global mutation and is predictable.
    - Use `AsyncRESTClient.create(...)` to ensure async initialization (RPC
      config, optional chain client) happens before use. Remember to `await
      client.close()` to release the underlying `httpx.AsyncClient`.
    - We intentionally avoid “async properties”. For convenience methods that
      derive indices (e.g., products by ticker/id), use explicit async methods
      like `products_by_ticker()` or `products_by_id()` that fetch fresh
      data each call to keep behavior lightweight and predictable.
    """

    list_funding = list_funding
    get_projected_funding = get_projected_funding
    list_projected_funding = list_projected_funding
    get_order = get_order
    list_fills = list_fills
    list_orders = list_orders
    list_trades = list_trades
    prepare_order = prepare_order
    sign_order = sign_order
    submit_order = submit_order
    dry_run_order = dry_run_order
    prepare_cancel_order = prepare_cancel_order
    sign_cancel_order = sign_cancel_order
    cancel_order = cancel_order
    get_signer = get_signer
    get_signer_quota = get_signer_quota
    list_signers = list_signers
    prepare_linked_signer = prepare_linked_signer
    sign_linked_signer = sign_linked_signer
    link_linked_signer = link_linked_signer
    prepare_revoke_linked_signer = prepare_revoke_linked_signer
    sign_revoke_linked_signer = sign_revoke_linked_signer
    revoke_linked_signer = revoke_linked_signer
    prepare_refresh_linked_signer = prepare_refresh_linked_signer
    sign_refresh_linked_signer = sign_refresh_linked_signer
    refresh_linked_signer = refresh_linked_signer
    list_positions = list_positions
    get_position = get_position
    list_position_liquidations = list_position_liquidations
    get_market_liquidity = get_market_liquidity
    list_market_prices = list_market_prices
    list_products = list_products
    get_rpc_config = get_rpc_config
    list_subaccounts = list_subaccounts
    get_subaccount = get_subaccount
    get_subaccount_balances = get_subaccount_balances
    get_subaccount_balance_history = get_subaccount_balance_history
    get_subaccount_unrealized_pnl_history = get_subaccount_unrealized_pnl_history
    get_subaccount_volume_history = get_subaccount_volume_history
    get_subaccount_funding_history = get_subaccount_funding_history
    get_subaccount_total_volume = get_subaccount_total_volume
    get_referral_summary = get_referral_summary
    list_referrals = list_referrals
    get_token = get_token
    list_token_withdraws = list_token_withdraws
    list_tokens = list_tokens
    list_token_transfers = list_token_transfers
    prepare_withdraw_token = prepare_withdraw_token
    sign_withdraw_token = sign_withdraw_token
    submit_withdraw_token = submit_withdraw_token

    def __init__(self, config: Union[Dict[str, Any], RESTConfig] = {}):
        self.config = RESTConfig.model_validate(config)
        network = self.config.network or "testnet"

        if not self.config.base_url:
            self.config.base_url = AnyHttpUrl(
                NETWORK_URLS.get(network, NETWORK_URLS["testnet"])
            )

        if not self.config.archive_base_url:
            self.config.archive_base_url = AnyHttpUrl(
                ARCHIVE_NETWORK_URLS.get(network, ARCHIVE_NETWORK_URLS["testnet"])
            )

        self._models = importlib.import_module(
            _MODEL_PATHS.get(network, _MODEL_PATHS["default"])  # type: ignore
        )
        super().__init__(self.config)

        self._archive_base_url = self.config.archive_base_url
        self.chain: Optional[ChainClient] = None
        self.rpc_config: Optional[RpcConfigDto] = None
        self.private_key: Optional[str] = None
        self.provider: Optional[Any] = None
        self.default_time_in_force = self.config.default_time_in_force
        self.default_post_only = self.config.default_post_only
        self._subaccounts: Optional[List[SubaccountDto]] = None
        self._products: Optional[List[ProductDto]] = None
        self._tokens: Optional[List[TokenDto]] = None
        self._products_by_ticker: Optional[Dict[str, ProductDto]] = None
        self._products_by_id: Optional[Dict[UUID, ProductDto]] = None

    @classmethod
    async def create(
        cls, config: Union[Dict[str, Any], RESTConfig] = {}
    ) -> AsyncRESTClient:
        """Factory method to create and asynchronously initialize the client.

        Args:
            config (Union[Dict[str, Any], RESTConfig], optional): Configuration dictionary or RESTConfig object. Optional fields include:
                private_key (str, optional): The private key.
                base_url (str, optional): Base URL for REST requests. Defaults to mainnet.
                timeout (int, optional): Timeout in seconds for REST requests.
                verbose (bool, optional): Enables debug logging. Defaults to False.
                rate_limit_headers (bool, optional): Enables rate limit headers. Defaults to False.
                chain_config (ChainConfig, optional): Chain configuration for signing transactions.

        Returns:
            AsyncRESTClient: Fully initialized async client instance.
        """
        client = cls(config)
        await client._async_init()
        return client

    async def _async_init(self):
        """Asynchronous initialization.

        Loads RPC configuration and (optionally) initializes the chain client if
        `chain_config` is provided. This split constructor pattern keeps
        `__init__` sync, while `create()` ensures the instance is fully ready.
        """
        self.rpc_config = await self.get_rpc_config()
        tokens = await self.list_tokens()
        if self.config.chain_config:
            self._init_chain_client(self.config.chain_config, self.rpc_config, tokens)
        self.private_key = self.chain.private_key if self.chain else None
        self.provider = self.chain.provider if self.chain else None
        # No eager cache priming; caches are populated on first access

    def _init_chain_client(
        self,
        config: Union[Dict[str, Any], ChainConfig],
        rpc_config: Optional[RpcConfigDto] = None,
        tokens: Optional[List[TokenDto]] = None,
    ):
        """Initialize the ChainClient for transaction signing.

        Args:
            config (Union[Dict[str, Any], ChainConfig]): The chain configuration.
            rpc_config (RpcConfigDto, optional): RPC configuration. Defaults to None.
            tokens (List[TokenDto], optional): List of token configurations. Defaults to None.
        """
        config = ChainConfig.model_validate(config)
        try:
            self.chain = ChainClient(config, rpc_config, tokens)
            self.logger.debug("Chain client initialized successfully")
        except Exception as e:
            self.logger.debug(f"Failed to initialize chain client: {e}")

    async def _get_pages(
        self,
        endpoint: str,
        request_model: type[BaseModel],
        response_model: type[BaseModel],
        paginate: bool = False,
        max_pages: int = 10,
        **kwargs,
    ) -> Any:
        """Make a GET request with validated parameters and response, handling pagination.

        Args:
            endpoint (str): API endpoint path (e.g. "order" will be appended to the base URL and prefix to form "/v1/order")
            request_model (type[BaseModel]): Pydantic model for request validation
            response_model (type[BaseModel]): Pydantic model for response validation
            paginate (bool, optional): If True, fetch all pages and return aggregated list. Defaults to False.
            max_pages (int, optional): Maximum number of pages to fetch when paginating. Defaults to 10.

        Other Parameters:
            **kwargs: Parameters to validate and include in the request

        Returns:
            Any: List of validated response objects. Single page if paginate=False, all pages if paginate=True.
        """
        first_page = await self.get_validated(
            url_path=f"{API_PREFIX}/{endpoint}",
            request_model=request_model,
            response_model=response_model,
            **kwargs,
        )
        # `get_validated` already returns an instance of `response_model` with
        # `.data`, `.has_next`, and `.next_cursor`.
        if not paginate:
            return list(getattr(first_page, "data"))

        all_items: List[Any] = list(getattr(first_page, "data"))
        current_cursor = getattr(first_page, "next_cursor", None)
        has_next = getattr(first_page, "has_next", False)

        page_count = 1
        while has_next and current_cursor and page_count <= max_pages:
            page = await self.get_validated(
                url_path=f"{API_PREFIX}/{endpoint}",
                request_model=request_model,
                response_model=response_model,
                cursor=current_cursor,
                **kwargs,
            )
            all_items.extend(getattr(page, "data"))
            has_next = getattr(page, "has_next", False)
            current_cursor = getattr(page, "next_cursor", None)
            page_count += 1

        return all_items

    async def subaccounts(self, refresh: bool = False) -> List[SubaccountDto]:
        """Get the list of subaccounts.

        Args:
            refresh (bool, optional): If True, bypass cache and fetch fresh data. Defaults to False.

        Returns:
            List[SubaccountDto]: List of subaccount objects for the connected wallet address.

        Raises:
            ValueError: If no chain client is configured or address is unavailable.
        """
        if not self.chain or not getattr(self.chain, "address", None):
            raise ValueError("Chain address is required to list subaccounts")
        if refresh or self._subaccounts is None:
            self._subaccounts = await self.list_subaccounts(
                sender=self.chain.address, order_by="createdAt", order="asc"
            )
        return self._subaccounts

    async def products(self, refresh: bool = False) -> List[ProductDto]:
        """Get the list of products.

        Args:
            refresh (bool, optional): If True, bypass cache and fetch fresh data. Defaults to False.

        Returns:
            List[ProductDto]: List of product objects.
        """
        if refresh or self._products is None:
            self._products = await self.list_products()
            self._products_by_id = None
            self._products_by_ticker = None
        return self._products

    async def tokens(self, refresh: bool = False) -> List[TokenDto]:
        """Get the list of tokens.

        Args:
            refresh (bool, optional): If True, bypass cache and fetch fresh data. Defaults to False.

        Returns:
            List[TokenDto]: List of token objects.
        """
        if refresh or self._tokens is None:
            self._tokens = await self.list_tokens()
        return self._tokens

    async def products_by_ticker(self, refresh: bool = False) -> Dict[str, ProductDto]:
        """Get the products indexed by ticker.

        Args:
            refresh (bool, optional): If True, bypass cache and fetch fresh data. Defaults to False.

        Returns:
            Dict[str, ProductDto]: Dictionary of products keyed by ticker.
        """
        if refresh or self._products_by_ticker is None:
            products = await self.products(refresh=refresh)
            self._products_by_ticker = {
                p.ticker: p for p in products if getattr(p, "ticker", None)
            }
        return self._products_by_ticker

    async def products_by_id(self, refresh: bool = False) -> Dict[UUID, ProductDto]:
        """Get the products indexed by ID.

        Args:
            refresh (bool, optional): If True, bypass cache and fetch fresh data. Defaults to False.

        Returns:
            Dict[UUID, ProductDto]: Dictionary of products keyed by ID.
        """
        if refresh or self._products_by_id is None:
            products = await self.products(refresh=refresh)
            self._products_by_id = {p.id: p for p in products}
        return self._products_by_id

    async def get_maintenance_margin(
        self,
        subaccount_id: UUID,
        positions: Optional[List[PositionDto] | List[Dict[str, Any]]] = None,
        products: Optional[List[ProductDto] | List[Dict[str, Any]]] = None,
        product_ids: Optional[List[UUID]] = None,
    ) -> Decimal:
        """Calculate the an account's maintenance margin for specified positions or products.

        Args:
            subaccount_id (str): Fetch positions for this subaccount when ``positions`` is not supplied.
            positions (List[PositionDto] | List[Dict[str, Any]], optional): Pre-fetched positions to use directly.
            products (List[ProductDto] | List[Dict[str, Any]], optional): Pre-fetched products used to filter
                the calculation.
            product_ids (List[str] | List[UUID], optional): Filters the calculation to these product IDs.

        Returns:
            Decimal: Total maintenance margin for the filtered positions.

        Raises:
            ValueError: If neither positions nor subaccount context is provided, or if any
                referenced product cannot be resolved.
        """

        class PartialPosition(BaseModel):
            product_id: UUID
            cost: str

        class PartialProduct(BaseModel):
            id: UUID
            max_leverage: float
            taker_fee: str

        if positions is None:
            positions = await self.list_positions(subaccount_id=subaccount_id)

        if products and product_ids:
            raise ValueError("Can only specify one of products and product_ids")

        if products is not None:
            valid_products = [
                PartialProduct(**p if isinstance(p, dict) else p.model_dump())
                for p in products
            ]
            products_by_id: Dict[UUID, PartialProduct] = {
                product.id: product for product in valid_products
            }
        else:
            raw_products_by_id = await self.products_by_id()
            products_by_id = {
                UUID(str(k)): PartialProduct(
                    **v if isinstance(v, dict) else v.model_dump()
                )
                for k, v in raw_products_by_id.items()
            }

        product_ids_set: Optional[Set[UUID]] = None
        if product_ids is not None:
            product_ids_set = set([UUID(str(pid)) for pid in product_ids])
            products_by_id_keys = set(products_by_id.keys())
            missing_products = product_ids_set - products_by_id_keys
            if missing_products:
                missing = ", ".join(sorted([str(p) for p in missing_products]))
                raise ValueError(f"Products not found for calculation: {missing}")

        valid_positions = [
            PartialPosition(**p if isinstance(p, dict) else p.model_dump())
            for p in positions
        ]
        positions_filtered = [
            position
            for position in valid_positions
            if product_ids_set is None or position.product_id in product_ids_set
        ]

        if not positions_filtered:
            return Decimal("0")

        total_mm = Decimal("0")
        for position in positions_filtered:
            product = products_by_id.get(position.product_id)
            if product is None:
                raise ValueError(
                    f"Product '{position.product_id}' not found for position '{position.product_id}'"
                )

            notional = abs(Decimal(position.cost))
            max_leverage = Decimal(str(product.max_leverage))
            taker_fee_rate = Decimal(product.taker_fee)

            mmr = Decimal("1") / (max_leverage * Decimal("2"))
            total_mm += notional * mmr
            total_mm += notional * taker_fee_rate

        return total_mm

    async def get_tokens(self) -> List[TokenDto]:
        """Return the latest list of tokens (no caching)."""
        return await self.list_tokens()

    async def _resolve_subaccount_name(self, subaccount_id: UUID) -> str:
        """Resolve a subaccount name from its ID, using the cached list first."""
        for sa in await self.subaccounts():
            if sa.id == subaccount_id:
                return sa.name
        return (await self.get_subaccount(id=subaccount_id)).name

    async def _resolve_sender_and_subaccount(
        self, sender: Optional[str] = None, subaccount: Optional[str] = None
    ) -> Tuple[str, str]:
        if sender is None and self.chain:
            sender = self.chain.address
            warnings.warn(
                "Automatically resolving 'sender' parameter is deprecated and will be removed in a future version. "
                "Always explicitly pass 'sender' and 'subaccount' to avoid issues while using multiple account. "
                f"Using sender='{sender}'",
                DeprecationWarning,
                stacklevel=3,
            )
        if sender is None:
            raise ValueError(
                "Could not resolve 'sender'. Either pass 'sender' explicitly or initialize the client with a chain configuration."
            )
        if subaccount is None:
            subaccounts = await self.subaccounts()
            if not subaccounts:
                raise ValueError(
                    "No subaccounts found for this account. Please create a subaccount first."
                )
            self.logger.debug(
                f"First subaccount name: '{subaccounts[0].name}', id: '{subaccounts[0].id}'"
            )
            self.logger.debug(f"All subaccount names: {[s.name for s in subaccounts]}")
            subaccount = subaccounts[0].name
            warnings.warn(
                "Automatically resolving 'subaccount' parameter is deprecated and will be removed in a future version. "
                "Always explicitly pass 'subaccount' and 'subaccount' to avoid issues while using multiple account. "
                f"Using subaccount='{subaccount}'",
                DeprecationWarning,
                stacklevel=3,
            )
        return sender, subaccount

    async def link_signer(
        self,
        signer: str,
        subaccount_id: UUID,
        subaccount: Optional[str] = None,
        sender: Optional[str] = None,
        signer_private_key: Optional[str] = None,
        nonce: Optional[str] = None,
        signed_at: Optional[int] = None,
        sign_sender: bool = True,
        sign_signer: bool = True,
        submit: bool = True,
        **kwargs,
    ) -> Union[SignerDto, LinkSignerDto]:
        """Prepare, sign, and optionally submit a linked signer payload.

        Args:
            signer (str): Address being linked as a delegate signer. Required.
            subaccount_id (UUID): Target subaccount id. Required.
            subaccount (str, optional): Bytes32 subaccount name. If omitted, fetched by id.
            sender (str, optional): Owner address. Defaults to the client's chain address.
            signer_private_key (str, optional): Private key for the signer address.
            nonce (str, optional): Custom nonce for the signature.
            signed_at (int, optional): Seconds since epoch for the signature timestamp.
            sign_sender (bool, optional): Sign with the owner key. Defaults to True.
            sign_signer (bool, optional): Sign with the signer key. Defaults to True.
            submit (bool, optional): Submit to the API. If False, return the prepared DTO.

        Returns:
            Union[SignerDto, LinkSignerDto]: Submitted record or the prepared/signable payload.

        Raises:
            ValueError: If the subaccount name cannot be resolved or signer signature is missing.
        """
        if subaccount is None:
            subaccount = await self._resolve_subaccount_name(subaccount_id)
        sender, subaccount = await self._resolve_sender_and_subaccount(
            sender, subaccount
        )

        link = await self.prepare_linked_signer(
            sender=sender,
            signer=signer,
            subaccount=subaccount,
            subaccount_id=subaccount_id,
            include_signature=False,
            nonce=nonce,
            signed_at=signed_at,
        )

        if sign_sender:
            link = await self.sign_linked_signer(link)

        if sign_signer:
            if signer_private_key:
                link = await self.sign_linked_signer(
                    link, signer_private_key=signer_private_key
                )
            elif not link.signer_signature:
                raise ValueError(
                    "signer_private_key or signer_signature is required when sign_signer=True"
                )

        return link if not submit else await self.link_linked_signer(link, **kwargs)

    async def revoke_signer(
        self,
        signer: str,
        subaccount_id: UUID,
        subaccount: Optional[str] = None,
        sender: Optional[str] = None,
        sign: bool = True,
        submit: bool = True,
        **kwargs,
    ) -> Union[SignerDto, RevokeLinkedSignerDto]:
        """Prepare, sign, and optionally submit a revoke-linked-signer payload.

        Args:
            signer (str): Address of the delegate to revoke. Required.
            subaccount_id (UUID): Target subaccount id. Required.
            subaccount (str, optional): Bytes32 subaccount name. If omitted, fetched by id.
            sender (str, optional): Owner address. Defaults to the client's chain address.
            sign (bool, optional): Sign with the owner key. Defaults to True.
            submit (bool, optional): Submit to the API. If False, return the prepared DTO.

        Returns:
            Union[SignerDto, RevokeLinkedSignerDto]: Submitted record or the prepared/signable payload.

        Raises:
            ValueError: If the subaccount name cannot be resolved.
        """
        if subaccount is None:
            subaccount = await self._resolve_subaccount_name(subaccount_id)
        sender, subaccount = await self._resolve_sender_and_subaccount(
            sender, subaccount
        )

        revoke = await self.prepare_revoke_linked_signer(
            sender=sender,
            signer=signer,
            subaccount=subaccount,
            subaccount_id=subaccount_id,
            include_signature=False,
            **kwargs,
        )

        if sign:
            revoke = await self.sign_revoke_linked_signer(revoke)

        return (
            revoke if not submit else await self.revoke_linked_signer(revoke, **kwargs)
        )

    async def withdraw_token(
        self,
        dto: Optional[InitiateWithdrawDto] = None,
        token_id: Optional[UUID] = None,
        amount: Optional[int] = None,
        destination_address: Optional[str] = None,
        destination_endpoint: Optional[int] = None,
        subaccount: Optional[str] = None,
        account: Optional[str] = None,
        nonce: Optional[str] = None,
        signed_at: Optional[int] = None,
        sign: bool = True,
        submit: bool = True,
        **kwargs,
    ) -> Union[WithdrawDto, InitiateWithdrawDto]:
        """Prepares, signs, and optionally submits a token withdrawal.

        This is the primary method for initiating token withdrawals. It handles
        the complete withdrawal flow: preparing the withdrawal payload, signing
        it with EIP-712, and submitting to the API.

        Example:
            Withdraw 100 USD to the connected wallet::

                result = await client.withdraw_token(
                    token_id=usd_token.id,
                    amount=100,
                    destination_address=client.chain.address,
                    destination_endpoint=0,
                )

            Prepare without submitting (for inspection or manual submission)::

                payload = await client.withdraw_token(
                    token_id=usd_token.id,
                    amount=100,
                    destination_address="0x...",
                    destination_endpoint=0,
                    submit=False,
                )

        Args:
            token_id: UUID of the token to withdraw.
            amount: Amount to withdraw in the token's base units.
            destination_address: The LayerZero destination address where funds
                will be sent. Automatically padded to bytes32 format.
            destination_endpoint: LayerZero endpoint ID for the destination chain.
                Use 0 for same-chain withdrawals.
            subaccount: Hex-encoded subaccount name. If not provided, defaults to
                the first subaccount associated with the connected wallet.
            account: Recipient wallet address. If not provided, defaults to the
                connected wallet's address.
            nonce: Custom nonce for the EIP-712 signature. If not provided, a
                random nonce is generated.
            signed_at: Unix timestamp (seconds) for the signature. If not provided,
                defaults to the current time.
            sign: Whether to sign the withdrawal payload. Defaults to True.
            submit: Whether to submit the withdrawal to the API. Set to False to
                receive the prepared payload without submitting. Defaults to True.
            dto: **Deprecated.** A pre-built InitiateWithdrawDto. This parameter
                exists for backward compatibility and will be removed in a future
                version. Use the individual parameters instead.

        Returns:
            If ``submit=True``, returns a WithdrawDto containing the withdrawal
            record created by the API. If ``submit=False``, returns the prepared
            InitiateWithdrawDto payload.

        Raises:
            ValueError: If required parameters (token_id, amount, destination_address,
                destination_endpoint) are not provided.
            ValueError: If required parameters (token_id, amount, destination_address,
        """
        # Detect deprecated dto path
        if dto is not None:
            warnings.warn(
                "Passing 'dto' to withdraw_token() is deprecated. "
                "Pass friendly parameters directly (amount, token_id, destination_address, etc.) instead. "
                "The dto parameter will be removed in a future version.",
                DeprecationWarning,
                stacklevel=2,
            )
            if token_id is None:
                raise ValueError("token_id is required when using dto parameter")
            if not submit:
                if sign and not dto.signature:
                    dto = await self.sign_withdraw_token(dto)
                return dto

        # New friendly parameter path
        if token_id is None:
            raise ValueError("token_id is required")
        if amount is None:
            raise ValueError("amount is required")
        if destination_address is None:
            raise ValueError("destination_address is required")
        if destination_endpoint is None:
            raise ValueError("destination_endpoint is required")

        # Resolve token address from token_id
        token = await self.get_token(id=token_id)
        token_addr = token.address

        # Resolve account/subaccount
        account_addr, subaccount_name = await self._resolve_sender_and_subaccount(
            account, subaccount
        )

        # Prepare
        withdraw_dto = await self.prepare_withdraw_token(
            subaccount=subaccount_name,
            token=token_addr,
            amount=amount,
            account=account_addr,
            destination_address=destination_address,
            destination_endpoint=destination_endpoint,
            include_signature=False,
            nonce=nonce,
            signed_at=signed_at,
        )

        # Sign
        if sign:
            withdraw_dto = await self.sign_withdraw_token(withdraw_dto)

        # Submit or return
        if not submit:
            return withdraw_dto
        return await self.submit_withdraw_token(
            dto=withdraw_dto, token_id=token_id, **kwargs
        )

    async def refresh_signer(
        self,
        signer: str,
        subaccount_id: UUID,
        subaccount: Optional[str] = None,
        sender: Optional[str] = None,
        sign: bool = True,
        submit: bool = True,
        **kwargs,
    ) -> Union[SignerDto, RefreshLinkedSignerDto]:
        """Prepare, sign, and optionally submit a refresh-linked-signer payload.

        Args:
            signer (str): Address of the delegate to refresh. Required.
            subaccount_id (UUID): Target subaccount id. Required.
            subaccount (str, optional): Bytes32 subaccount name. If omitted, fetched by id.
            sender (str, optional): Owner address. Defaults to the client's chain address.
            sign (bool, optional): Sign with the owner key. Defaults to True.
            submit (bool, optional): Submit to the API. If False, return the prepared DTO.

        Returns:
            Union[SignerDto, RefreshLinkedSignerDto]: Submitted record or the prepared/signable payload.

        Raises:
            ValueError: If the subaccount name cannot be resolved.
        """
        if subaccount is None:
            subaccount = await self._resolve_subaccount_name(subaccount_id)
        sender, subaccount = await self._resolve_sender_and_subaccount(
            sender, subaccount
        )

        refresh = await self.prepare_refresh_linked_signer(
            sender=sender,
            signer=signer,
            subaccount=subaccount,
            subaccount_id=subaccount_id,
            include_signature=False,
            **kwargs,
        )

        if sign:
            refresh = await self.sign_refresh_linked_signer(refresh)

        return (
            refresh
            if not submit
            else await self.refresh_linked_signer(refresh, **kwargs)
        )

    async def create_order(
        self,
        order_type: str,
        quantity: float,
        side: int,
        price: Optional[float] = None,
        ticker: Optional[str] = None,
        product_id: Optional[UUID] = None,
        client_order_id: Optional[str] = None,
        sender: Optional[str] = None,
        subaccount: Optional[str] = None,
        time_in_force: Optional[str] = None,
        post_only: Optional[bool] = None,
        reduce_only: Optional[bool] = False,
        close: Optional[bool] = None,
        stop_price: Optional[float] = None,
        stop_type: Optional[int] = None,
        expires_at: Optional[int] = None,
        group_id: Optional[str] = None,
        group_contingency_type: Optional[int] = None,
        sign: bool = True,
        dry_run: bool = False,
        submit: bool = True,
    ) -> Union[SubmitOrderCreatedDto, DryRunOrderCreatedDto, SubmitOrderDto]:
        """Create and submit an order.

        Args:
            order_type (str): 'LIMIT' or 'MARKET'. Required.
            quantity (float): Order size. Required.
            side (int): 0 for buy, 1 for sell. Required.
            price (float, optional): Limit price for LIMIT orders.
            ticker (str, optional): Ticker of the product.
            product_id (str, optional): UUID of the product.
            client_order_id (str, optional): Subaccount-scoped client-generated id (UUID or <=32 alphanumeric).
            sender (str, optional): Address placing the order. Defaults to chain address.
            subaccount (str, optional): Hex-encoded subaccount name. Defaults to first subaccount.
            time_in_force (str, optional): For LIMIT orders (e.g., 'GTC', 'GTD'). Defaults to 'GTC'.
            post_only (bool, optional): For LIMIT orders; rejects if crossing. Defaults to False.
            reduce_only (bool, optional): If True, order only reduces position. Defaults to False.
            close (bool, optional): For MARKET orders; If True, closes the position.
            stop_price (float, optional): Stop trigger price.
            stop_type (int, optional): Stop type, either 0 (take-profit) or 1 (stop-loss), requires non-zero stopPrice.
            expires_at (int, optional): Expiry timestamp for GTD.
            group_id (str, optional): Group Id (UUID) for linking orders together in OCO/OTO relationships.
            group_contingency_type (int, optional): Contingency type for order groups: 0=OTO (Order-Triggers-Order), 1=OCO (One-Cancels-Other).
            sign (bool, optional): If True, sign the payload immediately. Defaults to True.
            dry_run (bool, optional): If True, validate without execution. Defaults to False.
            submit (bool, optional): If True, submit the order. Defaults to True.

        Returns:
            Union[SubmitOrderCreatedDto, DryRunOrderCreatedDto, SubmitOrderDto]: Created order response, dry-run validation result, or prepared order payload.

        Raises:
            ValueError: If neither product_id nor ticker is provided or if order type is invalid.
        """
        sender, subaccount = await self._resolve_sender_and_subaccount(
            sender, subaccount
        )

        if product_id is not None:
            products_by_id = await self.products_by_id()
            onchain_id = products_by_id[product_id].onchain_id
        elif ticker is not None:
            products_by_ticker = await self.products_by_ticker()
            onchain_id = products_by_ticker[ticker].onchain_id
        else:
            raise ValueError("Either product_id or ticker must be provided")

        order_params: Dict[str, Any] = {
            "subaccount": subaccount,
            "side": side,
            "quantity": quantity,
            "onchain_id": onchain_id,
            "order_type": order_type,
            "client_order_id": client_order_id,
            "reduce_only": reduce_only,
            "close": close,
            "stop_price": stop_price,
            "stop_type": stop_type,
            "group_id": group_id,
            "group_contingency_type": group_contingency_type,
        }

        if order_type == "LIMIT":
            order_params.update(
                {
                    "price": price,
                    "time_in_force": time_in_force or self.default_time_in_force,
                    "post_only": post_only or self.default_post_only,
                    "expires_at": expires_at,
                }
            )
        elif order_type != "MARKET":
            raise ValueError("Invalid order type")

        order = await self.prepare_order(sender, **order_params, include_signature=sign)
        if dry_run:
            return await self.dry_run_order(order)
        elif submit:
            return await self.submit_order(order)
        else:
            return order

    async def cancel_orders(
        self,
        order_ids: List[UUID],
        sender: Optional[str] = None,
        subaccount: Optional[str] = None,
        client_order_ids: List[str] = [],
        sign: bool = True,
        submit: bool = True,
        **kwargs,
    ) -> Union[List[CancelOrderResultDto], CancelOrderDto]:
        """Prepares and optionally submits a request to cancel multiple orders.

        Args:
            order_ids (List[str]): Order UUIDs to cancel. Required.
            sender (str, optional): Address initiating the cancellation. Defaults to chain address.
            subaccount (str, optional): Hex-encoded subaccount name. Defaults to first subaccount.
            client_order_ids (List[str], optional): Client-generated IDs to cancel. Defaults to empty list.
            sign (bool, optional): If True, sign the payload immediately. Defaults to True.
            submit (bool, optional): If True, submit the request to the API. Defaults to True.

        Other Parameters:
            **kwargs: Additional request parameters accepted by the API.

        Returns:
            Union[List[CancelOrderResultDto], CancelOrderDto]: Cancellation results per order id or prepared cancel payload.

        Raises:
            ValueError: If no order IDs or client order IDs provided for cancellation.
        """
        if len(order_ids) == 0 and len(client_order_ids) == 0:
            raise ValueError(
                "No order IDs or client order IDs provided for cancellation"
            )
        sender, subaccount = await self._resolve_sender_and_subaccount(
            sender, subaccount
        )
        try:
            prepared_cancel = await self.prepare_cancel_order(
                order_ids=order_ids,
                client_order_ids=client_order_ids,
                sender=sender,
                subaccount=subaccount,
                include_signature=sign,
                **kwargs,
            )
        except ValueError as e:
            self.logger.warning(f"Could not prepare/sign order cancellation: {e}")
            raise

        if not submit:
            return prepared_cancel

        result = await self.cancel_order(
            prepared_cancel,
            **kwargs,
        )
        return result

    async def cancel_all_orders(
        self,
        subaccount_id: UUID,
        product_ids: Optional[List[UUID]] = None,
        **kwargs,
    ) -> List[CancelOrderResultDto]:
        """Cancel all orders for a given subaccount.

        Args:
            subaccount_id (str): UUID of the subaccount. Required.
            product_ids (List[str], optional): Filter cancellation by product IDs.

        Other Parameters:
            **kwargs: Additional request parameters accepted by the API.

        Returns:
            List[CancelOrderResultDto]: Cancellation results per order id.

        Raises:
            ValueError: If no orders found to cancel or cancellation fails.
        """
        subaccount = await self.get_subaccount(id=subaccount_id)
        query_params = {
            "subaccount_id": subaccount_id,
            "isWorking": True,
            **kwargs,
        }
        if product_ids:
            query_params["product_ids"] = product_ids

        orders = await self._get_pages(
            endpoint="order",
            request_model=self._models.V1OrderGetParametersQuery,
            response_model=self._models.PageOfOrderDtos,
            paginate=True,
            **query_params,
        )
        order_ids = [order.id for order in orders]

        if len(order_ids) == 0:
            raise ValueError("No order IDs provided for cancellation")
        cancel_results = await self.cancel_orders(
            order_ids=order_ids,
            sender=subaccount.account,
            subaccount=subaccount.name,
            sign=True,
            submit=True,
        )
        if not isinstance(cancel_results, list):
            raise ValueError("Failed to cancel orders")
        return cancel_results

    async def replace_order(
        self,
        order: Optional[OrderDto] = None,
        order_id: Optional[UUID] = None,
        quantity: Optional[float] = None,
        price: Optional[float] = None,
        time_in_force: Optional[str] = None,
        post_only: Optional[bool] = None,
        reduce_only: Optional[bool] = False,
    ) -> Tuple[SubmitOrderCreatedDto, bool]:
        """Replace an existing order with new parameters.

        Args:
            order (OrderDto, optional): Existing order object to replace.
            order_id (str, optional): UUID of the order to replace.
            quantity (float, optional): New order size.
            price (float, optional): New limit price.
            time_in_force (str, optional): New time in force.
            post_only (bool, optional): New post-only flag.
            reduce_only (bool, optional): New reduce-only flag. Defaults to False.

        Returns:
            Tuple[SubmitOrderCreatedDto, bool]: Created order response and success flag.

        Raises:
            ValueError: If neither order nor order_id is provided, or both are provided.
        """
        if order is None and order_id is None:
            raise ValueError("Either order or order_id must be provided")
        elif order is not None and order_id is not None:
            raise ValueError("Only one of order or order_id must be provided")
        elif order is not None:
            old_order = order
        elif order_id is not None:
            old_order = await self.get_order(id=order_id)
        subaccount = await self.get_subaccount(id=old_order.subaccount_id)

        quantity = float(old_order.quantity) if quantity is None else quantity
        price = float(old_order.price) if price is None else price
        time_in_force = (
            old_order.time_in_force.value
            if time_in_force is None and old_order.time_in_force
            else time_in_force
        )
        post_only = old_order.post_only if post_only is None else post_only
        reduce_only = old_order.reduce_only if reduce_only is None else reduce_only

        cancel_result = await self.cancel_orders(
            order_ids=[old_order.id],
            sender=old_order.sender,
            subaccount=subaccount.name,
            sign=True,
            submit=True,
        )
        if not isinstance(cancel_result, list) or len(cancel_result) != 1:
            raise ValueError("Failed to cancel order")
        canceled_order = cancel_result[0]

        if not canceled_order.result.value == "Ok":
            raise ValueError(
                f"Failed to cancel order {order_id}: {canceled_order.result.value}"
            )

        new_order = await self.create_order(
            order_type=old_order.type.value,
            quantity=quantity,
            side=old_order.side.value,
            price=price,
            product_id=old_order.product_id,
            sender=old_order.sender,
            subaccount=subaccount.name,
            time_in_force=time_in_force or self.default_time_in_force,
            post_only=post_only or self.default_post_only,
            reduce_only=reduce_only,
            dry_run=False,
        )
        return self._models.SubmitOrderCreatedDto.model_validate(
            new_order
        ), canceled_order.result.value == "Ok"

cancel_all_orders(subaccount_id, product_ids=None, **kwargs) async

Cancel all orders for a given subaccount.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required
product_ids List[str]

Filter cancellation by product IDs.

None

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API.

Returns:

Type Description
List[CancelOrderResultDto]

List[CancelOrderResultDto]: Cancellation results per order id.

Raises:

Type Description
ValueError

If no orders found to cancel or cancellation fails.

Source code in ethereal/async_rest_client.py
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
async def cancel_all_orders(
    self,
    subaccount_id: UUID,
    product_ids: Optional[List[UUID]] = None,
    **kwargs,
) -> List[CancelOrderResultDto]:
    """Cancel all orders for a given subaccount.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.
        product_ids (List[str], optional): Filter cancellation by product IDs.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API.

    Returns:
        List[CancelOrderResultDto]: Cancellation results per order id.

    Raises:
        ValueError: If no orders found to cancel or cancellation fails.
    """
    subaccount = await self.get_subaccount(id=subaccount_id)
    query_params = {
        "subaccount_id": subaccount_id,
        "isWorking": True,
        **kwargs,
    }
    if product_ids:
        query_params["product_ids"] = product_ids

    orders = await self._get_pages(
        endpoint="order",
        request_model=self._models.V1OrderGetParametersQuery,
        response_model=self._models.PageOfOrderDtos,
        paginate=True,
        **query_params,
    )
    order_ids = [order.id for order in orders]

    if len(order_ids) == 0:
        raise ValueError("No order IDs provided for cancellation")
    cancel_results = await self.cancel_orders(
        order_ids=order_ids,
        sender=subaccount.account,
        subaccount=subaccount.name,
        sign=True,
        submit=True,
    )
    if not isinstance(cancel_results, list):
        raise ValueError("Failed to cancel orders")
    return cancel_results

cancel_orders(order_ids, sender=None, subaccount=None, client_order_ids=[], sign=True, submit=True, **kwargs) async

Prepares and optionally submits a request to cancel multiple orders.

Parameters:

Name Type Description Default
order_ids List[str]

Order UUIDs to cancel. Required.

required
sender str

Address initiating the cancellation. Defaults to chain address.

None
subaccount str

Hex-encoded subaccount name. Defaults to first subaccount.

None
client_order_ids List[str]

Client-generated IDs to cancel. Defaults to empty list.

[]
sign bool

If True, sign the payload immediately. Defaults to True.

True
submit bool

If True, submit the request to the API. Defaults to True.

True

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API.

Returns:

Type Description
Union[List[CancelOrderResultDto], CancelOrderDto]

Union[List[CancelOrderResultDto], CancelOrderDto]: Cancellation results per order id or prepared cancel payload.

Raises:

Type Description
ValueError

If no order IDs or client order IDs provided for cancellation.

Source code in ethereal/async_rest_client.py
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
async def cancel_orders(
    self,
    order_ids: List[UUID],
    sender: Optional[str] = None,
    subaccount: Optional[str] = None,
    client_order_ids: List[str] = [],
    sign: bool = True,
    submit: bool = True,
    **kwargs,
) -> Union[List[CancelOrderResultDto], CancelOrderDto]:
    """Prepares and optionally submits a request to cancel multiple orders.

    Args:
        order_ids (List[str]): Order UUIDs to cancel. Required.
        sender (str, optional): Address initiating the cancellation. Defaults to chain address.
        subaccount (str, optional): Hex-encoded subaccount name. Defaults to first subaccount.
        client_order_ids (List[str], optional): Client-generated IDs to cancel. Defaults to empty list.
        sign (bool, optional): If True, sign the payload immediately. Defaults to True.
        submit (bool, optional): If True, submit the request to the API. Defaults to True.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API.

    Returns:
        Union[List[CancelOrderResultDto], CancelOrderDto]: Cancellation results per order id or prepared cancel payload.

    Raises:
        ValueError: If no order IDs or client order IDs provided for cancellation.
    """
    if len(order_ids) == 0 and len(client_order_ids) == 0:
        raise ValueError(
            "No order IDs or client order IDs provided for cancellation"
        )
    sender, subaccount = await self._resolve_sender_and_subaccount(
        sender, subaccount
    )
    try:
        prepared_cancel = await self.prepare_cancel_order(
            order_ids=order_ids,
            client_order_ids=client_order_ids,
            sender=sender,
            subaccount=subaccount,
            include_signature=sign,
            **kwargs,
        )
    except ValueError as e:
        self.logger.warning(f"Could not prepare/sign order cancellation: {e}")
        raise

    if not submit:
        return prepared_cancel

    result = await self.cancel_order(
        prepared_cancel,
        **kwargs,
    )
    return result

create(config={}) async classmethod

Factory method to create and asynchronously initialize the client.

Parameters:

Name Type Description Default
config Union[Dict[str, Any], RESTConfig]

Configuration dictionary or RESTConfig object. Optional fields include: private_key (str, optional): The private key. base_url (str, optional): Base URL for REST requests. Defaults to mainnet. timeout (int, optional): Timeout in seconds for REST requests. verbose (bool, optional): Enables debug logging. Defaults to False. rate_limit_headers (bool, optional): Enables rate limit headers. Defaults to False. chain_config (ChainConfig, optional): Chain configuration for signing transactions.

{}

Returns:

Name Type Description
AsyncRESTClient AsyncRESTClient

Fully initialized async client instance.

Source code in ethereal/async_rest_client.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
@classmethod
async def create(
    cls, config: Union[Dict[str, Any], RESTConfig] = {}
) -> AsyncRESTClient:
    """Factory method to create and asynchronously initialize the client.

    Args:
        config (Union[Dict[str, Any], RESTConfig], optional): Configuration dictionary or RESTConfig object. Optional fields include:
            private_key (str, optional): The private key.
            base_url (str, optional): Base URL for REST requests. Defaults to mainnet.
            timeout (int, optional): Timeout in seconds for REST requests.
            verbose (bool, optional): Enables debug logging. Defaults to False.
            rate_limit_headers (bool, optional): Enables rate limit headers. Defaults to False.
            chain_config (ChainConfig, optional): Chain configuration for signing transactions.

    Returns:
        AsyncRESTClient: Fully initialized async client instance.
    """
    client = cls(config)
    await client._async_init()
    return client

create_order(order_type, quantity, side, price=None, ticker=None, product_id=None, client_order_id=None, sender=None, subaccount=None, time_in_force=None, post_only=None, reduce_only=False, close=None, stop_price=None, stop_type=None, expires_at=None, group_id=None, group_contingency_type=None, sign=True, dry_run=False, submit=True) async

Create and submit an order.

Parameters:

Name Type Description Default
order_type str

'LIMIT' or 'MARKET'. Required.

required
quantity float

Order size. Required.

required
side int

0 for buy, 1 for sell. Required.

required
price float

Limit price for LIMIT orders.

None
ticker str

Ticker of the product.

None
product_id str

UUID of the product.

None
client_order_id str

Subaccount-scoped client-generated id (UUID or <=32 alphanumeric).

None
sender str

Address placing the order. Defaults to chain address.

None
subaccount str

Hex-encoded subaccount name. Defaults to first subaccount.

None
time_in_force str

For LIMIT orders (e.g., 'GTC', 'GTD'). Defaults to 'GTC'.

None
post_only bool

For LIMIT orders; rejects if crossing. Defaults to False.

None
reduce_only bool

If True, order only reduces position. Defaults to False.

False
close bool

For MARKET orders; If True, closes the position.

None
stop_price float

Stop trigger price.

None
stop_type int

Stop type, either 0 (take-profit) or 1 (stop-loss), requires non-zero stopPrice.

None
expires_at int

Expiry timestamp for GTD.

None
group_id str

Group Id (UUID) for linking orders together in OCO/OTO relationships.

None
group_contingency_type int

Contingency type for order groups: 0=OTO (Order-Triggers-Order), 1=OCO (One-Cancels-Other).

None
sign bool

If True, sign the payload immediately. Defaults to True.

True
dry_run bool

If True, validate without execution. Defaults to False.

False
submit bool

If True, submit the order. Defaults to True.

True

Returns:

Type Description
Union[SubmitOrderCreatedDto, DryRunOrderCreatedDto, SubmitOrderDto]

Union[SubmitOrderCreatedDto, DryRunOrderCreatedDto, SubmitOrderDto]: Created order response, dry-run validation result, or prepared order payload.

Raises:

Type Description
ValueError

If neither product_id nor ticker is provided or if order type is invalid.

Source code in ethereal/async_rest_client.py
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
async def create_order(
    self,
    order_type: str,
    quantity: float,
    side: int,
    price: Optional[float] = None,
    ticker: Optional[str] = None,
    product_id: Optional[UUID] = None,
    client_order_id: Optional[str] = None,
    sender: Optional[str] = None,
    subaccount: Optional[str] = None,
    time_in_force: Optional[str] = None,
    post_only: Optional[bool] = None,
    reduce_only: Optional[bool] = False,
    close: Optional[bool] = None,
    stop_price: Optional[float] = None,
    stop_type: Optional[int] = None,
    expires_at: Optional[int] = None,
    group_id: Optional[str] = None,
    group_contingency_type: Optional[int] = None,
    sign: bool = True,
    dry_run: bool = False,
    submit: bool = True,
) -> Union[SubmitOrderCreatedDto, DryRunOrderCreatedDto, SubmitOrderDto]:
    """Create and submit an order.

    Args:
        order_type (str): 'LIMIT' or 'MARKET'. Required.
        quantity (float): Order size. Required.
        side (int): 0 for buy, 1 for sell. Required.
        price (float, optional): Limit price for LIMIT orders.
        ticker (str, optional): Ticker of the product.
        product_id (str, optional): UUID of the product.
        client_order_id (str, optional): Subaccount-scoped client-generated id (UUID or <=32 alphanumeric).
        sender (str, optional): Address placing the order. Defaults to chain address.
        subaccount (str, optional): Hex-encoded subaccount name. Defaults to first subaccount.
        time_in_force (str, optional): For LIMIT orders (e.g., 'GTC', 'GTD'). Defaults to 'GTC'.
        post_only (bool, optional): For LIMIT orders; rejects if crossing. Defaults to False.
        reduce_only (bool, optional): If True, order only reduces position. Defaults to False.
        close (bool, optional): For MARKET orders; If True, closes the position.
        stop_price (float, optional): Stop trigger price.
        stop_type (int, optional): Stop type, either 0 (take-profit) or 1 (stop-loss), requires non-zero stopPrice.
        expires_at (int, optional): Expiry timestamp for GTD.
        group_id (str, optional): Group Id (UUID) for linking orders together in OCO/OTO relationships.
        group_contingency_type (int, optional): Contingency type for order groups: 0=OTO (Order-Triggers-Order), 1=OCO (One-Cancels-Other).
        sign (bool, optional): If True, sign the payload immediately. Defaults to True.
        dry_run (bool, optional): If True, validate without execution. Defaults to False.
        submit (bool, optional): If True, submit the order. Defaults to True.

    Returns:
        Union[SubmitOrderCreatedDto, DryRunOrderCreatedDto, SubmitOrderDto]: Created order response, dry-run validation result, or prepared order payload.

    Raises:
        ValueError: If neither product_id nor ticker is provided or if order type is invalid.
    """
    sender, subaccount = await self._resolve_sender_and_subaccount(
        sender, subaccount
    )

    if product_id is not None:
        products_by_id = await self.products_by_id()
        onchain_id = products_by_id[product_id].onchain_id
    elif ticker is not None:
        products_by_ticker = await self.products_by_ticker()
        onchain_id = products_by_ticker[ticker].onchain_id
    else:
        raise ValueError("Either product_id or ticker must be provided")

    order_params: Dict[str, Any] = {
        "subaccount": subaccount,
        "side": side,
        "quantity": quantity,
        "onchain_id": onchain_id,
        "order_type": order_type,
        "client_order_id": client_order_id,
        "reduce_only": reduce_only,
        "close": close,
        "stop_price": stop_price,
        "stop_type": stop_type,
        "group_id": group_id,
        "group_contingency_type": group_contingency_type,
    }

    if order_type == "LIMIT":
        order_params.update(
            {
                "price": price,
                "time_in_force": time_in_force or self.default_time_in_force,
                "post_only": post_only or self.default_post_only,
                "expires_at": expires_at,
            }
        )
    elif order_type != "MARKET":
        raise ValueError("Invalid order type")

    order = await self.prepare_order(sender, **order_params, include_signature=sign)
    if dry_run:
        return await self.dry_run_order(order)
    elif submit:
        return await self.submit_order(order)
    else:
        return order

get_maintenance_margin(subaccount_id, positions=None, products=None, product_ids=None) async

Calculate the an account's maintenance margin for specified positions or products.

Parameters:

Name Type Description Default
subaccount_id str

Fetch positions for this subaccount when positions is not supplied.

required
positions List[PositionDto] | List[Dict[str, Any]]

Pre-fetched positions to use directly.

None
products List[ProductDto] | List[Dict[str, Any]]

Pre-fetched products used to filter the calculation.

None
product_ids List[str] | List[UUID]

Filters the calculation to these product IDs.

None

Returns:

Name Type Description
Decimal Decimal

Total maintenance margin for the filtered positions.

Raises:

Type Description
ValueError

If neither positions nor subaccount context is provided, or if any referenced product cannot be resolved.

Source code in ethereal/async_rest_client.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
async def get_maintenance_margin(
    self,
    subaccount_id: UUID,
    positions: Optional[List[PositionDto] | List[Dict[str, Any]]] = None,
    products: Optional[List[ProductDto] | List[Dict[str, Any]]] = None,
    product_ids: Optional[List[UUID]] = None,
) -> Decimal:
    """Calculate the an account's maintenance margin for specified positions or products.

    Args:
        subaccount_id (str): Fetch positions for this subaccount when ``positions`` is not supplied.
        positions (List[PositionDto] | List[Dict[str, Any]], optional): Pre-fetched positions to use directly.
        products (List[ProductDto] | List[Dict[str, Any]], optional): Pre-fetched products used to filter
            the calculation.
        product_ids (List[str] | List[UUID], optional): Filters the calculation to these product IDs.

    Returns:
        Decimal: Total maintenance margin for the filtered positions.

    Raises:
        ValueError: If neither positions nor subaccount context is provided, or if any
            referenced product cannot be resolved.
    """

    class PartialPosition(BaseModel):
        product_id: UUID
        cost: str

    class PartialProduct(BaseModel):
        id: UUID
        max_leverage: float
        taker_fee: str

    if positions is None:
        positions = await self.list_positions(subaccount_id=subaccount_id)

    if products and product_ids:
        raise ValueError("Can only specify one of products and product_ids")

    if products is not None:
        valid_products = [
            PartialProduct(**p if isinstance(p, dict) else p.model_dump())
            for p in products
        ]
        products_by_id: Dict[UUID, PartialProduct] = {
            product.id: product for product in valid_products
        }
    else:
        raw_products_by_id = await self.products_by_id()
        products_by_id = {
            UUID(str(k)): PartialProduct(
                **v if isinstance(v, dict) else v.model_dump()
            )
            for k, v in raw_products_by_id.items()
        }

    product_ids_set: Optional[Set[UUID]] = None
    if product_ids is not None:
        product_ids_set = set([UUID(str(pid)) for pid in product_ids])
        products_by_id_keys = set(products_by_id.keys())
        missing_products = product_ids_set - products_by_id_keys
        if missing_products:
            missing = ", ".join(sorted([str(p) for p in missing_products]))
            raise ValueError(f"Products not found for calculation: {missing}")

    valid_positions = [
        PartialPosition(**p if isinstance(p, dict) else p.model_dump())
        for p in positions
    ]
    positions_filtered = [
        position
        for position in valid_positions
        if product_ids_set is None or position.product_id in product_ids_set
    ]

    if not positions_filtered:
        return Decimal("0")

    total_mm = Decimal("0")
    for position in positions_filtered:
        product = products_by_id.get(position.product_id)
        if product is None:
            raise ValueError(
                f"Product '{position.product_id}' not found for position '{position.product_id}'"
            )

        notional = abs(Decimal(position.cost))
        max_leverage = Decimal(str(product.max_leverage))
        taker_fee_rate = Decimal(product.taker_fee)

        mmr = Decimal("1") / (max_leverage * Decimal("2"))
        total_mm += notional * mmr
        total_mm += notional * taker_fee_rate

    return total_mm

get_tokens() async

Return the latest list of tokens (no caching).

Source code in ethereal/async_rest_client.py
491
492
493
async def get_tokens(self) -> List[TokenDto]:
    """Return the latest list of tokens (no caching)."""
    return await self.list_tokens()

Prepare, sign, and optionally submit a linked signer payload.

Parameters:

Name Type Description Default
signer str

Address being linked as a delegate signer. Required.

required
subaccount_id UUID

Target subaccount id. Required.

required
subaccount str

Bytes32 subaccount name. If omitted, fetched by id.

None
sender str

Owner address. Defaults to the client's chain address.

None
signer_private_key str

Private key for the signer address.

None
nonce str

Custom nonce for the signature.

None
signed_at int

Seconds since epoch for the signature timestamp.

None
sign_sender bool

Sign with the owner key. Defaults to True.

True
sign_signer bool

Sign with the signer key. Defaults to True.

True
submit bool

Submit to the API. If False, return the prepared DTO.

True

Returns:

Type Description
Union[SignerDto, LinkSignerDto]

Union[SignerDto, LinkSignerDto]: Submitted record or the prepared/signable payload.

Raises:

Type Description
ValueError

If the subaccount name cannot be resolved or signer signature is missing.

Source code in ethereal/async_rest_client.py
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
async def link_signer(
    self,
    signer: str,
    subaccount_id: UUID,
    subaccount: Optional[str] = None,
    sender: Optional[str] = None,
    signer_private_key: Optional[str] = None,
    nonce: Optional[str] = None,
    signed_at: Optional[int] = None,
    sign_sender: bool = True,
    sign_signer: bool = True,
    submit: bool = True,
    **kwargs,
) -> Union[SignerDto, LinkSignerDto]:
    """Prepare, sign, and optionally submit a linked signer payload.

    Args:
        signer (str): Address being linked as a delegate signer. Required.
        subaccount_id (UUID): Target subaccount id. Required.
        subaccount (str, optional): Bytes32 subaccount name. If omitted, fetched by id.
        sender (str, optional): Owner address. Defaults to the client's chain address.
        signer_private_key (str, optional): Private key for the signer address.
        nonce (str, optional): Custom nonce for the signature.
        signed_at (int, optional): Seconds since epoch for the signature timestamp.
        sign_sender (bool, optional): Sign with the owner key. Defaults to True.
        sign_signer (bool, optional): Sign with the signer key. Defaults to True.
        submit (bool, optional): Submit to the API. If False, return the prepared DTO.

    Returns:
        Union[SignerDto, LinkSignerDto]: Submitted record or the prepared/signable payload.

    Raises:
        ValueError: If the subaccount name cannot be resolved or signer signature is missing.
    """
    if subaccount is None:
        subaccount = await self._resolve_subaccount_name(subaccount_id)
    sender, subaccount = await self._resolve_sender_and_subaccount(
        sender, subaccount
    )

    link = await self.prepare_linked_signer(
        sender=sender,
        signer=signer,
        subaccount=subaccount,
        subaccount_id=subaccount_id,
        include_signature=False,
        nonce=nonce,
        signed_at=signed_at,
    )

    if sign_sender:
        link = await self.sign_linked_signer(link)

    if sign_signer:
        if signer_private_key:
            link = await self.sign_linked_signer(
                link, signer_private_key=signer_private_key
            )
        elif not link.signer_signature:
            raise ValueError(
                "signer_private_key or signer_signature is required when sign_signer=True"
            )

    return link if not submit else await self.link_linked_signer(link, **kwargs)

products(refresh=False) async

Get the list of products.

Parameters:

Name Type Description Default
refresh bool

If True, bypass cache and fetch fresh data. Defaults to False.

False

Returns:

Type Description
List[ProductDto]

List[ProductDto]: List of product objects.

Source code in ethereal/async_rest_client.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
async def products(self, refresh: bool = False) -> List[ProductDto]:
    """Get the list of products.

    Args:
        refresh (bool, optional): If True, bypass cache and fetch fresh data. Defaults to False.

    Returns:
        List[ProductDto]: List of product objects.
    """
    if refresh or self._products is None:
        self._products = await self.list_products()
        self._products_by_id = None
        self._products_by_ticker = None
    return self._products

products_by_id(refresh=False) async

Get the products indexed by ID.

Parameters:

Name Type Description Default
refresh bool

If True, bypass cache and fetch fresh data. Defaults to False.

False

Returns:

Type Description
Dict[UUID, ProductDto]

Dict[UUID, ProductDto]: Dictionary of products keyed by ID.

Source code in ethereal/async_rest_client.py
381
382
383
384
385
386
387
388
389
390
391
392
393
async def products_by_id(self, refresh: bool = False) -> Dict[UUID, ProductDto]:
    """Get the products indexed by ID.

    Args:
        refresh (bool, optional): If True, bypass cache and fetch fresh data. Defaults to False.

    Returns:
        Dict[UUID, ProductDto]: Dictionary of products keyed by ID.
    """
    if refresh or self._products_by_id is None:
        products = await self.products(refresh=refresh)
        self._products_by_id = {p.id: p for p in products}
    return self._products_by_id

products_by_ticker(refresh=False) async

Get the products indexed by ticker.

Parameters:

Name Type Description Default
refresh bool

If True, bypass cache and fetch fresh data. Defaults to False.

False

Returns:

Type Description
Dict[str, ProductDto]

Dict[str, ProductDto]: Dictionary of products keyed by ticker.

Source code in ethereal/async_rest_client.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
async def products_by_ticker(self, refresh: bool = False) -> Dict[str, ProductDto]:
    """Get the products indexed by ticker.

    Args:
        refresh (bool, optional): If True, bypass cache and fetch fresh data. Defaults to False.

    Returns:
        Dict[str, ProductDto]: Dictionary of products keyed by ticker.
    """
    if refresh or self._products_by_ticker is None:
        products = await self.products(refresh=refresh)
        self._products_by_ticker = {
            p.ticker: p for p in products if getattr(p, "ticker", None)
        }
    return self._products_by_ticker

refresh_signer(signer, subaccount_id, subaccount=None, sender=None, sign=True, submit=True, **kwargs) async

Prepare, sign, and optionally submit a refresh-linked-signer payload.

Parameters:

Name Type Description Default
signer str

Address of the delegate to refresh. Required.

required
subaccount_id UUID

Target subaccount id. Required.

required
subaccount str

Bytes32 subaccount name. If omitted, fetched by id.

None
sender str

Owner address. Defaults to the client's chain address.

None
sign bool

Sign with the owner key. Defaults to True.

True
submit bool

Submit to the API. If False, return the prepared DTO.

True

Returns:

Type Description
Union[SignerDto, RefreshLinkedSignerDto]

Union[SignerDto, RefreshLinkedSignerDto]: Submitted record or the prepared/signable payload.

Raises:

Type Description
ValueError

If the subaccount name cannot be resolved.

Source code in ethereal/async_rest_client.py
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
async def refresh_signer(
    self,
    signer: str,
    subaccount_id: UUID,
    subaccount: Optional[str] = None,
    sender: Optional[str] = None,
    sign: bool = True,
    submit: bool = True,
    **kwargs,
) -> Union[SignerDto, RefreshLinkedSignerDto]:
    """Prepare, sign, and optionally submit a refresh-linked-signer payload.

    Args:
        signer (str): Address of the delegate to refresh. Required.
        subaccount_id (UUID): Target subaccount id. Required.
        subaccount (str, optional): Bytes32 subaccount name. If omitted, fetched by id.
        sender (str, optional): Owner address. Defaults to the client's chain address.
        sign (bool, optional): Sign with the owner key. Defaults to True.
        submit (bool, optional): Submit to the API. If False, return the prepared DTO.

    Returns:
        Union[SignerDto, RefreshLinkedSignerDto]: Submitted record or the prepared/signable payload.

    Raises:
        ValueError: If the subaccount name cannot be resolved.
    """
    if subaccount is None:
        subaccount = await self._resolve_subaccount_name(subaccount_id)
    sender, subaccount = await self._resolve_sender_and_subaccount(
        sender, subaccount
    )

    refresh = await self.prepare_refresh_linked_signer(
        sender=sender,
        signer=signer,
        subaccount=subaccount,
        subaccount_id=subaccount_id,
        include_signature=False,
        **kwargs,
    )

    if sign:
        refresh = await self.sign_refresh_linked_signer(refresh)

    return (
        refresh
        if not submit
        else await self.refresh_linked_signer(refresh, **kwargs)
    )

replace_order(order=None, order_id=None, quantity=None, price=None, time_in_force=None, post_only=None, reduce_only=False) async

Replace an existing order with new parameters.

Parameters:

Name Type Description Default
order OrderDto

Existing order object to replace.

None
order_id str

UUID of the order to replace.

None
quantity float

New order size.

None
price float

New limit price.

None
time_in_force str

New time in force.

None
post_only bool

New post-only flag.

None
reduce_only bool

New reduce-only flag. Defaults to False.

False

Returns:

Type Description
Tuple[SubmitOrderCreatedDto, bool]

Tuple[SubmitOrderCreatedDto, bool]: Created order response and success flag.

Raises:

Type Description
ValueError

If neither order nor order_id is provided, or both are provided.

Source code in ethereal/async_rest_client.py
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
async def replace_order(
    self,
    order: Optional[OrderDto] = None,
    order_id: Optional[UUID] = None,
    quantity: Optional[float] = None,
    price: Optional[float] = None,
    time_in_force: Optional[str] = None,
    post_only: Optional[bool] = None,
    reduce_only: Optional[bool] = False,
) -> Tuple[SubmitOrderCreatedDto, bool]:
    """Replace an existing order with new parameters.

    Args:
        order (OrderDto, optional): Existing order object to replace.
        order_id (str, optional): UUID of the order to replace.
        quantity (float, optional): New order size.
        price (float, optional): New limit price.
        time_in_force (str, optional): New time in force.
        post_only (bool, optional): New post-only flag.
        reduce_only (bool, optional): New reduce-only flag. Defaults to False.

    Returns:
        Tuple[SubmitOrderCreatedDto, bool]: Created order response and success flag.

    Raises:
        ValueError: If neither order nor order_id is provided, or both are provided.
    """
    if order is None and order_id is None:
        raise ValueError("Either order or order_id must be provided")
    elif order is not None and order_id is not None:
        raise ValueError("Only one of order or order_id must be provided")
    elif order is not None:
        old_order = order
    elif order_id is not None:
        old_order = await self.get_order(id=order_id)
    subaccount = await self.get_subaccount(id=old_order.subaccount_id)

    quantity = float(old_order.quantity) if quantity is None else quantity
    price = float(old_order.price) if price is None else price
    time_in_force = (
        old_order.time_in_force.value
        if time_in_force is None and old_order.time_in_force
        else time_in_force
    )
    post_only = old_order.post_only if post_only is None else post_only
    reduce_only = old_order.reduce_only if reduce_only is None else reduce_only

    cancel_result = await self.cancel_orders(
        order_ids=[old_order.id],
        sender=old_order.sender,
        subaccount=subaccount.name,
        sign=True,
        submit=True,
    )
    if not isinstance(cancel_result, list) or len(cancel_result) != 1:
        raise ValueError("Failed to cancel order")
    canceled_order = cancel_result[0]

    if not canceled_order.result.value == "Ok":
        raise ValueError(
            f"Failed to cancel order {order_id}: {canceled_order.result.value}"
        )

    new_order = await self.create_order(
        order_type=old_order.type.value,
        quantity=quantity,
        side=old_order.side.value,
        price=price,
        product_id=old_order.product_id,
        sender=old_order.sender,
        subaccount=subaccount.name,
        time_in_force=time_in_force or self.default_time_in_force,
        post_only=post_only or self.default_post_only,
        reduce_only=reduce_only,
        dry_run=False,
    )
    return self._models.SubmitOrderCreatedDto.model_validate(
        new_order
    ), canceled_order.result.value == "Ok"

revoke_signer(signer, subaccount_id, subaccount=None, sender=None, sign=True, submit=True, **kwargs) async

Prepare, sign, and optionally submit a revoke-linked-signer payload.

Parameters:

Name Type Description Default
signer str

Address of the delegate to revoke. Required.

required
subaccount_id UUID

Target subaccount id. Required.

required
subaccount str

Bytes32 subaccount name. If omitted, fetched by id.

None
sender str

Owner address. Defaults to the client's chain address.

None
sign bool

Sign with the owner key. Defaults to True.

True
submit bool

Submit to the API. If False, return the prepared DTO.

True

Returns:

Type Description
Union[SignerDto, RevokeLinkedSignerDto]

Union[SignerDto, RevokeLinkedSignerDto]: Submitted record or the prepared/signable payload.

Raises:

Type Description
ValueError

If the subaccount name cannot be resolved.

Source code in ethereal/async_rest_client.py
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
async def revoke_signer(
    self,
    signer: str,
    subaccount_id: UUID,
    subaccount: Optional[str] = None,
    sender: Optional[str] = None,
    sign: bool = True,
    submit: bool = True,
    **kwargs,
) -> Union[SignerDto, RevokeLinkedSignerDto]:
    """Prepare, sign, and optionally submit a revoke-linked-signer payload.

    Args:
        signer (str): Address of the delegate to revoke. Required.
        subaccount_id (UUID): Target subaccount id. Required.
        subaccount (str, optional): Bytes32 subaccount name. If omitted, fetched by id.
        sender (str, optional): Owner address. Defaults to the client's chain address.
        sign (bool, optional): Sign with the owner key. Defaults to True.
        submit (bool, optional): Submit to the API. If False, return the prepared DTO.

    Returns:
        Union[SignerDto, RevokeLinkedSignerDto]: Submitted record or the prepared/signable payload.

    Raises:
        ValueError: If the subaccount name cannot be resolved.
    """
    if subaccount is None:
        subaccount = await self._resolve_subaccount_name(subaccount_id)
    sender, subaccount = await self._resolve_sender_and_subaccount(
        sender, subaccount
    )

    revoke = await self.prepare_revoke_linked_signer(
        sender=sender,
        signer=signer,
        subaccount=subaccount,
        subaccount_id=subaccount_id,
        include_signature=False,
        **kwargs,
    )

    if sign:
        revoke = await self.sign_revoke_linked_signer(revoke)

    return (
        revoke if not submit else await self.revoke_linked_signer(revoke, **kwargs)
    )

subaccounts(refresh=False) async

Get the list of subaccounts.

Parameters:

Name Type Description Default
refresh bool

If True, bypass cache and fetch fresh data. Defaults to False.

False

Returns:

Type Description
List[SubaccountDto]

List[SubaccountDto]: List of subaccount objects for the connected wallet address.

Raises:

Type Description
ValueError

If no chain client is configured or address is unavailable.

Source code in ethereal/async_rest_client.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
async def subaccounts(self, refresh: bool = False) -> List[SubaccountDto]:
    """Get the list of subaccounts.

    Args:
        refresh (bool, optional): If True, bypass cache and fetch fresh data. Defaults to False.

    Returns:
        List[SubaccountDto]: List of subaccount objects for the connected wallet address.

    Raises:
        ValueError: If no chain client is configured or address is unavailable.
    """
    if not self.chain or not getattr(self.chain, "address", None):
        raise ValueError("Chain address is required to list subaccounts")
    if refresh or self._subaccounts is None:
        self._subaccounts = await self.list_subaccounts(
            sender=self.chain.address, order_by="createdAt", order="asc"
        )
    return self._subaccounts

tokens(refresh=False) async

Get the list of tokens.

Parameters:

Name Type Description Default
refresh bool

If True, bypass cache and fetch fresh data. Defaults to False.

False

Returns:

Type Description
List[TokenDto]

List[TokenDto]: List of token objects.

Source code in ethereal/async_rest_client.py
352
353
354
355
356
357
358
359
360
361
362
363
async def tokens(self, refresh: bool = False) -> List[TokenDto]:
    """Get the list of tokens.

    Args:
        refresh (bool, optional): If True, bypass cache and fetch fresh data. Defaults to False.

    Returns:
        List[TokenDto]: List of token objects.
    """
    if refresh or self._tokens is None:
        self._tokens = await self.list_tokens()
    return self._tokens

withdraw_token(dto=None, token_id=None, amount=None, destination_address=None, destination_endpoint=None, subaccount=None, account=None, nonce=None, signed_at=None, sign=True, submit=True, **kwargs) async

Prepares, signs, and optionally submits a token withdrawal.

This is the primary method for initiating token withdrawals. It handles the complete withdrawal flow: preparing the withdrawal payload, signing it with EIP-712, and submitting to the API.

Example

Withdraw 100 USD to the connected wallet::

result = await client.withdraw_token(
    token_id=usd_token.id,
    amount=100,
    destination_address=client.chain.address,
    destination_endpoint=0,
)

Prepare without submitting (for inspection or manual submission)::

payload = await client.withdraw_token(
    token_id=usd_token.id,
    amount=100,
    destination_address="0x...",
    destination_endpoint=0,
    submit=False,
)

Parameters:

Name Type Description Default
token_id Optional[UUID]

UUID of the token to withdraw.

None
amount Optional[int]

Amount to withdraw in the token's base units.

None
destination_address Optional[str]

The LayerZero destination address where funds will be sent. Automatically padded to bytes32 format.

None
destination_endpoint Optional[int]

LayerZero endpoint ID for the destination chain. Use 0 for same-chain withdrawals.

None
subaccount Optional[str]

Hex-encoded subaccount name. If not provided, defaults to the first subaccount associated with the connected wallet.

None
account Optional[str]

Recipient wallet address. If not provided, defaults to the connected wallet's address.

None
nonce Optional[str]

Custom nonce for the EIP-712 signature. If not provided, a random nonce is generated.

None
signed_at Optional[int]

Unix timestamp (seconds) for the signature. If not provided, defaults to the current time.

None
sign bool

Whether to sign the withdrawal payload. Defaults to True.

True
submit bool

Whether to submit the withdrawal to the API. Set to False to receive the prepared payload without submitting. Defaults to True.

True
dto Optional[InitiateWithdrawDto]

Deprecated. A pre-built InitiateWithdrawDto. This parameter exists for backward compatibility and will be removed in a future version. Use the individual parameters instead.

None

Returns:

Type Description
Union[WithdrawDto, InitiateWithdrawDto]

If submit=True, returns a WithdrawDto containing the withdrawal

Union[WithdrawDto, InitiateWithdrawDto]

record created by the API. If submit=False, returns the prepared

Union[WithdrawDto, InitiateWithdrawDto]

InitiateWithdrawDto payload.

Raises:

Type Description
ValueError

If required parameters (token_id, amount, destination_address, destination_endpoint) are not provided.

ValueError

If required parameters (token_id, amount, destination_address,

Source code in ethereal/async_rest_client.py
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
async def withdraw_token(
    self,
    dto: Optional[InitiateWithdrawDto] = None,
    token_id: Optional[UUID] = None,
    amount: Optional[int] = None,
    destination_address: Optional[str] = None,
    destination_endpoint: Optional[int] = None,
    subaccount: Optional[str] = None,
    account: Optional[str] = None,
    nonce: Optional[str] = None,
    signed_at: Optional[int] = None,
    sign: bool = True,
    submit: bool = True,
    **kwargs,
) -> Union[WithdrawDto, InitiateWithdrawDto]:
    """Prepares, signs, and optionally submits a token withdrawal.

    This is the primary method for initiating token withdrawals. It handles
    the complete withdrawal flow: preparing the withdrawal payload, signing
    it with EIP-712, and submitting to the API.

    Example:
        Withdraw 100 USD to the connected wallet::

            result = await client.withdraw_token(
                token_id=usd_token.id,
                amount=100,
                destination_address=client.chain.address,
                destination_endpoint=0,
            )

        Prepare without submitting (for inspection or manual submission)::

            payload = await client.withdraw_token(
                token_id=usd_token.id,
                amount=100,
                destination_address="0x...",
                destination_endpoint=0,
                submit=False,
            )

    Args:
        token_id: UUID of the token to withdraw.
        amount: Amount to withdraw in the token's base units.
        destination_address: The LayerZero destination address where funds
            will be sent. Automatically padded to bytes32 format.
        destination_endpoint: LayerZero endpoint ID for the destination chain.
            Use 0 for same-chain withdrawals.
        subaccount: Hex-encoded subaccount name. If not provided, defaults to
            the first subaccount associated with the connected wallet.
        account: Recipient wallet address. If not provided, defaults to the
            connected wallet's address.
        nonce: Custom nonce for the EIP-712 signature. If not provided, a
            random nonce is generated.
        signed_at: Unix timestamp (seconds) for the signature. If not provided,
            defaults to the current time.
        sign: Whether to sign the withdrawal payload. Defaults to True.
        submit: Whether to submit the withdrawal to the API. Set to False to
            receive the prepared payload without submitting. Defaults to True.
        dto: **Deprecated.** A pre-built InitiateWithdrawDto. This parameter
            exists for backward compatibility and will be removed in a future
            version. Use the individual parameters instead.

    Returns:
        If ``submit=True``, returns a WithdrawDto containing the withdrawal
        record created by the API. If ``submit=False``, returns the prepared
        InitiateWithdrawDto payload.

    Raises:
        ValueError: If required parameters (token_id, amount, destination_address,
            destination_endpoint) are not provided.
        ValueError: If required parameters (token_id, amount, destination_address,
    """
    # Detect deprecated dto path
    if dto is not None:
        warnings.warn(
            "Passing 'dto' to withdraw_token() is deprecated. "
            "Pass friendly parameters directly (amount, token_id, destination_address, etc.) instead. "
            "The dto parameter will be removed in a future version.",
            DeprecationWarning,
            stacklevel=2,
        )
        if token_id is None:
            raise ValueError("token_id is required when using dto parameter")
        if not submit:
            if sign and not dto.signature:
                dto = await self.sign_withdraw_token(dto)
            return dto

    # New friendly parameter path
    if token_id is None:
        raise ValueError("token_id is required")
    if amount is None:
        raise ValueError("amount is required")
    if destination_address is None:
        raise ValueError("destination_address is required")
    if destination_endpoint is None:
        raise ValueError("destination_endpoint is required")

    # Resolve token address from token_id
    token = await self.get_token(id=token_id)
    token_addr = token.address

    # Resolve account/subaccount
    account_addr, subaccount_name = await self._resolve_sender_and_subaccount(
        account, subaccount
    )

    # Prepare
    withdraw_dto = await self.prepare_withdraw_token(
        subaccount=subaccount_name,
        token=token_addr,
        amount=amount,
        account=account_addr,
        destination_address=destination_address,
        destination_endpoint=destination_endpoint,
        include_signature=False,
        nonce=nonce,
        signed_at=signed_at,
    )

    # Sign
    if sign:
        withdraw_dto = await self.sign_withdraw_token(withdraw_dto)

    # Submit or return
    if not submit:
        return withdraw_dto
    return await self.submit_withdraw_token(
        dto=withdraw_dto, token_id=token_id, **kwargs
    )

RESTClient

Synchronous wrapper around AsyncRESTClient for backward compatibility. Use AsyncRESTClient for new applications.

from ethereal import RESTClient

client = RESTClient({
    "base_url": "https://api.ethereal.trade",
    "chain_config": {
        "rpc_url": "https://rpc.ethereal.trade",
        "private_key": "your_private_key",  # optional
    }
})

# Property access works for convenience
products = client.products
subaccounts = client.subaccounts

# Remember to close when done
client.close()

ethereal.rest.rpc

get_rpc_config(self, **kwargs) async

Gets RPC configuration for EIP-712 signing and contract info.

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
RpcConfigDto RpcConfigDto

Domain and signature type definitions for signing.

Source code in ethereal/rest/rpc.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
async def get_rpc_config(self, **kwargs) -> RpcConfigDto:
    """Gets RPC configuration for EIP-712 signing and contract info.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        RpcConfigDto: Domain and signature type definitions for signing.
    """
    endpoint = f"{API_PREFIX}/rpc/config"

    res = await self.get(endpoint, **kwargs)
    domain = self._models.DomainTypeDto(**res["domain"])
    signature_types = self._models.SignatureTypesDto(**res["signatureTypes"])
    return self._models.RpcConfigDto(domain=domain, signatureTypes=signature_types)

ethereal.rest.subaccount

get_subaccount(self, id, **kwargs) async

Gets a specific subaccount by ID.

Parameters:

Name Type Description Default
id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
SubaccountDto SubaccountDto

Subaccount details.

Source code in ethereal/rest/subaccount.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
async def get_subaccount(self, id: UUID, **kwargs) -> SubaccountDto:
    """Gets a specific subaccount by ID.

    Args:
        id (str): UUID of the subaccount. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        SubaccountDto: Subaccount details.
    """
    endpoint = f"{API_PREFIX}/subaccount/{id}"
    res = await self.get(endpoint, **kwargs)
    return self._models.SubaccountDto(**res)

get_subaccount_balance_history(self, **kwargs) async

Gets historical subaccount balances from the archive API.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
start_time float

Range start time in milliseconds since Unix epoch. Required.

end_time float

Range end time in milliseconds since Unix epoch. Optional.

resolution str

Data resolution (e.g., 'hour1', 'day1'). Required.

order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (defaults to 'time'). Optional.

**kwargs

Additional query parameters supported by the API.

Returns:

Type Description
List[BalanceHistoryDto]

List[BalanceHistoryDto]: Historical balance records ordered per request parameters.

Source code in ethereal/rest/subaccount.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
async def get_subaccount_balance_history(self, **kwargs) -> List["BalanceHistoryDto"]:
    """Gets historical subaccount balances from the archive API.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.

    Other Parameters:
        start_time (float): Range start time in milliseconds since Unix epoch. Required.
        end_time (float, optional): Range end time in milliseconds since Unix epoch. Optional.
        resolution (str): Data resolution (e.g., 'hour1', 'day1'). Required.
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (defaults to 'time'). Optional.
        **kwargs: Additional query parameters supported by the API.

    Returns:
        List[BalanceHistoryDto]: Historical balance records ordered per request parameters.
    """

    res = await self.get_validated(
        url_path=f"{API_PREFIX}/subaccount/balance",
        request_model=self._models.ArchiveV1SubaccountBalanceGetParametersQuery,
        response_model=self._models.PageOfBalanceHistoryDtos,
        base_url_override=self._archive_base_url,
        **kwargs,
    )

    data = [
        self._models.BalanceHistoryDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

get_subaccount_balances(self, **kwargs) async

Gets token balances for a subaccount.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (e.g., 'tokenName', 'updatedAt', 'amount'). Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[SubaccountBalanceDto]

List[SubaccountBalanceDto]: Balances for the subaccount.

Source code in ethereal/rest/subaccount.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
async def get_subaccount_balances(self, **kwargs) -> List[SubaccountBalanceDto]:
    """Gets token balances for a subaccount.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.

    Other Parameters:
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (e.g., 'tokenName', 'updatedAt', 'amount'). Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[SubaccountBalanceDto]: Balances for the subaccount.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/subaccount/balance",
        request_model=self._models.V1SubaccountBalanceGetParametersQuery,
        response_model=self._models.PageOfSubaccountBalanceDtos,
        **kwargs,
    )
    data = [
        self._models.SubaccountBalanceDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

get_subaccount_funding_history(self, **kwargs) async

Gets historical funding charges for positions in a subaccount from the archive API.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required
start_time float

Range start time in milliseconds since Unix epoch. Required.

required

Other Parameters:

Name Type Description
end_time float

Range end time in milliseconds since Unix epoch. Optional.

position_ids List[str]

Filter by specific position IDs (max 128). Optional.

product_ids List[str]

Filter by specific product IDs. Optional.

order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (defaults to 'time'). Optional.

**kwargs

Additional query parameters supported by the API.

Returns:

Type Description
List[PositionFundingHistoryDto]

List[PositionFundingHistoryDto]: Historical funding charge records for positions in the subaccount.

Source code in ethereal/rest/subaccount.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
async def get_subaccount_funding_history(
    self, **kwargs
) -> List["PositionFundingHistoryDto"]:
    """Gets historical funding charges for positions in a subaccount from the archive API.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.
        start_time (float): Range start time in milliseconds since Unix epoch. Required.

    Other Parameters:
        end_time (float, optional): Range end time in milliseconds since Unix epoch. Optional.
        position_ids (List[str], optional): Filter by specific position IDs (max 128). Optional.
        product_ids (List[str], optional): Filter by specific product IDs. Optional.
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (defaults to 'time'). Optional.
        **kwargs: Additional query parameters supported by the API.

    Returns:
        List[PositionFundingHistoryDto]: Historical funding charge records for positions in the subaccount.
    """

    res = await self.get_validated(
        url_path=f"{API_PREFIX}/subaccount/funding",
        request_model=self._models.ArchiveV1SubaccountFundingGetParametersQuery,
        response_model=self._models.PageOfPositionFundingHistoryDtos,
        base_url_override=self._archive_base_url,
        **kwargs,
    )

    data = [
        self._models.PositionFundingHistoryDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

get_subaccount_total_volume(self, **kwargs) async

Gets the total trading volume for a subaccount from the archive API.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional query parameters supported by the API.

Returns:

Name Type Description
TotalSubaccountVolumeDto TotalSubaccountVolumeDto

Total volume in USD for the subaccount.

Source code in ethereal/rest/subaccount.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
async def get_subaccount_total_volume(self, **kwargs) -> "TotalSubaccountVolumeDto":
    """Gets the total trading volume for a subaccount from the archive API.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.

    Other Parameters:
        **kwargs: Additional query parameters supported by the API.

    Returns:
        TotalSubaccountVolumeDto: Total volume in USD for the subaccount.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/subaccount/total-volume",
        request_model=self._models.ArchiveV1SubaccountTotalVolumeGetParametersQuery,
        response_model=self._models.TotalSubaccountVolumeDto,
        base_url_override=self._archive_base_url,
        **kwargs,
    )
    return res

get_subaccount_unrealized_pnl_history(self, **kwargs) async

Gets historical unrealized PnL for a subaccount from the archive API.

Source code in ethereal/rest/subaccount.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
async def get_subaccount_unrealized_pnl_history(
    self, **kwargs
) -> List["UnrealizedPnlHistoryDto"]:
    """Gets historical unrealized PnL for a subaccount from the archive API."""

    res = await self.get_validated(
        url_path=f"{API_PREFIX}/subaccount/unrealized-pnl",
        request_model=self._models.ArchiveV1SubaccountUnrealizedPnlGetParametersQuery,
        response_model=self._models.PageOfUnrealizedPnlHistoryDtos,
        base_url_override=self._archive_base_url,
        **kwargs,
    )

    data = [
        self._models.UnrealizedPnlHistoryDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

get_subaccount_volume_history(self, **kwargs) async

Gets historical trading volume for a subaccount from the archive API.

Source code in ethereal/rest/subaccount.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
async def get_subaccount_volume_history(
    self, **kwargs
) -> List["SubaccountVolumeHistoryDto"]:
    """Gets historical trading volume for a subaccount from the archive API."""

    res = await self.get_validated(
        url_path=f"{API_PREFIX}/subaccount/volume",
        request_model=self._models.ArchiveV1SubaccountVolumeGetParametersQuery,
        response_model=self._models.PageOfSubaccountVolumeHistoryDtos,
        base_url_override=self._archive_base_url,
        **kwargs,
    )

    data = [
        self._models.SubaccountVolumeHistoryDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

list_subaccounts(self, **kwargs) async

Lists subaccounts for a given sender (address).

Parameters:

Name Type Description Default
sender str

Wallet address to query subaccounts for. Required.

required

Other Parameters:

Name Type Description
name str

Filter by subaccount name. Optional.

order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (e.g., 'createdAt'). Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[SubaccountDto]

List[SubaccountDto]: Subaccount records for the sender.

Source code in ethereal/rest/subaccount.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
async def list_subaccounts(self, **kwargs) -> List[SubaccountDto]:
    """Lists subaccounts for a given sender (address).

    Args:
        sender (str): Wallet address to query subaccounts for. Required.

    Other Parameters:
        name (str, optional): Filter by subaccount name. Optional.
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (e.g., 'createdAt'). Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[SubaccountDto]: Subaccount records for the sender.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/subaccount/",
        request_model=self._models.V1SubaccountGetParametersQuery,
        response_model=self._models.PageOfSubaccountDtos,
        **kwargs,
    )
    data = [
        self._models.SubaccountDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

ethereal.rest.product

get_market_liquidity(self, **kwargs) async

Gets market liquidity for a product.

Parameters:

Name Type Description Default
product_id str

Id representing the registered product. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
MarketLiquidityDto MarketLiquidityDto

Top-of-book and depth liquidity metrics.

Source code in ethereal/rest/product.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
async def get_market_liquidity(self, **kwargs) -> MarketLiquidityDto:
    """Gets market liquidity for a product.

    Args:
        product_id (str): Id representing the registered product. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        MarketLiquidityDto: Top-of-book and depth liquidity metrics.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/product/market-liquidity",
        request_model=self._models.V1ProductMarketLiquidityGetParametersQuery,
        response_model=self._models.MarketLiquidityDto,
        **kwargs,
    )
    return res

list_market_prices(self, **kwargs) async

Gets market prices for one or more products.

Parameters:

Name Type Description Default
product_ids List[str]

List of product IDs to query. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[MarketPriceDto]

List[MarketPriceDto]: Best bid/ask prices for the requested products.

Source code in ethereal/rest/product.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
async def list_market_prices(self, **kwargs) -> List[MarketPriceDto]:
    """Gets market prices for one or more products.

    Args:
        product_ids (List[str]): List of product IDs to query. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[MarketPriceDto]: Best bid/ask prices for the requested products.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/product/market-price",
        request_model=self._models.V1ProductMarketPriceGetParametersQuery,
        response_model=self._models.ListOfMarketPriceDtos,
        **kwargs,
    )
    data = [
        self._models.MarketPriceDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

list_products(self, **kwargs) async

Lists all products and their configurations.

Other Parameters:

Name Type Description
order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (e.g., 'createdAt'). Optional.

ticker str

Filter by product ticker (e.g., 'ETHUSD'). Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[ProductDto]

List[ProductDto]: Product configuration records.

Source code in ethereal/rest/product.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
async def list_products(self, **kwargs) -> List[ProductDto]:
    """Lists all products and their configurations.

    Other Parameters:
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (e.g., 'createdAt'). Optional.
        ticker (str, optional): Filter by product ticker (e.g., 'ETHUSD'). Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[ProductDto]: Product configuration records.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/product",
        request_model=self._models.V1ProductGetParametersQuery,
        response_model=self._models.PageOfProductDtos,
        **kwargs,
    )
    data = [
        self._models.ProductDto(**model.model_dump(by_alias=True)) for model in res.data
    ]
    return data

ethereal.rest.position

get_position(self, id, **kwargs) async

Gets a specific position by ID.

Parameters:

Name Type Description Default
id str

UUID of the position. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
PositionDto PositionDto

Position information for the specified ID.

Source code in ethereal/rest/position.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
async def get_position(
    self,
    id: UUID,
    **kwargs,
) -> PositionDto:
    """Gets a specific position by ID.

    Args:
        id (str): UUID of the position. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        PositionDto: Position information for the specified ID.
    """
    endpoint = f"{API_PREFIX}/position/{id}"
    res = await self.get(endpoint, **kwargs)
    return self._models.PositionDto(**res)

list_position_liquidations(self, **kwargs) async

Lists position liquidations.

Other Parameters:

Name Type Description
order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (e.g., 'createdAt'). Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[PositionLiquidationDto]

List[PositionLiquidationDto]: List of position liquidation records.

Source code in ethereal/rest/position.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
async def list_position_liquidations(
    self,
    **kwargs,
) -> List[PositionLiquidationDto]:
    """Lists position liquidations.

    Other Parameters:
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (e.g., 'createdAt'). Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[PositionLiquidationDto]: List of position liquidation records.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/position/liquidation",
        request_model=self._models.V1PositionLiquidationGetParametersQuery,
        response_model=self._models.PageOfPositionLiquidationsDto,
        **kwargs,
    )
    data = [
        self._models.PositionLiquidationDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

list_positions(self, **kwargs) async

Lists positions for a subaccount.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
product_ids List[str]

UUIDs of products to filter by. Optional.

open bool

Filter for open positions. Optional.

created_after float

Filter positions created after this timestamp. Optional.

created_before float

Filter positions created before this timestamp. Optional.

side int

Filter by position side (0 for buy, 1 for sell). Optional.

is_liquidated bool

Filter liquidated positions. Optional.

order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by, e.g., 'size', 'createdAt', 'updatedAt', or 'realizedPnl'. Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[PositionDto]

List[PositionDto]: List of position information for the subaccount.

Source code in ethereal/rest/position.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
async def list_positions(
    self,
    **kwargs,
) -> List[PositionDto]:
    """Lists positions for a subaccount.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.

    Other Parameters:
        product_ids (List[str], optional): UUIDs of products to filter by. Optional.
        open (bool, optional): Filter for open positions. Optional.
        created_after (float, optional): Filter positions created after this timestamp. Optional.
        created_before (float, optional): Filter positions created before this timestamp. Optional.
        side (int, optional): Filter by position side (0 for buy, 1 for sell). Optional.
        is_liquidated (bool, optional): Filter liquidated positions. Optional.
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by, e.g., 'size', 'createdAt', 'updatedAt', or 'realizedPnl'. Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[PositionDto]: List of position information for the subaccount.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/position",
        request_model=self._models.V1PositionGetParametersQuery,
        response_model=self._models.PageOfPositionDtos,
        **kwargs,
    )
    data = [
        self._models.PositionDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

ethereal.rest.order

cancel_order(self, order_to_cancel, **kwargs) async

Submits a prepared and signed cancel order request.

Parameters:

Name Type Description Default
order_to_cancel CancelOrderDto

Prepared and signed cancel payload. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[CancelOrderResultDto]

List[CancelOrderResultDto]: Cancellation results per order id.

Source code in ethereal/rest/order.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
async def cancel_order(
    self,
    order_to_cancel: CancelOrderDto,
    **kwargs,
) -> List[CancelOrderResultDto]:
    """Submits a prepared and signed cancel order request.

    Args:
        order_to_cancel (CancelOrderDto): Prepared and signed cancel payload. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[CancelOrderResultDto]: Cancellation results per order id.
    """
    endpoint = f"{API_PREFIX}/order/cancel"
    res = await self.post(
        endpoint,
        data=order_to_cancel.model_dump(mode="json", by_alias=True, exclude_none=True),
        **kwargs,
    )
    return [
        self._models.CancelOrderResultDto.model_validate(item)
        for item in res.get("data", [])
    ]

dry_run_order(self, order, **kwargs) async

Submits a prepared order for validation without execution.

Parameters:

Name Type Description Default
order SubmitOrderDto

Prepared order payload. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
DryRunOrderCreatedDto DryRunOrderCreatedDto

Dry-run validation result.

Source code in ethereal/rest/order.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
async def dry_run_order(
    self,
    order: SubmitOrderDto,
    **kwargs,
) -> DryRunOrderCreatedDto:
    """Submits a prepared order for validation without execution.

    Args:
        order (SubmitOrderDto): Prepared order payload. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        DryRunOrderCreatedDto: Dry-run validation result.
    """
    submit_payload = self._models.SubmitDryOrderDto.model_validate(
        {"data": order.data.model_dump(mode="json", by_alias=True, exclude_unset=True)}
    )
    endpoint = f"{API_PREFIX}/order/dry-run"
    res = await self.post(
        endpoint,
        data=submit_payload.model_dump(
            mode="json", by_alias=True, exclude_unset=True, exclude_none=True
        ),
        **kwargs,
    )
    return self._models.DryRunOrderCreatedDto.model_validate(res)

get_order(self, id, **kwargs) async

Gets a specific order by ID.

Parameters:

Name Type Description Default
id str

UUID of the order. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
OrderDto OrderDto

Order details.

Source code in ethereal/rest/order.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
async def get_order(self, id: UUID, **kwargs) -> OrderDto:
    """Gets a specific order by ID.

    Args:
        id (str): UUID of the order. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        OrderDto: Order details.
    """
    endpoint = f"{API_PREFIX}/order/{id}"
    response = await self.get(endpoint, **kwargs)
    return self._models.OrderDto(**response)

list_fills(self, **kwargs) async

Lists order fills for a subaccount.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
product_ids List[str]

Filter by one or more product IDs. Optional.

created_after float

Filter fills created after this timestamp. Optional.

created_before float

Filter fills created before this timestamp. Optional.

side int

Filter by order side (0 for buy, 1 for sell). Optional.

include_self_trades bool

Include self trades in results. Defaults to False. Optional.

order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (e.g., 'createdAt', 'productId'). Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[OrderFillDto]

List[OrderFillDto]: Fill records for the subaccount.

Source code in ethereal/rest/order.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
async def list_fills(self, **kwargs) -> List[OrderFillDto]:
    """Lists order fills for a subaccount.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.

    Other Parameters:
        product_ids (List[str], optional): Filter by one or more product IDs. Optional.
        created_after (float, optional): Filter fills created after this timestamp. Optional.
        created_before (float, optional): Filter fills created before this timestamp. Optional.
        side (int, optional): Filter by order side (0 for buy, 1 for sell). Optional.
        include_self_trades (bool, optional): Include self trades in results. Defaults to False. Optional.
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (e.g., 'createdAt', 'productId'). Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[OrderFillDto]: Fill records for the subaccount.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/order/fill",
        request_model=self._models.V1OrderFillGetParametersQuery,
        response_model=self._models.PageOfOrderFillDtos,
        **kwargs,
    )
    data = [
        self._models.OrderFillDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

list_orders(self, **kwargs) async

Lists orders for a subaccount.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
product_ids List[str]

Filter by one or more product IDs. Optional.

client_order_id str

Filter by a client-generated order id. Optional.

statuses List[str]

Filter by status values. Optional.

created_after float

Filter orders created after this timestamp. Optional.

created_before float

Filter orders created before this timestamp. Optional.

side int

Filter by order side (0 for buy, 1 for sell). Optional.

close bool

Filter by close flag. Optional.

stop_types List[int]

Filter by stop types (0 for GAIN, 1 for LOSS). Optional.

is_working bool

Filter working orders. Optional.

is_pending bool

Filter pending orders. Optional.

order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (e.g., 'createdAt'). Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[OrderDto]

List[OrderDto]: Order records for the subaccount.

Source code in ethereal/rest/order.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
async def list_orders(self, **kwargs) -> List[OrderDto]:
    """Lists orders for a subaccount.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.

    Other Parameters:
        product_ids (List[str], optional): Filter by one or more product IDs. Optional.
        client_order_id (str, optional): Filter by a client-generated order id. Optional.
        statuses (List[str], optional): Filter by status values. Optional.
        created_after (float, optional): Filter orders created after this timestamp. Optional.
        created_before (float, optional): Filter orders created before this timestamp. Optional.
        side (int, optional): Filter by order side (0 for buy, 1 for sell). Optional.
        close (bool, optional): Filter by close flag. Optional.
        stop_types (List[int], optional): Filter by stop types (0 for GAIN, 1 for LOSS). Optional.
        is_working (bool, optional): Filter working orders. Optional.
        is_pending (bool, optional): Filter pending orders. Optional.
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (e.g., 'createdAt'). Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[OrderDto]: Order records for the subaccount.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/order",
        request_model=self._models.V1OrderGetParametersQuery,
        response_model=self._models.PageOfOrderDtos,
        **kwargs,
    )
    data = [
        self._models.OrderDto(**model.model_dump(by_alias=True)) for model in res.data
    ]
    return data

list_trades(self, **kwargs) async

Lists trades for a specific product.

Parameters:

Name Type Description Default
product_id str

Product ID to query trades for. Required.

required

Other Parameters:

Name Type Description
order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (e.g., 'createdAt'). Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[TradeDto]

List[TradeDto]: Trade records.

Source code in ethereal/rest/order.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
async def list_trades(self, **kwargs) -> List[TradeDto]:
    """Lists trades for a specific product.

    Args:
        product_id (str): Product ID to query trades for. Required.

    Other Parameters:
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (e.g., 'createdAt'). Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[TradeDto]: Trade records.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/order/trade",
        request_model=self._models.V1OrderTradeGetParametersQuery,
        response_model=self._models.PageOfTradeDtos,
        **kwargs,
    )
    data = [
        self._models.TradeDto(**model.model_dump(by_alias=True)) for model in res.data
    ]
    return data

prepare_cancel_order(self, sender, subaccount, order_ids=[], client_order_ids=[], include_signature=False, **kwargs) async

Prepares the payload for canceling one or more orders.

Parameters:

Name Type Description Default
sender str

Address initiating the cancellation. Required.

required
subaccount str

Hex-encoded subaccount name. Required.

required
order_ids List[str]

Order UUIDs to cancel. Optional.

[]
client_order_ids List[str]

Client-generated IDs to cancel. Optional.

[]
include_signature bool

If True, sign the payload immediately. Optional.

False

Other Parameters:

Name Type Description
nonce str

Custom nonce for signing.

**kwargs

Additional request parameters accepted by the API.

Returns:

Name Type Description
CancelOrderDto CancelOrderDto

Prepared (and optionally signed) cancel payload.

Source code in ethereal/rest/order.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
async def prepare_cancel_order(
    self,
    sender: str,
    subaccount: str,
    order_ids: List[UUID] = [],
    client_order_ids: List[str] = [],
    include_signature: bool = False,
    **kwargs,
) -> CancelOrderDto:
    """Prepares the payload for canceling one or more orders.

    Args:
        sender (str): Address initiating the cancellation. Required.
        subaccount (str): Hex-encoded subaccount name. Required.
        order_ids (List[str]): Order UUIDs to cancel. Optional.
        client_order_ids (List[str]): Client-generated IDs to cancel. Optional.
        include_signature (bool): If True, sign the payload immediately. Optional.

    Other Parameters:
        nonce (str, optional): Custom nonce for signing.
        **kwargs: Additional request parameters accepted by the API.

    Returns:
        CancelOrderDto: Prepared (and optionally signed) cancel payload.
    """
    nonce = kwargs.get("nonce", None) or generate_nonce()
    uuid_order_ids = [
        UUID(order_id) if isinstance(order_id, str) else order_id
        for order_id in order_ids
    ]
    data_model = self._models.CancelOrderDtoData(
        sender=sender,
        subaccount=subaccount,
        nonce=nonce,
        orderIds=uuid_order_ids,
        clientOrderIds=client_order_ids,
    )
    result = self._models.CancelOrderDto.model_validate(
        {"data": data_model.model_dump(mode="json", by_alias=True), "signature": ""}
    )
    if include_signature:
        return await self.sign_cancel_order(result)
    return result

prepare_order(self, sender, price=None, quantity=None, side=None, subaccount=None, onchain_id=None, order_type=None, client_order_id=None, time_in_force=None, post_only=False, reduce_only=False, close=None, stop_price=None, stop_type=None, group_id=None, group_contingency_type=None, expires_at=None, include_signature=False, **kwargs) async

Prepares the payload for an order, optionally including a signature.

Parameters:

Name Type Description Default
sender str

Address placing the order. Required.

required
price Union[str, float, Decimal]

Limit price for LIMIT orders.

None
quantity Union[str, float, Decimal]

Order size.

None
side int

0 for buy, 1 for sell.

None
subaccount str

Hex-encoded subaccount name.

None
onchain_id float

Product onchain ID.

None
order_type str

'LIMIT' or 'MARKET'.

None
client_order_id str

Subaccount-scoped client-generated id (UUID or <=32 alphanumeric).

None
time_in_force str

For LIMIT orders (e.g., 'GTC', 'GTD').

None
post_only bool

For LIMIT orders; rejects if crossing.

False
reduce_only bool

If True, order only reduces position.

False
close bool

If True, closes the position.

None
stop_price Union[str, float, Decimal]

Stop trigger price.

None
stop_type int

Stop type enum value.

None
group_id str

Contingency group id.

None
group_contingency_type int

Group contingency type.

None
expires_at int

Expiry timestamp for GTD.

None
include_signature bool

If True, sign the payload immediately.

False

Other Parameters:

Name Type Description
nonce str

Custom nonce for signing.

signed_at int

Seconds since epoch for signature timestamp.

**kwargs

Additional request parameters accepted by the API.

Returns:

Name Type Description
SubmitOrderDto SubmitOrderDto

Prepared (and optionally signed) order payload.

Source code in ethereal/rest/order.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
async def prepare_order(
    self,
    sender: str,
    price: Optional[Union[str, float, Decimal]] = None,
    quantity: Optional[Union[str, float, Decimal]] = None,
    side: Optional[int] = None,
    subaccount: Optional[str] = None,
    onchain_id: Optional[int] = None,
    order_type: Optional[str] = None,
    client_order_id: Optional[str] = None,
    time_in_force: Optional[str] = None,
    post_only: Optional[bool] = False,
    reduce_only: Optional[bool] = False,
    close: Optional[bool] = None,
    stop_price: Optional[Union[str, float, Decimal]] = None,
    stop_type: Optional[int] = None,
    group_id: Optional[str] = None,
    group_contingency_type: Optional[int] = None,
    expires_at: Optional[int] = None,
    include_signature: bool = False,
    **kwargs,
) -> SubmitOrderDto:
    """Prepares the payload for an order, optionally including a signature.

    Args:
        sender (str): Address placing the order. Required.
        price (Union[str, float, Decimal], optional): Limit price for LIMIT orders.
        quantity (Union[str, float, Decimal], optional): Order size.
        side (int, optional): 0 for buy, 1 for sell.
        subaccount (str, optional): Hex-encoded subaccount name.
        onchain_id (float, optional): Product onchain ID.
        order_type (str, optional): 'LIMIT' or 'MARKET'.
        client_order_id (str, optional): Subaccount-scoped client-generated id (UUID or <=32 alphanumeric).
        time_in_force (str, optional): For LIMIT orders (e.g., 'GTC', 'GTD').
        post_only (bool, optional): For LIMIT orders; rejects if crossing.
        reduce_only (bool, optional): If True, order only reduces position.
        close (bool, optional): If True, closes the position.
        stop_price (Union[str, float, Decimal], optional): Stop trigger price.
        stop_type (int, optional): Stop type enum value.
        group_id (str, optional): Contingency group id.
        group_contingency_type (int, optional): Group contingency type.
        expires_at (int, optional): Expiry timestamp for GTD.
        include_signature (bool): If True, sign the payload immediately.

    Other Parameters:
        nonce (str, optional): Custom nonce for signing.
        signed_at (int, optional): Seconds since epoch for signature timestamp.
        **kwargs: Additional request parameters accepted by the API.

    Returns:
        SubmitOrderDto: Prepared (and optionally signed) order payload.
    """
    # Generate nonce and signed_at timestamp
    nonce = kwargs.get("nonce", None) or generate_nonce()
    signed_at = kwargs.get("signed_at", None) or int(time.time())

    # Prepare order data with common fields
    order_data = {
        "sender": sender,
        "subaccount": subaccount,
        "quantity": quantity,
        "price": price,
        "side": side,
        "engineType": 0,
        "onchainId": onchain_id,
        "nonce": nonce,
        "type": order_type,
        "clientOrderId": client_order_id,
        "reduceOnly": reduce_only,
        "signedAt": signed_at,
        "close": close,
        "stopPrice": stop_price,
        "stopType": stop_type,
        "groupId": group_id,
        "groupContingencyType": group_contingency_type,
        "expiresAt": expires_at,
    }

    # Declare data_model type
    data_model: Union[SubmitOrderLimitDtoData, SubmitOrderMarketDtoData]

    # Create specific order data based on type
    if order_type == "LIMIT":
        order_data.update(
            {
                "timeInForce": time_in_force,
                "postOnly": post_only,
            }
        )
        data_model = self._models.SubmitOrderLimitDtoData.model_validate(order_data)
    elif order_type == "MARKET":
        data_model = self._models.SubmitOrderMarketDtoData.model_validate(order_data)
    else:
        raise ValueError(f"Invalid order type: {order_type}")

    result = self._models.SubmitOrderDto.model_validate(
        {
            "data": data_model.model_dump(
                mode="json", by_alias=True, exclude_unset=True, exclude_none=True
            ),
            "signature": "",
        }
    )

    if include_signature:
        result = await self.sign_order(result)

    return result

sign_cancel_order(self, order_to_cancel, private_key=None) async

Signs a cancel order payload using EIP-712.

Parameters:

Name Type Description Default
order_to_cancel CancelOrderDto

Prepared cancel payload. Required.

required
private_key str

Private key override. Defaults to client's key.

None

Returns:

Name Type Description
CancelOrderDto CancelOrderDto

The same DTO with signature populated.

Raises:

Type Description
ValueError

If no chain client or private key is available.

Source code in ethereal/rest/order.py
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
async def sign_cancel_order(
    self,
    order_to_cancel: CancelOrderDto,
    private_key: Optional[str] = None,
) -> CancelOrderDto:
    """Signs a cancel order payload using EIP-712.

    Args:
        order_to_cancel (CancelOrderDto): Prepared cancel payload. Required.
        private_key (str, optional): Private key override. Defaults to client's key.

    Returns:
        CancelOrderDto: The same DTO with signature populated.

    Raises:
        ValueError: If no chain client or private key is available.
    """
    if not hasattr(self, "chain") or not self.chain:
        raise ValueError("No chain client available for signing")
    if not private_key and not self.chain.private_key:
        raise ValueError("No private key available for signing")
    elif not private_key:
        private_key = self.chain.private_key

    # Prepare message for signing
    message = order_to_cancel.data.model_dump(mode="json", by_alias=True)

    # For cancel orders, orderIds need to be converted to bytes32 format
    order_ids = (
        [uuid_to_bytes32(str(order_id)) for order_id in order_to_cancel.data.order_ids]
        if order_to_cancel.data.order_ids
        else []
    )
    message["orderIds"] = order_ids

    client_order_ids = (
        [client_order_id_to_bytes32(id) for id in order_to_cancel.data.client_order_ids]
        if order_to_cancel.data.client_order_ids
        else []
    )
    message["clientOrderIds"] = client_order_ids

    # Get domain and types for signing
    primary_type = "CancelOrder"
    domain = self.rpc_config.domain.model_dump(mode="json", by_alias=True)
    types = self.chain.get_signature_types(self.rpc_config, primary_type)

    # Sign the message
    order_to_cancel.signature = self.chain.sign_message(
        private_key, domain, types, primary_type, message
    )
    return order_to_cancel

sign_order(self, order, private_key=None) async

Signs an order payload using EIP-712.

Parameters:

Name Type Description Default
order SubmitOrderDto

Prepared order to sign. Required.

required
private_key str

Private key override. Defaults to client's key.

None

Returns:

Name Type Description
SubmitOrderDto SubmitOrderDto

The same DTO with signature populated.

Raises:

Type Description
ValueError

If no chain client or private key is available.

Source code in ethereal/rest/order.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
async def sign_order(
    self, order: SubmitOrderDto, private_key: Optional[str] = None
) -> SubmitOrderDto:
    """Signs an order payload using EIP-712.

    Args:
        order (SubmitOrderDto): Prepared order to sign. Required.
        private_key (str, optional): Private key override. Defaults to client's key.

    Returns:
        SubmitOrderDto: The same DTO with signature populated.

    Raises:
        ValueError: If no chain client or private key is available.
    """
    if not hasattr(self, "chain") or not self.chain:
        raise ValueError("No chain client available for signing")
    if not private_key and not self.chain.private_key:
        raise ValueError("No private key available for signing")
    elif not private_key:
        private_key = self.chain.private_key

    # Update message signedAt
    order.data.signed_at = int(time.time())

    # Prepare message for signing
    message = order.data.model_dump(mode="json", by_alias=True)

    # Make some adjustments to the message
    message["quantity"] = int(Decimal(message["quantity"]) * Decimal("1e9"))
    message["price"] = int(Decimal(message.get("price", 0)) * Decimal("1e9"))
    message["productId"] = int(message["onchainId"])
    message["signedAt"] = int(message["signedAt"])
    if message.get("clientOrderId"):
        message["clientOrderId"] = client_order_id_to_bytes32(message["clientOrderId"])

    # Get domain and types for signing
    primary_type = "TradeOrder"
    domain = self.rpc_config.domain.model_dump(mode="json", by_alias=True)
    types = self.chain.get_signature_types(self.rpc_config, primary_type)

    # Sign the message
    order.signature = self.chain.sign_message(
        private_key, domain, types, primary_type, message
    )
    return order

submit_order(self, order, **kwargs) async

Submits a prepared and signed order.

Parameters:

Name Type Description Default
order SubmitOrderDto

Prepared and signed order payload. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
SubmitOrderCreatedDto SubmitOrderCreatedDto

Created order response.

Source code in ethereal/rest/order.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
async def submit_order(
    self,
    order: SubmitOrderDto,
    **kwargs,
) -> SubmitOrderCreatedDto:
    """Submits a prepared and signed order.

    Args:
        order (SubmitOrderDto): Prepared and signed order payload. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        SubmitOrderCreatedDto: Created order response.
    """
    endpoint = f"{API_PREFIX}/order"
    res = await self.post(
        endpoint,
        data=order.model_dump(
            mode="json", by_alias=True, exclude_unset=True, exclude_none=True
        ),
        **kwargs,
    )
    return self._models.SubmitOrderCreatedDto.model_validate(res)

ethereal.rest.funding

get_projected_funding(self, **kwargs) async

Gets the projected funding rate for the next period.

Parameters:

Name Type Description Default
product_id str

Id representing the registered product. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
ProjectedFundingDto ProjectedFundingDto

Projected funding information for the product.

Source code in ethereal/rest/funding.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
async def get_projected_funding(self, **kwargs) -> "ProjectedFundingDto":
    """Gets the projected funding rate for the next period.

    Args:
        product_id (str): Id representing the registered product. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        ProjectedFundingDto: Projected funding information for the product.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/funding/projected",
        request_model=self._models.V1FundingProjectedGetParametersQuery,
        response_model=self._models.ProjectedFundingDto,
        **kwargs,
    )
    return res

list_funding(self, **kwargs) async

Lists historical funding rates for a product over a specified time range.

Parameters:

Name Type Description Default
product_id str

Id representing the registered product. Required.

required
range str

Time window to query. One of 'DAY', 'WEEK', or 'MONTH'. Required.

required

Other Parameters:

Name Type Description
order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by, e.g., 'createdAt'. Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[FundingDto]

List[FundingDto]: Funding rate history objects for the product.

Source code in ethereal/rest/funding.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
async def list_funding(self, **kwargs) -> List["FundingDto"]:
    """Lists historical funding rates for a product over a specified time range.

    Args:
        product_id (str): Id representing the registered product. Required.
        range (str): Time window to query. One of 'DAY', 'WEEK', or 'MONTH'. Required.

    Other Parameters:
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by, e.g., 'createdAt'. Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[FundingDto]: Funding rate history objects for the product.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/funding",
        request_model=self._models.V1FundingGetParametersQuery,
        response_model=self._models.PageOfFundingDtos,
        **kwargs,
    )
    data = [
        self._models.FundingDto(**model.model_dump(by_alias=True)) for model in res.data
    ]
    return data

list_projected_funding(self, **kwargs) async

Lists projected funding rates for multiple products.

Parameters:

Name Type Description Default
product_ids List[UUID]

List of product IDs (1-10). Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[ProjectedFundingDto]

List[ProjectedFundingDto]: Projected funding rate objects for the products.

Source code in ethereal/rest/funding.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
async def list_projected_funding(self, **kwargs) -> List["ProjectedFundingDto"]:
    """Lists projected funding rates for multiple products.

    Args:
        product_ids (List[UUID]): List of product IDs (1-10). Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[ProjectedFundingDto]: Projected funding rate objects for the products.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/funding/projected-rate",
        request_model=self._models.V1FundingProjectedRateGetParametersQuery,
        response_model=self._models.PageOfProjectedFundingDtos,
        **kwargs,
    )
    data = [
        self._models.ProjectedFundingDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

ethereal.rest.linked_signer

get_signer(self, id, **kwargs) async

Gets a specific linked signer by ID.

Parameters:

Name Type Description Default
id str

UUID of the linked signer. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
SignerDto SignerDto

Linked signer details.

Source code in ethereal/rest/linked_signer.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
async def get_signer(
    self,
    id: UUID,
    **kwargs,
) -> SignerDto:
    """Gets a specific linked signer by ID.

    Args:
        id (str): UUID of the linked signer. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        SignerDto: Linked signer details.
    """
    endpoint = f"{API_PREFIX}/linked-signer/{id}"
    res = await self.get(endpoint, **kwargs)
    return self._models.SignerDto.model_validate(res)

get_signer_quota(self, **kwargs) async

Gets the signer quota for a subaccount.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
AccountSignerQuotaDto AccountSignerQuotaDto

Remaining quota information for the subaccount.

Source code in ethereal/rest/linked_signer.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
async def get_signer_quota(
    self,
    **kwargs,
) -> AccountSignerQuotaDto:
    """Gets the signer quota for a subaccount.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        AccountSignerQuotaDto: Remaining quota information for the subaccount.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/linked-signer/quota",
        request_model=self._models.V1LinkedSignerQuotaGetParametersQuery,
        response_model=self._models.AccountSignerQuotaDto,
        **kwargs,
    )
    return res

Submits a prepared and signed link-signer payload.

Parameters:

Name Type Description Default
dto LinkSignerDto

Prepared and signed link payload. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
SignerDto SignerDto

Linked signer record after submission.

Source code in ethereal/rest/linked_signer.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
async def link_linked_signer(
    self,
    dto: LinkSignerDto,
    **kwargs,
) -> SignerDto:
    """Submits a prepared and signed link-signer payload.

    Args:
        dto (LinkSignerDto): Prepared and signed link payload. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        SignerDto: Linked signer record after submission.
    """
    endpoint = f"{API_PREFIX}/linked-signer/link"
    res = await self.post(
        endpoint,
        data=dto.model_dump(mode="json", by_alias=True, exclude_none=True),
        **kwargs,
    )
    return self._models.SignerDto.model_validate(res)

list_signers(self, **kwargs) async

Lists linked signers for a subaccount.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (e.g., 'createdAt'). Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[SignerDto]

List[SignerDto]: Linked signer records.

Source code in ethereal/rest/linked_signer.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
async def list_signers(
    self,
    **kwargs,
) -> List[SignerDto]:
    """Lists linked signers for a subaccount.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.

    Other Parameters:
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (e.g., 'createdAt'). Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[SignerDto]: Linked signer records.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/linked-signer",
        request_model=self._models.V1LinkedSignerGetParametersQuery,
        response_model=self._models.PageOfSignersDto,
        **kwargs,
    )
    data = [
        self._models.SignerDto(**model.model_dump(by_alias=True)) for model in res.data
    ]
    return data

prepare_linked_signer(self, sender, signer, subaccount, subaccount_id, signer_signature='', include_signature=False, **kwargs) async

Prepares the payload for linking a signer, optionally including a signature.

Parameters:

Name Type Description Default
sender str

Owner address initiating the link. Required.

required
signer str

Address of the signer being linked. Required.

required
subaccount str

Hex-encoded subaccount name. Required.

required
subaccount_id str

UUID of the subaccount. Required.

required
signer_signature str

Signature from the signer address. Optional.

''
include_signature bool

If True, sign with the owner's key as well. Optional.

False

Other Parameters:

Name Type Description
nonce str

Custom nonce for signing. Optional.

signed_at int

Seconds since epoch for the signature timestamp. Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
LinkSignerDto LinkSignerDto

Prepared (and optionally signed) link payload.

Source code in ethereal/rest/linked_signer.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
async def prepare_linked_signer(
    self,
    sender: str,
    signer: str,
    subaccount: str,
    subaccount_id: UUID,
    signer_signature: str = "",
    include_signature: bool = False,
    **kwargs,
) -> LinkSignerDto:
    """Prepares the payload for linking a signer, optionally including a signature.

    Args:
        sender (str): Owner address initiating the link. Required.
        signer (str): Address of the signer being linked. Required.
        subaccount (str): Hex-encoded subaccount name. Required.
        subaccount_id (str): UUID of the subaccount. Required.
        signer_signature (str): Signature from the signer address. Optional.
        include_signature (bool): If True, sign with the owner's key as well. Optional.

    Other Parameters:
        nonce (str, optional): Custom nonce for signing. Optional.
        signed_at (int, optional): Seconds since epoch for the signature timestamp. Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        LinkSignerDto: Prepared (and optionally signed) link payload.
    """
    nonce = kwargs.get("nonce") or generate_nonce()
    signed_at = kwargs.get("signed_at") or int(time.time())
    data = {
        "sender": sender,
        "signer": signer,
        "subaccount": subaccount,
        "subaccountId": subaccount_id,
        "nonce": nonce,
        "signedAt": signed_at,
    }
    data_model = self._models.LinkSignerDtoData.model_validate(data)

    # Prepare dto
    dto_data = {
        "data": data_model.model_dump(mode="json", by_alias=True),
        "signature": "",
        "signerSignature": signer_signature,
    }
    dto = self._models.LinkSignerDto.model_validate(dto_data, by_alias=True)
    if include_signature:
        dto = await self.sign_linked_signer(dto, private_key=self.chain.private_key)
    return dto

prepare_refresh_linked_signer(self, sender, signer, subaccount, subaccount_id, include_signature=False, **kwargs) async

Prepares the payload for refreshing a linked signer, optionally signing it.

Parameters:

Name Type Description Default
sender str

Owner address initiating the refresh. Required.

required
signer str

Signer address being refreshed. Required.

required
subaccount str

Hex-encoded subaccount name. Required.

required
subaccount_id str

UUID of the subaccount. Required.

required
include_signature bool

If True, sign with the owner's key. Optional.

False

Other Parameters:

Name Type Description
nonce str

Custom nonce for signing. Optional.

signed_at int

Seconds since epoch for the signature timestamp. Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
RefreshLinkedSignerDto RefreshLinkedSignerDto

Prepared (and optionally signed) refresh payload.

Source code in ethereal/rest/linked_signer.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
async def prepare_refresh_linked_signer(
    self,
    sender: str,
    signer: str,
    subaccount: str,
    subaccount_id: UUID,
    include_signature: bool = False,
    **kwargs,
) -> RefreshLinkedSignerDto:
    """Prepares the payload for refreshing a linked signer, optionally signing it.

    Args:
        sender (str): Owner address initiating the refresh. Required.
        signer (str): Signer address being refreshed. Required.
        subaccount (str): Hex-encoded subaccount name. Required.
        subaccount_id (str): UUID of the subaccount. Required.
        include_signature (bool): If True, sign with the owner's key. Optional.

    Other Parameters:
        nonce (str, optional): Custom nonce for signing. Optional.
        signed_at (int, optional): Seconds since epoch for the signature timestamp. Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        RefreshLinkedSignerDto: Prepared (and optionally signed) refresh payload.
    """
    nonce = kwargs.get("nonce") or generate_nonce()
    signed_at = kwargs.get("signed_at") or int(time.time())
    data = {
        "sender": sender,
        "signer": signer,
        "subaccount": subaccount,
        "subaccountId": subaccount_id,
        "nonce": nonce,
        "signedAt": signed_at,
    }
    data_model = self._models.RefreshLinkedSignerDtoData.model_validate(data)
    dto = self._models.RefreshLinkedSignerDto.model_validate(
        {"data": data_model.model_dump(mode="json", by_alias=True), "signature": ""}
    )
    if include_signature:
        dto = await self.sign_refresh_linked_signer(dto)
    return dto

prepare_revoke_linked_signer(self, sender, signer, subaccount, subaccount_id, include_signature=False, **kwargs) async

Prepares the payload for revoking a linked signer, optionally signing it.

Parameters:

Name Type Description Default
sender str

Owner address initiating the revoke. Required.

required
signer str

Signer address being revoked. Required.

required
subaccount str

Hex-encoded subaccount name. Required.

required
subaccount_id str

UUID of the subaccount. Required.

required
include_signature bool

If True, sign with the owner's key. Optional.

False

Other Parameters:

Name Type Description
nonce str

Custom nonce for signing. Optional.

signed_at int

Seconds since epoch for the signature timestamp. Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
RevokeLinkedSignerDto RevokeLinkedSignerDto

Prepared (and optionally signed) revoke payload.

Source code in ethereal/rest/linked_signer.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
async def prepare_revoke_linked_signer(
    self,
    sender: str,
    signer: str,
    subaccount: str,
    subaccount_id: UUID,
    include_signature: bool = False,
    **kwargs,
) -> RevokeLinkedSignerDto:
    """Prepares the payload for revoking a linked signer, optionally signing it.

    Args:
        sender (str): Owner address initiating the revoke. Required.
        signer (str): Signer address being revoked. Required.
        subaccount (str): Hex-encoded subaccount name. Required.
        subaccount_id (str): UUID of the subaccount. Required.
        include_signature (bool): If True, sign with the owner's key. Optional.

    Other Parameters:
        nonce (str, optional): Custom nonce for signing. Optional.
        signed_at (int, optional): Seconds since epoch for the signature timestamp. Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        RevokeLinkedSignerDto: Prepared (and optionally signed) revoke payload.
    """
    nonce = kwargs.get("nonce") or generate_nonce()
    signed_at = kwargs.get("signed_at") or int(time.time())
    data = {
        "sender": sender,
        "signer": signer,
        "subaccount": subaccount,
        "subaccountId": subaccount_id,
        "nonce": nonce,
        "signedAt": signed_at,
    }
    data_model = self._models.RevokeLinkedSignerDtoData.model_validate(data)
    dto = self._models.RevokeLinkedSignerDto.model_validate(
        {"data": data_model.model_dump(mode="json", by_alias=True), "signature": ""}
    )
    if include_signature:
        dto = await self.sign_revoke_linked_signer(dto)
    return dto

refresh_linked_signer(self, dto, **kwargs) async

Submits a prepared and signed refresh-linked-signer payload.

Parameters:

Name Type Description Default
dto RefreshLinkedSignerDto

Prepared and signed refresh payload. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
SignerDto SignerDto

Signer record reflecting the refreshed expiry.

Source code in ethereal/rest/linked_signer.py
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
async def refresh_linked_signer(
    self,
    dto: RefreshLinkedSignerDto,
    **kwargs,
) -> SignerDto:
    """Submits a prepared and signed refresh-linked-signer payload.

    Args:
        dto (RefreshLinkedSignerDto): Prepared and signed refresh payload. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        SignerDto: Signer record reflecting the refreshed expiry.
    """
    endpoint = f"{API_PREFIX}/linked-signer/refresh"
    res = await self.post(
        endpoint,
        data=dto.model_dump(mode="json", by_alias=True, exclude_none=True),
        **kwargs,
    )
    return self._models.SignerDto.model_validate(res)

revoke_linked_signer(self, dto, **kwargs) async

Submits a prepared and signed revoke-linked-signer payload.

Parameters:

Name Type Description Default
dto RevokeLinkedSignerDto

Prepared and signed revoke payload. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
SignerDto SignerDto

Signer record reflecting revocation.

Source code in ethereal/rest/linked_signer.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
async def revoke_linked_signer(
    self,
    dto: RevokeLinkedSignerDto,
    **kwargs,
) -> SignerDto:
    """Submits a prepared and signed revoke-linked-signer payload.

    Args:
        dto (RevokeLinkedSignerDto): Prepared and signed revoke payload. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        SignerDto: Signer record reflecting revocation.
    """
    endpoint = f"{API_PREFIX}/linked-signer/revoke"
    res = await self.delete(
        endpoint,
        data=dto.model_dump(mode="json", by_alias=True, exclude_none=True),
        **kwargs,
    )
    return self._models.SignerDto.model_validate(res)

sign_linked_signer(self, link_to_sign, signer_private_key=None, private_key=None) async

Signs the link-signer payload with the signer and/or owner key.

Parameters:

Name Type Description Default
link_to_sign LinkSignerDto

Prepared link payload. Required.

required
signer_private_key str

Signer's private key for cosigning. Optional.

None
private_key str

Owner's private key override. Optional.

None

Returns:

Name Type Description
LinkSignerDto LinkSignerDto

DTO with signature fields populated.

Raises:

Type Description
ValueError

If no chain client or private key is available.

Source code in ethereal/rest/linked_signer.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
async def sign_linked_signer(
    self,
    link_to_sign: LinkSignerDto,
    signer_private_key: Optional[str] = None,
    private_key: Optional[str] = None,
) -> LinkSignerDto:
    """Signs the link-signer payload with the signer and/or owner key.

    Args:
        link_to_sign (LinkSignerDto): Prepared link payload. Required.
        signer_private_key (str, optional): Signer's private key for cosigning. Optional.
        private_key (str, optional): Owner's private key override. Optional.

    Returns:
        LinkSignerDto: DTO with signature fields populated.

    Raises:
        ValueError: If no chain client or private key is available.
    """
    if not hasattr(self, "chain") or not self.chain:
        raise ValueError("No chain client available for signing")
    if not private_key and not self.chain.private_key and not signer_private_key:
        raise ValueError("No private key available for signing")
    elif not private_key:
        private_key = self.chain.private_key

    # Prepare message for signing
    message = link_to_sign.data.model_dump(mode="json", by_alias=True)
    message["signedAt"] = int(message["signedAt"])

    primary_type = "LinkSigner"
    domain = self.rpc_config.domain.model_dump(mode="json", by_alias=True)
    types = self.chain.get_signature_types(self.rpc_config, primary_type)

    if signer_private_key:
        signer_signature = self.chain.sign_message(
            signer_private_key, domain, types, primary_type, message
        )
        link_to_sign.signer_signature = signer_signature
    if private_key:
        signature = self.chain.sign_message(
            private_key, domain, types, primary_type, message
        )
        link_to_sign.signature = signature
    return link_to_sign

sign_refresh_linked_signer(self, refresh_to_sign, private_key=None) async

Signs the refresh-linked-signer payload with the owner's key.

Parameters:

Name Type Description Default
refresh_to_sign RefreshLinkedSignerDto

Prepared refresh payload. Required.

required
private_key str

Private key override. Defaults to client's key.

None

Returns:

Name Type Description
RefreshLinkedSignerDto RefreshLinkedSignerDto

DTO with signature populated.

Raises:

Type Description
ValueError

If no chain client or private key is available.

Source code in ethereal/rest/linked_signer.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
async def sign_refresh_linked_signer(
    self,
    refresh_to_sign: RefreshLinkedSignerDto,
    private_key: Optional[str] = None,
) -> RefreshLinkedSignerDto:
    """Signs the refresh-linked-signer payload with the owner's key.

    Args:
        refresh_to_sign (RefreshLinkedSignerDto): Prepared refresh payload. Required.
        private_key (str, optional): Private key override. Defaults to client's key.

    Returns:
        RefreshLinkedSignerDto: DTO with signature populated.

    Raises:
        ValueError: If no chain client or private key is available.
    """
    if not hasattr(self, "chain") or not self.chain:
        raise ValueError("No chain client available for signing")
    if not private_key:
        if not self.chain.private_key:
            raise ValueError("No private key available for signing")
        private_key = self.chain.private_key

    # Prepare message for signing
    message = refresh_to_sign.data.model_dump(mode="json", by_alias=True)
    message["signedAt"] = int(message["signedAt"])

    primary_type = "RefreshLinkedSigner"
    domain = self.rpc_config.domain.model_dump(mode="json", by_alias=True)
    types = self.chain.get_signature_types(self.rpc_config, primary_type)
    refresh_to_sign.signature = self.chain.sign_message(
        private_key, domain, types, primary_type, message
    )
    return refresh_to_sign

sign_revoke_linked_signer(self, revoke_to_sign, private_key=None) async

Signs the revoke-linked-signer payload with the owner's key.

Parameters:

Name Type Description Default
revoke_to_sign RevokeLinkedSignerDto

Prepared revoke payload. Required.

required
private_key str

Private key override. Defaults to client's key.

None

Returns:

Name Type Description
RevokeLinkedSignerDto RevokeLinkedSignerDto

DTO with signature populated.

Raises:

Type Description
ValueError

If no chain client or private key is available.

Source code in ethereal/rest/linked_signer.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
async def sign_revoke_linked_signer(
    self,
    revoke_to_sign: RevokeLinkedSignerDto,
    private_key: Optional[str] = None,
) -> RevokeLinkedSignerDto:
    """Signs the revoke-linked-signer payload with the owner's key.

    Args:
        revoke_to_sign (RevokeLinkedSignerDto): Prepared revoke payload. Required.
        private_key (str, optional): Private key override. Defaults to client's key.

    Returns:
        RevokeLinkedSignerDto: DTO with signature populated.

    Raises:
        ValueError: If no chain client or private key is available.
    """
    if not hasattr(self, "chain") or not self.chain:
        raise ValueError("No chain client available for signing")
    if not private_key:
        if not self.chain.private_key:
            raise ValueError("No private key available for signing")
        private_key = self.chain.private_key

    # Prepare message for signing
    message = revoke_to_sign.data.model_dump(mode="json", by_alias=True)
    message["signedAt"] = int(message["signedAt"])

    primary_type = "RevokeLinkedSigner"
    domain = self.rpc_config.domain.model_dump(mode="json", by_alias=True)
    types = self.chain.get_signature_types(self.rpc_config, primary_type)
    revoke_to_sign.signature = self.chain.sign_message(
        private_key, domain, types, primary_type, message
    )
    return revoke_to_sign

ethereal.rest.token

get_token(self, id, **kwargs) async

Gets a specific token by ID.

Parameters:

Name Type Description Default
id str

UUID for the token. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
TokenDto TokenDto

Token metadata.

Source code in ethereal/rest/token.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
async def get_token(
    self,
    id: UUID,
    **kwargs,
) -> TokenDto:
    """Gets a specific token by ID.

    Args:
        id (str): UUID for the token. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        TokenDto: Token metadata.
    """
    endpoint = f"{API_PREFIX}/token/{id}"
    res = await self.get(endpoint, **kwargs)
    return self._models.TokenDto(**res)

list_token_transfers(self, **kwargs) async

Lists token transfers for a subaccount.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
statuses List[str]

Filter by transfer status (e.g., 'COMPLETED'). Optional.

types List[str]

Filter by transfer type (e.g., 'WITHDRAW'). Optional.

created_after float

Filter transfers created after this timestamp. Optional.

created_before float

Filter transfers created before this timestamp. Optional.

order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (e.g., 'createdAt'). Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[TransferDto]

List[TransferDto]: Transfer records for the subaccount.

Source code in ethereal/rest/token.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
async def list_token_transfers(
    self,
    **kwargs,
) -> List[TransferDto]:
    """Lists token transfers for a subaccount.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.

    Other Parameters:
        statuses (List[str], optional): Filter by transfer status (e.g., 'COMPLETED'). Optional.
        types (List[str], optional): Filter by transfer type (e.g., 'WITHDRAW'). Optional.
        created_after (float, optional): Filter transfers created after this timestamp. Optional.
        created_before (float, optional): Filter transfers created before this timestamp. Optional.
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (e.g., 'createdAt'). Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[TransferDto]: Transfer records for the subaccount.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/token/transfer",
        request_model=self._models.V1TokenTransferGetParametersQuery,
        response_model=self._models.PageOfTransfersDtos,
        **kwargs,
    )
    data = [
        self._models.TransferDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

list_token_withdraws(self, **kwargs) async

Lists token withdrawals for a subaccount.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
is_active bool

Filter active withdrawals. Optional.

order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (e.g., 'createdAt'). Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[WithdrawDto]

List[WithdrawDto]: Withdrawal records for the subaccount.

Source code in ethereal/rest/token.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
async def list_token_withdraws(
    self,
    **kwargs,
) -> List[WithdrawDto]:
    """Lists token withdrawals for a subaccount.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.

    Other Parameters:
        is_active (bool, optional): Filter active withdrawals. Optional.
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (e.g., 'createdAt'). Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[WithdrawDto]: Withdrawal records for the subaccount.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/token/withdraw",
        request_model=self._models.V1TokenWithdrawGetParametersQuery,
        response_model=self._models.PageOfWithdrawDtos,
        **kwargs,
    )
    data = [
        self._models.WithdrawDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

list_tokens(self, **kwargs) async

Lists all supported tokens.

Other Parameters:

Name Type Description
deposit_enabled bool

Filter tokens with deposits enabled. Optional.

withdraw_enabled bool

Filter tokens with withdrawals enabled. Optional.

order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (e.g., 'createdAt'). Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[TokenDto]

List[TokenDto]: Token metadata.

Source code in ethereal/rest/token.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
async def list_tokens(
    self,
    **kwargs,
) -> List[TokenDto]:
    """Lists all supported tokens.

    Other Parameters:
        deposit_enabled (bool, optional): Filter tokens with deposits enabled. Optional.
        withdraw_enabled (bool, optional): Filter tokens with withdrawals enabled. Optional.
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (e.g., 'createdAt'). Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[TokenDto]: Token metadata.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/token",
        request_model=self._models.V1TokenGetParametersQuery,
        response_model=self._models.PageOfTokensDtos,
        **kwargs,
    )
    data = [
        self._models.TokenDto(**model.model_dump(by_alias=True)) for model in res.data
    ]
    return data

prepare_withdraw_token(self, subaccount, token, amount, account, destination_address, destination_endpoint, include_signature=False, nonce=None, signed_at=None) async

Prepares a token withdrawal payload for signing and submission.

Constructs an InitiateWithdrawDto containing all the data required for a withdrawal request. The payload can optionally be signed immediately if include_signature=True.

For most use cases, prefer the higher-level withdraw_token() method on AsyncRESTClient which handles preparation, signing, and submission in a single call.

Parameters:

Name Type Description Default
subaccount str

Hex-encoded (bytes32) subaccount name from which to withdraw.

required
token str

Token contract address on the source chain.

required
amount int

Amount to withdraw in the token's base units.

required
account str

Wallet address that owns the subaccount.

required
destination_address str

LayerZero destination address where funds will be sent. Can be a hex string or bytes; automatically left-padded to bytes32 format.

required
destination_endpoint int

LayerZero endpoint ID for the destination chain. Use 0 for same-chain withdrawals.

required
include_signature bool

If True, signs the payload immediately using the client's configured private key. Defaults to False.

False
nonce Optional[str]

Custom nonce for the EIP-712 signature. If not provided, a random nonce is generated.

None
signed_at Optional[int]

Unix timestamp (seconds) for the signature. If not provided, defaults to the current time.

None

Returns:

Type Description
InitiateWithdrawDto

A prepared withdrawal payload. If include_signature=True, the

InitiateWithdrawDto

signature field will be populated; otherwise it will be empty.

Source code in ethereal/rest/token.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
async def prepare_withdraw_token(
    self,
    subaccount: str,
    token: str,
    amount: int,
    account: str,
    destination_address: str,
    destination_endpoint: int,
    include_signature: bool = False,
    nonce: Optional[str] = None,
    signed_at: Optional[int] = None,
) -> InitiateWithdrawDto:
    """Prepares a token withdrawal payload for signing and submission.

    Constructs an InitiateWithdrawDto containing all the data required for a
    withdrawal request. The payload can optionally be signed immediately if
    ``include_signature=True``.

    For most use cases, prefer the higher-level ``withdraw_token()`` method
    on AsyncRESTClient which handles preparation, signing, and submission
    in a single call.

    Args:
        subaccount: Hex-encoded (bytes32) subaccount name from which to withdraw.
        token: Token contract address on the source chain.
        amount: Amount to withdraw in the token's base units.
        account: Wallet address that owns the subaccount.
        destination_address: LayerZero destination address where funds will be
            sent. Can be a hex string or bytes; automatically left-padded to
            bytes32 format.
        destination_endpoint: LayerZero endpoint ID for the destination chain.
            Use 0 for same-chain withdrawals.
        include_signature: If True, signs the payload immediately using the
            client's configured private key. Defaults to False.
        nonce: Custom nonce for the EIP-712 signature. If not provided, a
            random nonce is generated.
        signed_at: Unix timestamp (seconds) for the signature. If not provided,
            defaults to the current time.

    Returns:
        A prepared withdrawal payload. If ``include_signature=True``, the
        signature field will be populated; otherwise it will be empty.
    """
    nonce = nonce or generate_nonce()
    signed_at = signed_at or int(time.time())
    lz_destination_id = self._models.LzDestinationEid(destination_endpoint)

    data = {
        "account": account,
        "subaccount": subaccount,
        "token": token,
        "amount": amount,
        "nonce": nonce,
        "signedAt": signed_at,
        "lzDestinationAddress": ensure_bytes32_hex(destination_address),
        "lzDestinationEid": lz_destination_id,
    }
    data_model = self._models.InitiateWithdrawDtoData.model_validate(data)
    dto = self._models.InitiateWithdrawDto.model_validate(
        {"data": data_model.model_dump(mode="json", by_alias=True), "signature": ""}
    )
    if include_signature:
        dto = await self.sign_withdraw_token(dto)
    return dto

sign_withdraw_token(self, withdraw_dto, private_key=None) async

Signs a token withdrawal payload using EIP-712 typed data signing.

Generates an EIP-712 signature for the withdrawal payload using either the provided private key or the client's configured chain key.

Parameters:

Name Type Description Default
withdraw_dto InitiateWithdrawDto

A prepared withdrawal payload (from prepare_withdraw_token). The signature field will be populated in-place.

required
private_key Optional[str]

Optional private key to use for signing. If not provided, uses the private key from the client's chain configuration.

None

Returns:

Type Description
InitiateWithdrawDto

The same InitiateWithdrawDto with the signature field populated.

Raises:

Type Description
ValueError

If no chain client is configured on the client.

ValueError

If no private key is available (neither provided nor configured).

Source code in ethereal/rest/token.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
async def sign_withdraw_token(
    self,
    withdraw_dto: InitiateWithdrawDto,
    private_key: Optional[str] = None,
) -> InitiateWithdrawDto:
    """Signs a token withdrawal payload using EIP-712 typed data signing.

    Generates an EIP-712 signature for the withdrawal payload using either
    the provided private key or the client's configured chain key.

    Args:
        withdraw_dto: A prepared withdrawal payload (from ``prepare_withdraw_token``).
            The signature field will be populated in-place.
        private_key: Optional private key to use for signing. If not provided,
            uses the private key from the client's chain configuration.

    Returns:
        The same InitiateWithdrawDto with the signature field populated.

    Raises:
        ValueError: If no chain client is configured on the client.
        ValueError: If no private key is available (neither provided nor configured).
    """
    if not hasattr(self, "chain") or not self.chain:
        raise ValueError("No chain client available for signing")
    if not private_key and not self.chain.private_key:
        raise ValueError("No private key available for signing")
    elif not private_key:
        private_key = self.chain.private_key

    # Prepare the message for signing
    message = withdraw_dto.data.model_dump(by_alias=True, mode="json")
    message["amount"] = int(Decimal(str(message["amount"])) * Decimal(1e9))
    message["signedAt"] = int(message["signedAt"])

    destination_address = message["lzDestinationAddress"]
    destination_eid = int(message["lzDestinationEid"])

    message["destinationAddress"] = destination_address
    message["destinationEndpointId"] = destination_eid

    primary_type = "InitiateWithdraw"
    domain = self.rpc_config.domain.model_dump(mode="json", by_alias=True)
    types = self.chain.get_signature_types(self.rpc_config, primary_type)
    withdraw_dto.signature = self.chain.sign_message(
        private_key, domain, types, primary_type, message
    )
    return withdraw_dto

submit_withdraw_token(self, dto, token_id, **kwargs) async

Submits a prepared and signed token withdrawal request to the API.

This is the low-level submission function that sends a pre-constructed InitiateWithdrawDto to the withdrawal endpoint. For most use cases, prefer the higher-level withdraw_token() method on AsyncRESTClient which handles preparation, signing, and submission in a single call.

Parameters:

Name Type Description Default
dto InitiateWithdrawDto

A prepared and signed withdrawal payload containing the withdrawal details and EIP-712 signature.

required
token_id UUID

UUID of the token being withdrawn. Used to construct the API endpoint path.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters passed to the underlying HTTP client.

Returns:

Type Description
WithdrawDto

The withdrawal record created by the API, containing transaction

WithdrawDto

details and status information.

Source code in ethereal/rest/token.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
async def submit_withdraw_token(
    self,
    dto: InitiateWithdrawDto,
    token_id: UUID,
    **kwargs,
) -> WithdrawDto:
    """Submits a prepared and signed token withdrawal request to the API.

    This is the low-level submission function that sends a pre-constructed
    InitiateWithdrawDto to the withdrawal endpoint. For most use cases,
    prefer the higher-level ``withdraw_token()`` method on AsyncRESTClient
    which handles preparation, signing, and submission in a single call.

    Args:
        dto: A prepared and signed withdrawal payload containing the withdrawal
            details and EIP-712 signature.
        token_id: UUID of the token being withdrawn. Used to construct the
            API endpoint path.

    Other Parameters:
        **kwargs: Additional request parameters passed to the underlying
            HTTP client.

    Returns:
        The withdrawal record created by the API, containing transaction
        details and status information.
    """
    endpoint = f"{API_PREFIX}/token/{token_id}/withdraw"
    res = await self.post(
        endpoint,
        data=dto.model_dump(mode="json", by_alias=True, exclude_none=True),
        **kwargs,
    )
    return self._models.WithdrawDto.model_validate(res)

ethereal.rest.util

client_order_id_to_bytes32(client_order_id)

Converts client_order_id to appropriate bytes32 format.

Parameters:

Name Type Description Default
client_order_id str

Client order ID to convert.

required

Returns:

Name Type Description
str str

Converted client order ID in bytes32 hex format.

Raises:

Type Description
ValueError

If string is longer than 32 characters and not a UUID, or if input is None/empty.

Source code in ethereal/rest/util.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def client_order_id_to_bytes32(client_order_id: str) -> str:
    """Converts client_order_id to appropriate bytes32 format.

    Args:
        client_order_id (str): Client order ID to convert.

    Returns:
        str: Converted client order ID in bytes32 hex format.

    Raises:
        ValueError: If string is longer than 32 characters and not a UUID, or if input is None/empty.
    """
    if client_order_id is None:
        raise ValueError("Client order ID cannot be None")

    if not client_order_id:
        raise ValueError("Client order ID cannot be empty")

    if is_uuid(client_order_id):
        return uuid_to_bytes32(client_order_id)

    if len(client_order_id) > 32:
        raise ValueError(
            f"Client order ID cannot be longer than 32 characters, got {len(client_order_id)}"
        )

    # Convert string to bytes32 hex format
    client_order_bytes = client_order_id.encode("utf-8")
    padded_bytes = client_order_bytes.ljust(32, b"\0")
    return "0x" + padded_bytes.hex()

decode_account_name(hex_name)

Converts hex-encoded subaccount name back to text.

Parameters:

Name Type Description Default
hex_name str

Hex-encoded name with '0x' prefix.

required

Returns:

Name Type Description
str str

Decoded text string with null bytes stripped.

Source code in ethereal/rest/util.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def decode_account_name(hex_name: str) -> str:
    """Converts hex-encoded subaccount name back to text.

    Args:
        hex_name (str): Hex-encoded name with '0x' prefix.

    Returns:
        str: Decoded text string with null bytes stripped.
    """
    try:
        if hex_name.startswith("0x"):
            return bytes.fromhex(hex_name[2:]).decode("utf-8").rstrip("\x00")
        else:
            return bytes.fromhex(hex_name).decode("utf-8").rstrip("\x00")
    except (ValueError, UnicodeDecodeError):
        # Return original if decoding fails
        return hex_name

encode_account_name(text)

Converts text to hex-encoded subaccount name format.

Parameters:

Name Type Description Default
text str

Text to convert to hex name.

required

Returns:

Name Type Description
str str

Hex-encoded name with '0x' prefix, padded to 32 bytes.

Source code in ethereal/rest/util.py
109
110
111
112
113
114
115
116
117
118
119
120
121
def encode_account_name(text: str) -> str:
    """Converts text to hex-encoded subaccount name format.

    Args:
        text (str): Text to convert to hex name.

    Returns:
        str: Hex-encoded name with '0x' prefix, padded to 32 bytes.
    """
    hex_encoded = text.encode("utf-8").hex()
    # Pad to 64 characters (32 bytes) with zeros on the right
    padded_hex = hex_encoded.ljust(64, "0")
    return "0x" + padded_hex

ensure_bytes32(value)

Return a 32-byte representation of value (zero-padded on the left).

Source code in ethereal/rest/util.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def ensure_bytes32(value):
    """Return a 32-byte representation of ``value`` (zero-padded on the left)."""
    if isinstance(value, (bytes, bytearray)):
        data = bytes(value)
    elif isinstance(value, str):
        raw = value[2:] if value.startswith("0x") else value
        raw = raw if len(raw) % 2 == 0 else "0" + raw
        try:
            data = decode_hex(add_0x_prefix(raw))
        except ValueError as exc:
            raise Bytes32Error(f"Invalid hex string: {value!r}") from exc
    else:
        raise Bytes32Error(f"Unsupported type: {type(value).__name__}")
    if len(data) > 32:
        raise Bytes32Error(f"Expected at most 32 bytes, got {len(data)} bytes")
    return data.rjust(32, b"\x00")

ensure_bytes32_hex(value)

Return value as a 0x-prefixed 64-hex-digit string.

Source code in ethereal/rest/util.py
28
29
30
def ensure_bytes32_hex(value):
    """Return ``value`` as a 0x-prefixed 64-hex-digit string."""
    return encode_hex(ensure_bytes32(value))

generate_nonce()

Generates a timestamp-based nonce.

Returns:

Name Type Description
str str

Current timestamp in nanoseconds as string.

Source code in ethereal/rest/util.py
100
101
102
103
104
105
106
def generate_nonce() -> str:
    """Generates a timestamp-based nonce.

    Returns:
        str: Current timestamp in nanoseconds as string.
    """
    return str(time.time_ns())

is_uuid(value)

Checks if a string is a valid UUID.

Parameters:

Name Type Description Default
value str

String to check.

required

Returns:

Name Type Description
bool bool

True if string is a valid UUID, False otherwise.

Source code in ethereal/rest/util.py
53
54
55
56
57
58
59
60
61
62
63
64
65
def is_uuid(value: str) -> bool:
    """Checks if a string is a valid UUID.

    Args:
        value (str): String to check.

    Returns:
        bool: True if string is a valid UUID, False otherwise.
    """
    try:
        return value == str(uuid.UUID(value))
    except ValueError:
        return False

uuid_to_bytes32(uuid_str)

Converts UUID string to bytes32 hex format.

Parameters:

Name Type Description Default
uuid_str str

UUID string to convert.

required

Returns:

Name Type Description
str str

Bytes32 hex string prefixed with '0x'.

Source code in ethereal/rest/util.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def uuid_to_bytes32(uuid_str: str) -> str:
    """Converts UUID string to bytes32 hex format.

    Args:
        uuid_str (str): UUID string to convert.

    Returns:
        str: Bytes32 hex string prefixed with '0x'.
    """
    uuid_obj = uuid.UUID(uuid_str)

    # remove hyphens and convert to hex
    uuid_hex = uuid_obj.hex

    # pad the hex to make it 32 bytes
    padded_hex = uuid_hex.rjust(64, "0")

    return "0x" + padded_hex

ethereal.async_ws_client.AsyncWSClient

Bases: BaseClient

Ethereal async WebSocket client.

Supports both Socket.IO and raw WebSocket transports via pluggable backends.

Parameters:

Name Type Description Default
config Union[Dict[str, Any], WSConfig]

Configuration dictionary or WSConfig object. - base_url (str): Base URL for websocket requests - verbose (bool): Enables debug logging (default: False) - transport (str): 'socketio' or 'websocket' (default: 'socketio')

required
Source code in ethereal/async_ws_client.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class AsyncWSClient(BaseClient):
    """Ethereal async WebSocket client.

    Supports both Socket.IO and raw WebSocket transports via pluggable backends.

    Args:
        config: Configuration dictionary or WSConfig object.
            - base_url (str): Base URL for websocket requests
            - verbose (bool): Enables debug logging (default: False)
            - transport (str): 'socketio' or 'websocket' (default: 'socketio')
    """

    config: WSConfig

    def __init__(self, config: Union[Dict[str, Any], WSConfig]):
        super().__init__(config)
        self.config = WSConfig.model_validate(config)
        self._transport = self._create_transport()

    def _create_transport(self) -> AsyncTransport:
        """Create the appropriate transport based on config."""
        base_url = str(self.config.base_url)
        verbose = self.config.verbose

        if self.config.transport == "socketio":
            return AsyncSocketIOTransport(base_url, verbose)
        else:
            return AsyncWebSocketTransport(base_url, verbose)

    @property
    def connected(self) -> bool:
        """Check if the WebSocket is connected."""
        return self._transport.connected

    @property
    def callbacks(self) -> Dict[str, List[Callable]]:
        """Access callbacks on the transport."""
        return self._transport.callbacks

    @callbacks.setter
    def callbacks(self, value: Dict[str, List[Callable]]) -> None:
        self._transport.callbacks = value

    async def open(self, namespaces: Optional[List[str]] = None) -> None:
        """Open the WebSocket connection.

        Args:
            namespaces: Socket.IO namespaces to connect to (socketio transport only)
        """
        await self._transport.open(namespaces)

    async def close(self) -> None:
        """Close the WebSocket connection."""
        await self._transport.close()

    async def wait(self) -> None:
        """Wait for the connection to close."""
        await self._transport.wait()

    async def subscribe(
        self,
        stream_type: str,
        *,
        namespace: str = "/v1/stream",
        **params,
    ) -> Dict[str, Any]:
        """Subscribe to a stream.

        Args:
            stream_type: Type of stream (e.g., "BookDepth", "MarketPrice", "OrderFill")
            namespace: Socket.IO namespace (socketio transport only)
            **params: Subscription parameters (snake_case converted to camelCase)
                - symbol: Product symbol (e.g., "BTCUSD")
                - subaccount_id: Subaccount ID for account-specific streams
                - product_id: Product ID
        """
        data = {"type": stream_type, **prepare_params(**params)}
        return await self._transport.send("subscribe", data, namespace=namespace)

    async def unsubscribe(
        self,
        stream_type: str,
        *,
        namespace: str = "/v1/stream",
        **params,
    ) -> Dict[str, Any]:
        """Unsubscribe from a stream.

        Args:
            stream_type: Type of stream
            namespace: Socket.IO namespace (socketio transport only)
            **params: Unsubscription parameters (snake_case converted to camelCase)
        """
        data = {"type": stream_type, **prepare_params(**params)}
        return await self._transport.send("unsubscribe", data, namespace=namespace)

callbacks property writable

Access callbacks on the transport.

connected property

Check if the WebSocket is connected.

close() async

Close the WebSocket connection.

Source code in ethereal/async_ws_client.py
66
67
68
async def close(self) -> None:
    """Close the WebSocket connection."""
    await self._transport.close()

open(namespaces=None) async

Open the WebSocket connection.

Parameters:

Name Type Description Default
namespaces Optional[List[str]]

Socket.IO namespaces to connect to (socketio transport only)

None
Source code in ethereal/async_ws_client.py
58
59
60
61
62
63
64
async def open(self, namespaces: Optional[List[str]] = None) -> None:
    """Open the WebSocket connection.

    Args:
        namespaces: Socket.IO namespaces to connect to (socketio transport only)
    """
    await self._transport.open(namespaces)

subscribe(stream_type, *, namespace='/v1/stream', **params) async

Subscribe to a stream.

Parameters:

Name Type Description Default
stream_type str

Type of stream (e.g., "BookDepth", "MarketPrice", "OrderFill")

required
namespace str

Socket.IO namespace (socketio transport only)

'/v1/stream'
**params

Subscription parameters (snake_case converted to camelCase) - symbol: Product symbol (e.g., "BTCUSD") - subaccount_id: Subaccount ID for account-specific streams - product_id: Product ID

{}
Source code in ethereal/async_ws_client.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
async def subscribe(
    self,
    stream_type: str,
    *,
    namespace: str = "/v1/stream",
    **params,
) -> Dict[str, Any]:
    """Subscribe to a stream.

    Args:
        stream_type: Type of stream (e.g., "BookDepth", "MarketPrice", "OrderFill")
        namespace: Socket.IO namespace (socketio transport only)
        **params: Subscription parameters (snake_case converted to camelCase)
            - symbol: Product symbol (e.g., "BTCUSD")
            - subaccount_id: Subaccount ID for account-specific streams
            - product_id: Product ID
    """
    data = {"type": stream_type, **prepare_params(**params)}
    return await self._transport.send("subscribe", data, namespace=namespace)

unsubscribe(stream_type, *, namespace='/v1/stream', **params) async

Unsubscribe from a stream.

Parameters:

Name Type Description Default
stream_type str

Type of stream

required
namespace str

Socket.IO namespace (socketio transport only)

'/v1/stream'
**params

Unsubscription parameters (snake_case converted to camelCase)

{}
Source code in ethereal/async_ws_client.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
async def unsubscribe(
    self,
    stream_type: str,
    *,
    namespace: str = "/v1/stream",
    **params,
) -> Dict[str, Any]:
    """Unsubscribe from a stream.

    Args:
        stream_type: Type of stream
        namespace: Socket.IO namespace (socketio transport only)
        **params: Unsubscription parameters (snake_case converted to camelCase)
    """
    data = {"type": stream_type, **prepare_params(**params)}
    return await self._transport.send("unsubscribe", data, namespace=namespace)

wait() async

Wait for the connection to close.

Source code in ethereal/async_ws_client.py
70
71
72
async def wait(self) -> None:
    """Wait for the connection to close."""
    await self._transport.wait()

ethereal.ws_client.WSClient

Bases: BaseClient

Ethereal WebSocket client.

Supports both Socket.IO and raw WebSocket transports via pluggable backends.

Parameters:

Name Type Description Default
config Union[Dict[str, Any], WSConfig]

Configuration dictionary or WSConfig object. - base_url (str): Base URL for websocket requests - verbose (bool): Enables debug logging (default: False) - transport (str): 'socketio' or 'websocket' (default: 'socketio')

required
Source code in ethereal/ws_client.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class WSClient(BaseClient):
    """Ethereal WebSocket client.

    Supports both Socket.IO and raw WebSocket transports via pluggable backends.

    Args:
        config: Configuration dictionary or WSConfig object.
            - base_url (str): Base URL for websocket requests
            - verbose (bool): Enables debug logging (default: False)
            - transport (str): 'socketio' or 'websocket' (default: 'socketio')
    """

    config: WSConfig

    def __init__(self, config: Union[Dict[str, Any], WSConfig]):
        super().__init__(config)
        self.config = WSConfig.model_validate(config)
        self._transport = self._create_transport()

    def _create_transport(self) -> Transport:
        """Create the appropriate transport based on config."""
        base_url = str(self.config.base_url)
        verbose: bool = self.config.verbose

        if self.config.transport == "socketio":
            return SocketIOTransport(base_url, verbose)
        else:
            return WebSocketTransport(base_url, verbose)

    @property
    def connected(self) -> bool:
        """Check if the WebSocket is connected."""
        return self._transport.connected

    @property
    def callbacks(self) -> Dict[str, List[Callable]]:
        """Access callbacks on the transport."""
        return self._transport.callbacks

    @callbacks.setter
    def callbacks(self, value: Dict[str, List[Callable]]) -> None:
        self._transport.callbacks = value

    def open(self, namespaces: Optional[List[str]] = None) -> None:
        """Open the WebSocket connection.

        Args:
            namespaces: Socket.IO namespaces to connect to (socketio transport only)
        """
        self._transport.open(namespaces)

    def close(self) -> None:
        """Close the WebSocket connection."""
        self._transport.close()

    def subscribe(
        self,
        stream_type: str,
        *,
        namespace: str = "/v1/stream",
        **params,
    ) -> Dict[str, Any]:
        """Subscribe to a stream.

        Args:
            stream_type: Type of stream (e.g., "BookDepth", "MarketPrice", "OrderFill")
            namespace: Socket.IO namespace (socketio transport only)
            **params: Subscription parameters (snake_case converted to camelCase)
                - symbol: Product symbol (e.g., "BTCUSD")
                - subaccount_id: Subaccount ID for account-specific streams
                - product_id: Product ID
        """
        data = {"type": stream_type, **prepare_params(**params)}
        return self._transport.send("subscribe", data, namespace=namespace)

    def unsubscribe(
        self,
        stream_type: str,
        *,
        namespace: str = "/v1/stream",
        **params,
    ) -> Dict[str, Any]:
        """Unsubscribe from a stream.

        Args:
            stream_type: Type of stream
            namespace: Socket.IO namespace (socketio transport only)
            **params: Unsubscription parameters (snake_case converted to camelCase)
        """
        data = {"type": stream_type, **prepare_params(**params)}
        return self._transport.send("unsubscribe", data, namespace=namespace)

callbacks property writable

Access callbacks on the transport.

connected property

Check if the WebSocket is connected.

close()

Close the WebSocket connection.

Source code in ethereal/ws_client.py
66
67
68
def close(self) -> None:
    """Close the WebSocket connection."""
    self._transport.close()

open(namespaces=None)

Open the WebSocket connection.

Parameters:

Name Type Description Default
namespaces Optional[List[str]]

Socket.IO namespaces to connect to (socketio transport only)

None
Source code in ethereal/ws_client.py
58
59
60
61
62
63
64
def open(self, namespaces: Optional[List[str]] = None) -> None:
    """Open the WebSocket connection.

    Args:
        namespaces: Socket.IO namespaces to connect to (socketio transport only)
    """
    self._transport.open(namespaces)

subscribe(stream_type, *, namespace='/v1/stream', **params)

Subscribe to a stream.

Parameters:

Name Type Description Default
stream_type str

Type of stream (e.g., "BookDepth", "MarketPrice", "OrderFill")

required
namespace str

Socket.IO namespace (socketio transport only)

'/v1/stream'
**params

Subscription parameters (snake_case converted to camelCase) - symbol: Product symbol (e.g., "BTCUSD") - subaccount_id: Subaccount ID for account-specific streams - product_id: Product ID

{}
Source code in ethereal/ws_client.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def subscribe(
    self,
    stream_type: str,
    *,
    namespace: str = "/v1/stream",
    **params,
) -> Dict[str, Any]:
    """Subscribe to a stream.

    Args:
        stream_type: Type of stream (e.g., "BookDepth", "MarketPrice", "OrderFill")
        namespace: Socket.IO namespace (socketio transport only)
        **params: Subscription parameters (snake_case converted to camelCase)
            - symbol: Product symbol (e.g., "BTCUSD")
            - subaccount_id: Subaccount ID for account-specific streams
            - product_id: Product ID
    """
    data = {"type": stream_type, **prepare_params(**params)}
    return self._transport.send("subscribe", data, namespace=namespace)

unsubscribe(stream_type, *, namespace='/v1/stream', **params)

Unsubscribe from a stream.

Parameters:

Name Type Description Default
stream_type str

Type of stream

required
namespace str

Socket.IO namespace (socketio transport only)

'/v1/stream'
**params

Unsubscription parameters (snake_case converted to camelCase)

{}
Source code in ethereal/ws_client.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def unsubscribe(
    self,
    stream_type: str,
    *,
    namespace: str = "/v1/stream",
    **params,
) -> Dict[str, Any]:
    """Unsubscribe from a stream.

    Args:
        stream_type: Type of stream
        namespace: Socket.IO namespace (socketio transport only)
        **params: Unsubscription parameters (snake_case converted to camelCase)
    """
    data = {"type": stream_type, **prepare_params(**params)}
    return self._transport.send("unsubscribe", data, namespace=namespace)

ethereal.chain_client.ChainClient

Bases: BaseClient

Client for interacting with the blockchain using Web3 functionality.

Parameters:

Name Type Description Default
config Union[Dict[str, Any], ChainConfig]

Chain configuration

required
rpc_config RpcConfigDto

RPC configuration. Defaults to None.

None

Raises:

Type Description
Exception

If RPC URL or private key is not specified in the configuration

Source code in ethereal/chain_client.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
class ChainClient(BaseClient):
    """Client for interacting with the blockchain using Web3 functionality.

    Args:
        config (Union[Dict[str, Any], ChainConfig]): Chain configuration
        rpc_config (RpcConfigDto, optional): RPC configuration. Defaults to None.

    Raises:
        Exception: If RPC URL or private key is not specified in the configuration
    """

    def __init__(
        self,
        config: Union[Dict[str, Any], ChainConfig],
        rpc_config: Optional[RpcConfigDto] = None,
        tokens: Optional[List[TokenDto]] = None,
    ):
        super().__init__(config)
        self.config = ChainConfig.model_validate(config)
        self.provider = self._setup_provider()
        self.account = self._setup_account()
        if self.account:
            self.address = self.account.address
            self.private_key = self.config.private_key
        else:
            self.address = self.config.address
            self.private_key = None

        self.chain_id = self.provider.eth.chain_id

        if tokens is not None:
            usde_token = next((t for t in tokens if t.name == "USD"), None)
            if usde_token is None:
                self.logger.warning("USD token not found in the provided tokens list.")
            else:
                usde_address = self.provider.to_checksum_address(usde_token.address)
                usde_abi = (
                    read_contract(self.chain_id, "WUSDe", common=True)
                    if usde_token.erc20_name and "Wrapped" in usde_token.erc20_name
                    else read_contract(self.chain_id, "ERC20", common=True)
                )

                self.usde = self.provider.eth.contract(
                    address=usde_address,
                    abi=usde_abi,
                )
        self.rpc_config = rpc_config

    @property
    def exchange_contract(self):
        return self.provider.eth.contract(
            address=self.rpc_config.domain.verifying_contract,
            abi=read_contract(self.chain_id, "ExchangeGateway"),
        )

    def _setup_provider(self):
        """Set up the Web3 provider.

        Returns:
            Web3: The Web3 provider instance

        Raises:
            Exception: If RPC URL is not specified in the configuration
        """
        # TODO: Support other provider types (e.g. WebSocket)
        if self.config.rpc_url is None:
            raise Exception("RPC URL must be specified in the configuration")
        return Web3(Web3.HTTPProvider(self.config.rpc_url))

    def _setup_account(self):
        """Set up the account.

        Returns:
            Account: The Web3 account instance

        Raises:
            Exception: If private key is not specified in the configuration
        """
        if self.config.private_key is None:
            self.logger.debug("Private key not specified in the configuration")
            return None

        account = self.provider.eth.account.from_key(self.config.private_key)
        if not account:
            raise Exception("Failed to create account from private key")
        if self.config.address and account.address != self.config.address:
            raise Exception(
                f"Private key does not match address specified in the config: {self.config.address}"
            )
        return account

    def _get_tx(self, value=0, to=None) -> TxParams:
        """Get default transaction parameters.

        Args:
            value (int, optional): The value to send. Defaults to 0.
            to (str, optional): The recipient address. Defaults to None.

        Returns:
            TxParams: The transaction parameters
        """
        params: TxParams = {
            "from": self.address,
            "chainId": self.chain_id,
            "value": value,
            "nonce": Nonce(self.get_nonce(self.address)),
        }
        if to is not None:
            params["to"] = to
        return params

    def _decode_error(self, error: ContractCustomError) -> str:
        abi_errors = {
            self.provider.to_hex(function_abi_to_4byte_selector(f)): f.get("name")
            for f in self.exchange_contract.abi
            if f.get("type") == "error"
        }
        data = error.data
        error_signature = data
        return abi_errors.get(error_signature, "Unknown error")

    def _get_by_alias(self, model: BaseModel, alias: str):
        """
        Get a field value by its alias from a Pydantic model.

        Args:
            model (BaseModel): The Pydantic model instance to extract the field from.
            alias (str): The alias of the field to retrieve.

        Returns:
            Any: The value of the field with the specified alias.

        Raises:
            KeyError: If the alias is not found in the model.
        """
        for field_name, field in model.__class__.model_fields.items():
            if field.alias == alias:
                return getattr(model, field_name)
        raise KeyError(f"Alias '{alias}' not found in model {model.__class__.__name__}")

    def get_signature_types(self, rpc_config: RpcConfigDto, primary_type: str):
        """Gets EIP-712 signature types.

        Args:
            rpc_config (RpcConfigDto): RPC configuration.
            primary_type (str): Primary type for the signature.

        Returns:
            dict: Dictionary containing signature type definitions.
        """
        return {
            "EIP712Domain": [
                {"name": "name", "type": "string"},
                {"name": "version", "type": "string"},
                {"name": "chainId", "type": "uint256"},
                {"name": "verifyingContract", "type": "address"},
            ],
            primary_type: self.convert_types(
                self._get_by_alias(rpc_config.signature_types, primary_type),
            ),
        }

    def convert_types(self, type_string: str) -> List[Dict[str, str]]:
        """Converts type string into EIP-712 field format.

        Args:
            type_string (str): String containing type definitions.

        Returns:
            List[Dict[str, str]]: List of field definitions.
        """
        fields = [comp.strip() for comp in type_string.split(",")]
        type_fields = []
        for field in fields:
            field_type, field_name = field.rsplit(" ", 1)
            type_fields.append({"name": field_name, "type": field_type})
        return type_fields

    def add_gas_fees(self, tx: TxParams) -> TxParams:
        """Add gas fee parameters to a transaction.

        Args:
            tx (TxParams): The transaction parameters

        Returns:
            TxParams: The transaction parameters with gas fee parameters added
        """
        if "maxFeePerGas" in tx and "maxPriorityFeePerGas" in tx:
            return tx
        try:
            gas_price = self.provider.eth.gas_price
            max_priority_fee = self.provider.eth.max_priority_fee
            tx["maxFeePerGas"] = gas_price
            tx["maxPriorityFeePerGas"] = max_priority_fee
            return tx
        except Web3Exception as e:
            self.logger.error(f"Failed to add gas: {e}")
            return tx

    def add_gas_limit(self, tx: TxParams) -> TxParams:
        """Add gas limit to a transaction.

        Args:
            tx (TxParams): The transaction parameters

        Returns:
            TxParams: The transaction parameters with gas limit added
        """
        if "gas" in tx:
            return tx
        try:
            gas = self.provider.eth.estimate_gas(tx)
            tx["gas"] = gas
            return tx
        except Web3Exception as e:
            self.logger.error(
                f"Failed to add gas limit: {self._decode_error(e) if isinstance(e, ContractCustomError) else e}"
            )
            raise e

    def submit_tx(self, tx: TxParams) -> str:
        """Submit a transaction.

        Args:
            tx (TxParams): The transaction parameters

        Returns:
            str: The transaction hash
        """
        tx = self.add_gas_fees(tx)
        tx = self.add_gas_limit(tx)
        try:
            signed_tx = self.provider.eth.account.sign_transaction(
                tx, private_key=self.private_key
            )
            tx_hash = self.provider.eth.send_raw_transaction(signed_tx.raw_transaction)
            return encode_hex(tx_hash)
        except Web3Exception as e:
            self.logger.error(f"Failed to submit transaction: {e}")
            raise e

    def get_nonce(self, address: str) -> int:
        """Get the nonce for a given address.

        Args:
            address (str): The address to get the nonce for

        Returns:
            int: The nonce, or -1 if failed
        """
        try:
            return self.provider.eth.get_transaction_count(address)
        except Web3Exception as e:
            self.logger.error(f"Failed to get nonce: {e}")
            return -1

    def get_balance(self, address: str) -> int:
        """Get the balance for a given address.

        Args:
            address (str): The address to get the balance for

        Returns:
            int: The balance, or -1 if failed
        """
        try:
            return self.provider.eth.get_balance(address)
        except Web3Exception as e:
            self.logger.error(f"Failed to get balance: {e}")
            return -1

    def get_token_balance(self, address: str, token_address: str) -> int:
        """Get the token balance for a given address.

        Args:
            address (str): The address to get the token balance for
            token_address (str): The token address

        Returns:
            int: The token balance, or -1 if failed
        """
        try:
            contract = self.provider.eth.contract(
                address=token_address,
                abi=read_contract(self.chain_id, "ERC20", common=True),
            )
            return contract.functions.balanceOf(address).call()
        except Web3Exception as e:
            self.logger.error(f"Failed to get token balance: {e}")
            return -1

    def sign_message(
        self,
        private_key: str,
        domain: dict,
        types: dict,
        primary_type: str,
        message: dict,
    ):
        """Sign an EIP-712 typed data message.

        Args:
            private_key (str): private key to sign the message with
            domain (dict): domain parameters including name, version, chainId, and verifyingContract
            types (dict): type definitions for the structured data
            primary_type (str): primary type for the signature
            message (dict): message data to be signed

        Returns:
            str: the hexadecimal signature string prefixed with '0x'
        """
        # A type fix for the domain
        domain["chainId"] = int(domain["chainId"])

        # Preparing the full message as per EIP-712
        full_message = {
            "types": types,
            "primaryType": primary_type,
            "domain": domain,
            "message": message,
        }

        encoded_message = encode_typed_data(full_message=full_message)

        # Signing the message
        signed_message = Account.sign_message(encoded_message, private_key)
        return "0x" + signed_message.signature.hex()

    def deposit_usde(
        self,
        amount: float,
        account_name: str = "primary",
        address: Optional[str] = None,
        submit: bool = False,
        account_name_bytes: Optional[str] = None,
    ) -> Union[TxParams, str]:
        """Submit a deposit transaction.

        Args:
            amount (float): The amount to deposit
            address (str, optional): The address to deposit to. Defaults to None.
            submit (bool, optional): Whether to submit the transaction. Defaults to False.
            account_name (str, optional): The account name. Defaults to "primary".
            account_name_bytes (str, optional): The account name as a hex string (bytes32). Defaults to None.

        Returns:
            Union[TxParams, str]: The transaction parameters or transaction hash if submit=True

        Raises:
            ValueError: If both account_name and account_name_bytes are provided
        """
        if address is None:
            address = self.address

        if account_name is None and account_name_bytes is None:
            account_name = "primary"

        # Validate inputs
        if account_name is not None and account_name_bytes is not None:
            raise ValueError("Cannot provide both account_name and account_name_bytes")

        try:
            # params
            if account_name_bytes is not None:
                subaccount = ensure_bytes32_hex(account_name_bytes)
            else:
                subaccount = self.provider.to_hex(text=account_name).ljust(66, "0")
            amount = self.provider.to_wei(amount, "ether")
            referral_code = self.provider.to_hex(0).ljust(66, "0")

            # prepare the tx
            tx = self._get_tx(to=self.exchange_contract.address, value=amount)
            tx["data"] = self.exchange_contract.encode_abi(
                "depositUsd", args=[subaccount, referral_code]
            )

            if submit:
                return self.submit_tx(tx)
            else:
                return tx

        except Web3Exception as e:
            self.logger.error(
                f"Failed to prepare deposit transaction: {self._decode_error(e) if isinstance(e, ContractCustomError) else e}"
            )
            raise e

    def finalize_withdraw(
        self,
        account_name: str = "primary",
        address: Optional[str] = None,
        submit: Optional[bool] = False,
    ) -> Union[TxParams, str]:
        """Finalize a withdrawal.

        Args:
            address (str, optional): The address to deposit to. Defaults to None.
            submit (bool, optional): Whether to submit the transaction. Defaults to False.
            account_name (str, optional): The name of the account. Defaults to "primary".

        Returns:
            Union[TxParams, str]: The transaction parameters or transaction hash if submit=True
        """
        if address is None:
            address = self.address
        try:
            # params
            subaccount = encode_account_name(account_name)

            # prepare the tx
            tx = self._get_tx(to=self.exchange_contract.address)
            tx["data"] = self.exchange_contract.encode_abi(
                "finalizeWithdraw", args=[address, subaccount]
            )

            if submit:
                return self.submit_tx(tx)
            else:
                return tx

        except Web3Exception as e:
            self.logger.error(
                f"Failed to prepare finalizeWithdraw transaction: {self._decode_error(e) if isinstance(e, ContractCustomError) else e}"
            )
            raise e

add_gas_fees(tx)

Add gas fee parameters to a transaction.

Parameters:

Name Type Description Default
tx TxParams

The transaction parameters

required

Returns:

Name Type Description
TxParams TxParams

The transaction parameters with gas fee parameters added

Source code in ethereal/chain_client.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def add_gas_fees(self, tx: TxParams) -> TxParams:
    """Add gas fee parameters to a transaction.

    Args:
        tx (TxParams): The transaction parameters

    Returns:
        TxParams: The transaction parameters with gas fee parameters added
    """
    if "maxFeePerGas" in tx and "maxPriorityFeePerGas" in tx:
        return tx
    try:
        gas_price = self.provider.eth.gas_price
        max_priority_fee = self.provider.eth.max_priority_fee
        tx["maxFeePerGas"] = gas_price
        tx["maxPriorityFeePerGas"] = max_priority_fee
        return tx
    except Web3Exception as e:
        self.logger.error(f"Failed to add gas: {e}")
        return tx

add_gas_limit(tx)

Add gas limit to a transaction.

Parameters:

Name Type Description Default
tx TxParams

The transaction parameters

required

Returns:

Name Type Description
TxParams TxParams

The transaction parameters with gas limit added

Source code in ethereal/chain_client.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def add_gas_limit(self, tx: TxParams) -> TxParams:
    """Add gas limit to a transaction.

    Args:
        tx (TxParams): The transaction parameters

    Returns:
        TxParams: The transaction parameters with gas limit added
    """
    if "gas" in tx:
        return tx
    try:
        gas = self.provider.eth.estimate_gas(tx)
        tx["gas"] = gas
        return tx
    except Web3Exception as e:
        self.logger.error(
            f"Failed to add gas limit: {self._decode_error(e) if isinstance(e, ContractCustomError) else e}"
        )
        raise e

convert_types(type_string)

Converts type string into EIP-712 field format.

Parameters:

Name Type Description Default
type_string str

String containing type definitions.

required

Returns:

Type Description
List[Dict[str, str]]

List[Dict[str, str]]: List of field definitions.

Source code in ethereal/chain_client.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def convert_types(self, type_string: str) -> List[Dict[str, str]]:
    """Converts type string into EIP-712 field format.

    Args:
        type_string (str): String containing type definitions.

    Returns:
        List[Dict[str, str]]: List of field definitions.
    """
    fields = [comp.strip() for comp in type_string.split(",")]
    type_fields = []
    for field in fields:
        field_type, field_name = field.rsplit(" ", 1)
        type_fields.append({"name": field_name, "type": field_type})
    return type_fields

deposit_usde(amount, account_name='primary', address=None, submit=False, account_name_bytes=None)

Submit a deposit transaction.

Parameters:

Name Type Description Default
amount float

The amount to deposit

required
address str

The address to deposit to. Defaults to None.

None
submit bool

Whether to submit the transaction. Defaults to False.

False
account_name str

The account name. Defaults to "primary".

'primary'
account_name_bytes str

The account name as a hex string (bytes32). Defaults to None.

None

Returns:

Type Description
Union[TxParams, str]

Union[TxParams, str]: The transaction parameters or transaction hash if submit=True

Raises:

Type Description
ValueError

If both account_name and account_name_bytes are provided

Source code in ethereal/chain_client.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
def deposit_usde(
    self,
    amount: float,
    account_name: str = "primary",
    address: Optional[str] = None,
    submit: bool = False,
    account_name_bytes: Optional[str] = None,
) -> Union[TxParams, str]:
    """Submit a deposit transaction.

    Args:
        amount (float): The amount to deposit
        address (str, optional): The address to deposit to. Defaults to None.
        submit (bool, optional): Whether to submit the transaction. Defaults to False.
        account_name (str, optional): The account name. Defaults to "primary".
        account_name_bytes (str, optional): The account name as a hex string (bytes32). Defaults to None.

    Returns:
        Union[TxParams, str]: The transaction parameters or transaction hash if submit=True

    Raises:
        ValueError: If both account_name and account_name_bytes are provided
    """
    if address is None:
        address = self.address

    if account_name is None and account_name_bytes is None:
        account_name = "primary"

    # Validate inputs
    if account_name is not None and account_name_bytes is not None:
        raise ValueError("Cannot provide both account_name and account_name_bytes")

    try:
        # params
        if account_name_bytes is not None:
            subaccount = ensure_bytes32_hex(account_name_bytes)
        else:
            subaccount = self.provider.to_hex(text=account_name).ljust(66, "0")
        amount = self.provider.to_wei(amount, "ether")
        referral_code = self.provider.to_hex(0).ljust(66, "0")

        # prepare the tx
        tx = self._get_tx(to=self.exchange_contract.address, value=amount)
        tx["data"] = self.exchange_contract.encode_abi(
            "depositUsd", args=[subaccount, referral_code]
        )

        if submit:
            return self.submit_tx(tx)
        else:
            return tx

    except Web3Exception as e:
        self.logger.error(
            f"Failed to prepare deposit transaction: {self._decode_error(e) if isinstance(e, ContractCustomError) else e}"
        )
        raise e

finalize_withdraw(account_name='primary', address=None, submit=False)

Finalize a withdrawal.

Parameters:

Name Type Description Default
address str

The address to deposit to. Defaults to None.

None
submit bool

Whether to submit the transaction. Defaults to False.

False
account_name str

The name of the account. Defaults to "primary".

'primary'

Returns:

Type Description
Union[TxParams, str]

Union[TxParams, str]: The transaction parameters or transaction hash if submit=True

Source code in ethereal/chain_client.py
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
def finalize_withdraw(
    self,
    account_name: str = "primary",
    address: Optional[str] = None,
    submit: Optional[bool] = False,
) -> Union[TxParams, str]:
    """Finalize a withdrawal.

    Args:
        address (str, optional): The address to deposit to. Defaults to None.
        submit (bool, optional): Whether to submit the transaction. Defaults to False.
        account_name (str, optional): The name of the account. Defaults to "primary".

    Returns:
        Union[TxParams, str]: The transaction parameters or transaction hash if submit=True
    """
    if address is None:
        address = self.address
    try:
        # params
        subaccount = encode_account_name(account_name)

        # prepare the tx
        tx = self._get_tx(to=self.exchange_contract.address)
        tx["data"] = self.exchange_contract.encode_abi(
            "finalizeWithdraw", args=[address, subaccount]
        )

        if submit:
            return self.submit_tx(tx)
        else:
            return tx

    except Web3Exception as e:
        self.logger.error(
            f"Failed to prepare finalizeWithdraw transaction: {self._decode_error(e) if isinstance(e, ContractCustomError) else e}"
        )
        raise e

get_balance(address)

Get the balance for a given address.

Parameters:

Name Type Description Default
address str

The address to get the balance for

required

Returns:

Name Type Description
int int

The balance, or -1 if failed

Source code in ethereal/chain_client.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def get_balance(self, address: str) -> int:
    """Get the balance for a given address.

    Args:
        address (str): The address to get the balance for

    Returns:
        int: The balance, or -1 if failed
    """
    try:
        return self.provider.eth.get_balance(address)
    except Web3Exception as e:
        self.logger.error(f"Failed to get balance: {e}")
        return -1

get_nonce(address)

Get the nonce for a given address.

Parameters:

Name Type Description Default
address str

The address to get the nonce for

required

Returns:

Name Type Description
int int

The nonce, or -1 if failed

Source code in ethereal/chain_client.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def get_nonce(self, address: str) -> int:
    """Get the nonce for a given address.

    Args:
        address (str): The address to get the nonce for

    Returns:
        int: The nonce, or -1 if failed
    """
    try:
        return self.provider.eth.get_transaction_count(address)
    except Web3Exception as e:
        self.logger.error(f"Failed to get nonce: {e}")
        return -1

get_signature_types(rpc_config, primary_type)

Gets EIP-712 signature types.

Parameters:

Name Type Description Default
rpc_config RpcConfigDto

RPC configuration.

required
primary_type str

Primary type for the signature.

required

Returns:

Name Type Description
dict

Dictionary containing signature type definitions.

Source code in ethereal/chain_client.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def get_signature_types(self, rpc_config: RpcConfigDto, primary_type: str):
    """Gets EIP-712 signature types.

    Args:
        rpc_config (RpcConfigDto): RPC configuration.
        primary_type (str): Primary type for the signature.

    Returns:
        dict: Dictionary containing signature type definitions.
    """
    return {
        "EIP712Domain": [
            {"name": "name", "type": "string"},
            {"name": "version", "type": "string"},
            {"name": "chainId", "type": "uint256"},
            {"name": "verifyingContract", "type": "address"},
        ],
        primary_type: self.convert_types(
            self._get_by_alias(rpc_config.signature_types, primary_type),
        ),
    }

get_token_balance(address, token_address)

Get the token balance for a given address.

Parameters:

Name Type Description Default
address str

The address to get the token balance for

required
token_address str

The token address

required

Returns:

Name Type Description
int int

The token balance, or -1 if failed

Source code in ethereal/chain_client.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def get_token_balance(self, address: str, token_address: str) -> int:
    """Get the token balance for a given address.

    Args:
        address (str): The address to get the token balance for
        token_address (str): The token address

    Returns:
        int: The token balance, or -1 if failed
    """
    try:
        contract = self.provider.eth.contract(
            address=token_address,
            abi=read_contract(self.chain_id, "ERC20", common=True),
        )
        return contract.functions.balanceOf(address).call()
    except Web3Exception as e:
        self.logger.error(f"Failed to get token balance: {e}")
        return -1

sign_message(private_key, domain, types, primary_type, message)

Sign an EIP-712 typed data message.

Parameters:

Name Type Description Default
private_key str

private key to sign the message with

required
domain dict

domain parameters including name, version, chainId, and verifyingContract

required
types dict

type definitions for the structured data

required
primary_type str

primary type for the signature

required
message dict

message data to be signed

required

Returns:

Name Type Description
str

the hexadecimal signature string prefixed with '0x'

Source code in ethereal/chain_client.py
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def sign_message(
    self,
    private_key: str,
    domain: dict,
    types: dict,
    primary_type: str,
    message: dict,
):
    """Sign an EIP-712 typed data message.

    Args:
        private_key (str): private key to sign the message with
        domain (dict): domain parameters including name, version, chainId, and verifyingContract
        types (dict): type definitions for the structured data
        primary_type (str): primary type for the signature
        message (dict): message data to be signed

    Returns:
        str: the hexadecimal signature string prefixed with '0x'
    """
    # A type fix for the domain
    domain["chainId"] = int(domain["chainId"])

    # Preparing the full message as per EIP-712
    full_message = {
        "types": types,
        "primaryType": primary_type,
        "domain": domain,
        "message": message,
    }

    encoded_message = encode_typed_data(full_message=full_message)

    # Signing the message
    signed_message = Account.sign_message(encoded_message, private_key)
    return "0x" + signed_message.signature.hex()

submit_tx(tx)

Submit a transaction.

Parameters:

Name Type Description Default
tx TxParams

The transaction parameters

required

Returns:

Name Type Description
str str

The transaction hash

Source code in ethereal/chain_client.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def submit_tx(self, tx: TxParams) -> str:
    """Submit a transaction.

    Args:
        tx (TxParams): The transaction parameters

    Returns:
        str: The transaction hash
    """
    tx = self.add_gas_fees(tx)
    tx = self.add_gas_limit(tx)
    try:
        signed_tx = self.provider.eth.account.sign_transaction(
            tx, private_key=self.private_key
        )
        tx_hash = self.provider.eth.send_raw_transaction(signed_tx.raw_transaction)
        return encode_hex(tx_hash)
    except Web3Exception as e:
        self.logger.error(f"Failed to submit transaction: {e}")
        raise e

ethereal.rest.http_client.HTTPClient

Bases: BaseClient

HTTP client for making API requests.

Parameters:

Name Type Description Default
config Union[Dict[str, Any], HTTPConfig]

Client configuration.

required
Source code in ethereal/rest/http_client.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
class HTTPClient(BaseClient):
    """HTTP client for making API requests.

    Args:
        config (Union[Dict[str, Any], HTTPConfig]): Client configuration.
    """

    def __init__(self, config: Union[Dict[str, Any], HTTPConfig]):
        super().__init__(config)
        self.config = HTTPConfig.model_validate(config)
        self.base_url = self.config.base_url
        self.timeout = self.config.timeout
        self.session = self._setup_session()
        self.rate_limit_headers = self.config.rate_limit_headers

    def _setup_session(self):
        """Sets up an HTTP session.

        Returns:
            requests.Session: Configured session object.
        """
        return requests.Session()

    def _handle_exception(self, response):
        """Handles HTTP exceptions.

        Args:
            response (Response): The HTTP response object.

        Raises:
            HTTPError: If response indicates an error occurred.
        """
        http_error_msg = ""
        reason = response.reason

        if 400 <= response.status_code < 500:
            if (
                response.status_code == 403
                and "'error_details':'Missing required scopes'" in response.text
            ):
                http_error_msg = f"{response.status_code} Client Error: Missing Required Scopes. Please verify your API keys include the necessary permissions."
            else:
                http_error_msg = (
                    f"{response.status_code} Client Error: {reason} {response.text}"
                )
        elif 500 <= response.status_code < 600:
            http_error_msg = (
                f"{response.status_code} Server Error: {reason} {response.text}"
            )

        if http_error_msg:
            self.logger.error(f"HTTP Error: {http_error_msg}")
            raise HTTPError(http_error_msg, response=response)

    def get(self, url_path, params: Optional[dict] = None, **kwargs) -> Dict[str, Any]:
        """Sends a GET request.

        Args:
            url_path (str): The URL path. Required.
            params (dict, optional): The query parameters. Optional.
            **kwargs: Additional arguments to pass to the request. Optional.

        Returns:
            Dict[str, Any]: The response data.
        """
        params = params or {}

        if kwargs:
            params.update(kwargs)

        return self.prepare_and_send_request("GET", url_path, params, data=None)

    def get_validated(
        self,
        url_path,
        request_model: Type[BaseModel],
        response_model: Type[BaseModel],
        **kwargs,
    ) -> BaseModel:
        """Sends a GET request including type validation of both the input and output from provided models.

        Args:
            url_path (str): The URL path. Required.
            request_model (Type[BaseModel]): Pydantic model for request validation. Required.
            response_model (Type[BaseModel]): Pydantic model for response validation. Required.
            **kwargs: Includes all arguments to pass to the request. Optional.

        Returns:
            BaseModel: The response data, validated against the response_model.
        """
        try:
            validated_params = request_model.model_validate(kwargs, by_name=True)
        except ValidationError as e:
            raise ValidationException(
                "Request", url_path, request_model, e.errors()
            ) from None
        params = validated_params.model_dump(
            mode="json", exclude_none=True, by_alias=True
        )

        response_data = self.prepare_and_send_request(
            "GET", url_path, params, data=None
        )
        validated_response = response_model.model_validate(response_data)
        return validated_response

    def post(
        self,
        url_path,
        params: Optional[dict] = None,
        data: Optional[dict] = None,
        **kwargs,
    ) -> Dict[str, Any]:
        """Sends a POST request.

        Args:
            url_path (str): The URL path. Required.
            params (dict, optional): The query parameters. Optional.
            data (dict, optional): The request body. Optional.
            **kwargs: Additional arguments to pass to the request. Optional.

        Returns:
            Dict[str, Any]: The response data.
        """
        data = data or {}

        if kwargs:
            data.update(kwargs)

        return self.prepare_and_send_request("POST", url_path, params, data)

    def put(
        self,
        url_path,
        params: Optional[dict] = None,
        data: Optional[dict] = None,
        **kwargs,
    ) -> Dict[str, Any]:
        """Sends a PUT request.

        Args:
            url_path (str): The URL path. Required.
            params (dict, optional): The query parameters. Optional.
            data (dict, optional): The request body. Optional.
            **kwargs: Additional arguments to pass to the request. Optional.

        Returns:
            Dict[str, Any]: The response data.
        """
        data = data or {}

        if kwargs:
            data.update(kwargs)

        return self.prepare_and_send_request("PUT", url_path, params, data)

    def delete(
        self,
        url_path,
        params: Optional[dict] = None,
        data: Optional[dict] = None,
        **kwargs,
    ) -> Dict[str, Any]:
        """Sends a DELETE request.

        Args:
            url_path (str): The URL path. Required.
            params (dict, optional): The query parameters. Optional.
            data (dict, optional): The request body. Optional.
            **kwargs: Additional arguments to pass to the request. Optional.

        Returns:
            Dict[str, Any]: The response data.
        """
        data = data or {}

        if kwargs:
            data.update(kwargs)

        return self.prepare_and_send_request("DELETE", url_path, params, data)

    def prepare_and_send_request(
        self,
        http_method,
        url_path,
        params: Optional[dict] = None,
        data: Optional[dict] = None,
    ):
        """Prepares and sends an HTTP request.

        Args:
            http_method (str): The HTTP method. Required.
            url_path (str): The URL path. Required.
            params (dict, optional): The query parameters. Optional.
            data (dict, optional): The request body. Optional.

        Returns:
            Dict[str, Any]: The response data.
        """
        headers = self.set_headers(http_method, url_path)

        if params is not None:
            params = {
                key: str(value).lower() if isinstance(value, bool) else value
                for key, value in params.items()
                if value is not None
            }

        if data is not None:
            data = {key: value for key, value in data.items() if value is not None}

        return self.send_request(http_method, url_path, params, headers, data=data)

    def send_request(self, http_method, url_path, params, headers, data=None):
        """Sends an HTTP request.

        Args:
            http_method (str): The HTTP method. Required.
            url_path (str): The URL path. Required.
            params (dict): The query parameters. Required.
            headers (dict): The request headers. Required.
            data (dict, optional): The request body. Optional.

        Returns:
            Dict[str, Any]: The response data.

        Raises:
            HTTPError: If the request fails.
        """
        if data is None:
            data = {}

        url = f"{self.base_url}{url_path}"

        self.logger.debug(f"Sending {http_method} request to {url}")

        response = self.session.request(
            http_method,
            url,
            params=params,
            json=data,
            headers=headers,
            timeout=self.timeout,
        )
        self._handle_exception(response)  # Raise an HTTPError for bad responses

        self.logger.debug(f"Raw response: {response.json()}")

        response_data = response.json()

        if self.rate_limit_headers:
            response_headers = dict(response.headers)
            specific_headers = {
                REST_COMMON_FIELDS.get(key, key): response_headers.get(key, None)
                for key in RATE_LIMIT_HEADERS
            }

            response_data = {**response_data, **specific_headers}

        return response_data

    def set_headers(self, method, path):
        """Sets the request headers.

        Args:
            method (str): The HTTP method. Required.
            path (str): The URL path. Required.

        Returns:
            dict: The request headers.
        """

        return {
            "User-Agent": USER_AGENT,
            "Content-Type": "application/json",
        }

delete(url_path, params=None, data=None, **kwargs)

Sends a DELETE request.

Parameters:

Name Type Description Default
url_path str

The URL path. Required.

required
params dict

The query parameters. Optional.

None
data dict

The request body. Optional.

None
**kwargs

Additional arguments to pass to the request. Optional.

{}

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The response data.

Source code in ethereal/rest/http_client.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def delete(
    self,
    url_path,
    params: Optional[dict] = None,
    data: Optional[dict] = None,
    **kwargs,
) -> Dict[str, Any]:
    """Sends a DELETE request.

    Args:
        url_path (str): The URL path. Required.
        params (dict, optional): The query parameters. Optional.
        data (dict, optional): The request body. Optional.
        **kwargs: Additional arguments to pass to the request. Optional.

    Returns:
        Dict[str, Any]: The response data.
    """
    data = data or {}

    if kwargs:
        data.update(kwargs)

    return self.prepare_and_send_request("DELETE", url_path, params, data)

get(url_path, params=None, **kwargs)

Sends a GET request.

Parameters:

Name Type Description Default
url_path str

The URL path. Required.

required
params dict

The query parameters. Optional.

None
**kwargs

Additional arguments to pass to the request. Optional.

{}

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The response data.

Source code in ethereal/rest/http_client.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def get(self, url_path, params: Optional[dict] = None, **kwargs) -> Dict[str, Any]:
    """Sends a GET request.

    Args:
        url_path (str): The URL path. Required.
        params (dict, optional): The query parameters. Optional.
        **kwargs: Additional arguments to pass to the request. Optional.

    Returns:
        Dict[str, Any]: The response data.
    """
    params = params or {}

    if kwargs:
        params.update(kwargs)

    return self.prepare_and_send_request("GET", url_path, params, data=None)

get_validated(url_path, request_model, response_model, **kwargs)

Sends a GET request including type validation of both the input and output from provided models.

Parameters:

Name Type Description Default
url_path str

The URL path. Required.

required
request_model Type[BaseModel]

Pydantic model for request validation. Required.

required
response_model Type[BaseModel]

Pydantic model for response validation. Required.

required
**kwargs

Includes all arguments to pass to the request. Optional.

{}

Returns:

Name Type Description
BaseModel BaseModel

The response data, validated against the response_model.

Source code in ethereal/rest/http_client.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def get_validated(
    self,
    url_path,
    request_model: Type[BaseModel],
    response_model: Type[BaseModel],
    **kwargs,
) -> BaseModel:
    """Sends a GET request including type validation of both the input and output from provided models.

    Args:
        url_path (str): The URL path. Required.
        request_model (Type[BaseModel]): Pydantic model for request validation. Required.
        response_model (Type[BaseModel]): Pydantic model for response validation. Required.
        **kwargs: Includes all arguments to pass to the request. Optional.

    Returns:
        BaseModel: The response data, validated against the response_model.
    """
    try:
        validated_params = request_model.model_validate(kwargs, by_name=True)
    except ValidationError as e:
        raise ValidationException(
            "Request", url_path, request_model, e.errors()
        ) from None
    params = validated_params.model_dump(
        mode="json", exclude_none=True, by_alias=True
    )

    response_data = self.prepare_and_send_request(
        "GET", url_path, params, data=None
    )
    validated_response = response_model.model_validate(response_data)
    return validated_response

post(url_path, params=None, data=None, **kwargs)

Sends a POST request.

Parameters:

Name Type Description Default
url_path str

The URL path. Required.

required
params dict

The query parameters. Optional.

None
data dict

The request body. Optional.

None
**kwargs

Additional arguments to pass to the request. Optional.

{}

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The response data.

Source code in ethereal/rest/http_client.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def post(
    self,
    url_path,
    params: Optional[dict] = None,
    data: Optional[dict] = None,
    **kwargs,
) -> Dict[str, Any]:
    """Sends a POST request.

    Args:
        url_path (str): The URL path. Required.
        params (dict, optional): The query parameters. Optional.
        data (dict, optional): The request body. Optional.
        **kwargs: Additional arguments to pass to the request. Optional.

    Returns:
        Dict[str, Any]: The response data.
    """
    data = data or {}

    if kwargs:
        data.update(kwargs)

    return self.prepare_and_send_request("POST", url_path, params, data)

prepare_and_send_request(http_method, url_path, params=None, data=None)

Prepares and sends an HTTP request.

Parameters:

Name Type Description Default
http_method str

The HTTP method. Required.

required
url_path str

The URL path. Required.

required
params dict

The query parameters. Optional.

None
data dict

The request body. Optional.

None

Returns:

Type Description

Dict[str, Any]: The response data.

Source code in ethereal/rest/http_client.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def prepare_and_send_request(
    self,
    http_method,
    url_path,
    params: Optional[dict] = None,
    data: Optional[dict] = None,
):
    """Prepares and sends an HTTP request.

    Args:
        http_method (str): The HTTP method. Required.
        url_path (str): The URL path. Required.
        params (dict, optional): The query parameters. Optional.
        data (dict, optional): The request body. Optional.

    Returns:
        Dict[str, Any]: The response data.
    """
    headers = self.set_headers(http_method, url_path)

    if params is not None:
        params = {
            key: str(value).lower() if isinstance(value, bool) else value
            for key, value in params.items()
            if value is not None
        }

    if data is not None:
        data = {key: value for key, value in data.items() if value is not None}

    return self.send_request(http_method, url_path, params, headers, data=data)

put(url_path, params=None, data=None, **kwargs)

Sends a PUT request.

Parameters:

Name Type Description Default
url_path str

The URL path. Required.

required
params dict

The query parameters. Optional.

None
data dict

The request body. Optional.

None
**kwargs

Additional arguments to pass to the request. Optional.

{}

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The response data.

Source code in ethereal/rest/http_client.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def put(
    self,
    url_path,
    params: Optional[dict] = None,
    data: Optional[dict] = None,
    **kwargs,
) -> Dict[str, Any]:
    """Sends a PUT request.

    Args:
        url_path (str): The URL path. Required.
        params (dict, optional): The query parameters. Optional.
        data (dict, optional): The request body. Optional.
        **kwargs: Additional arguments to pass to the request. Optional.

    Returns:
        Dict[str, Any]: The response data.
    """
    data = data or {}

    if kwargs:
        data.update(kwargs)

    return self.prepare_and_send_request("PUT", url_path, params, data)

send_request(http_method, url_path, params, headers, data=None)

Sends an HTTP request.

Parameters:

Name Type Description Default
http_method str

The HTTP method. Required.

required
url_path str

The URL path. Required.

required
params dict

The query parameters. Required.

required
headers dict

The request headers. Required.

required
data dict

The request body. Optional.

None

Returns:

Type Description

Dict[str, Any]: The response data.

Raises:

Type Description
HTTPError

If the request fails.

Source code in ethereal/rest/http_client.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def send_request(self, http_method, url_path, params, headers, data=None):
    """Sends an HTTP request.

    Args:
        http_method (str): The HTTP method. Required.
        url_path (str): The URL path. Required.
        params (dict): The query parameters. Required.
        headers (dict): The request headers. Required.
        data (dict, optional): The request body. Optional.

    Returns:
        Dict[str, Any]: The response data.

    Raises:
        HTTPError: If the request fails.
    """
    if data is None:
        data = {}

    url = f"{self.base_url}{url_path}"

    self.logger.debug(f"Sending {http_method} request to {url}")

    response = self.session.request(
        http_method,
        url,
        params=params,
        json=data,
        headers=headers,
        timeout=self.timeout,
    )
    self._handle_exception(response)  # Raise an HTTPError for bad responses

    self.logger.debug(f"Raw response: {response.json()}")

    response_data = response.json()

    if self.rate_limit_headers:
        response_headers = dict(response.headers)
        specific_headers = {
            REST_COMMON_FIELDS.get(key, key): response_headers.get(key, None)
            for key in RATE_LIMIT_HEADERS
        }

        response_data = {**response_data, **specific_headers}

    return response_data

set_headers(method, path)

Sets the request headers.

Parameters:

Name Type Description Default
method str

The HTTP method. Required.

required
path str

The URL path. Required.

required

Returns:

Name Type Description
dict

The request headers.

Source code in ethereal/rest/http_client.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def set_headers(self, method, path):
    """Sets the request headers.

    Args:
        method (str): The HTTP method. Required.
        path (str): The URL path. Required.

    Returns:
        dict: The request headers.
    """

    return {
        "User-Agent": USER_AGENT,
        "Content-Type": "application/json",
    }