Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added uniswap clones endpoint #2

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions source/covalent_api/constants/maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,12 @@
'Fuji C-Chain Testnet': '43113',
'Fantom Opera Mainnet': '250'
}

#Available Dexes
AVAILABLE_DEXES = {
'sushiswap','sushiswap',
'quickswap','quickswap',
'pangolin','pangolin',
'spiritswap','spiritswap',
'spookyswap','spookyswap',
}
247 changes: 247 additions & 0 deletions source/covalent_api/dex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
from covalent_api import constants

class Dex(object):

@property
def session(self):
return self._session

def __init__(self, session):
self._session = session

# https://api.covalenthq.com/v1/{chain_id}/xy=k/{dexname}/health/
def health(self, chain_id,dexname):
if chain_id not in list(constants.AVAILABLE_CHAIN_IDS.values()):
raise Exception(
"chain_id should be one of {}".format(
list(constants.AVAILABLE_CHAIN_IDS.values())
)
)
if dexname not in list(constants.AVAILABLE_DEXES.values()):
raise Exception(
"dexname should be one of {}".format(
list(constants.AVAILABLE_DEXES.values())
)
)
method_url = (
'/v1/{chain_id}/xy=k/{dexname}/health/'.format(
chain_id=chain_id,
dexname=dexname
)
)
decode = format == 'json'
result = self.session.query(method_url, decode=decode)
return result
# https://api.covalenthq.com/v1/{chain_id}/xy=k/{dexname}/ecosystem/
def ecosystem(self, chain_id,dexname):
if chain_id not in list(constants.AVAILABLE_CHAIN_IDS.values()):
raise Exception(
"chain_id should be one of {}".format(
list(constants.AVAILABLE_CHAIN_IDS.values())
)
)
if dexname not in list(constants.AVAILABLE_DEXES.values()):
raise Exception(
"dexname should be one of {}".format(
list(constants.AVAILABLE_DEXES.values())
)
)
method_url = (
'/v1/{chain_id}/xy=k/{dexname}/ecosystem/'.format(
chain_id=chain_id,
dexname=dexname
)
)
decode = format == 'json'
result = self.session.query(method_url, decode=decode)
return result
# https://api.covalenthq.com/v1/{chain_id}/xy=k/{dexname}/pools/
def pools(self, chain_id,dexname):
if chain_id not in list(constants.AVAILABLE_CHAIN_IDS.values()):
raise Exception(
"chain_id should be one of {}".format(
list(constants.AVAILABLE_CHAIN_IDS.values())
)
)
if dexname not in list(constants.AVAILABLE_DEXES.values()):
raise Exception(
"dexname should be one of {}".format(
list(constants.AVAILABLE_DEXES.values())
)
)
method_url = (
'/v1/{chain_id}/xy=k/{dexname}/pools/'.format(
chain_id=chain_id,
dexname=dexname
)
)
decode = format == 'json'
result = self.session.query(method_url, decode=decode)
return result
# https://api.covalenthq.com/v1/{chain_id}/xy=k/{dexname}/pools/address/{address}/
def poolDataByAddress(self, chain_id,dexname,address):
if chain_id not in list(constants.AVAILABLE_CHAIN_IDS.values()):
raise Exception(
"chain_id should be one of {}".format(
list(constants.AVAILABLE_CHAIN_IDS.values())
)
)
if dexname not in list(constants.AVAILABLE_DEXES.values()):
raise Exception(
"dexname should be one of {}".format(
list(constants.AVAILABLE_DEXES.values())
)
)
method_url = (
'v1/{chain_id}/xy=k/{dexname}/pools/address/{address}/'.format(
chain_id=chain_id,
dexname=dexname,
address=address
)
)
decode = format == 'json'
result = self.session.query(method_url, decode=decode)
return result
# https://api.covalenthq.com/v1/{chain_id}/xy=k/{dexname}/pools/address/{address}/transactions/
def poolTransactionsByAddress(self, chain_id,dexname,address):
if chain_id not in list(constants.AVAILABLE_CHAIN_IDS.values()):
raise Exception(
"chain_id should be one of {}".format(
list(constants.AVAILABLE_CHAIN_IDS.values())
)
)
if dexname not in list(constants.AVAILABLE_DEXES.values()):
raise Exception(
"dexname should be one of {}".format(
list(constants.AVAILABLE_DEXES.values())
)
)
method_url = (
'v1/{chain_id}/xy=k/{dexname}/pools/address/{address}/transactions/'.format(
chain_id=chain_id,
dexname=dexname,
address=address
)
)
decode = format == 'json'
result = self.session.query(method_url, decode=decode)
return result
# https://api.covalenthq.com/v1/{chain_id}/xy=k/{dexname}/tokens/
def tokens(self, chain_id,dexname):
if chain_id not in list(constants.AVAILABLE_CHAIN_IDS.values()):
raise Exception(
"chain_id should be one of {}".format(
list(constants.AVAILABLE_CHAIN_IDS.values())
)
)
if dexname not in list(constants.AVAILABLE_DEXES.values()):
raise Exception(
"dexname should be one of {}".format(
list(constants.AVAILABLE_DEXES.values())
)
)
method_url = (
'/v1/{chain_id}/xy=k/{dexname}/tokens/'.format(
chain_id=chain_id,
dexname=dexname
)
)
decode = format == 'json'
result = self.session.query(method_url, decode=decode)
return result
# https://api.covalenthq.com/v1/{chain_id}/xy=k/{dexname}/tokens/address/{address}/
def tokenDataByAddress(self, chain_id,dexname,address):
if chain_id not in list(constants.AVAILABLE_CHAIN_IDS.values()):
raise Exception(
"chain_id should be one of {}".format(
list(constants.AVAILABLE_CHAIN_IDS.values())
)
)
if dexname not in list(constants.AVAILABLE_DEXES.values()):
raise Exception(
"dexname should be one of {}".format(
list(constants.AVAILABLE_DEXES.values())
)
)
method_url = (
'v1/{chain_id}/xy=k/{dexname}/tokens/address/{address}/'.format(
chain_id=chain_id,
dexname=dexname,
address=address
)
)
decode = format == 'json'
result = self.session.query(method_url, decode=decode)
return result
# https://api.covalenthq.com/v1/{chain_id}/xy=k/{dexname}/tokens/address/{address}/transactions/
def tokenTransactionsByAddress(self, chain_id,dexname,address):
if chain_id not in list(constants.AVAILABLE_CHAIN_IDS.values()):
raise Exception(
"chain_id should be one of {}".format(
list(constants.AVAILABLE_CHAIN_IDS.values())
)
)
if dexname not in list(constants.AVAILABLE_DEXES.values()):
raise Exception(
"dexname should be one of {}".format(
list(constants.AVAILABLE_DEXES.values())
)
)
method_url = (
'v1/{chain_id}/xy=k/{dexname}/tokens/address/{address}/transactions/'.format(
chain_id=chain_id,
dexname=dexname,
address=address
)
)
decode = format == 'json'
result = self.session.query(method_url, decode=decode)
return result
# https://api.covalenthq.com/v1/{chain_id}/xy=k/{dexname}/address/{address}/balances/
def addressExchangesBalances(self, chain_id,dexname,address):
if chain_id not in list(constants.AVAILABLE_CHAIN_IDS.values()):
raise Exception(
"chain_id should be one of {}".format(
list(constants.AVAILABLE_CHAIN_IDS.values())
)
)
if dexname not in list(constants.AVAILABLE_DEXES.values()):
raise Exception(
"dexname should be one of {}".format(
list(constants.AVAILABLE_DEXES.values())
)
)
method_url = (
'v1/{chain_id}/xy=k/{dexname}/address/{address}/balances/'.format(
chain_id=chain_id,
dexname=dexname,
address=address
)
)
decode = format == 'json'
result = self.session.query(method_url, decode=decode)
return result
# https://api.covalenthq.com/v1/{chain_id}/xy=k/{dexname}/address/{address}/transactions/
def addressExchangesLiqudityTransactions(self, chain_id,dexname,address):
if chain_id not in list(constants.AVAILABLE_CHAIN_IDS.values()):
raise Exception(
"chain_id should be one of {}".format(
list(constants.AVAILABLE_CHAIN_IDS.values())
)
)
if dexname not in list(constants.AVAILABLE_DEXES.values()):
raise Exception(
"dexname should be one of {}".format(
list(constants.AVAILABLE_DEXES.values())
)
)
method_url = (
'v1/{chain_id}/xy=k/{dexname}/address/{address}/transactions/'.format(
chain_id=chain_id,
dexname=dexname,
address=address
)
)
decode = format == 'json'
result = self.session.query(method_url, decode=decode)
return result