A python nacos client based on the official open-api.
pip install use-nacos
from use_nacos import NacosClient
client = NacosClient(...)
# publish config
client.config.publish("test_config", "DEFAULT_GROUP", "test_value")
# get config
assert client.config.get("test_config", "DEFAULT_GROUP") == "test_value"
# subscribe config
def config_update(config):
print(config)
client.config.subscribe(
"test_config",
"DEFAULT_GROUP",
callback=config_update
)
from use_nacos import NacosClient
nacos = NacosClient()
nacos.instance.register(
service_name="test",
ip="10.10.10.10",
port=8000,
weight=10.0
)
nacos.instance.heartbeat(
service_name="test",
ip="10.10.10.10",
port=8000,
)
# example: fastapi
from contextlib import asynccontextmanager
import uvicorn
from fastapi import FastAPI
from use_nacos import NacosAsyncClient
def config_update(config):
print(config)
@asynccontextmanager
async def lifespan(app: FastAPI):
nacos = NacosAsyncClient()
config_subscriber = await nacos.config.subscribe(
data_id="test-config",
group="DEFAULT_GROUP",
callback=config_update,
)
yield
config_subscriber.cancel()
app = FastAPI(lifespan=lifespan)
if __name__ == '__main__':
uvicorn.run("in_fastapi:app", host="0.0.0.0", port=1081)
make install # Run `poetry install`
make lint # Runs bandit and black in check mode
make format # Formats you code with Black
make test # run pytest with coverage
make publish # run `poetry publish --build` to build source and wheel package and publish to pypi