Skip to content

API Reference

This reference documents the main classes and methods in the Ethereal Python SDK.

Client Classes

AsyncRESTClient

The primary asynchronous client for interacting with the Ethereal API via REST endpoints. Recommended for all new applications.

from ethereal import AsyncRESTClient

# Create async client (use within async function)
client = await AsyncRESTClient.create({
    "base_url": "https://api.etherealtest.net",
    "chain_config": {
        "rpc_url": "https://rpc.etherealtest.net",
        "private_key": "your_private_key",  # optional, required for signing
    }
})

# Use the client
products = await client.products()
subaccounts = await client.subaccounts()

# Remember to close when done
await client.close()

ethereal.async_rest_client.AsyncRESTClient

Bases: AsyncHTTPClient

Asynchronous REST client for the Ethereal API.

Notes for maintainers: - This client composes endpoint functions from the ethereal.rest.* modules by assigning them as attributes on the class (see below). Each function expects self to provide get, post, and get_validated from AsyncHTTPClient and to expose _models for the active network. - Network-specific models are accessed via self._models which is set based on the configured network. This avoids global mutation and is predictable. - Use AsyncRESTClient.create(...) to ensure async initialization (RPC config, optional chain client) happens before use. Remember to await client.close() to release the underlying httpx.AsyncClient. - We intentionally avoid “async properties”. For convenience methods that derive indices (e.g., products by ticker/id), use explicit async methods like get_products_by_ticker() or get_products_by_id() that fetch fresh data each call to keep behavior lightweight and predictable.

Source code in ethereal/async_rest_client.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
class AsyncRESTClient(AsyncHTTPClient):
    """Asynchronous REST client for the Ethereal API.

    Notes for maintainers:
    - This client composes endpoint functions from the `ethereal.rest.*` modules
      by assigning them as attributes on the class (see below). Each function
      expects `self` to provide `get`, `post`, and `get_validated` from
      AsyncHTTPClient and to expose `_models` for the active network.
    - Network-specific models are accessed via `self._models` which is set based
      on the configured network. This avoids global mutation and is predictable.
    - Use `AsyncRESTClient.create(...)` to ensure async initialization (RPC
      config, optional chain client) happens before use. Remember to `await
      client.close()` to release the underlying `httpx.AsyncClient`.
    - We intentionally avoid “async properties”. For convenience methods that
      derive indices (e.g., products by ticker/id), use explicit async methods
      like `get_products_by_ticker()` or `get_products_by_id()` that fetch fresh
      data each call to keep behavior lightweight and predictable.
    """

    list_funding = list_funding
    get_projected_funding = get_projected_funding
    get_order = get_order
    list_fills = list_fills
    list_orders = list_orders
    list_trades = list_trades
    prepare_order = prepare_order
    sign_order = sign_order
    submit_order = submit_order
    dry_run_order = dry_run_order
    prepare_cancel_order = prepare_cancel_order
    sign_cancel_order = sign_cancel_order
    cancel_order = cancel_order
    get_signer = get_signer
    get_signer_quota = get_signer_quota
    list_signers = list_signers
    prepare_linked_signer = prepare_linked_signer
    sign_linked_signer = sign_linked_signer
    link_linked_signer = link_linked_signer
    prepare_revoke_linked_signer = prepare_revoke_linked_signer
    sign_revoke_linked_signer = sign_revoke_linked_signer
    revoke_linked_signer = revoke_linked_signer
    list_positions = list_positions
    get_position = get_position
    get_market_liquidity = get_market_liquidity
    list_market_prices = list_market_prices
    list_products = list_products
    get_rpc_config = get_rpc_config
    list_subaccounts = list_subaccounts
    get_subaccount = get_subaccount
    get_subaccount_balances = get_subaccount_balances
    get_subaccount_balance_history = get_subaccount_balance_history
    get_subaccount_unrealized_pnl_history = get_subaccount_unrealized_pnl_history
    get_subaccount_volume_history = get_subaccount_volume_history
    get_token = get_token
    list_token_withdraws = list_token_withdraws
    list_tokens = list_tokens
    list_token_transfers = list_token_transfers
    prepare_withdraw_token = prepare_withdraw_token
    sign_withdraw_token = sign_withdraw_token
    withdraw_token = withdraw_token

    def __init__(self, config: Union[Dict[str, Any], RESTConfig] = {}):
        self.config = RESTConfig.model_validate(config)
        network = self.config.network or "testnet"

        if not self.config.base_url:
            self.config.base_url = AnyHttpUrl(
                NETWORK_URLS.get(network, NETWORK_URLS["testnet"])
            )

        if not self.config.archive_base_url:
            self.config.archive_base_url = AnyHttpUrl(
                ARCHIVE_NETWORK_URLS.get(network, ARCHIVE_NETWORK_URLS["testnet"])
            )

        self._models = importlib.import_module(
            _MODEL_PATHS.get(network, _MODEL_PATHS["default"])  # type: ignore
        )
        super().__init__(self.config)

        self._archive_base_url = self.config.archive_base_url
        self.chain: Optional[ChainClient] = None
        self.rpc_config: Optional[RpcConfigDto] = None
        self.private_key: Optional[str] = None
        self.provider: Optional[Any] = None
        self.default_time_in_force = self.config.default_time_in_force
        self.default_post_only = self.config.default_post_only
        self._subaccounts: Optional[List[SubaccountDto]] = None
        self._products: Optional[List[ProductDto]] = None
        self._tokens: Optional[List[TokenDto]] = None
        self._products_by_ticker: Optional[Dict[str, ProductDto]] = None
        self._products_by_id: Optional[Dict[str, ProductDto]] = None

    @classmethod
    async def create(
        cls, config: Union[Dict[str, Any], RESTConfig] = {}
    ) -> AsyncRESTClient:
        """Factory method to create and asynchronously initialize the client.

        Args:
            config (Union[Dict[str, Any], RESTConfig], optional): Configuration dictionary or RESTConfig object. Optional fields include:
                private_key (str, optional): The private key.
                base_url (str, optional): Base URL for REST requests. Defaults to mainnet.
                timeout (int, optional): Timeout in seconds for REST requests.
                verbose (bool, optional): Enables debug logging. Defaults to False.
                rate_limit_headers (bool, optional): Enables rate limit headers. Defaults to False.
                chain_config (ChainConfig, optional): Chain configuration for signing transactions.

        Returns:
            AsyncRESTClient: Fully initialized async client instance.
        """
        client = cls(config)
        await client._async_init()
        return client

    async def _async_init(self):
        """Asynchronous initialization.

        Loads RPC configuration and (optionally) initializes the chain client if
        `chain_config` is provided. This split constructor pattern keeps
        `__init__` sync, while `create()` ensures the instance is fully ready.
        """
        self.rpc_config = await self.get_rpc_config()
        tokens = await self.list_tokens()
        if self.config.chain_config:
            self._init_chain_client(self.config.chain_config, self.rpc_config, tokens)
        self.private_key = self.chain.private_key if self.chain else None
        self.provider = self.chain.provider if self.chain else None
        # No eager cache priming; caches are populated on first access

    def _init_chain_client(
        self,
        config: Union[Dict[str, Any], ChainConfig],
        rpc_config: Optional[RpcConfigDto] = None,
        tokens: Optional[List[TokenDto]] = None,
    ):
        """Initialize the ChainClient for transaction signing.

        Args:
            config (Union[Dict[str, Any], ChainConfig]): The chain configuration.
            rpc_config (RpcConfigDto, optional): RPC configuration. Defaults to None.
            tokens (List[TokenDto], optional): List of token configurations. Defaults to None.
        """
        config = ChainConfig.model_validate(config)
        try:
            self.chain = ChainClient(config, rpc_config, tokens)
            self.logger.debug("Chain client initialized successfully")
        except Exception as e:
            self.logger.debug(f"Failed to initialize chain client: {e}")

    async def _get_pages(
        self,
        endpoint: str,
        request_model: Type[BaseModel],
        response_model: Type[BaseModel],
        paginate: bool = False,
        **kwargs,
    ) -> Any:
        """Make a GET request with validated parameters and response, handling pagination.

        Args:
            endpoint (str): API endpoint path (e.g. "order" will be appended to the base URL and prefix to form "/v1/order")
            request_model (Type[BaseModel]): Pydantic model for request validation
            response_model (Type[BaseModel]): Pydantic model for response validation
            paginate (bool, optional): If True, fetch all pages and return aggregated list. Defaults to False.

        Other Parameters:
            **kwargs: Parameters to validate and include in the request

        Returns:
            Any: List of validated response objects. Single page if paginate=False, all pages if paginate=True.
        """
        first_page = await self.get_validated(
            url_path=f"{API_PREFIX}/{endpoint}",
            request_model=request_model,
            response_model=response_model,
            **kwargs,
        )
        # `get_validated` already returns an instance of `response_model` with
        # `.data`, `.has_next`, and `.next_cursor`.
        if not paginate:
            return list(first_page.data)

        all_items: List[Any] = list(first_page.data)
        current_cursor = getattr(first_page, "next_cursor", None)
        has_next = getattr(first_page, "has_next", False)

        while has_next and current_cursor:
            page = await self.get_validated(
                url_path=f"{API_PREFIX}/{endpoint}",
                request_model=request_model,
                response_model=response_model,
                cursor=current_cursor,
                **kwargs,
            )
            all_items.extend(page.data)
            has_next = getattr(page, "has_next", False)
            current_cursor = getattr(page, "next_cursor", None)

        return all_items

    async def subaccounts(self, refresh: bool = False) -> List[SubaccountDto]:
        """Get the list of subaccounts.

        Args:
            refresh (bool, optional): If True, bypass cache and fetch fresh data. Defaults to False.

        Returns:
            List[SubaccountDto]: List of subaccount objects for the connected wallet address.

        Raises:
            ValueError: If no chain client is configured or address is unavailable.
        """
        if not self.chain or not getattr(self.chain, "address", None):
            raise ValueError("Chain address is required to list subaccounts")
        if refresh or self._subaccounts is None:
            self._subaccounts = await self.list_subaccounts(
                sender=self.chain.address, order_by="createdAt", order="asc"
            )
        return self._subaccounts

    async def products(self, refresh: bool = False) -> List[ProductDto]:
        """Get the list of products.

        Args:
            refresh (bool, optional): If True, bypass cache and fetch fresh data. Defaults to False.

        Returns:
            List[ProductDto]: List of product objects.
        """
        if refresh or self._products is None:
            self._products = await self.list_products()
        return self._products

    async def tokens(self, refresh: bool = False) -> List[TokenDto]:
        """Get the list of tokens.

        Args:
            refresh (bool, optional): If True, bypass cache and fetch fresh data. Defaults to False.

        Returns:
            List[TokenDto]: List of token objects.
        """
        if refresh or self._tokens is None:
            self._tokens = await self.list_tokens()
        return self._tokens

    async def products_by_ticker(self, refresh: bool = False) -> Dict[str, ProductDto]:
        """Get the products indexed by ticker.

        Args:
            refresh (bool, optional): If True, bypass cache and fetch fresh data. Defaults to False.

        Returns:
            Dict[str, ProductDto]: Dictionary of products keyed by ticker.
        """
        if refresh or self._products_by_ticker is None:
            products = await self.products(refresh=refresh)
            self._products_by_ticker = {
                p.ticker: p for p in products if getattr(p, "ticker", None)
            }
        return self._products_by_ticker

    async def products_by_id(self, refresh: bool = False) -> Dict[str, ProductDto]:
        """Get the products indexed by ID.

        Args:
            refresh (bool, optional): If True, bypass cache and fetch fresh data. Defaults to False.

        Returns:
            Dict[str, ProductDto]: Dictionary of products keyed by ID.
        """
        if refresh or self._products_by_id is None:
            products = await self.products(refresh=refresh)
            self._products_by_id = {p.id: p for p in products}
        return self._products_by_id

    async def get_maintenance_margin(
        self,
        subaccount_id: str,
        positions: Optional[Union[List[PositionDto], List[Dict[str, Any]]]] = None,
        products: Optional[Union[List[ProductDto], List[Dict[str, Any]]]] = None,
        product_ids: Optional[List[str]] = None,
    ) -> Decimal:
        """Calculate the an account's maintenance margin for specified positions or products.

        Args:
            subaccount_id (str): Fetch positions for this subaccount when ``positions`` is not supplied.
            positions (Union[List[PositionDto], List[Dict[str, Any]]], optional): Pre-fetched positions to use directly.
            products (Union[List[ProductDto], List[Dict[str, Any]]], optional): Pre-fetched products used to filter
                the calculation.
            product_ids (List[str], optional): Filters the calculation to these product IDs.

        Returns:
            Decimal: Total maintenance margin for the filtered positions.

        Raises:
            ValueError: If neither positions nor subaccount context is provided, or if any
                referenced product cannot be resolved.
        """

        class PartialPosition(BaseModel):
            product_id: str
            cost: str

        class PartialProduct(BaseModel):
            id: str
            max_leverage: float
            taker_fee: str

        if positions is None:
            positions = await self.list_positions(subaccount_id=subaccount_id)

        if products and product_ids:
            raise ValueError("Can only specify one of products and product_ids")

        if products is not None:
            valid_products = [
                PartialProduct(**p if isinstance(p, dict) else p.model_dump())
                for p in products
            ]
            products_by_id = {product.id: product for product in valid_products}
        else:
            raw_products_by_id = await self.products_by_id()
            products_by_id = {
                k: PartialProduct(**v if isinstance(v, dict) else v.model_dump())
                for k, v in raw_products_by_id.items()
            }

        product_ids_set: Optional[Set[str]] = None
        if product_ids is not None:
            product_ids_set = set(product_ids)
            missing_products = product_ids_set - set(products_by_id.keys())
            if missing_products:
                missing = ", ".join(sorted(missing_products))
                raise ValueError(f"Products not found for calculation: {missing}")

        valid_positions = [
            PartialPosition(**p if isinstance(p, dict) else p.model_dump())
            for p in positions
        ]
        positions_filtered = [
            position
            for position in valid_positions
            if product_ids_set is None or position.product_id in product_ids_set
        ]

        if not positions_filtered:
            return Decimal("0")

        total_mm = Decimal("0")
        for position in positions_filtered:
            product = products_by_id.get(position.product_id)
            if product is None:
                raise ValueError(
                    f"Product '{position.product_id}' not found for position '{position.product_id}'"
                )

            notional = abs(Decimal(position.cost))
            max_leverage = Decimal(str(product.max_leverage))
            taker_fee_rate = Decimal(product.taker_fee)

            mmr = Decimal("1") / (max_leverage * Decimal("2"))
            total_mm += notional * mmr
            total_mm += notional * taker_fee_rate

        return total_mm

    async def get_tokens(self) -> List[TokenDto]:
        """Return the latest list of tokens (no caching)."""
        return await self.list_tokens()

    async def create_order(
        self,
        order_type: str,
        quantity: float,
        side: int,
        price: Optional[float] = None,
        ticker: Optional[str] = None,
        product_id: Optional[str] = None,
        client_order_id: Optional[str] = None,
        sender: Optional[str] = None,
        subaccount: Optional[str] = None,
        time_in_force: Optional[str] = None,
        post_only: Optional[bool] = None,
        reduce_only: Optional[bool] = False,
        close: Optional[bool] = None,
        stop_price: Optional[float] = None,
        stop_type: Optional[int] = None,
        expires_at: Optional[int] = None,
        group_id: Optional[str] = None,
        group_contingency_type: Optional[int] = None,
        sign: bool = True,
        dry_run: bool = False,
        submit: bool = True,
    ) -> Union[SubmitOrderCreatedDto, DryRunOrderCreatedDto, SubmitOrderDto]:
        """Create and submit an order.

        Args:
            order_type (str): 'LIMIT' or 'MARKET'. Required.
            quantity (float): Order size. Required.
            side (int): 0 for buy, 1 for sell. Required.
            price (float, optional): Limit price for LIMIT orders.
            ticker (str, optional): Ticker of the product.
            product_id (str, optional): UUID of the product.
            client_order_id (str, optional): Subaccount-scoped client-generated id (UUID or <=32 alphanumeric).
            sender (str, optional): Address placing the order. Defaults to chain address.
            subaccount (str, optional): Hex-encoded subaccount name. Defaults to first subaccount.
            time_in_force (str, optional): For LIMIT orders (e.g., 'GTC', 'GTD'). Defaults to 'GTC'.
            post_only (bool, optional): For LIMIT orders; rejects if crossing. Defaults to False.
            reduce_only (bool, optional): If True, order only reduces position. Defaults to False.
            close (bool, optional): For MARKET orders; If True, closes the position.
            stop_price (float, optional): Stop trigger price.
            stop_type (int, optional): Stop type, either 0 (take-profit) or 1 (stop-loss), requires non-zero stopPrice.
            expires_at (int, optional): Expiry timestamp for GTD.
            group_id (str, optional): Group Id (UUID) for linking orders together in OCO/OTO relationships.
            group_contingency_type (int, optional): Contingency type for order groups: 0=OTO (Order-Triggers-Order), 1=OCO (One-Cancels-Other).
            sign (bool, optional): If True, sign the payload immediately. Defaults to True.
            dry_run (bool, optional): If True, validate without execution. Defaults to False.
            submit (bool, optional): If True, submit the order. Defaults to True.

        Returns:
            Union[SubmitOrderCreatedDto, DryRunOrderCreatedDto, SubmitOrderDto]: Created order response, dry-run validation result, or prepared order payload.

        Raises:
            ValueError: If neither product_id nor ticker is provided or if order type is invalid.
        """
        if sender is None and self.chain:
            sender = self.chain.address
        if subaccount is None:
            subaccounts = await self.subaccounts()
            if not subaccounts:
                raise ValueError(
                    "No subaccounts found for this account. Please create a subaccount first."
                )
            # Log at debug level to aid troubleshooting without spamming INFO logs
            self.logger.debug(
                f"First subaccount name: '{subaccounts[0].name}', id: '{subaccounts[0].id}'"
            )
            self.logger.debug(f"All subaccount names: {[s.name for s in subaccounts]}")
            # Try the hex name directly first
            subaccount = subaccounts[0].name

        if product_id is not None:
            products_by_id = await self.products_by_id()
            onchain_id = products_by_id[product_id].onchain_id
        elif ticker is not None:
            products_by_ticker = await self.products_by_ticker()
            onchain_id = products_by_ticker[ticker].onchain_id
        else:
            raise ValueError("Either product_id or ticker must be provided")

        order_params = {
            "sender": sender,
            "subaccount": subaccount,
            "side": side,
            "quantity": quantity,
            "onchain_id": onchain_id,
            "order_type": order_type,
            "client_order_id": client_order_id,
            "reduce_only": reduce_only,
            "close": close,
            "stop_price": stop_price,
            "stop_type": stop_type,
            "group_id": group_id,
            "group_contingency_type": group_contingency_type,
        }

        if order_type == "LIMIT":
            order_params.update(
                {
                    "price": price,
                    "time_in_force": time_in_force or self.default_time_in_force,
                    "post_only": post_only or self.default_post_only,
                    "expires_at": expires_at,
                }
            )
        elif order_type != "MARKET":
            raise ValueError("Invalid order type")

        order = await self.prepare_order(**order_params, include_signature=sign)
        if dry_run:
            return await self.dry_run_order(order)
        elif submit:
            return await self.submit_order(order)
        else:
            return order

    async def cancel_orders(
        self,
        order_ids: List[str],
        sender: str,
        subaccount: str,
        client_order_ids: List[str] = [],
        sign: bool = True,
        submit: bool = True,
        **kwargs,
    ) -> Union[List[CancelOrderResultDto], CancelOrderDto]:
        """Prepares and optionally submits a request to cancel multiple orders.

        Args:
            order_ids (List[str]): Order UUIDs to cancel. Required.
            sender (str): Address initiating the cancellation. Required.
            subaccount (str): Hex-encoded subaccount name. Required.
            client_order_ids (List[str], optional): Client-generated IDs to cancel. Defaults to empty list.
            sign (bool, optional): If True, sign the payload immediately. Defaults to True.
            submit (bool, optional): If True, submit the request to the API. Defaults to True.

        Other Parameters:
            **kwargs: Additional request parameters accepted by the API.

        Returns:
            Union[List[CancelOrderResultDto], CancelOrderDto]: Cancellation results per order id or prepared cancel payload.

        Raises:
            ValueError: If no order IDs or client order IDs provided for cancellation.
        """
        if len(order_ids) == 0 and len(client_order_ids) == 0:
            raise ValueError(
                "No order IDs or client order IDs provided for cancellation"
            )
        try:
            prepared_cancel = await self.prepare_cancel_order(
                order_ids=order_ids,
                client_order_ids=client_order_ids,
                sender=sender,
                subaccount=subaccount,
                include_signature=sign,
                **kwargs,
            )
        except ValueError as e:
            self.logger.warning(f"Could not prepare/sign order cancellation: {e}")
            raise

        if not submit:
            return prepared_cancel

        result = await self.cancel_order(
            prepared_cancel,
            **kwargs,
        )
        return result

    async def cancel_all_orders(
        self,
        subaccount_id: str,
        product_ids: Optional[List[str]] = None,
        **kwargs,
    ) -> List[CancelOrderResultDto]:
        """Cancel all orders for a given subaccount.

        Args:
            subaccount_id (str): UUID of the subaccount. Required.
            product_ids (List[str], optional): Filter cancellation by product IDs.

        Other Parameters:
            **kwargs: Additional request parameters accepted by the API.

        Returns:
            List[CancelOrderResultDto]: Cancellation results per order id.

        Raises:
            ValueError: If no orders found to cancel or cancellation fails.
        """
        subaccount = await self.get_subaccount(id=subaccount_id)
        query_params = {
            "subaccount_id": subaccount_id,
            "statuses": ["FILLED_PARTIAL", "NEW", "PENDING"],
            **kwargs,
        }
        if product_ids:
            query_params["product_ids"] = product_ids

        orders = await self._get_pages(
            endpoint="order",
            request_model=self._models.V1OrderGetParametersQuery,
            response_model=self._models.PageOfOrderDtos,
            paginate=True,
            **query_params,
        )
        order_ids = [order.id for order in orders]

        if len(order_ids) == 0:
            raise ValueError("No order IDs provided for cancellation")
        cancel_results = await self.cancel_orders(
            order_ids=order_ids,
            sender=subaccount.account,
            subaccount=subaccount.name,
            sign=True,
            submit=True,
        )
        if not isinstance(cancel_results, list):
            raise ValueError("Failed to cancel orders")
        return cancel_results

    async def replace_order(
        self,
        order: Optional[OrderDto] = None,
        order_id: Optional[str] = None,
        quantity: Optional[float] = None,
        price: Optional[float] = None,
        time_in_force: Optional[str] = None,
        post_only: Optional[bool] = None,
        reduce_only: Optional[bool] = False,
    ) -> Tuple[SubmitOrderCreatedDto, bool]:
        """Replace an existing order with new parameters.

        Args:
            order (OrderDto, optional): Existing order object to replace.
            order_id (str, optional): UUID of the order to replace.
            quantity (float, optional): New order size.
            price (float, optional): New limit price.
            time_in_force (str, optional): New time in force.
            post_only (bool, optional): New post-only flag.
            reduce_only (bool, optional): New reduce-only flag. Defaults to False.

        Returns:
            Tuple[SubmitOrderCreatedDto, bool]: Created order response and success flag.

        Raises:
            ValueError: If neither order nor order_id is provided, or both are provided.
        """
        if order is None and order_id is None:
            raise ValueError("Either order or order_id must be provided")
        elif order is not None and order_id is not None:
            raise ValueError("Only one of order or order_id must be provided")
        elif order is not None:
            old_order = order
        elif order_id is not None:
            old_order = await self.get_order(id=order_id)
        subaccount = await self.get_subaccount(id=old_order.subaccount_id)

        quantity = float(old_order.quantity) if quantity is None else quantity
        price = float(old_order.price) if price is None else price
        time_in_force = (
            old_order.time_in_force.value
            if time_in_force is None and old_order.time_in_force
            else time_in_force
        )
        post_only = old_order.post_only if post_only is None else post_only
        reduce_only = old_order.reduce_only if reduce_only is None else reduce_only

        cancel_result = await self.cancel_orders(
            order_ids=[old_order.id],
            sender=old_order.sender,
            subaccount=subaccount.name,
            sign=True,
            submit=True,
        )
        if not isinstance(cancel_result, list) or len(cancel_result) != 1:
            raise ValueError("Failed to cancel order")
        canceled_order = cancel_result[0]

        if not canceled_order.result.value == "Ok":
            raise ValueError(
                f"Failed to cancel order {order_id}: {canceled_order.result.value}"
            )

        new_order = await self.create_order(
            order_type=old_order.type.value,
            quantity=quantity,
            side=old_order.side.value,
            price=price,
            product_id=old_order.product_id,
            sender=old_order.sender,
            subaccount=subaccount.name,
            time_in_force=time_in_force or self.default_time_in_force,
            post_only=post_only or self.default_post_only,
            reduce_only=reduce_only,
            dry_run=False,
        )
        return self._models.SubmitOrderCreatedDto.model_validate(
            new_order
        ), canceled_order.result.value == "Ok"

cancel_all_orders(subaccount_id, product_ids=None, **kwargs) async

Cancel all orders for a given subaccount.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required
product_ids List[str]

Filter cancellation by product IDs.

None

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API.

Returns:

Type Description
List[CancelOrderResultDto]

List[CancelOrderResultDto]: Cancellation results per order id.

Raises:

Type Description
ValueError

If no orders found to cancel or cancellation fails.

Source code in ethereal/async_rest_client.py
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
async def cancel_all_orders(
    self,
    subaccount_id: str,
    product_ids: Optional[List[str]] = None,
    **kwargs,
) -> List[CancelOrderResultDto]:
    """Cancel all orders for a given subaccount.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.
        product_ids (List[str], optional): Filter cancellation by product IDs.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API.

    Returns:
        List[CancelOrderResultDto]: Cancellation results per order id.

    Raises:
        ValueError: If no orders found to cancel or cancellation fails.
    """
    subaccount = await self.get_subaccount(id=subaccount_id)
    query_params = {
        "subaccount_id": subaccount_id,
        "statuses": ["FILLED_PARTIAL", "NEW", "PENDING"],
        **kwargs,
    }
    if product_ids:
        query_params["product_ids"] = product_ids

    orders = await self._get_pages(
        endpoint="order",
        request_model=self._models.V1OrderGetParametersQuery,
        response_model=self._models.PageOfOrderDtos,
        paginate=True,
        **query_params,
    )
    order_ids = [order.id for order in orders]

    if len(order_ids) == 0:
        raise ValueError("No order IDs provided for cancellation")
    cancel_results = await self.cancel_orders(
        order_ids=order_ids,
        sender=subaccount.account,
        subaccount=subaccount.name,
        sign=True,
        submit=True,
    )
    if not isinstance(cancel_results, list):
        raise ValueError("Failed to cancel orders")
    return cancel_results

cancel_orders(order_ids, sender, subaccount, client_order_ids=[], sign=True, submit=True, **kwargs) async

Prepares and optionally submits a request to cancel multiple orders.

Parameters:

Name Type Description Default
order_ids List[str]

Order UUIDs to cancel. Required.

required
sender str

Address initiating the cancellation. Required.

required
subaccount str

Hex-encoded subaccount name. Required.

required
client_order_ids List[str]

Client-generated IDs to cancel. Defaults to empty list.

[]
sign bool

If True, sign the payload immediately. Defaults to True.

True
submit bool

If True, submit the request to the API. Defaults to True.

True

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API.

Returns:

Type Description
Union[List[CancelOrderResultDto], CancelOrderDto]

Union[List[CancelOrderResultDto], CancelOrderDto]: Cancellation results per order id or prepared cancel payload.

Raises:

Type Description
ValueError

If no order IDs or client order IDs provided for cancellation.

Source code in ethereal/async_rest_client.py
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
async def cancel_orders(
    self,
    order_ids: List[str],
    sender: str,
    subaccount: str,
    client_order_ids: List[str] = [],
    sign: bool = True,
    submit: bool = True,
    **kwargs,
) -> Union[List[CancelOrderResultDto], CancelOrderDto]:
    """Prepares and optionally submits a request to cancel multiple orders.

    Args:
        order_ids (List[str]): Order UUIDs to cancel. Required.
        sender (str): Address initiating the cancellation. Required.
        subaccount (str): Hex-encoded subaccount name. Required.
        client_order_ids (List[str], optional): Client-generated IDs to cancel. Defaults to empty list.
        sign (bool, optional): If True, sign the payload immediately. Defaults to True.
        submit (bool, optional): If True, submit the request to the API. Defaults to True.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API.

    Returns:
        Union[List[CancelOrderResultDto], CancelOrderDto]: Cancellation results per order id or prepared cancel payload.

    Raises:
        ValueError: If no order IDs or client order IDs provided for cancellation.
    """
    if len(order_ids) == 0 and len(client_order_ids) == 0:
        raise ValueError(
            "No order IDs or client order IDs provided for cancellation"
        )
    try:
        prepared_cancel = await self.prepare_cancel_order(
            order_ids=order_ids,
            client_order_ids=client_order_ids,
            sender=sender,
            subaccount=subaccount,
            include_signature=sign,
            **kwargs,
        )
    except ValueError as e:
        self.logger.warning(f"Could not prepare/sign order cancellation: {e}")
        raise

    if not submit:
        return prepared_cancel

    result = await self.cancel_order(
        prepared_cancel,
        **kwargs,
    )
    return result

create(config={}) async classmethod

Factory method to create and asynchronously initialize the client.

Parameters:

Name Type Description Default
config Union[Dict[str, Any], RESTConfig]

Configuration dictionary or RESTConfig object. Optional fields include: private_key (str, optional): The private key. base_url (str, optional): Base URL for REST requests. Defaults to mainnet. timeout (int, optional): Timeout in seconds for REST requests. verbose (bool, optional): Enables debug logging. Defaults to False. rate_limit_headers (bool, optional): Enables rate limit headers. Defaults to False. chain_config (ChainConfig, optional): Chain configuration for signing transactions.

{}

Returns:

Name Type Description
AsyncRESTClient AsyncRESTClient

Fully initialized async client instance.

Source code in ethereal/async_rest_client.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
@classmethod
async def create(
    cls, config: Union[Dict[str, Any], RESTConfig] = {}
) -> AsyncRESTClient:
    """Factory method to create and asynchronously initialize the client.

    Args:
        config (Union[Dict[str, Any], RESTConfig], optional): Configuration dictionary or RESTConfig object. Optional fields include:
            private_key (str, optional): The private key.
            base_url (str, optional): Base URL for REST requests. Defaults to mainnet.
            timeout (int, optional): Timeout in seconds for REST requests.
            verbose (bool, optional): Enables debug logging. Defaults to False.
            rate_limit_headers (bool, optional): Enables rate limit headers. Defaults to False.
            chain_config (ChainConfig, optional): Chain configuration for signing transactions.

    Returns:
        AsyncRESTClient: Fully initialized async client instance.
    """
    client = cls(config)
    await client._async_init()
    return client

create_order(order_type, quantity, side, price=None, ticker=None, product_id=None, client_order_id=None, sender=None, subaccount=None, time_in_force=None, post_only=None, reduce_only=False, close=None, stop_price=None, stop_type=None, expires_at=None, group_id=None, group_contingency_type=None, sign=True, dry_run=False, submit=True) async

Create and submit an order.

Parameters:

Name Type Description Default
order_type str

'LIMIT' or 'MARKET'. Required.

required
quantity float

Order size. Required.

required
side int

0 for buy, 1 for sell. Required.

required
price float

Limit price for LIMIT orders.

None
ticker str

Ticker of the product.

None
product_id str

UUID of the product.

None
client_order_id str

Subaccount-scoped client-generated id (UUID or <=32 alphanumeric).

None
sender str

Address placing the order. Defaults to chain address.

None
subaccount str

Hex-encoded subaccount name. Defaults to first subaccount.

None
time_in_force str

For LIMIT orders (e.g., 'GTC', 'GTD'). Defaults to 'GTC'.

None
post_only bool

For LIMIT orders; rejects if crossing. Defaults to False.

None
reduce_only bool

If True, order only reduces position. Defaults to False.

False
close bool

For MARKET orders; If True, closes the position.

None
stop_price float

Stop trigger price.

None
stop_type int

Stop type, either 0 (take-profit) or 1 (stop-loss), requires non-zero stopPrice.

None
expires_at int

Expiry timestamp for GTD.

None
group_id str

Group Id (UUID) for linking orders together in OCO/OTO relationships.

None
group_contingency_type int

Contingency type for order groups: 0=OTO (Order-Triggers-Order), 1=OCO (One-Cancels-Other).

None
sign bool

If True, sign the payload immediately. Defaults to True.

True
dry_run bool

If True, validate without execution. Defaults to False.

False
submit bool

If True, submit the order. Defaults to True.

True

Returns:

Type Description
Union[SubmitOrderCreatedDto, DryRunOrderCreatedDto, SubmitOrderDto]

Union[SubmitOrderCreatedDto, DryRunOrderCreatedDto, SubmitOrderDto]: Created order response, dry-run validation result, or prepared order payload.

Raises:

Type Description
ValueError

If neither product_id nor ticker is provided or if order type is invalid.

Source code in ethereal/async_rest_client.py
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
async def create_order(
    self,
    order_type: str,
    quantity: float,
    side: int,
    price: Optional[float] = None,
    ticker: Optional[str] = None,
    product_id: Optional[str] = None,
    client_order_id: Optional[str] = None,
    sender: Optional[str] = None,
    subaccount: Optional[str] = None,
    time_in_force: Optional[str] = None,
    post_only: Optional[bool] = None,
    reduce_only: Optional[bool] = False,
    close: Optional[bool] = None,
    stop_price: Optional[float] = None,
    stop_type: Optional[int] = None,
    expires_at: Optional[int] = None,
    group_id: Optional[str] = None,
    group_contingency_type: Optional[int] = None,
    sign: bool = True,
    dry_run: bool = False,
    submit: bool = True,
) -> Union[SubmitOrderCreatedDto, DryRunOrderCreatedDto, SubmitOrderDto]:
    """Create and submit an order.

    Args:
        order_type (str): 'LIMIT' or 'MARKET'. Required.
        quantity (float): Order size. Required.
        side (int): 0 for buy, 1 for sell. Required.
        price (float, optional): Limit price for LIMIT orders.
        ticker (str, optional): Ticker of the product.
        product_id (str, optional): UUID of the product.
        client_order_id (str, optional): Subaccount-scoped client-generated id (UUID or <=32 alphanumeric).
        sender (str, optional): Address placing the order. Defaults to chain address.
        subaccount (str, optional): Hex-encoded subaccount name. Defaults to first subaccount.
        time_in_force (str, optional): For LIMIT orders (e.g., 'GTC', 'GTD'). Defaults to 'GTC'.
        post_only (bool, optional): For LIMIT orders; rejects if crossing. Defaults to False.
        reduce_only (bool, optional): If True, order only reduces position. Defaults to False.
        close (bool, optional): For MARKET orders; If True, closes the position.
        stop_price (float, optional): Stop trigger price.
        stop_type (int, optional): Stop type, either 0 (take-profit) or 1 (stop-loss), requires non-zero stopPrice.
        expires_at (int, optional): Expiry timestamp for GTD.
        group_id (str, optional): Group Id (UUID) for linking orders together in OCO/OTO relationships.
        group_contingency_type (int, optional): Contingency type for order groups: 0=OTO (Order-Triggers-Order), 1=OCO (One-Cancels-Other).
        sign (bool, optional): If True, sign the payload immediately. Defaults to True.
        dry_run (bool, optional): If True, validate without execution. Defaults to False.
        submit (bool, optional): If True, submit the order. Defaults to True.

    Returns:
        Union[SubmitOrderCreatedDto, DryRunOrderCreatedDto, SubmitOrderDto]: Created order response, dry-run validation result, or prepared order payload.

    Raises:
        ValueError: If neither product_id nor ticker is provided or if order type is invalid.
    """
    if sender is None and self.chain:
        sender = self.chain.address
    if subaccount is None:
        subaccounts = await self.subaccounts()
        if not subaccounts:
            raise ValueError(
                "No subaccounts found for this account. Please create a subaccount first."
            )
        # Log at debug level to aid troubleshooting without spamming INFO logs
        self.logger.debug(
            f"First subaccount name: '{subaccounts[0].name}', id: '{subaccounts[0].id}'"
        )
        self.logger.debug(f"All subaccount names: {[s.name for s in subaccounts]}")
        # Try the hex name directly first
        subaccount = subaccounts[0].name

    if product_id is not None:
        products_by_id = await self.products_by_id()
        onchain_id = products_by_id[product_id].onchain_id
    elif ticker is not None:
        products_by_ticker = await self.products_by_ticker()
        onchain_id = products_by_ticker[ticker].onchain_id
    else:
        raise ValueError("Either product_id or ticker must be provided")

    order_params = {
        "sender": sender,
        "subaccount": subaccount,
        "side": side,
        "quantity": quantity,
        "onchain_id": onchain_id,
        "order_type": order_type,
        "client_order_id": client_order_id,
        "reduce_only": reduce_only,
        "close": close,
        "stop_price": stop_price,
        "stop_type": stop_type,
        "group_id": group_id,
        "group_contingency_type": group_contingency_type,
    }

    if order_type == "LIMIT":
        order_params.update(
            {
                "price": price,
                "time_in_force": time_in_force or self.default_time_in_force,
                "post_only": post_only or self.default_post_only,
                "expires_at": expires_at,
            }
        )
    elif order_type != "MARKET":
        raise ValueError("Invalid order type")

    order = await self.prepare_order(**order_params, include_signature=sign)
    if dry_run:
        return await self.dry_run_order(order)
    elif submit:
        return await self.submit_order(order)
    else:
        return order

get_maintenance_margin(subaccount_id, positions=None, products=None, product_ids=None) async

Calculate the an account's maintenance margin for specified positions or products.

Parameters:

Name Type Description Default
subaccount_id str

Fetch positions for this subaccount when positions is not supplied.

required
positions Union[List[PositionDto], List[Dict[str, Any]]]

Pre-fetched positions to use directly.

None
products Union[List[ProductDto], List[Dict[str, Any]]]

Pre-fetched products used to filter the calculation.

None
product_ids List[str]

Filters the calculation to these product IDs.

None

Returns:

Name Type Description
Decimal Decimal

Total maintenance margin for the filtered positions.

Raises:

Type Description
ValueError

If neither positions nor subaccount context is provided, or if any referenced product cannot be resolved.

Source code in ethereal/async_rest_client.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
async def get_maintenance_margin(
    self,
    subaccount_id: str,
    positions: Optional[Union[List[PositionDto], List[Dict[str, Any]]]] = None,
    products: Optional[Union[List[ProductDto], List[Dict[str, Any]]]] = None,
    product_ids: Optional[List[str]] = None,
) -> Decimal:
    """Calculate the an account's maintenance margin for specified positions or products.

    Args:
        subaccount_id (str): Fetch positions for this subaccount when ``positions`` is not supplied.
        positions (Union[List[PositionDto], List[Dict[str, Any]]], optional): Pre-fetched positions to use directly.
        products (Union[List[ProductDto], List[Dict[str, Any]]], optional): Pre-fetched products used to filter
            the calculation.
        product_ids (List[str], optional): Filters the calculation to these product IDs.

    Returns:
        Decimal: Total maintenance margin for the filtered positions.

    Raises:
        ValueError: If neither positions nor subaccount context is provided, or if any
            referenced product cannot be resolved.
    """

    class PartialPosition(BaseModel):
        product_id: str
        cost: str

    class PartialProduct(BaseModel):
        id: str
        max_leverage: float
        taker_fee: str

    if positions is None:
        positions = await self.list_positions(subaccount_id=subaccount_id)

    if products and product_ids:
        raise ValueError("Can only specify one of products and product_ids")

    if products is not None:
        valid_products = [
            PartialProduct(**p if isinstance(p, dict) else p.model_dump())
            for p in products
        ]
        products_by_id = {product.id: product for product in valid_products}
    else:
        raw_products_by_id = await self.products_by_id()
        products_by_id = {
            k: PartialProduct(**v if isinstance(v, dict) else v.model_dump())
            for k, v in raw_products_by_id.items()
        }

    product_ids_set: Optional[Set[str]] = None
    if product_ids is not None:
        product_ids_set = set(product_ids)
        missing_products = product_ids_set - set(products_by_id.keys())
        if missing_products:
            missing = ", ".join(sorted(missing_products))
            raise ValueError(f"Products not found for calculation: {missing}")

    valid_positions = [
        PartialPosition(**p if isinstance(p, dict) else p.model_dump())
        for p in positions
    ]
    positions_filtered = [
        position
        for position in valid_positions
        if product_ids_set is None or position.product_id in product_ids_set
    ]

    if not positions_filtered:
        return Decimal("0")

    total_mm = Decimal("0")
    for position in positions_filtered:
        product = products_by_id.get(position.product_id)
        if product is None:
            raise ValueError(
                f"Product '{position.product_id}' not found for position '{position.product_id}'"
            )

        notional = abs(Decimal(position.cost))
        max_leverage = Decimal(str(product.max_leverage))
        taker_fee_rate = Decimal(product.taker_fee)

        mmr = Decimal("1") / (max_leverage * Decimal("2"))
        total_mm += notional * mmr
        total_mm += notional * taker_fee_rate

    return total_mm

get_tokens() async

Return the latest list of tokens (no caching).

Source code in ethereal/async_rest_client.py
455
456
457
async def get_tokens(self) -> List[TokenDto]:
    """Return the latest list of tokens (no caching)."""
    return await self.list_tokens()

products(refresh=False) async

Get the list of products.

Parameters:

Name Type Description Default
refresh bool

If True, bypass cache and fetch fresh data. Defaults to False.

False

Returns:

Type Description
List[ProductDto]

List[ProductDto]: List of product objects.

Source code in ethereal/async_rest_client.py
308
309
310
311
312
313
314
315
316
317
318
319
async def products(self, refresh: bool = False) -> List[ProductDto]:
    """Get the list of products.

    Args:
        refresh (bool, optional): If True, bypass cache and fetch fresh data. Defaults to False.

    Returns:
        List[ProductDto]: List of product objects.
    """
    if refresh or self._products is None:
        self._products = await self.list_products()
    return self._products

products_by_id(refresh=False) async

Get the products indexed by ID.

Parameters:

Name Type Description Default
refresh bool

If True, bypass cache and fetch fresh data. Defaults to False.

False

Returns:

Type Description
Dict[str, ProductDto]

Dict[str, ProductDto]: Dictionary of products keyed by ID.

Source code in ethereal/async_rest_client.py
350
351
352
353
354
355
356
357
358
359
360
361
362
async def products_by_id(self, refresh: bool = False) -> Dict[str, ProductDto]:
    """Get the products indexed by ID.

    Args:
        refresh (bool, optional): If True, bypass cache and fetch fresh data. Defaults to False.

    Returns:
        Dict[str, ProductDto]: Dictionary of products keyed by ID.
    """
    if refresh or self._products_by_id is None:
        products = await self.products(refresh=refresh)
        self._products_by_id = {p.id: p for p in products}
    return self._products_by_id

products_by_ticker(refresh=False) async

Get the products indexed by ticker.

Parameters:

Name Type Description Default
refresh bool

If True, bypass cache and fetch fresh data. Defaults to False.

False

Returns:

Type Description
Dict[str, ProductDto]

Dict[str, ProductDto]: Dictionary of products keyed by ticker.

Source code in ethereal/async_rest_client.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
async def products_by_ticker(self, refresh: bool = False) -> Dict[str, ProductDto]:
    """Get the products indexed by ticker.

    Args:
        refresh (bool, optional): If True, bypass cache and fetch fresh data. Defaults to False.

    Returns:
        Dict[str, ProductDto]: Dictionary of products keyed by ticker.
    """
    if refresh or self._products_by_ticker is None:
        products = await self.products(refresh=refresh)
        self._products_by_ticker = {
            p.ticker: p for p in products if getattr(p, "ticker", None)
        }
    return self._products_by_ticker

replace_order(order=None, order_id=None, quantity=None, price=None, time_in_force=None, post_only=None, reduce_only=False) async

Replace an existing order with new parameters.

Parameters:

Name Type Description Default
order OrderDto

Existing order object to replace.

None
order_id str

UUID of the order to replace.

None
quantity float

New order size.

None
price float

New limit price.

None
time_in_force str

New time in force.

None
post_only bool

New post-only flag.

None
reduce_only bool

New reduce-only flag. Defaults to False.

False

Returns:

Type Description
Tuple[SubmitOrderCreatedDto, bool]

Tuple[SubmitOrderCreatedDto, bool]: Created order response and success flag.

Raises:

Type Description
ValueError

If neither order nor order_id is provided, or both are provided.

Source code in ethereal/async_rest_client.py
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
async def replace_order(
    self,
    order: Optional[OrderDto] = None,
    order_id: Optional[str] = None,
    quantity: Optional[float] = None,
    price: Optional[float] = None,
    time_in_force: Optional[str] = None,
    post_only: Optional[bool] = None,
    reduce_only: Optional[bool] = False,
) -> Tuple[SubmitOrderCreatedDto, bool]:
    """Replace an existing order with new parameters.

    Args:
        order (OrderDto, optional): Existing order object to replace.
        order_id (str, optional): UUID of the order to replace.
        quantity (float, optional): New order size.
        price (float, optional): New limit price.
        time_in_force (str, optional): New time in force.
        post_only (bool, optional): New post-only flag.
        reduce_only (bool, optional): New reduce-only flag. Defaults to False.

    Returns:
        Tuple[SubmitOrderCreatedDto, bool]: Created order response and success flag.

    Raises:
        ValueError: If neither order nor order_id is provided, or both are provided.
    """
    if order is None and order_id is None:
        raise ValueError("Either order or order_id must be provided")
    elif order is not None and order_id is not None:
        raise ValueError("Only one of order or order_id must be provided")
    elif order is not None:
        old_order = order
    elif order_id is not None:
        old_order = await self.get_order(id=order_id)
    subaccount = await self.get_subaccount(id=old_order.subaccount_id)

    quantity = float(old_order.quantity) if quantity is None else quantity
    price = float(old_order.price) if price is None else price
    time_in_force = (
        old_order.time_in_force.value
        if time_in_force is None and old_order.time_in_force
        else time_in_force
    )
    post_only = old_order.post_only if post_only is None else post_only
    reduce_only = old_order.reduce_only if reduce_only is None else reduce_only

    cancel_result = await self.cancel_orders(
        order_ids=[old_order.id],
        sender=old_order.sender,
        subaccount=subaccount.name,
        sign=True,
        submit=True,
    )
    if not isinstance(cancel_result, list) or len(cancel_result) != 1:
        raise ValueError("Failed to cancel order")
    canceled_order = cancel_result[0]

    if not canceled_order.result.value == "Ok":
        raise ValueError(
            f"Failed to cancel order {order_id}: {canceled_order.result.value}"
        )

    new_order = await self.create_order(
        order_type=old_order.type.value,
        quantity=quantity,
        side=old_order.side.value,
        price=price,
        product_id=old_order.product_id,
        sender=old_order.sender,
        subaccount=subaccount.name,
        time_in_force=time_in_force or self.default_time_in_force,
        post_only=post_only or self.default_post_only,
        reduce_only=reduce_only,
        dry_run=False,
    )
    return self._models.SubmitOrderCreatedDto.model_validate(
        new_order
    ), canceled_order.result.value == "Ok"

subaccounts(refresh=False) async

Get the list of subaccounts.

Parameters:

Name Type Description Default
refresh bool

If True, bypass cache and fetch fresh data. Defaults to False.

False

Returns:

Type Description
List[SubaccountDto]

List[SubaccountDto]: List of subaccount objects for the connected wallet address.

Raises:

Type Description
ValueError

If no chain client is configured or address is unavailable.

Source code in ethereal/async_rest_client.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
async def subaccounts(self, refresh: bool = False) -> List[SubaccountDto]:
    """Get the list of subaccounts.

    Args:
        refresh (bool, optional): If True, bypass cache and fetch fresh data. Defaults to False.

    Returns:
        List[SubaccountDto]: List of subaccount objects for the connected wallet address.

    Raises:
        ValueError: If no chain client is configured or address is unavailable.
    """
    if not self.chain or not getattr(self.chain, "address", None):
        raise ValueError("Chain address is required to list subaccounts")
    if refresh or self._subaccounts is None:
        self._subaccounts = await self.list_subaccounts(
            sender=self.chain.address, order_by="createdAt", order="asc"
        )
    return self._subaccounts

tokens(refresh=False) async

Get the list of tokens.

Parameters:

Name Type Description Default
refresh bool

If True, bypass cache and fetch fresh data. Defaults to False.

False

Returns:

Type Description
List[TokenDto]

List[TokenDto]: List of token objects.

Source code in ethereal/async_rest_client.py
321
322
323
324
325
326
327
328
329
330
331
332
async def tokens(self, refresh: bool = False) -> List[TokenDto]:
    """Get the list of tokens.

    Args:
        refresh (bool, optional): If True, bypass cache and fetch fresh data. Defaults to False.

    Returns:
        List[TokenDto]: List of token objects.
    """
    if refresh or self._tokens is None:
        self._tokens = await self.list_tokens()
    return self._tokens

RESTClient

Synchronous wrapper around AsyncRESTClient for backward compatibility. Use AsyncRESTClient for new applications.

from ethereal import RESTClient

client = RESTClient({
    "base_url": "https://api.etherealtest.net",
    "chain_config": {
        "rpc_url": "https://rpc.etherealtest.net",
        "private_key": "your_private_key",  # optional
    }
})

# Property access works for convenience
products = client.products
subaccounts = client.subaccounts

# Remember to close when done
client.close()

ethereal.rest.rpc

get_rpc_config(self, **kwargs) async

Gets RPC configuration for EIP-712 signing and contract info.

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
RpcConfigDto RpcConfigDto

Domain and signature type definitions for signing.

Source code in ethereal/rest/rpc.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
async def get_rpc_config(self, **kwargs) -> RpcConfigDto:
    """Gets RPC configuration for EIP-712 signing and contract info.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        RpcConfigDto: Domain and signature type definitions for signing.
    """
    endpoint = f"{API_PREFIX}/rpc/config"

    res = await self.get(endpoint, **kwargs)
    domain = self._models.DomainTypeDto(**res["domain"])
    signature_types = self._models.SignatureTypesDto(**res["signatureTypes"])
    return self._models.RpcConfigDto(domain=domain, signatureTypes=signature_types)

ethereal.rest.subaccount

get_subaccount(self, id, **kwargs) async

Gets a specific subaccount by ID.

Parameters:

Name Type Description Default
id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
SubaccountDto SubaccountDto

Subaccount details.

Source code in ethereal/rest/subaccount.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
async def get_subaccount(self, id: str, **kwargs) -> SubaccountDto:
    """Gets a specific subaccount by ID.

    Args:
        id (str): UUID of the subaccount. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        SubaccountDto: Subaccount details.
    """
    endpoint = f"{API_PREFIX}/subaccount/{id}"
    res = await self.get(endpoint, **kwargs)
    return self._models.SubaccountDto(**res)

get_subaccount_balance_history(self, **kwargs) async

Gets historical subaccount balances from the archive API.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
start_time float

Range start time in milliseconds since Unix epoch. Required.

end_time float

Range end time in milliseconds since Unix epoch. Optional.

resolution str

Data resolution (e.g., 'hour1', 'day1'). Required.

order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (defaults to 'time'). Optional.

**kwargs

Additional query parameters supported by the API.

Returns:

Type Description
List[BalanceHistoryDto]

List[BalanceHistoryDto]: Historical balance records ordered per request parameters.

Source code in ethereal/rest/subaccount.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
async def get_subaccount_balance_history(self, **kwargs) -> List["BalanceHistoryDto"]:
    """Gets historical subaccount balances from the archive API.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.

    Other Parameters:
        start_time (float): Range start time in milliseconds since Unix epoch. Required.
        end_time (float, optional): Range end time in milliseconds since Unix epoch. Optional.
        resolution (str): Data resolution (e.g., 'hour1', 'day1'). Required.
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (defaults to 'time'). Optional.
        **kwargs: Additional query parameters supported by the API.

    Returns:
        List[BalanceHistoryDto]: Historical balance records ordered per request parameters.
    """

    res = await self.get_validated(
        url_path=f"{API_PREFIX}/subaccount/balance",
        request_model=self._models.ArchiveV1SubaccountBalanceGetParametersQuery,
        response_model=self._models.PageOfBalanceHistoryDtos,
        base_url_override=self._archive_base_url,
        **kwargs,
    )

    data = [
        self._models.BalanceHistoryDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

get_subaccount_balances(self, **kwargs) async

Gets token balances for a subaccount.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[SubaccountBalanceDto]

List[SubaccountBalanceDto]: Balances for the subaccount.

Source code in ethereal/rest/subaccount.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
async def get_subaccount_balances(self, **kwargs) -> List[SubaccountBalanceDto]:
    """Gets token balances for a subaccount.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[SubaccountBalanceDto]: Balances for the subaccount.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/subaccount/balance",
        request_model=self._models.V1SubaccountBalanceGetParametersQuery,
        response_model=self._models.PageOfSubaccountBalanceDtos,
        **kwargs,
    )
    data = [
        self._models.SubaccountBalanceDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

get_subaccount_unrealized_pnl_history(self, **kwargs) async

Gets historical unrealized PnL for a subaccount from the archive API.

Source code in ethereal/rest/subaccount.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
async def get_subaccount_unrealized_pnl_history(
    self, **kwargs
) -> List["UnrealizedPnlHistoryDto"]:
    """Gets historical unrealized PnL for a subaccount from the archive API."""

    res = await self.get_validated(
        url_path=f"{API_PREFIX}/subaccount/unrealized-pnl",
        request_model=self._models.ArchiveV1SubaccountUnrealizedPnlGetParametersQuery,
        response_model=self._models.PageOfUnrealizedPnlHistoryDtos,
        base_url_override=self._archive_base_url,
        **kwargs,
    )

    data = [
        self._models.UnrealizedPnlHistoryDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

get_subaccount_volume_history(self, **kwargs) async

Gets historical trading volume for a subaccount from the archive API.

Source code in ethereal/rest/subaccount.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
async def get_subaccount_volume_history(self, **kwargs) -> List["VolumeHistoryDto"]:
    """Gets historical trading volume for a subaccount from the archive API."""

    res = await self.get_validated(
        url_path=f"{API_PREFIX}/subaccount/volume",
        request_model=self._models.ArchiveV1SubaccountVolumeGetParametersQuery,
        response_model=self._models.PageOfVolumeHistoryDtos,
        base_url_override=self._archive_base_url,
        **kwargs,
    )

    data = [
        self._models.VolumeHistoryDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

list_subaccounts(self, **kwargs) async

Lists subaccounts for a given sender (address).

Parameters:

Name Type Description Default
sender str

Wallet address to query subaccounts for. Required.

required

Other Parameters:

Name Type Description
order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (e.g., 'createdAt'). Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[SubaccountDto]

List[SubaccountDto]: Subaccount records for the sender.

Source code in ethereal/rest/subaccount.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
async def list_subaccounts(self, **kwargs) -> List[SubaccountDto]:
    """Lists subaccounts for a given sender (address).

    Args:
        sender (str): Wallet address to query subaccounts for. Required.

    Other Parameters:
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (e.g., 'createdAt'). Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[SubaccountDto]: Subaccount records for the sender.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/subaccount/",
        request_model=self._models.V1SubaccountGetParametersQuery,
        response_model=self._models.PageOfSubaccountDtos,
        **kwargs,
    )
    data = [
        self._models.SubaccountDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

ethereal.rest.product

get_market_liquidity(self, **kwargs) async

Gets market liquidity for a product.

Parameters:

Name Type Description Default
product_id str

Id representing the registered product. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
MarketLiquidityDto MarketLiquidityDto

Top-of-book and depth liquidity metrics.

Source code in ethereal/rest/product.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
async def get_market_liquidity(self, **kwargs) -> MarketLiquidityDto:
    """Gets market liquidity for a product.

    Args:
        product_id (str): Id representing the registered product. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        MarketLiquidityDto: Top-of-book and depth liquidity metrics.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/product/market-liquidity",
        request_model=self._models.V1ProductMarketLiquidityGetParametersQuery,
        response_model=self._models.MarketLiquidityDto,
        **kwargs,
    )
    return res

list_market_prices(self, **kwargs) async

Gets market prices for one or more products.

Parameters:

Name Type Description Default
product_ids List[str]

List of product IDs to query. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[MarketPriceDto]

List[MarketPriceDto]: Best bid/ask prices for the requested products.

Source code in ethereal/rest/product.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
async def list_market_prices(self, **kwargs) -> List[MarketPriceDto]:
    """Gets market prices for one or more products.

    Args:
        product_ids (List[str]): List of product IDs to query. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[MarketPriceDto]: Best bid/ask prices for the requested products.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/product/market-price",
        request_model=self._models.V1ProductMarketPriceGetParametersQuery,
        response_model=self._models.ListOfMarketPriceDtos,
        **kwargs,
    )
    data = [
        self._models.MarketPriceDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

list_products(self, **kwargs) async

Lists all products and their configurations.

Other Parameters:

Name Type Description
order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (e.g., 'createdAt'). Optional.

ticker str

Filter by product ticker (e.g., 'ETHUSD'). Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[ProductDto]

List[ProductDto]: Product configuration records.

Source code in ethereal/rest/product.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
async def list_products(self, **kwargs) -> List[ProductDto]:
    """Lists all products and their configurations.

    Other Parameters:
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (e.g., 'createdAt'). Optional.
        ticker (str, optional): Filter by product ticker (e.g., 'ETHUSD'). Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[ProductDto]: Product configuration records.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/product",
        request_model=self._models.V1ProductGetParametersQuery,
        response_model=self._models.PageOfProductDtos,
        **kwargs,
    )
    data = [
        self._models.ProductDto(**model.model_dump(by_alias=True)) for model in res.data
    ]
    return data

ethereal.rest.position

get_position(self, id, **kwargs) async

Gets a specific position by ID.

Parameters:

Name Type Description Default
id str

UUID of the position. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
PositionDto PositionDto

Position information for the specified ID.

Source code in ethereal/rest/position.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
async def get_position(
    self,
    id: str,
    **kwargs,
) -> PositionDto:
    """Gets a specific position by ID.

    Args:
        id (str): UUID of the position. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        PositionDto: Position information for the specified ID.
    """
    endpoint = f"{API_PREFIX}/position/{id}"
    res = await self.get(endpoint, **kwargs)
    return self._models.PositionDto(**res)

list_positions(self, **kwargs) async

Lists positions for a subaccount.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
product_ids List[str]

UUIDs of products to filter by. Optional.

open bool

Filter for open positions. Optional.

order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by, e.g., 'size', 'createdAt', or 'updatedAt'. Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[PositionDto]

List[PositionDto]: List of position information for the subaccount.

Source code in ethereal/rest/position.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
async def list_positions(
    self,
    **kwargs,
) -> List[PositionDto]:
    """Lists positions for a subaccount.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.

    Other Parameters:
        product_ids (List[str], optional): UUIDs of products to filter by. Optional.
        open (bool, optional): Filter for open positions. Optional.
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by, e.g., 'size', 'createdAt', or 'updatedAt'. Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[PositionDto]: List of position information for the subaccount.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/position",
        request_model=self._models.V1PositionGetParametersQuery,
        response_model=self._models.PageOfPositionDtos,
        **kwargs,
    )
    data = [
        self._models.PositionDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

ethereal.rest.order

cancel_order(self, order_to_cancel, **kwargs) async

Submits a prepared and signed cancel order request.

Parameters:

Name Type Description Default
order_to_cancel CancelOrderDto

Prepared and signed cancel payload. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[CancelOrderResultDto]

List[CancelOrderResultDto]: Cancellation results per order id.

Source code in ethereal/rest/order.py
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
async def cancel_order(
    self,
    order_to_cancel: CancelOrderDto,
    **kwargs,
) -> List[CancelOrderResultDto]:
    """Submits a prepared and signed cancel order request.

    Args:
        order_to_cancel (CancelOrderDto): Prepared and signed cancel payload. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[CancelOrderResultDto]: Cancellation results per order id.
    """
    endpoint = f"{API_PREFIX}/order/cancel"
    res = await self.post(
        endpoint,
        data=order_to_cancel.model_dump(mode="json", by_alias=True, exclude_none=True),
        **kwargs,
    )
    return [
        self._models.CancelOrderResultDto.model_validate(item)
        for item in res.get("data", [])
    ]

dry_run_order(self, order, **kwargs) async

Submits a prepared order for validation without execution.

Parameters:

Name Type Description Default
order SubmitOrderDto

Prepared order payload. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
DryRunOrderCreatedDto DryRunOrderCreatedDto

Dry-run validation result.

Source code in ethereal/rest/order.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
async def dry_run_order(
    self,
    order: SubmitOrderDto,
    **kwargs,
) -> DryRunOrderCreatedDto:
    """Submits a prepared order for validation without execution.

    Args:
        order (SubmitOrderDto): Prepared order payload. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        DryRunOrderCreatedDto: Dry-run validation result.
    """
    submit_payload = self._models.SubmitDryOrderDto.model_validate(
        {"data": order.data.model_dump(mode="json", by_alias=True, exclude_unset=True)}
    )
    endpoint = f"{API_PREFIX}/order/dry-run"
    res = await self.post(
        endpoint,
        data=submit_payload.model_dump(
            mode="json", by_alias=True, exclude_unset=True, exclude_none=True
        ),
        **kwargs,
    )
    return self._models.DryRunOrderCreatedDto.model_validate(res)

get_order(self, id, **kwargs) async

Gets a specific order by ID.

Parameters:

Name Type Description Default
id str

UUID of the order. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
OrderDto OrderDto

Order details.

Source code in ethereal/rest/order.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
async def get_order(self, id: str, **kwargs) -> OrderDto:
    """Gets a specific order by ID.

    Args:
        id (str): UUID of the order. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        OrderDto: Order details.
    """
    endpoint = f"{API_PREFIX}/order/{id}"
    response = await self.get(endpoint, **kwargs)
    return self._models.OrderDto(**response)

list_fills(self, **kwargs) async

Lists order fills for a subaccount.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
product_ids List[str]

Filter by one or more product IDs. Optional.

order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (e.g., 'createdAt'). Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[OrderFillDto]

List[OrderFillDto]: Fill records for the subaccount.

Source code in ethereal/rest/order.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
async def list_fills(self, **kwargs) -> List[OrderFillDto]:
    """Lists order fills for a subaccount.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.

    Other Parameters:
        product_ids (List[str], optional): Filter by one or more product IDs. Optional.
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (e.g., 'createdAt'). Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[OrderFillDto]: Fill records for the subaccount.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/order/fill",
        request_model=self._models.V1OrderFillGetParametersQuery,
        response_model=self._models.PageOfOrderFillDtos,
        **kwargs,
    )
    data = [
        self._models.OrderFillDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

list_orders(self, **kwargs) async

Lists orders for a subaccount.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
product_ids List[str]

Filter by one or more product IDs. Optional.

client_order_id str

Filter by a client-generated order id. Optional.

statuses List[str]

Filter by status values. Optional.

order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (e.g., 'createdAt'). Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[OrderDto]

List[OrderDto]: Order records for the subaccount.

Source code in ethereal/rest/order.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
async def list_orders(self, **kwargs) -> List[OrderDto]:
    """Lists orders for a subaccount.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.

    Other Parameters:
        product_ids (List[str], optional): Filter by one or more product IDs. Optional.
        client_order_id (str, optional): Filter by a client-generated order id. Optional.
        statuses (List[str], optional): Filter by status values. Optional.
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (e.g., 'createdAt'). Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[OrderDto]: Order records for the subaccount.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/order",
        request_model=self._models.V1OrderGetParametersQuery,
        response_model=self._models.PageOfOrderDtos,
        **kwargs,
    )
    data = [
        self._models.OrderDto(**model.model_dump(by_alias=True)) for model in res.data
    ]
    return data

list_trades(self, **kwargs) async

Lists trades for a specific product.

Parameters:

Name Type Description Default
product_id str

Product ID to query trades for. Required.

required

Other Parameters:

Name Type Description
order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (e.g., 'createdAt'). Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[TradeDto]

List[TradeDto]: Trade records.

Source code in ethereal/rest/order.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
async def list_trades(self, **kwargs) -> List[TradeDto]:
    """Lists trades for a specific product.

    Args:
        product_id (str): Product ID to query trades for. Required.

    Other Parameters:
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (e.g., 'createdAt'). Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[TradeDto]: Trade records.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/order/trade",
        request_model=self._models.V1OrderTradeGetParametersQuery,
        response_model=self._models.PageOfTradeDtos,
        **kwargs,
    )
    data = [
        self._models.TradeDto(**model.model_dump(by_alias=True)) for model in res.data
    ]
    return data

prepare_cancel_order(self, sender, subaccount, order_ids=[], client_order_ids=[], include_signature=False, **kwargs) async

Prepares the payload for canceling one or more orders.

Parameters:

Name Type Description Default
sender str

Address initiating the cancellation. Required.

required
subaccount str

Hex-encoded subaccount name. Required.

required
order_ids List[str]

Order UUIDs to cancel. Optional.

[]
client_order_ids List[str]

Client-generated IDs to cancel. Optional.

[]
include_signature bool

If True, sign the payload immediately. Optional.

False

Other Parameters:

Name Type Description
nonce str

Custom nonce for signing.

**kwargs

Additional request parameters accepted by the API.

Returns:

Name Type Description
CancelOrderDto CancelOrderDto

Prepared (and optionally signed) cancel payload.

Source code in ethereal/rest/order.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
async def prepare_cancel_order(
    self,
    sender: str,
    subaccount: str,
    order_ids: List[str] = [],
    client_order_ids: List[str] = [],
    include_signature: bool = False,
    **kwargs,
) -> CancelOrderDto:
    """Prepares the payload for canceling one or more orders.

    Args:
        sender (str): Address initiating the cancellation. Required.
        subaccount (str): Hex-encoded subaccount name. Required.
        order_ids (List[str]): Order UUIDs to cancel. Optional.
        client_order_ids (List[str]): Client-generated IDs to cancel. Optional.
        include_signature (bool): If True, sign the payload immediately. Optional.

    Other Parameters:
        nonce (str, optional): Custom nonce for signing.
        **kwargs: Additional request parameters accepted by the API.

    Returns:
        CancelOrderDto: Prepared (and optionally signed) cancel payload.
    """
    nonce = kwargs.get("nonce", None) or generate_nonce()
    uuid_order_ids = [
        UUID(order_id) if isinstance(order_id, str) else order_id
        for order_id in order_ids
    ]
    data_model = self._models.CancelOrderDtoData(
        sender=sender,
        subaccount=subaccount,
        nonce=nonce,
        orderIds=uuid_order_ids,
        clientOrderIds=client_order_ids,
    )
    result = self._models.CancelOrderDto.model_validate(
        {"data": data_model.model_dump(mode="json", by_alias=True), "signature": ""}
    )
    if include_signature:
        return await self.sign_cancel_order(result)
    return result

prepare_order(self, sender, price=None, quantity=None, side=None, subaccount=None, onchain_id=None, order_type=None, client_order_id=None, time_in_force=None, post_only=False, reduce_only=False, close=None, stop_price=None, stop_type=None, group_id=None, group_contingency_type=None, expires_at=None, include_signature=False, **kwargs) async

Prepares the payload for an order, optionally including a signature.

Parameters:

Name Type Description Default
sender str

Address placing the order. Required.

required
price Union[str, float, Decimal]

Limit price for LIMIT orders.

None
quantity Union[str, float, Decimal]

Order size.

None
side int

0 for buy, 1 for sell.

None
subaccount str

Hex-encoded subaccount name.

None
onchain_id float

Product onchain ID.

None
order_type str

'LIMIT' or 'MARKET'.

None
client_order_id str

Subaccount-scoped client-generated id (UUID or <=32 alphanumeric).

None
time_in_force str

For LIMIT orders (e.g., 'GTC', 'GTD').

None
post_only bool

For LIMIT orders; rejects if crossing.

False
reduce_only bool

If True, order only reduces position.

False
close bool

If True, closes the position.

None
stop_price Union[str, float, Decimal]

Stop trigger price.

None
stop_type int

Stop type enum value.

None
group_id str

Contingency group id.

None
group_contingency_type int

Group contingency type.

None
expires_at int

Expiry timestamp for GTD.

None
include_signature bool

If True, sign the payload immediately.

False

Other Parameters:

Name Type Description
nonce str

Custom nonce for signing.

signed_at int

Seconds since epoch for signature timestamp.

**kwargs

Additional request parameters accepted by the API.

Returns:

Name Type Description
SubmitOrderDto SubmitOrderDto

Prepared (and optionally signed) order payload.

Source code in ethereal/rest/order.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
async def prepare_order(
    self,
    sender: str,
    price: Optional[Union[str, float, Decimal]] = None,
    quantity: Optional[Union[str, float, Decimal]] = None,
    side: Optional[int] = None,
    subaccount: Optional[str] = None,
    onchain_id: Optional[float] = None,
    order_type: Optional[str] = None,
    client_order_id: Optional[str] = None,
    time_in_force: Optional[str] = None,
    post_only: Optional[bool] = False,
    reduce_only: Optional[bool] = False,
    close: Optional[bool] = None,
    stop_price: Optional[Union[str, float, Decimal]] = None,
    stop_type: Optional[int] = None,
    group_id: Optional[str] = None,
    group_contingency_type: Optional[int] = None,
    expires_at: Optional[int] = None,
    include_signature: bool = False,
    **kwargs,
) -> SubmitOrderDto:
    """Prepares the payload for an order, optionally including a signature.

    Args:
        sender (str): Address placing the order. Required.
        price (Union[str, float, Decimal], optional): Limit price for LIMIT orders.
        quantity (Union[str, float, Decimal], optional): Order size.
        side (int, optional): 0 for buy, 1 for sell.
        subaccount (str, optional): Hex-encoded subaccount name.
        onchain_id (float, optional): Product onchain ID.
        order_type (str, optional): 'LIMIT' or 'MARKET'.
        client_order_id (str, optional): Subaccount-scoped client-generated id (UUID or <=32 alphanumeric).
        time_in_force (str, optional): For LIMIT orders (e.g., 'GTC', 'GTD').
        post_only (bool, optional): For LIMIT orders; rejects if crossing.
        reduce_only (bool, optional): If True, order only reduces position.
        close (bool, optional): If True, closes the position.
        stop_price (Union[str, float, Decimal], optional): Stop trigger price.
        stop_type (int, optional): Stop type enum value.
        group_id (str, optional): Contingency group id.
        group_contingency_type (int, optional): Group contingency type.
        expires_at (int, optional): Expiry timestamp for GTD.
        include_signature (bool): If True, sign the payload immediately.

    Other Parameters:
        nonce (str, optional): Custom nonce for signing.
        signed_at (int, optional): Seconds since epoch for signature timestamp.
        **kwargs: Additional request parameters accepted by the API.

    Returns:
        SubmitOrderDto: Prepared (and optionally signed) order payload.
    """
    # Generate nonce and signed_at timestamp
    nonce = kwargs.get("nonce", None) or generate_nonce()
    signed_at = kwargs.get("signed_at", None) or int(time.time())

    # Prepare order data with common fields
    order_data = {
        "sender": sender,
        "subaccount": subaccount,
        "quantity": quantity,
        "price": price,
        "side": side,
        "engineType": 0,
        "onchainId": onchain_id,
        "nonce": nonce,
        "type": order_type,
        "clientOrderId": client_order_id,
        "reduceOnly": reduce_only,
        "signedAt": signed_at,
        "close": close,
        "stopPrice": stop_price,
        "stopType": stop_type,
        "groupId": group_id,
        "groupContingencyType": group_contingency_type,
        "expiresAt": expires_at,
    }

    # Declare data_model type
    data_model: Union[SubmitOrderLimitDtoData, SubmitOrderMarketDtoData]

    # Create specific order data based on type
    if order_type == "LIMIT":
        order_data.update(
            {
                "timeInForce": time_in_force,
                "postOnly": post_only,
            }
        )
        data_model = self._models.SubmitOrderLimitDtoData.model_validate(order_data)
    elif order_type == "MARKET":
        data_model = self._models.SubmitOrderMarketDtoData.model_validate(order_data)
    else:
        raise ValueError(f"Invalid order type: {order_type}")

    result = self._models.SubmitOrderDto.model_validate(
        {
            "data": data_model.model_dump(
                mode="json", by_alias=True, exclude_unset=True, exclude_none=True
            ),
            "signature": "",
        }
    )

    if include_signature:
        result = await self.sign_order(result)

    return result

sign_cancel_order(self, order_to_cancel, private_key=None) async

Signs a cancel order payload using EIP-712.

Parameters:

Name Type Description Default
order_to_cancel CancelOrderDto

Prepared cancel payload. Required.

required
private_key str

Private key override. Defaults to client's key.

None

Returns:

Name Type Description
CancelOrderDto CancelOrderDto

The same DTO with signature populated.

Raises:

Type Description
ValueError

If no chain client or private key is available.

Source code in ethereal/rest/order.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
async def sign_cancel_order(
    self,
    order_to_cancel: CancelOrderDto,
    private_key: Optional[str] = None,
) -> CancelOrderDto:
    """Signs a cancel order payload using EIP-712.

    Args:
        order_to_cancel (CancelOrderDto): Prepared cancel payload. Required.
        private_key (str, optional): Private key override. Defaults to client's key.

    Returns:
        CancelOrderDto: The same DTO with signature populated.

    Raises:
        ValueError: If no chain client or private key is available.
    """
    if not hasattr(self, "chain") or not self.chain:
        raise ValueError("No chain client available for signing")
    if not private_key and not self.chain.private_key:
        raise ValueError("No private key available for signing")
    elif not private_key:
        private_key = self.chain.private_key

    # Prepare message for signing
    message = order_to_cancel.data.model_dump(mode="json", by_alias=True)

    # For cancel orders, orderIds need to be converted to bytes32 format
    order_ids = (
        [uuid_to_bytes32(str(order_id)) for order_id in order_to_cancel.data.order_ids]
        if order_to_cancel.data.order_ids
        else []
    )
    message["orderIds"] = order_ids

    client_order_ids = (
        [client_order_id_to_bytes32(id) for id in order_to_cancel.data.client_order_ids]
        if order_to_cancel.data.client_order_ids
        else []
    )
    message["clientOrderIds"] = client_order_ids

    # Get domain and types for signing
    primary_type = "CancelOrder"
    domain = self.rpc_config.domain.model_dump(mode="json", by_alias=True)
    types = self.chain.get_signature_types(self.rpc_config, primary_type)

    # Sign the message
    order_to_cancel.signature = self.chain.sign_message(
        self.chain.private_key, domain, types, primary_type, message
    )
    return order_to_cancel

sign_order(self, order, private_key=None) async

Signs an order payload using EIP-712.

Parameters:

Name Type Description Default
order SubmitOrderDto

Prepared order to sign. Required.

required
private_key str

Private key override. Defaults to client's key.

None

Returns:

Name Type Description
SubmitOrderDto SubmitOrderDto

The same DTO with signature populated.

Raises:

Type Description
ValueError

If no chain client or private key is available.

Source code in ethereal/rest/order.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
async def sign_order(
    self, order: SubmitOrderDto, private_key: Optional[str] = None
) -> SubmitOrderDto:
    """Signs an order payload using EIP-712.

    Args:
        order (SubmitOrderDto): Prepared order to sign. Required.
        private_key (str, optional): Private key override. Defaults to client's key.

    Returns:
        SubmitOrderDto: The same DTO with signature populated.

    Raises:
        ValueError: If no chain client or private key is available.
    """
    if not hasattr(self, "chain") or not self.chain:
        raise ValueError("No chain client available for signing")
    if not private_key and not self.chain.private_key:
        raise ValueError("No private key available for signing")
    elif not private_key:
        private_key = self.chain.private_key

    # Update message signedAt
    order.data.signed_at = int(time.time())

    # Prepare message for signing
    message = order.data.model_dump(mode="json", by_alias=True)

    # Make some adjustments to the message
    message["quantity"] = int(Decimal(message["quantity"]) * Decimal("1e9"))
    message["price"] = int(Decimal(message.get("price", 0)) * Decimal("1e9"))
    message["productId"] = int(message["onchainId"])
    message["signedAt"] = int(message["signedAt"])
    if message.get("clientOrderId"):
        message["clientOrderId"] = client_order_id_to_bytes32(message["clientOrderId"])

    # Get domain and types for signing
    primary_type = "TradeOrder"
    domain = self.rpc_config.domain.model_dump(mode="json", by_alias=True)
    types = self.chain.get_signature_types(self.rpc_config, primary_type)

    # Sign the message
    order.signature = self.chain.sign_message(
        self.chain.private_key, domain, types, primary_type, message
    )
    return order

submit_order(self, order, **kwargs) async

Submits a prepared and signed order.

Parameters:

Name Type Description Default
order SubmitOrderDto

Prepared and signed order payload. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
SubmitOrderCreatedDto SubmitOrderCreatedDto

Created order response.

Source code in ethereal/rest/order.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
async def submit_order(
    self,
    order: SubmitOrderDto,
    **kwargs,
) -> SubmitOrderCreatedDto:
    """Submits a prepared and signed order.

    Args:
        order (SubmitOrderDto): Prepared and signed order payload. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        SubmitOrderCreatedDto: Created order response.
    """
    endpoint = f"{API_PREFIX}/order"
    res = await self.post(
        endpoint,
        data=order.model_dump(
            mode="json", by_alias=True, exclude_unset=True, exclude_none=True
        ),
        **kwargs,
    )
    return self._models.SubmitOrderCreatedDto.model_validate(res)

ethereal.rest.funding

get_projected_funding(self, **kwargs) async

Gets the projected funding rate for the next period.

Parameters:

Name Type Description Default
product_id str

Id representing the registered product. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
ProjectedFundingDto ProjectedFundingDto

Projected funding information for the product.

Source code in ethereal/rest/funding.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
async def get_projected_funding(self, **kwargs) -> ProjectedFundingDto:
    """Gets the projected funding rate for the next period.

    Args:
        product_id (str): Id representing the registered product. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        ProjectedFundingDto: Projected funding information for the product.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/funding/projected",
        request_model=self._models.V1FundingProjectedGetParametersQuery,
        response_model=self._models.ProjectedFundingDto,
        **kwargs,
    )
    return res

list_funding(self, **kwargs) async

Lists historical funding rates for a product over a specified time range.

Parameters:

Name Type Description Default
product_id str

Id representing the registered product. Required.

required
range str

Time window to query. One of 'DAY', 'WEEK', or 'MONTH'. Required.

required

Other Parameters:

Name Type Description
order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by, e.g., 'createdAt'. Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[FundingDto]

List[FundingDto]: Funding rate history objects for the product.

Source code in ethereal/rest/funding.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
async def list_funding(self, **kwargs) -> List[FundingDto]:
    """Lists historical funding rates for a product over a specified time range.

    Args:
        product_id (str): Id representing the registered product. Required.
        range (str): Time window to query. One of 'DAY', 'WEEK', or 'MONTH'. Required.

    Other Parameters:
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by, e.g., 'createdAt'. Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[FundingDto]: Funding rate history objects for the product.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/funding",
        request_model=self._models.V1FundingGetParametersQuery,
        response_model=self._models.PageOfFundingDtos,
        **kwargs,
    )
    data = [
        self._models.FundingDto(**model.model_dump(by_alias=True)) for model in res.data
    ]
    return data

ethereal.rest.linked_signer

get_signer(self, id, **kwargs) async

Gets a specific linked signer by ID.

Parameters:

Name Type Description Default
id str

UUID of the linked signer. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
SignerDto SignerDto

Linked signer details.

Source code in ethereal/rest/linked_signer.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
async def get_signer(
    self,
    id: str,
    **kwargs,
) -> SignerDto:
    """Gets a specific linked signer by ID.

    Args:
        id (str): UUID of the linked signer. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        SignerDto: Linked signer details.
    """
    endpoint = f"{API_PREFIX}/linked-signer/{id}"
    res = await self.get(endpoint, **kwargs)
    return self._models.SignerDto.model_validate(res)

get_signer_quota(self, **kwargs) async

Gets the signer quota for a subaccount.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
AccountSignerQuotaDto AccountSignerQuotaDto

Remaining quota information for the subaccount.

Source code in ethereal/rest/linked_signer.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
async def get_signer_quota(
    self,
    **kwargs,
) -> AccountSignerQuotaDto:
    """Gets the signer quota for a subaccount.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        AccountSignerQuotaDto: Remaining quota information for the subaccount.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/linked-signer/quota",
        request_model=self._models.V1LinkedSignerQuotaGetParametersQuery,
        response_model=self._models.AccountSignerQuotaDto,
        **kwargs,
    )
    return res

Submits a prepared and signed link-signer payload.

Parameters:

Name Type Description Default
dto LinkSignerDto

Prepared and signed link payload. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
SignerDto SignerDto

Linked signer record after submission.

Source code in ethereal/rest/linked_signer.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
async def link_linked_signer(
    self,
    dto: LinkSignerDto,
    **kwargs,
) -> SignerDto:
    """Submits a prepared and signed link-signer payload.

    Args:
        dto (LinkSignerDto): Prepared and signed link payload. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        SignerDto: Linked signer record after submission.
    """
    endpoint = f"{API_PREFIX}/linked-signer/link"
    res = await self.post(
        endpoint,
        data=dto.model_dump(mode="json", by_alias=True, exclude_none=True),
        **kwargs,
    )
    return self._models.SignerDto.model_validate(res)

list_signers(self, **kwargs) async

Lists linked signers for a subaccount.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (e.g., 'createdAt'). Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[SignerDto]

List[SignerDto]: Linked signer records.

Source code in ethereal/rest/linked_signer.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
async def list_signers(
    self,
    **kwargs,
) -> List[SignerDto]:
    """Lists linked signers for a subaccount.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.

    Other Parameters:
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (e.g., 'createdAt'). Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[SignerDto]: Linked signer records.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/linked-signer",
        request_model=self._models.V1LinkedSignerGetParametersQuery,
        response_model=self._models.PageOfSignersDto,
        **kwargs,
    )
    data = [
        self._models.SignerDto(**model.model_dump(by_alias=True)) for model in res.data
    ]
    return data

prepare_linked_signer(self, sender, signer, subaccount, subaccount_id, signer_signature='', include_signature=False, **kwargs) async

Prepares the payload for linking a signer, optionally including a signature.

Parameters:

Name Type Description Default
sender str

Owner address initiating the link. Required.

required
signer str

Address of the signer being linked. Required.

required
subaccount str

Hex-encoded subaccount name. Required.

required
subaccount_id str

UUID of the subaccount. Required.

required
signer_signature str

Signature from the signer address. Optional.

''
include_signature bool

If True, sign with the owner's key as well. Optional.

False

Other Parameters:

Name Type Description
nonce str

Custom nonce for signing. Optional.

signed_at int

Seconds since epoch for the signature timestamp. Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
LinkSignerDto LinkSignerDto

Prepared (and optionally signed) link payload.

Source code in ethereal/rest/linked_signer.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
async def prepare_linked_signer(
    self,
    sender: str,
    signer: str,
    subaccount: str,
    subaccount_id: str,
    signer_signature: str = "",
    include_signature: bool = False,
    **kwargs,
) -> LinkSignerDto:
    """Prepares the payload for linking a signer, optionally including a signature.

    Args:
        sender (str): Owner address initiating the link. Required.
        signer (str): Address of the signer being linked. Required.
        subaccount (str): Hex-encoded subaccount name. Required.
        subaccount_id (str): UUID of the subaccount. Required.
        signer_signature (str): Signature from the signer address. Optional.
        include_signature (bool): If True, sign with the owner's key as well. Optional.

    Other Parameters:
        nonce (str, optional): Custom nonce for signing. Optional.
        signed_at (int, optional): Seconds since epoch for the signature timestamp. Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        LinkSignerDto: Prepared (and optionally signed) link payload.
    """
    nonce = kwargs.get("nonce") or generate_nonce()
    signed_at = kwargs.get("signed_at") or int(time.time())
    data = {
        "sender": sender,
        "signer": signer,
        "subaccount": subaccount,
        "subaccountId": subaccount_id,
        "nonce": nonce,
        "signedAt": signed_at,
    }
    data_model = self._models.LinkSignerDtoData.model_validate(data)

    # Prepare dto
    dto_data = {
        "data": data_model.model_dump(mode="json", by_alias=True),
        "signature": "",
        "signerSignature": signer_signature,
    }
    dto = self._models.LinkSignerDto.model_validate(dto_data, by_alias=True)
    if include_signature:
        dto = await self.sign_linked_signer(dto, private_key=self.chain.private_key)
    return dto

prepare_revoke_linked_signer(self, sender, signer, subaccount, subaccount_id, include_signature=False, **kwargs) async

Prepares the payload for revoking a linked signer, optionally signing it.

Parameters:

Name Type Description Default
sender str

Owner address initiating the revoke. Required.

required
signer str

Signer address being revoked. Required.

required
subaccount str

Hex-encoded subaccount name. Required.

required
subaccount_id str

UUID of the subaccount. Required.

required
include_signature bool

If True, sign with the owner's key. Optional.

False

Other Parameters:

Name Type Description
nonce str

Custom nonce for signing. Optional.

signed_at int

Seconds since epoch for the signature timestamp. Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
RevokeLinkedSignerDto RevokeLinkedSignerDto

Prepared (and optionally signed) revoke payload.

Source code in ethereal/rest/linked_signer.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
async def prepare_revoke_linked_signer(
    self,
    sender: str,
    signer: str,
    subaccount: str,
    subaccount_id: str,
    include_signature: bool = False,
    **kwargs,
) -> RevokeLinkedSignerDto:
    """Prepares the payload for revoking a linked signer, optionally signing it.

    Args:
        sender (str): Owner address initiating the revoke. Required.
        signer (str): Signer address being revoked. Required.
        subaccount (str): Hex-encoded subaccount name. Required.
        subaccount_id (str): UUID of the subaccount. Required.
        include_signature (bool): If True, sign with the owner's key. Optional.

    Other Parameters:
        nonce (str, optional): Custom nonce for signing. Optional.
        signed_at (int, optional): Seconds since epoch for the signature timestamp. Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        RevokeLinkedSignerDto: Prepared (and optionally signed) revoke payload.
    """
    nonce = kwargs.get("nonce") or generate_nonce()
    signed_at = kwargs.get("signed_at") or int(time.time())
    data = {
        "sender": sender,
        "signer": signer,
        "subaccount": subaccount,
        "subaccountId": subaccount_id,
        "nonce": nonce,
        "signedAt": signed_at,
    }
    data_model = self._models.RevokeLinkedSignerDtoData.model_validate(data)
    dto = self._models.RevokeLinkedSignerDto.model_validate(
        {"data": data_model.model_dump(mode="json", by_alias=True), "signature": ""}
    )
    if include_signature:
        dto = await self.sign_revoke_linked_signer(dto)
    return dto

revoke_linked_signer(self, dto, **kwargs) async

Submits a prepared and signed revoke-linked-signer payload.

Parameters:

Name Type Description Default
dto RevokeLinkedSignerDto

Prepared and signed revoke payload. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
SignerDto SignerDto

Signer record reflecting revocation.

Source code in ethereal/rest/linked_signer.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
async def revoke_linked_signer(
    self,
    dto: RevokeLinkedSignerDto,
    **kwargs,
) -> SignerDto:
    """Submits a prepared and signed revoke-linked-signer payload.

    Args:
        dto (RevokeLinkedSignerDto): Prepared and signed revoke payload. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        SignerDto: Signer record reflecting revocation.
    """
    endpoint = f"{API_PREFIX}/linked-signer/revoke"
    res = await self.post(
        endpoint,
        data=dto.model_dump(mode="json", by_alias=True, exclude_none=True),
        **kwargs,
    )
    return self._models.SignerDto.model_validate(res)

sign_linked_signer(self, link_to_sign, signer_private_key=None, private_key=None) async

Signs the link-signer payload with the signer and/or owner key.

Parameters:

Name Type Description Default
link_to_sign LinkSignerDto

Prepared link payload. Required.

required
signer_private_key str

Signer's private key for cosigning. Optional.

None
private_key str

Owner's private key override. Optional.

None

Returns:

Name Type Description
LinkSignerDto LinkSignerDto

DTO with signature fields populated.

Raises:

Type Description
ValueError

If no chain client or private key is available.

Source code in ethereal/rest/linked_signer.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
async def sign_linked_signer(
    self,
    link_to_sign: LinkSignerDto,
    signer_private_key: Optional[str] = None,
    private_key: Optional[str] = None,
) -> LinkSignerDto:
    """Signs the link-signer payload with the signer and/or owner key.

    Args:
        link_to_sign (LinkSignerDto): Prepared link payload. Required.
        signer_private_key (str, optional): Signer's private key for cosigning. Optional.
        private_key (str, optional): Owner's private key override. Optional.

    Returns:
        LinkSignerDto: DTO with signature fields populated.

    Raises:
        ValueError: If no chain client or private key is available.
    """
    if not hasattr(self, "chain") or not self.chain:
        raise ValueError("No chain client available for signing")
    if not private_key and not self.chain.private_key and not signer_private_key:
        raise ValueError("No private key available for signing")
    elif not private_key:
        private_key = self.chain.private_key

    # Prepare message for signing
    message = link_to_sign.data.model_dump(mode="json", by_alias=True)
    message["signedAt"] = int(message["signedAt"])

    primary_type = "LinkSigner"
    domain = self.rpc_config.domain.model_dump(mode="json", by_alias=True)
    types = self.chain.get_signature_types(self.rpc_config, primary_type)

    if signer_private_key:
        signer_signature = self.chain.sign_message(
            signer_private_key, domain, types, primary_type, message
        )
        link_to_sign.signer_signature = signer_signature
    if self.chain.private_key:
        signature = self.chain.sign_message(
            private_key, domain, types, primary_type, message
        )
        link_to_sign.signature = signature
    return link_to_sign

sign_revoke_linked_signer(self, revoke_to_sign) async

Signs the revoke-linked-signer payload with the owner's key.

Parameters:

Name Type Description Default
revoke_to_sign RevokeLinkedSignerDto

Prepared revoke payload. Required.

required

Returns:

Name Type Description
RevokeLinkedSignerDto RevokeLinkedSignerDto

DTO with signature populated.

Raises:

Type Description
ValueError

If no chain client or private key is available.

Source code in ethereal/rest/linked_signer.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
async def sign_revoke_linked_signer(
    self,
    revoke_to_sign: RevokeLinkedSignerDto,
) -> RevokeLinkedSignerDto:
    """Signs the revoke-linked-signer payload with the owner's key.

    Args:
        revoke_to_sign (RevokeLinkedSignerDto): Prepared revoke payload. Required.

    Returns:
        RevokeLinkedSignerDto: DTO with signature populated.

    Raises:
        ValueError: If no chain client or private key is available.
    """
    if not hasattr(self, "chain") or not self.chain:
        raise ValueError("No chain client available for signing")
    if not self.chain.private_key:
        raise ValueError("No private key available for signing")

    # Prepare message for signing
    message = revoke_to_sign.data.model_dump(mode="json", by_alias=True)
    message["signedAt"] = int(message["signedAt"])

    primary_type = "RevokeLinkedSigner"
    domain = self.rpc_config.domain.model_dump(mode="json", by_alias=True)
    types = self.chain.get_signature_types(self.rpc_config, primary_type)
    revoke_to_sign.signature = self.chain.sign_message(
        self.chain.private_key, domain, types, primary_type, message
    )
    return revoke_to_sign

ethereal.rest.token

get_token(self, id, **kwargs) async

Gets a specific token by ID.

Parameters:

Name Type Description Default
id str

UUID for the token. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
TokenDto TokenDto

Token metadata.

Source code in ethereal/rest/token.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
async def get_token(
    self,
    id: str,
    **kwargs,
) -> TokenDto:
    """Gets a specific token by ID.

    Args:
        id (str): UUID for the token. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        TokenDto: Token metadata.
    """
    endpoint = f"{API_PREFIX}/token/{id}"
    res = await self.get(endpoint, **kwargs)
    return self._models.TokenDto(**res)

list_token_transfers(self, **kwargs) async

Lists token transfers for a subaccount.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (e.g., 'createdAt'). Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[TransferDto]

List[TransferDto]: Transfer records for the subaccount.

Source code in ethereal/rest/token.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
async def list_token_transfers(
    self,
    **kwargs,
) -> List[TransferDto]:
    """Lists token transfers for a subaccount.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.

    Other Parameters:
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (e.g., 'createdAt'). Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[TransferDto]: Transfer records for the subaccount.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/token/transfer",
        request_model=self._models.V1TokenTransferGetParametersQuery,
        response_model=self._models.PageOfTransfersDtos,
        **kwargs,
    )
    data = [
        self._models.TransferDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

list_token_withdraws(self, **kwargs) async

Lists token withdrawals for a subaccount.

Parameters:

Name Type Description Default
subaccount_id str

UUID of the subaccount. Required.

required

Other Parameters:

Name Type Description
order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (e.g., 'createdAt'). Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[WithdrawDto]

List[WithdrawDto]: Withdrawal records for the subaccount.

Source code in ethereal/rest/token.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
async def list_token_withdraws(
    self,
    **kwargs,
) -> List[WithdrawDto]:
    """Lists token withdrawals for a subaccount.

    Args:
        subaccount_id (str): UUID of the subaccount. Required.

    Other Parameters:
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (e.g., 'createdAt'). Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[WithdrawDto]: Withdrawal records for the subaccount.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/token/withdraw",
        request_model=self._models.V1TokenWithdrawGetParametersQuery,
        response_model=self._models.PageOfWithdrawDtos,
        **kwargs,
    )
    data = [
        self._models.WithdrawDto(**model.model_dump(by_alias=True))
        for model in res.data
    ]
    return data

list_tokens(self, **kwargs) async

Lists all supported tokens.

Other Parameters:

Name Type Description
order str

Sort order, 'asc' or 'desc'. Optional.

limit float

Maximum number of results to return. Optional.

cursor str

Pagination cursor for fetching the next page. Optional.

order_by str

Field to order by (e.g., 'createdAt'). Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Type Description
List[TokenDto]

List[TokenDto]: Token metadata.

Source code in ethereal/rest/token.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
async def list_tokens(
    self,
    **kwargs,
) -> List[TokenDto]:
    """Lists all supported tokens.

    Other Parameters:
        order (str, optional): Sort order, 'asc' or 'desc'. Optional.
        limit (float, optional): Maximum number of results to return. Optional.
        cursor (str, optional): Pagination cursor for fetching the next page. Optional.
        order_by (str, optional): Field to order by (e.g., 'createdAt'). Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        List[TokenDto]: Token metadata.
    """
    res = await self.get_validated(
        url_path=f"{API_PREFIX}/token",
        request_model=self._models.V1TokenGetParametersQuery,
        response_model=self._models.PageOfTokensDtos,
        **kwargs,
    )
    data = [
        self._models.TokenDto(**model.model_dump(by_alias=True)) for model in res.data
    ]
    return data

prepare_withdraw_token(self, subaccount, token, amount, account, include_signature=False, **kwargs) async

Prepares a token withdrawal request, optionally including a signature.

Parameters:

Name Type Description Default
subaccount str

Hex-encoded subaccount name. Required.

required
token str

Token contract address or identifier. Required.

required
amount int

Amount to withdraw (token base units). Required.

required
account str

Recipient address. Required.

required
include_signature bool

If True, sign the payload immediately. Optional.

False

Other Parameters:

Name Type Description
nonce str

Custom nonce for signing. Optional.

signed_at int

Seconds since epoch for the signature timestamp. Optional.

**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
InitiateWithdrawDto InitiateWithdrawDto

Prepared (and optionally signed) withdrawal payload.

Source code in ethereal/rest/token.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
async def prepare_withdraw_token(
    self,
    subaccount: str,
    token: str,
    amount: int,
    account: str,
    include_signature: bool = False,
    **kwargs,
) -> InitiateWithdrawDto:
    """Prepares a token withdrawal request, optionally including a signature.

    Args:
        subaccount (str): Hex-encoded subaccount name. Required.
        token (str): Token contract address or identifier. Required.
        amount (int): Amount to withdraw (token base units). Required.
        account (str): Recipient address. Required.
        include_signature (bool): If True, sign the payload immediately. Optional.

    Other Parameters:
        nonce (str, optional): Custom nonce for signing. Optional.
        signed_at (int, optional): Seconds since epoch for the signature timestamp. Optional.
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        InitiateWithdrawDto: Prepared (and optionally signed) withdrawal payload.
    """
    nonce = kwargs.get("nonce") or generate_nonce()
    signed_at = kwargs.get("signed_at") or int(time.time())
    data = {
        "account": account,
        "subaccount": subaccount,
        "token": token,
        "amount": amount,
        "nonce": nonce,
        "signedAt": signed_at,
    }
    data_model = self._models.InitiateWithdrawDtoData.model_validate(data)
    dto = self._models.InitiateWithdrawDto.model_validate(
        {"data": data_model.model_dump(mode="json", by_alias=True), "signature": ""}
    )
    if include_signature:
        dto = await self.sign_withdraw_token(dto)
    return dto

sign_withdraw_token(self, withdraw_dto, private_key=None) async

Signs the token withdrawal payload using EIP-712.

Parameters:

Name Type Description Default
withdraw_dto InitiateWithdrawDto

Prepared withdrawal payload. Required.

required
private_key str

Private key override. Defaults to the client's chain key.

None

Returns:

Name Type Description
InitiateWithdrawDto InitiateWithdrawDto

The same DTO with signature populated.

Raises:

Type Description
ValueError

If no chain client or private key is available.

Source code in ethereal/rest/token.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
async def sign_withdraw_token(
    self,
    withdraw_dto: InitiateWithdrawDto,
    private_key: Optional[str] = None,
) -> InitiateWithdrawDto:
    """Signs the token withdrawal payload using EIP-712.

    Args:
        withdraw_dto (InitiateWithdrawDto): Prepared withdrawal payload. Required.
        private_key (str, optional): Private key override. Defaults to the client's chain key.

    Returns:
        InitiateWithdrawDto: The same DTO with signature populated.

    Raises:
        ValueError: If no chain client or private key is available.
    """
    if not hasattr(self, "chain") or not self.chain:
        raise ValueError("No chain client available for signing")
    if not private_key and not self.chain.private_key:
        raise ValueError("No private key available for signing")
    elif not private_key:
        private_key = self.chain.private_key

    # Prepare the message for signing
    message = withdraw_dto.data.model_dump(mode="json", by_alias=True)
    message["amount"] = int(Decimal(message["amount"]) * Decimal(1e9))
    message["signedAt"] = int(message["signedAt"])

    primary_type = "InitiateWithdraw"
    domain = self.rpc_config.domain.model_dump(mode="json", by_alias=True)
    types = self.chain.get_signature_types(self.rpc_config, primary_type)
    withdraw_dto.signature = self.chain.sign_message(
        private_key, domain, types, primary_type, message
    )
    return withdraw_dto

withdraw_token(self, dto, token_id, **kwargs) async

Submits a prepared and signed token withdrawal request.

Parameters:

Name Type Description Default
dto InitiateWithdrawDto

Prepared and signed withdrawal payload. Required.

required
token_id str

Token ID for the withdraw route parameter. Required.

required

Other Parameters:

Name Type Description
**kwargs

Additional request parameters accepted by the API. Optional.

Returns:

Name Type Description
WithdrawDto WithdrawDto

Withdrawal record created by the API.

Source code in ethereal/rest/token.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
async def withdraw_token(
    self,
    dto: InitiateWithdrawDto,
    token_id: str,
    **kwargs,
) -> WithdrawDto:
    """Submits a prepared and signed token withdrawal request.

    Args:
        dto (InitiateWithdrawDto): Prepared and signed withdrawal payload. Required.
        token_id (str): Token ID for the withdraw route parameter. Required.

    Other Parameters:
        **kwargs: Additional request parameters accepted by the API. Optional.

    Returns:
        WithdrawDto: Withdrawal record created by the API.
    """
    endpoint = f"{API_PREFIX}/token/{token_id}/withdraw"
    res = await self.post(
        endpoint,
        data=dto.model_dump(mode="json", by_alias=True, exclude_none=True),
        **kwargs,
    )
    return self._models.WithdrawDto.model_validate(res)

ethereal.rest.util

client_order_id_to_bytes32(client_order_id)

Converts client_order_id to appropriate bytes32 format.

Parameters:

Name Type Description Default
client_order_id str

Client order ID to convert.

required

Returns:

Name Type Description
str str

Converted client order ID in bytes32 hex format.

Raises:

Type Description
ValueError

If string is longer than 32 characters and not a UUID, or if input is None/empty.

Source code in ethereal/rest/util.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def client_order_id_to_bytes32(client_order_id: str) -> str:
    """Converts client_order_id to appropriate bytes32 format.

    Args:
        client_order_id (str): Client order ID to convert.

    Returns:
        str: Converted client order ID in bytes32 hex format.

    Raises:
        ValueError: If string is longer than 32 characters and not a UUID, or if input is None/empty.
    """
    if client_order_id is None:
        raise ValueError("Client order ID cannot be None")

    if not client_order_id:
        raise ValueError("Client order ID cannot be empty")

    if is_uuid(client_order_id):
        return uuid_to_bytes32(client_order_id)

    if len(client_order_id) > 32:
        raise ValueError(
            f"Client order ID cannot be longer than 32 characters, got {len(client_order_id)}"
        )

    # Convert string to bytes32 hex format
    client_order_bytes = client_order_id.encode("utf-8")
    padded_bytes = client_order_bytes.ljust(32, b"\0")
    return "0x" + padded_bytes.hex()

generate_nonce()

Generates a timestamp-based nonce.

Returns:

Name Type Description
str str

Current timestamp in nanoseconds as string.

Source code in ethereal/rest/util.py
72
73
74
75
76
77
78
def generate_nonce() -> str:
    """Generates a timestamp-based nonce.

    Returns:
        str: Current timestamp in nanoseconds as string.
    """
    return str(time.time_ns())

is_uuid(value)

Checks if a string is a valid UUID.

Parameters:

Name Type Description Default
value str

String to check.

required

Returns:

Name Type Description
bool bool

True if string is a valid UUID, False otherwise.

Source code in ethereal/rest/util.py
25
26
27
28
29
30
31
32
33
34
35
36
37
def is_uuid(value: str) -> bool:
    """Checks if a string is a valid UUID.

    Args:
        value (str): String to check.

    Returns:
        bool: True if string is a valid UUID, False otherwise.
    """
    try:
        return value == str(uuid.UUID(value))
    except ValueError:
        return False

uuid_to_bytes32(uuid_str)

Converts UUID string to bytes32 hex format.

Parameters:

Name Type Description Default
uuid_str str

UUID string to convert.

required

Returns:

Name Type Description
str str

Bytes32 hex string prefixed with '0x'.

Source code in ethereal/rest/util.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
def uuid_to_bytes32(uuid_str: str) -> str:
    """Converts UUID string to bytes32 hex format.

    Args:
        uuid_str (str): UUID string to convert.

    Returns:
        str: Bytes32 hex string prefixed with '0x'.
    """
    uuid_obj = uuid.UUID(uuid_str)

    # remove hyphens and convert to hex
    uuid_hex = uuid_obj.hex

    # pad the hex to make it 32 bytes
    padded_hex = uuid_hex.rjust(64, "0")

    return "0x" + padded_hex

ethereal.ws_client.WSClient

Bases: WSBase

Ethereal websocket client.

Parameters:

Name Type Description Default
config Union[Dict[str, Any], WSConfig]

Configuration dictionary or WSConfig object. Required fields include: - base_url (str): Base URL for websocket requests Optional fields include: - verbose (bool): Enables debug logging, defaults to False

required
Source code in ethereal/ws_client.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class WSClient(WSBase):
    """Ethereal websocket client.

    Args:
        config (Union[Dict[str, Any], WSConfig]): Configuration dictionary or WSConfig object.
            Required fields include:
            - base_url (str): Base URL for websocket requests
            Optional fields include:
            - verbose (bool): Enables debug logging, defaults to False
    """

    def __init__(self, config: Union[Dict[str, Any], WSConfig]):
        super().__init__(config)

    def subscribe(
        self,
        stream_type: str,
        product_id: Optional[str] = None,
        subaccount_id: Optional[str] = None,
        callback: Optional[Callable[[Dict[str, Any]], None]] = None,
        namespace: Optional[str] = "/v1/stream",
    ) -> Dict[str, Any]:
        """Subscribe to a specific stream.

        Args:
            stream_type (str): Type of stream to subscribe to
            product_id (Optional[str]): Product ID to subscribe to
            subaccount_id (Optional[str]): Subaccount ID, optional
            callback (Optional[Callable]): Callback function to handle incoming messages

        Returns:
            Dict[str, Any]: Subscription response
        """
        subscription_data = {"type": stream_type}

        if subaccount_id:
            subscription_data["subaccountId"] = subaccount_id
        if product_id:
            subscription_data["productId"] = product_id

        # Register callback if provided
        if callback:
            event_key = stream_type
            if event_key not in self.callbacks:
                self.callbacks[event_key] = []

            self.callbacks[event_key].append(callback)

        # Send subscription request
        return self._emit("subscribe", subscription_data, namespace=namespace)

    def unsubscribe(
        self,
        stream_type: str,
        product_id: Optional[str] = None,
        subaccount_id: Optional[str] = None,
        namespace: Optional[str] = "/v1/stream",
    ) -> Dict[str, Any]:
        """Unsubscribe from a specific stream.

        Args:
            stream_type (str): Type of stream to unsubscribe from
            product_id (str): Product ID to unsubscribe from
            subaccount_id (Optional[str]): Subaccount ID, optional

        Returns:
            Dict[str, Any]: Unsubscription response
        """
        unsubscription_data = {"type": stream_type}

        if subaccount_id:
            unsubscription_data["subaccountId"] = subaccount_id
        if product_id:
            unsubscription_data["productId"] = product_id

        # Send unsubscription request
        return self._emit("unsubscribe", unsubscription_data, namespace=namespace)

subscribe(stream_type, product_id=None, subaccount_id=None, callback=None, namespace='/v1/stream')

Subscribe to a specific stream.

Parameters:

Name Type Description Default
stream_type str

Type of stream to subscribe to

required
product_id Optional[str]

Product ID to subscribe to

None
subaccount_id Optional[str]

Subaccount ID, optional

None
callback Optional[Callable]

Callback function to handle incoming messages

None

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Subscription response

Source code in ethereal/ws_client.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def subscribe(
    self,
    stream_type: str,
    product_id: Optional[str] = None,
    subaccount_id: Optional[str] = None,
    callback: Optional[Callable[[Dict[str, Any]], None]] = None,
    namespace: Optional[str] = "/v1/stream",
) -> Dict[str, Any]:
    """Subscribe to a specific stream.

    Args:
        stream_type (str): Type of stream to subscribe to
        product_id (Optional[str]): Product ID to subscribe to
        subaccount_id (Optional[str]): Subaccount ID, optional
        callback (Optional[Callable]): Callback function to handle incoming messages

    Returns:
        Dict[str, Any]: Subscription response
    """
    subscription_data = {"type": stream_type}

    if subaccount_id:
        subscription_data["subaccountId"] = subaccount_id
    if product_id:
        subscription_data["productId"] = product_id

    # Register callback if provided
    if callback:
        event_key = stream_type
        if event_key not in self.callbacks:
            self.callbacks[event_key] = []

        self.callbacks[event_key].append(callback)

    # Send subscription request
    return self._emit("subscribe", subscription_data, namespace=namespace)

unsubscribe(stream_type, product_id=None, subaccount_id=None, namespace='/v1/stream')

Unsubscribe from a specific stream.

Parameters:

Name Type Description Default
stream_type str

Type of stream to unsubscribe from

required
product_id str

Product ID to unsubscribe from

None
subaccount_id Optional[str]

Subaccount ID, optional

None

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Unsubscription response

Source code in ethereal/ws_client.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def unsubscribe(
    self,
    stream_type: str,
    product_id: Optional[str] = None,
    subaccount_id: Optional[str] = None,
    namespace: Optional[str] = "/v1/stream",
) -> Dict[str, Any]:
    """Unsubscribe from a specific stream.

    Args:
        stream_type (str): Type of stream to unsubscribe from
        product_id (str): Product ID to unsubscribe from
        subaccount_id (Optional[str]): Subaccount ID, optional

    Returns:
        Dict[str, Any]: Unsubscription response
    """
    unsubscription_data = {"type": stream_type}

    if subaccount_id:
        unsubscription_data["subaccountId"] = subaccount_id
    if product_id:
        unsubscription_data["productId"] = product_id

    # Send unsubscription request
    return self._emit("unsubscribe", unsubscription_data, namespace=namespace)

ethereal.chain_client.ChainClient

Bases: BaseClient

Client for interacting with the blockchain using Web3 functionality.

Parameters:

Name Type Description Default
config Union[Dict[str, Any], ChainConfig]

Chain configuration

required
rpc_config RpcConfigDto

RPC configuration. Defaults to None.

None

Raises:

Type Description
Exception

If RPC URL or private key is not specified in the configuration

Source code in ethereal/chain_client.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
class ChainClient(BaseClient):
    """Client for interacting with the blockchain using Web3 functionality.

    Args:
        config (Union[Dict[str, Any], ChainConfig]): Chain configuration
        rpc_config (RpcConfigDto, optional): RPC configuration. Defaults to None.

    Raises:
        Exception: If RPC URL or private key is not specified in the configuration
    """

    def __init__(
        self,
        config: Union[Dict[str, Any], ChainConfig],
        rpc_config: Optional[RpcConfigDto] = None,
        tokens: Optional[List[TokenDto]] = None,
    ):
        super().__init__(config)
        self.config = ChainConfig.model_validate(config)
        self.provider = self._setup_provider()
        self.account = self._setup_account()
        if self.account:
            self.address = self.account.address
            self.private_key = self.config.private_key
        else:
            self.address = self.config.address
            self.private_key = None

        self.chain_id = self.provider.eth.chain_id

        if tokens is not None:
            usde_token = next((t for t in tokens if t.name == "USD"), None)
            if usde_token is None:
                self.logger.warning("USD token not found in the provided tokens list.")
            else:
                usde_address = self.provider.to_checksum_address(usde_token.address)
                usde_abi = (
                    read_contract(self.chain_id, "WUSDe", common=True)
                    if usde_token.erc20_name and "Wrapped" in usde_token.erc20_name
                    else read_contract(self.chain_id, "ERC20", common=True)
                )

                self.usde = self.provider.eth.contract(
                    address=usde_address,
                    abi=usde_abi,
                )
        self.rpc_config = rpc_config

    @property
    def exchange_contract(self):
        return self.provider.eth.contract(
            address=self.rpc_config.domain.verifying_contract,
            abi=read_contract(self.chain_id, "ExchangeGateway"),
        )

    def _setup_provider(self):
        """Set up the Web3 provider.

        Returns:
            Web3: The Web3 provider instance

        Raises:
            Exception: If RPC URL is not specified in the configuration
        """
        # TODO: Support other provider types (e.g. WebSocket)
        if self.config.rpc_url is None:
            raise Exception("RPC URL must be specified in the configuration")
        return Web3(Web3.HTTPProvider(self.config.rpc_url))

    def _setup_account(self):
        """Set up the account.

        Returns:
            Account: The Web3 account instance

        Raises:
            Exception: If private key is not specified in the configuration
        """
        if self.config.private_key is None:
            self.logger.debug("Private key not specified in the configuration")
            return None

        account = self.provider.eth.account.from_key(self.config.private_key)
        if not account:
            raise Exception("Failed to create account from private key")
        if self.config.address and account.address != self.config.address:
            raise Exception(
                f"Private key does not match address specified in the config: {self.config.address}"
            )
        return account

    def _get_tx(self, value=0, to=None) -> TxParams:
        """Get default transaction parameters.

        Args:
            value (int, optional): The value to send. Defaults to 0.
            to (str, optional): The recipient address. Defaults to None.

        Returns:
            TxParams: The transaction parameters
        """
        params: TxParams = {
            "from": self.address,
            "chainId": self.chain_id,
            "value": value,
            "nonce": Nonce(self.get_nonce(self.address)),
        }
        if to is not None:
            params["to"] = to
        return params

    def _decode_error(self, error: ContractCustomError) -> str:
        abi_errors = {
            self.provider.to_hex(function_abi_to_4byte_selector(f)): f.get("name")
            for f in self.exchange_contract.abi
            if f.get("type") == "error"
        }
        data = error.data
        error_signature = data
        return abi_errors.get(error_signature, "Unknown error")

    def _get_by_alias(self, model: BaseModel, alias: str):
        """
        Get a field value by its alias from a Pydantic model.

        Args:
            model (BaseModel): The Pydantic model instance to extract the field from.
            alias (str): The alias of the field to retrieve.

        Returns:
            Any: The value of the field with the specified alias.

        Raises:
            KeyError: If the alias is not found in the model.
        """
        for field_name, field in model.__class__.model_fields.items():
            if field.alias == alias:
                return getattr(model, field_name)
        raise KeyError(f"Alias '{alias}' not found in model {model.__class__.__name__}")

    def get_signature_types(self, rpc_config: RpcConfigDto, primary_type: str):
        """Gets EIP-712 signature types.

        Args:
            rpc_config (RpcConfigDto): RPC configuration.
            primary_type (str): Primary type for the signature.

        Returns:
            dict: Dictionary containing signature type definitions.
        """
        return {
            "EIP712Domain": [
                {"name": "name", "type": "string"},
                {"name": "version", "type": "string"},
                {"name": "chainId", "type": "uint256"},
                {"name": "verifyingContract", "type": "address"},
            ],
            primary_type: self.convert_types(
                self._get_by_alias(rpc_config.signature_types, primary_type),
            ),
        }

    def convert_types(self, type_string: str) -> List[Dict[str, str]]:
        """Converts type string into EIP-712 field format.

        Args:
            type_string (str): String containing type definitions.

        Returns:
            List[Dict[str, str]]: List of field definitions.
        """
        fields = [comp.strip() for comp in type_string.split(",")]
        type_fields = []
        for field in fields:
            field_type, field_name = field.rsplit(" ", 1)
            type_fields.append({"name": field_name, "type": field_type})
        return type_fields

    def add_gas_fees(self, tx: TxParams) -> TxParams:
        """Add gas fee parameters to a transaction.

        Args:
            tx (TxParams): The transaction parameters

        Returns:
            TxParams: The transaction parameters with gas fee parameters added
        """
        if "maxFeePerGas" in tx and "maxPriorityFeePerGas" in tx:
            return tx
        try:
            gas_price = self.provider.eth.gas_price
            max_priority_fee = self.provider.eth.max_priority_fee
            tx["maxFeePerGas"] = gas_price
            tx["maxPriorityFeePerGas"] = max_priority_fee
            return tx
        except Web3Exception as e:
            self.logger.error(f"Failed to add gas: {e}")
            return tx

    def add_gas_limit(self, tx: TxParams) -> TxParams:
        """Add gas limit to a transaction.

        Args:
            tx (TxParams): The transaction parameters

        Returns:
            TxParams: The transaction parameters with gas limit added
        """
        if "gas" in tx:
            return tx
        try:
            gas = self.provider.eth.estimate_gas(tx)
            tx["gas"] = gas
            return tx
        except Web3Exception as e:
            self.logger.error(
                f"Failed to add gas limit: {self._decode_error(e) if isinstance(e, ContractCustomError) else e}"
            )
            raise e

    def submit_tx(self, tx: TxParams) -> str:
        """Submit a transaction.

        Args:
            tx (TxParams): The transaction parameters

        Returns:
            str: The transaction hash
        """
        tx = self.add_gas_fees(tx)
        tx = self.add_gas_limit(tx)
        try:
            signed_tx = self.provider.eth.account.sign_transaction(
                tx, private_key=self.private_key
            )
            tx_hash = self.provider.eth.send_raw_transaction(signed_tx.raw_transaction)
            return encode_hex(tx_hash)
        except Web3Exception as e:
            self.logger.error(f"Failed to submit transaction: {e}")
            raise e

    def get_nonce(self, address: str) -> int:
        """Get the nonce for a given address.

        Args:
            address (str): The address to get the nonce for

        Returns:
            int: The nonce, or -1 if failed
        """
        try:
            return self.provider.eth.get_transaction_count(address)
        except Web3Exception as e:
            self.logger.error(f"Failed to get nonce: {e}")
            return -1

    def get_balance(self, address: str) -> int:
        """Get the balance for a given address.

        Args:
            address (str): The address to get the balance for

        Returns:
            int: The balance, or -1 if failed
        """
        try:
            return self.provider.eth.get_balance(address)
        except Web3Exception as e:
            self.logger.error(f"Failed to get balance: {e}")
            return -1

    def get_token_balance(self, address: str, token_address: str) -> int:
        """Get the token balance for a given address.

        Args:
            address (str): The address to get the token balance for
            token_address (str): The token address

        Returns:
            int: The token balance, or -1 if failed
        """
        try:
            contract = self.provider.eth.contract(
                address=token_address,
                abi=read_contract(self.chain_id, "ERC20", common=True),
            )
            return contract.functions.balanceOf(address).call()
        except Web3Exception as e:
            self.logger.error(f"Failed to get token balance: {e}")
            return -1

    def sign_message(
        self,
        private_key: str,
        domain: dict,
        types: dict,
        primary_type: str,
        message: dict,
    ):
        """Sign an EIP-712 typed data message.

        Args:
            private_key (str): private key to sign the message with
            domain (dict): domain parameters including name, version, chainId, and verifyingContract
            types (dict): type definitions for the structured data
            primary_type (str): primary type for the signature
            message (dict): message data to be signed

        Returns:
            str: the hexadecimal signature string prefixed with '0x'
        """
        # A type fix for the domain
        domain["chainId"] = int(domain["chainId"])

        # Preparing the full message as per EIP-712
        full_message = {
            "types": types,
            "primaryType": primary_type,
            "domain": domain,
            "message": message,
        }

        encoded_message = encode_typed_data(full_message=full_message)

        # Signing the message
        signed_message = Account.sign_message(encoded_message, private_key)
        return "0x" + signed_message.signature.hex()

    def deposit_usde(
        self,
        amount: float,
        address: Optional[str] = None,
        submit: Optional[bool] = False,
        account_name: Optional[str] = "primary",
    ) -> Union[TxParams, str]:
        """Submit a deposit transaction.

        Args:
            amount (float): The amount to deposit
            address (str, optional): The address to deposit to. Defaults to None.
            submit (bool, optional): Whether to submit the transaction. Defaults to False.
            account_name (str, optional): The account name. Defaults to "primary".

        Returns:
            Union[TxParams, str]: The transaction parameters or transaction hash if submit=True
        """
        if address is None:
            address = self.address
        try:
            # params
            subaccount = self.provider.to_hex(text=account_name).ljust(66, "0")
            amount = self.provider.to_wei(amount, "ether")
            referral_code = self.provider.to_hex(0).ljust(66, "0")

            # prepare the tx
            tx = self._get_tx(to=self.exchange_contract.address, value=amount)
            tx["data"] = self.exchange_contract.encode_abi(
                "depositUsd", args=[subaccount, referral_code]
            )

            if submit:
                return self.submit_tx(tx)
            else:
                return tx

        except Web3Exception as e:
            self.logger.error(
                f"Failed to prepare deposit transaction: {self._decode_error(e) if isinstance(e, ContractCustomError) else e}"
            )
            raise e

    def finalize_withdraw(
        self,
        address: Optional[str] = None,
        submit: Optional[bool] = False,
        account_name: Optional[str] = "primary",
    ) -> Union[TxParams, str]:
        """Finalize a withdrawal.

        Args:
            address (str, optional): The address to deposit to. Defaults to None.
            submit (bool, optional): Whether to submit the transaction. Defaults to False.
            account_name (str, optional): The name of the account. Defaults to "primary".

        Returns:
            Union[TxParams, str]: The transaction parameters or transaction hash if submit=True
        """
        if address is None:
            address = self.address
        try:
            # params
            subaccount = self.provider.to_hex(text=account_name).ljust(66, "0")

            # prepare the tx
            tx = self._get_tx(to=self.exchange_contract.address)
            tx["data"] = self.exchange_contract.encode_abi(
                "finalizeWithdraw", args=[address, subaccount]
            )

            if submit:
                return self.submit_tx(tx)
            else:
                return tx

        except Web3Exception as e:
            self.logger.error(
                f"Failed to prepare finalizeWithdraw transaction: {self._decode_error(e) if isinstance(e, ContractCustomError) else e}"
            )
            raise e

add_gas_fees(tx)

Add gas fee parameters to a transaction.

Parameters:

Name Type Description Default
tx TxParams

The transaction parameters

required

Returns:

Name Type Description
TxParams TxParams

The transaction parameters with gas fee parameters added

Source code in ethereal/chain_client.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def add_gas_fees(self, tx: TxParams) -> TxParams:
    """Add gas fee parameters to a transaction.

    Args:
        tx (TxParams): The transaction parameters

    Returns:
        TxParams: The transaction parameters with gas fee parameters added
    """
    if "maxFeePerGas" in tx and "maxPriorityFeePerGas" in tx:
        return tx
    try:
        gas_price = self.provider.eth.gas_price
        max_priority_fee = self.provider.eth.max_priority_fee
        tx["maxFeePerGas"] = gas_price
        tx["maxPriorityFeePerGas"] = max_priority_fee
        return tx
    except Web3Exception as e:
        self.logger.error(f"Failed to add gas: {e}")
        return tx

add_gas_limit(tx)

Add gas limit to a transaction.

Parameters:

Name Type Description Default
tx TxParams

The transaction parameters

required

Returns:

Name Type Description
TxParams TxParams

The transaction parameters with gas limit added

Source code in ethereal/chain_client.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def add_gas_limit(self, tx: TxParams) -> TxParams:
    """Add gas limit to a transaction.

    Args:
        tx (TxParams): The transaction parameters

    Returns:
        TxParams: The transaction parameters with gas limit added
    """
    if "gas" in tx:
        return tx
    try:
        gas = self.provider.eth.estimate_gas(tx)
        tx["gas"] = gas
        return tx
    except Web3Exception as e:
        self.logger.error(
            f"Failed to add gas limit: {self._decode_error(e) if isinstance(e, ContractCustomError) else e}"
        )
        raise e

convert_types(type_string)

Converts type string into EIP-712 field format.

Parameters:

Name Type Description Default
type_string str

String containing type definitions.

required

Returns:

Type Description
List[Dict[str, str]]

List[Dict[str, str]]: List of field definitions.

Source code in ethereal/chain_client.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def convert_types(self, type_string: str) -> List[Dict[str, str]]:
    """Converts type string into EIP-712 field format.

    Args:
        type_string (str): String containing type definitions.

    Returns:
        List[Dict[str, str]]: List of field definitions.
    """
    fields = [comp.strip() for comp in type_string.split(",")]
    type_fields = []
    for field in fields:
        field_type, field_name = field.rsplit(" ", 1)
        type_fields.append({"name": field_name, "type": field_type})
    return type_fields

deposit_usde(amount, address=None, submit=False, account_name='primary')

Submit a deposit transaction.

Parameters:

Name Type Description Default
amount float

The amount to deposit

required
address str

The address to deposit to. Defaults to None.

None
submit bool

Whether to submit the transaction. Defaults to False.

False
account_name str

The account name. Defaults to "primary".

'primary'

Returns:

Type Description
Union[TxParams, str]

Union[TxParams, str]: The transaction parameters or transaction hash if submit=True

Source code in ethereal/chain_client.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def deposit_usde(
    self,
    amount: float,
    address: Optional[str] = None,
    submit: Optional[bool] = False,
    account_name: Optional[str] = "primary",
) -> Union[TxParams, str]:
    """Submit a deposit transaction.

    Args:
        amount (float): The amount to deposit
        address (str, optional): The address to deposit to. Defaults to None.
        submit (bool, optional): Whether to submit the transaction. Defaults to False.
        account_name (str, optional): The account name. Defaults to "primary".

    Returns:
        Union[TxParams, str]: The transaction parameters or transaction hash if submit=True
    """
    if address is None:
        address = self.address
    try:
        # params
        subaccount = self.provider.to_hex(text=account_name).ljust(66, "0")
        amount = self.provider.to_wei(amount, "ether")
        referral_code = self.provider.to_hex(0).ljust(66, "0")

        # prepare the tx
        tx = self._get_tx(to=self.exchange_contract.address, value=amount)
        tx["data"] = self.exchange_contract.encode_abi(
            "depositUsd", args=[subaccount, referral_code]
        )

        if submit:
            return self.submit_tx(tx)
        else:
            return tx

    except Web3Exception as e:
        self.logger.error(
            f"Failed to prepare deposit transaction: {self._decode_error(e) if isinstance(e, ContractCustomError) else e}"
        )
        raise e

finalize_withdraw(address=None, submit=False, account_name='primary')

Finalize a withdrawal.

Parameters:

Name Type Description Default
address str

The address to deposit to. Defaults to None.

None
submit bool

Whether to submit the transaction. Defaults to False.

False
account_name str

The name of the account. Defaults to "primary".

'primary'

Returns:

Type Description
Union[TxParams, str]

Union[TxParams, str]: The transaction parameters or transaction hash if submit=True

Source code in ethereal/chain_client.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
def finalize_withdraw(
    self,
    address: Optional[str] = None,
    submit: Optional[bool] = False,
    account_name: Optional[str] = "primary",
) -> Union[TxParams, str]:
    """Finalize a withdrawal.

    Args:
        address (str, optional): The address to deposit to. Defaults to None.
        submit (bool, optional): Whether to submit the transaction. Defaults to False.
        account_name (str, optional): The name of the account. Defaults to "primary".

    Returns:
        Union[TxParams, str]: The transaction parameters or transaction hash if submit=True
    """
    if address is None:
        address = self.address
    try:
        # params
        subaccount = self.provider.to_hex(text=account_name).ljust(66, "0")

        # prepare the tx
        tx = self._get_tx(to=self.exchange_contract.address)
        tx["data"] = self.exchange_contract.encode_abi(
            "finalizeWithdraw", args=[address, subaccount]
        )

        if submit:
            return self.submit_tx(tx)
        else:
            return tx

    except Web3Exception as e:
        self.logger.error(
            f"Failed to prepare finalizeWithdraw transaction: {self._decode_error(e) if isinstance(e, ContractCustomError) else e}"
        )
        raise e

get_balance(address)

Get the balance for a given address.

Parameters:

Name Type Description Default
address str

The address to get the balance for

required

Returns:

Name Type Description
int int

The balance, or -1 if failed

Source code in ethereal/chain_client.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def get_balance(self, address: str) -> int:
    """Get the balance for a given address.

    Args:
        address (str): The address to get the balance for

    Returns:
        int: The balance, or -1 if failed
    """
    try:
        return self.provider.eth.get_balance(address)
    except Web3Exception as e:
        self.logger.error(f"Failed to get balance: {e}")
        return -1

get_nonce(address)

Get the nonce for a given address.

Parameters:

Name Type Description Default
address str

The address to get the nonce for

required

Returns:

Name Type Description
int int

The nonce, or -1 if failed

Source code in ethereal/chain_client.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def get_nonce(self, address: str) -> int:
    """Get the nonce for a given address.

    Args:
        address (str): The address to get the nonce for

    Returns:
        int: The nonce, or -1 if failed
    """
    try:
        return self.provider.eth.get_transaction_count(address)
    except Web3Exception as e:
        self.logger.error(f"Failed to get nonce: {e}")
        return -1

get_signature_types(rpc_config, primary_type)

Gets EIP-712 signature types.

Parameters:

Name Type Description Default
rpc_config RpcConfigDto

RPC configuration.

required
primary_type str

Primary type for the signature.

required

Returns:

Name Type Description
dict

Dictionary containing signature type definitions.

Source code in ethereal/chain_client.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def get_signature_types(self, rpc_config: RpcConfigDto, primary_type: str):
    """Gets EIP-712 signature types.

    Args:
        rpc_config (RpcConfigDto): RPC configuration.
        primary_type (str): Primary type for the signature.

    Returns:
        dict: Dictionary containing signature type definitions.
    """
    return {
        "EIP712Domain": [
            {"name": "name", "type": "string"},
            {"name": "version", "type": "string"},
            {"name": "chainId", "type": "uint256"},
            {"name": "verifyingContract", "type": "address"},
        ],
        primary_type: self.convert_types(
            self._get_by_alias(rpc_config.signature_types, primary_type),
        ),
    }

get_token_balance(address, token_address)

Get the token balance for a given address.

Parameters:

Name Type Description Default
address str

The address to get the token balance for

required
token_address str

The token address

required

Returns:

Name Type Description
int int

The token balance, or -1 if failed

Source code in ethereal/chain_client.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def get_token_balance(self, address: str, token_address: str) -> int:
    """Get the token balance for a given address.

    Args:
        address (str): The address to get the token balance for
        token_address (str): The token address

    Returns:
        int: The token balance, or -1 if failed
    """
    try:
        contract = self.provider.eth.contract(
            address=token_address,
            abi=read_contract(self.chain_id, "ERC20", common=True),
        )
        return contract.functions.balanceOf(address).call()
    except Web3Exception as e:
        self.logger.error(f"Failed to get token balance: {e}")
        return -1

sign_message(private_key, domain, types, primary_type, message)

Sign an EIP-712 typed data message.

Parameters:

Name Type Description Default
private_key str

private key to sign the message with

required
domain dict

domain parameters including name, version, chainId, and verifyingContract

required
types dict

type definitions for the structured data

required
primary_type str

primary type for the signature

required
message dict

message data to be signed

required

Returns:

Name Type Description
str

the hexadecimal signature string prefixed with '0x'

Source code in ethereal/chain_client.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def sign_message(
    self,
    private_key: str,
    domain: dict,
    types: dict,
    primary_type: str,
    message: dict,
):
    """Sign an EIP-712 typed data message.

    Args:
        private_key (str): private key to sign the message with
        domain (dict): domain parameters including name, version, chainId, and verifyingContract
        types (dict): type definitions for the structured data
        primary_type (str): primary type for the signature
        message (dict): message data to be signed

    Returns:
        str: the hexadecimal signature string prefixed with '0x'
    """
    # A type fix for the domain
    domain["chainId"] = int(domain["chainId"])

    # Preparing the full message as per EIP-712
    full_message = {
        "types": types,
        "primaryType": primary_type,
        "domain": domain,
        "message": message,
    }

    encoded_message = encode_typed_data(full_message=full_message)

    # Signing the message
    signed_message = Account.sign_message(encoded_message, private_key)
    return "0x" + signed_message.signature.hex()

submit_tx(tx)

Submit a transaction.

Parameters:

Name Type Description Default
tx TxParams

The transaction parameters

required

Returns:

Name Type Description
str str

The transaction hash

Source code in ethereal/chain_client.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def submit_tx(self, tx: TxParams) -> str:
    """Submit a transaction.

    Args:
        tx (TxParams): The transaction parameters

    Returns:
        str: The transaction hash
    """
    tx = self.add_gas_fees(tx)
    tx = self.add_gas_limit(tx)
    try:
        signed_tx = self.provider.eth.account.sign_transaction(
            tx, private_key=self.private_key
        )
        tx_hash = self.provider.eth.send_raw_transaction(signed_tx.raw_transaction)
        return encode_hex(tx_hash)
    except Web3Exception as e:
        self.logger.error(f"Failed to submit transaction: {e}")
        raise e

ethereal.rest.http_client.HTTPClient

Bases: BaseClient

HTTP client for making API requests.

Parameters:

Name Type Description Default
config Union[Dict[str, Any], HTTPConfig]

Client configuration.

required
Source code in ethereal/rest/http_client.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
class HTTPClient(BaseClient):
    """HTTP client for making API requests.

    Args:
        config (Union[Dict[str, Any], HTTPConfig]): Client configuration.
    """

    def __init__(self, config: Union[Dict[str, Any], HTTPConfig]):
        super().__init__(config)
        self.config = HTTPConfig.model_validate(config)
        self.base_url = self.config.base_url
        self.timeout = self.config.timeout
        self.session = self._setup_session()
        self.rate_limit_headers = self.config.rate_limit_headers

    def _setup_session(self):
        """Sets up an HTTP session.

        Returns:
            requests.Session: Configured session object.
        """
        return requests.Session()

    def _handle_exception(self, response):
        """Handles HTTP exceptions.

        Args:
            response (Response): The HTTP response object.

        Raises:
            HTTPError: If response indicates an error occurred.
        """
        http_error_msg = ""
        reason = response.reason

        if 400 <= response.status_code < 500:
            if (
                response.status_code == 403
                and "'error_details':'Missing required scopes'" in response.text
            ):
                http_error_msg = f"{response.status_code} Client Error: Missing Required Scopes. Please verify your API keys include the necessary permissions."
            else:
                http_error_msg = (
                    f"{response.status_code} Client Error: {reason} {response.text}"
                )
        elif 500 <= response.status_code < 600:
            http_error_msg = (
                f"{response.status_code} Server Error: {reason} {response.text}"
            )

        if http_error_msg:
            self.logger.error(f"HTTP Error: {http_error_msg}")
            raise HTTPError(http_error_msg, response=response)

    def get(self, url_path, params: Optional[dict] = None, **kwargs) -> Dict[str, Any]:
        """Sends a GET request.

        Args:
            url_path (str): The URL path. Required.
            params (dict, optional): The query parameters. Optional.
            **kwargs: Additional arguments to pass to the request. Optional.

        Returns:
            Dict[str, Any]: The response data.
        """
        params = params or {}

        if kwargs:
            params.update(kwargs)

        return self.prepare_and_send_request("GET", url_path, params, data=None)

    def get_validated(
        self,
        url_path,
        request_model: Type[BaseModel],
        response_model: Type[BaseModel],
        **kwargs,
    ) -> BaseModel:
        """Sends a GET request including type validation of both the input and output from provided models.

        Args:
            url_path (str): The URL path. Required.
            request_model (Type[BaseModel]): Pydantic model for request validation. Required.
            response_model (Type[BaseModel]): Pydantic model for response validation. Required.
            **kwargs: Includes all arguments to pass to the request. Optional.

        Returns:
            BaseModel: The response data, validated against the response_model.
        """
        try:
            validated_params = request_model.model_validate(kwargs, by_name=True)
        except ValidationError as e:
            raise ValidationException(
                "Request", url_path, request_model, e.errors()
            ) from None
        params = validated_params.model_dump(
            mode="json", exclude_none=True, by_alias=True
        )

        response_data = self.prepare_and_send_request(
            "GET", url_path, params, data=None
        )
        validated_response = response_model.model_validate(response_data)
        return validated_response

    def post(
        self,
        url_path,
        params: Optional[dict] = None,
        data: Optional[dict] = None,
        **kwargs,
    ) -> Dict[str, Any]:
        """Sends a POST request.

        Args:
            url_path (str): The URL path. Required.
            params (dict, optional): The query parameters. Optional.
            data (dict, optional): The request body. Optional.
            **kwargs: Additional arguments to pass to the request. Optional.

        Returns:
            Dict[str, Any]: The response data.
        """
        data = data or {}

        if kwargs:
            data.update(kwargs)

        return self.prepare_and_send_request("POST", url_path, params, data)

    def put(
        self,
        url_path,
        params: Optional[dict] = None,
        data: Optional[dict] = None,
        **kwargs,
    ) -> Dict[str, Any]:
        """Sends a PUT request.

        Args:
            url_path (str): The URL path. Required.
            params (dict, optional): The query parameters. Optional.
            data (dict, optional): The request body. Optional.
            **kwargs: Additional arguments to pass to the request. Optional.

        Returns:
            Dict[str, Any]: The response data.
        """
        data = data or {}

        if kwargs:
            data.update(kwargs)

        return self.prepare_and_send_request("PUT", url_path, params, data)

    def delete(
        self,
        url_path,
        params: Optional[dict] = None,
        data: Optional[dict] = None,
        **kwargs,
    ) -> Dict[str, Any]:
        """Sends a DELETE request.

        Args:
            url_path (str): The URL path. Required.
            params (dict, optional): The query parameters. Optional.
            data (dict, optional): The request body. Optional.
            **kwargs: Additional arguments to pass to the request. Optional.

        Returns:
            Dict[str, Any]: The response data.
        """
        data = data or {}

        if kwargs:
            data.update(kwargs)

        return self.prepare_and_send_request("DELETE", url_path, params, data)

    def prepare_and_send_request(
        self,
        http_method,
        url_path,
        params: Optional[dict] = None,
        data: Optional[dict] = None,
    ):
        """Prepares and sends an HTTP request.

        Args:
            http_method (str): The HTTP method. Required.
            url_path (str): The URL path. Required.
            params (dict, optional): The query parameters. Optional.
            data (dict, optional): The request body. Optional.

        Returns:
            Dict[str, Any]: The response data.
        """
        headers = self.set_headers(http_method, url_path)

        if params is not None:
            params = {
                key: str(value).lower() if isinstance(value, bool) else value
                for key, value in params.items()
                if value is not None
            }

        if data is not None:
            data = {key: value for key, value in data.items() if value is not None}

        return self.send_request(http_method, url_path, params, headers, data=data)

    def send_request(self, http_method, url_path, params, headers, data=None):
        """Sends an HTTP request.

        Args:
            http_method (str): The HTTP method. Required.
            url_path (str): The URL path. Required.
            params (dict): The query parameters. Required.
            headers (dict): The request headers. Required.
            data (dict, optional): The request body. Optional.

        Returns:
            Dict[str, Any]: The response data.

        Raises:
            HTTPError: If the request fails.
        """
        if data is None:
            data = {}

        url = f"{self.base_url}{url_path}"

        self.logger.debug(f"Sending {http_method} request to {url}")

        response = self.session.request(
            http_method,
            url,
            params=params,
            json=data,
            headers=headers,
            timeout=self.timeout,
        )
        self._handle_exception(response)  # Raise an HTTPError for bad responses

        self.logger.debug(f"Raw response: {response.json()}")

        response_data = response.json()

        if self.rate_limit_headers:
            response_headers = dict(response.headers)
            specific_headers = {
                REST_COMMON_FIELDS.get(key, key): response_headers.get(key, None)
                for key in RATE_LIMIT_HEADERS
            }

            response_data = {**response_data, **specific_headers}

        return response_data

    def set_headers(self, method, path):
        """Sets the request headers.

        Args:
            method (str): The HTTP method. Required.
            path (str): The URL path. Required.

        Returns:
            dict: The request headers.
        """

        return {
            "User-Agent": USER_AGENT,
            "Content-Type": "application/json",
        }

delete(url_path, params=None, data=None, **kwargs)

Sends a DELETE request.

Parameters:

Name Type Description Default
url_path str

The URL path. Required.

required
params dict

The query parameters. Optional.

None
data dict

The request body. Optional.

None
**kwargs

Additional arguments to pass to the request. Optional.

{}

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The response data.

Source code in ethereal/rest/http_client.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def delete(
    self,
    url_path,
    params: Optional[dict] = None,
    data: Optional[dict] = None,
    **kwargs,
) -> Dict[str, Any]:
    """Sends a DELETE request.

    Args:
        url_path (str): The URL path. Required.
        params (dict, optional): The query parameters. Optional.
        data (dict, optional): The request body. Optional.
        **kwargs: Additional arguments to pass to the request. Optional.

    Returns:
        Dict[str, Any]: The response data.
    """
    data = data or {}

    if kwargs:
        data.update(kwargs)

    return self.prepare_and_send_request("DELETE", url_path, params, data)

get(url_path, params=None, **kwargs)

Sends a GET request.

Parameters:

Name Type Description Default
url_path str

The URL path. Required.

required
params dict

The query parameters. Optional.

None
**kwargs

Additional arguments to pass to the request. Optional.

{}

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The response data.

Source code in ethereal/rest/http_client.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def get(self, url_path, params: Optional[dict] = None, **kwargs) -> Dict[str, Any]:
    """Sends a GET request.

    Args:
        url_path (str): The URL path. Required.
        params (dict, optional): The query parameters. Optional.
        **kwargs: Additional arguments to pass to the request. Optional.

    Returns:
        Dict[str, Any]: The response data.
    """
    params = params or {}

    if kwargs:
        params.update(kwargs)

    return self.prepare_and_send_request("GET", url_path, params, data=None)

get_validated(url_path, request_model, response_model, **kwargs)

Sends a GET request including type validation of both the input and output from provided models.

Parameters:

Name Type Description Default
url_path str

The URL path. Required.

required
request_model Type[BaseModel]

Pydantic model for request validation. Required.

required
response_model Type[BaseModel]

Pydantic model for response validation. Required.

required
**kwargs

Includes all arguments to pass to the request. Optional.

{}

Returns:

Name Type Description
BaseModel BaseModel

The response data, validated against the response_model.

Source code in ethereal/rest/http_client.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def get_validated(
    self,
    url_path,
    request_model: Type[BaseModel],
    response_model: Type[BaseModel],
    **kwargs,
) -> BaseModel:
    """Sends a GET request including type validation of both the input and output from provided models.

    Args:
        url_path (str): The URL path. Required.
        request_model (Type[BaseModel]): Pydantic model for request validation. Required.
        response_model (Type[BaseModel]): Pydantic model for response validation. Required.
        **kwargs: Includes all arguments to pass to the request. Optional.

    Returns:
        BaseModel: The response data, validated against the response_model.
    """
    try:
        validated_params = request_model.model_validate(kwargs, by_name=True)
    except ValidationError as e:
        raise ValidationException(
            "Request", url_path, request_model, e.errors()
        ) from None
    params = validated_params.model_dump(
        mode="json", exclude_none=True, by_alias=True
    )

    response_data = self.prepare_and_send_request(
        "GET", url_path, params, data=None
    )
    validated_response = response_model.model_validate(response_data)
    return validated_response

post(url_path, params=None, data=None, **kwargs)

Sends a POST request.

Parameters:

Name Type Description Default
url_path str

The URL path. Required.

required
params dict

The query parameters. Optional.

None
data dict

The request body. Optional.

None
**kwargs

Additional arguments to pass to the request. Optional.

{}

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The response data.

Source code in ethereal/rest/http_client.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def post(
    self,
    url_path,
    params: Optional[dict] = None,
    data: Optional[dict] = None,
    **kwargs,
) -> Dict[str, Any]:
    """Sends a POST request.

    Args:
        url_path (str): The URL path. Required.
        params (dict, optional): The query parameters. Optional.
        data (dict, optional): The request body. Optional.
        **kwargs: Additional arguments to pass to the request. Optional.

    Returns:
        Dict[str, Any]: The response data.
    """
    data = data or {}

    if kwargs:
        data.update(kwargs)

    return self.prepare_and_send_request("POST", url_path, params, data)

prepare_and_send_request(http_method, url_path, params=None, data=None)

Prepares and sends an HTTP request.

Parameters:

Name Type Description Default
http_method str

The HTTP method. Required.

required
url_path str

The URL path. Required.

required
params dict

The query parameters. Optional.

None
data dict

The request body. Optional.

None

Returns:

Type Description

Dict[str, Any]: The response data.

Source code in ethereal/rest/http_client.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def prepare_and_send_request(
    self,
    http_method,
    url_path,
    params: Optional[dict] = None,
    data: Optional[dict] = None,
):
    """Prepares and sends an HTTP request.

    Args:
        http_method (str): The HTTP method. Required.
        url_path (str): The URL path. Required.
        params (dict, optional): The query parameters. Optional.
        data (dict, optional): The request body. Optional.

    Returns:
        Dict[str, Any]: The response data.
    """
    headers = self.set_headers(http_method, url_path)

    if params is not None:
        params = {
            key: str(value).lower() if isinstance(value, bool) else value
            for key, value in params.items()
            if value is not None
        }

    if data is not None:
        data = {key: value for key, value in data.items() if value is not None}

    return self.send_request(http_method, url_path, params, headers, data=data)

put(url_path, params=None, data=None, **kwargs)

Sends a PUT request.

Parameters:

Name Type Description Default
url_path str

The URL path. Required.

required
params dict

The query parameters. Optional.

None
data dict

The request body. Optional.

None
**kwargs

Additional arguments to pass to the request. Optional.

{}

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The response data.

Source code in ethereal/rest/http_client.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def put(
    self,
    url_path,
    params: Optional[dict] = None,
    data: Optional[dict] = None,
    **kwargs,
) -> Dict[str, Any]:
    """Sends a PUT request.

    Args:
        url_path (str): The URL path. Required.
        params (dict, optional): The query parameters. Optional.
        data (dict, optional): The request body. Optional.
        **kwargs: Additional arguments to pass to the request. Optional.

    Returns:
        Dict[str, Any]: The response data.
    """
    data = data or {}

    if kwargs:
        data.update(kwargs)

    return self.prepare_and_send_request("PUT", url_path, params, data)

send_request(http_method, url_path, params, headers, data=None)

Sends an HTTP request.

Parameters:

Name Type Description Default
http_method str

The HTTP method. Required.

required
url_path str

The URL path. Required.

required
params dict

The query parameters. Required.

required
headers dict

The request headers. Required.

required
data dict

The request body. Optional.

None

Returns:

Type Description

Dict[str, Any]: The response data.

Raises:

Type Description
HTTPError

If the request fails.

Source code in ethereal/rest/http_client.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def send_request(self, http_method, url_path, params, headers, data=None):
    """Sends an HTTP request.

    Args:
        http_method (str): The HTTP method. Required.
        url_path (str): The URL path. Required.
        params (dict): The query parameters. Required.
        headers (dict): The request headers. Required.
        data (dict, optional): The request body. Optional.

    Returns:
        Dict[str, Any]: The response data.

    Raises:
        HTTPError: If the request fails.
    """
    if data is None:
        data = {}

    url = f"{self.base_url}{url_path}"

    self.logger.debug(f"Sending {http_method} request to {url}")

    response = self.session.request(
        http_method,
        url,
        params=params,
        json=data,
        headers=headers,
        timeout=self.timeout,
    )
    self._handle_exception(response)  # Raise an HTTPError for bad responses

    self.logger.debug(f"Raw response: {response.json()}")

    response_data = response.json()

    if self.rate_limit_headers:
        response_headers = dict(response.headers)
        specific_headers = {
            REST_COMMON_FIELDS.get(key, key): response_headers.get(key, None)
            for key in RATE_LIMIT_HEADERS
        }

        response_data = {**response_data, **specific_headers}

    return response_data

set_headers(method, path)

Sets the request headers.

Parameters:

Name Type Description Default
method str

The HTTP method. Required.

required
path str

The URL path. Required.

required

Returns:

Name Type Description
dict

The request headers.

Source code in ethereal/rest/http_client.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def set_headers(self, method, path):
    """Sets the request headers.

    Args:
        method (str): The HTTP method. Required.
        path (str): The URL path. Required.

    Returns:
        dict: The request headers.
    """

    return {
        "User-Agent": USER_AGENT,
        "Content-Type": "application/json",
    }