From a9090fddbf4b4b308394a0c78706163b38bc32b9 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Thu, 14 Mar 2024 11:24:34 +0400 Subject: [PATCH 01/35] Postgres can optionally be used as a vector store and retriever. --- .env.example | 6 +++ .env.gcp.yaml.example | 10 ++++ .gitignore | 1 + backend/app/tools.py | 23 ++++++--- backend/app/upload.py | 31 +++++++++++- backend/poetry.lock | 105 +++++++++++++++++++++++++++++++++++++++-- backend/pyproject.toml | 2 + docker-compose.yml | 11 +++++ postgres/Dockerfile | 12 +++++ postgres/init.sql | 1 + 10 files changed, 190 insertions(+), 12 deletions(-) create mode 100644 postgres/Dockerfile create mode 100644 postgres/init.sql diff --git a/.env.example b/.env.example index 9e476bf9..31a7a553 100644 --- a/.env.example +++ b/.env.example @@ -9,3 +9,9 @@ AZURE_OPENAI_API_VERSION=placeholder CONNERY_RUNNER_URL=https://your-personal-connery-runner-url CONNERY_RUNNER_API_KEY=placeholder PROXY_URL=your_proxy_url +POSTGRES_HOST=placeholder +POSTGRES_PORT=placeholder +POSTGRES_DB=placeholder +POSTGRES_USER=placeholder +POSTGRES_PASSWORD=placeholder +VSTORE_TYPE=placeholder \ No newline at end of file diff --git a/.env.gcp.yaml.example b/.env.gcp.yaml.example index 869e5583..ad07178c 100644 --- a/.env.gcp.yaml.example +++ b/.env.gcp.yaml.example @@ -11,3 +11,13 @@ AZURE_OPENAI_API_BASE: your_secret_here AZURE_OPENAI_API_VERSION: your_secret_here AZURE_OPENAI_API_KEY: your_secret_here KAY_API_KEY: your_secret_here +ROBOCORP_ACTION_SERVER_URL: https://dummy-action-server.robocorp.link +ROBOCORP_ACTION_SERVER_KEY: dummy-api-key +CONNERY_RUNNER_URL: https://your-personal-connery-runner-url +CONNERY_RUNNER_API_KEY: your_secret_here +POSTGRES_HOST: your_postgres_host_here +POSTGRES_PORT: your_postgres_port_here +POSTGRES_DB: your_postgres_db_here +POSTGRES_USER: your_postgres_user_here +POSTGRES_PASSWORD: your_postgres_password_here +VSTORE_TYPE: redis_or_postgres diff --git a/.gitignore b/.gitignore index 44904efd..722b4aa3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ *.env .env.gcp.yaml redis-volume/ +postgres-volume/ backend/ui # Operating System generated files diff --git a/backend/app/tools.py b/backend/app/tools.py index 268258d5..86a30307 100644 --- a/backend/app/tools.py +++ b/backend/app/tools.py @@ -24,8 +24,10 @@ from langchain_community.vectorstores.redis import RedisFilter from langchain_robocorp import ActionServerToolkit from typing_extensions import TypedDict +from langchain_community.vectorstores.redis import Redis +from langchain_community.vectorstores.pgvector import PGVector -from app.upload import vstore +from app.upload import get_vectorstore class DDGInput(BaseModel): @@ -191,12 +193,19 @@ class Retrieval(BaseTool): def get_retriever(assistant_id: str, thread_id: str): - return vstore.as_retriever( - search_kwargs={ - "filter": (RedisFilter.tag("namespace") == assistant_id) - | (RedisFilter.tag("namespace") == thread_id) - } - ) + vstore = get_vectorstore() + if isinstance(vstore, Redis): + return vstore.as_retriever( + search_kwargs={ + "filter": (RedisFilter.tag("namespace") == assistant_id) + | (RedisFilter.tag("namespace") == thread_id) + } + ) + elif isinstance(vstore, PGVector): + return vstore.as_retriever( + search_kwargs={"filter": {"namespace": {"in": [assistant_id, thread_id]}}} + ) + raise Exception("Vector store not supported.") @lru_cache(maxsize=5) diff --git a/backend/app/upload.py b/backend/app/upload.py index 1c7693d9..d5612966 100644 --- a/backend/app/upload.py +++ b/backend/app/upload.py @@ -10,10 +10,12 @@ import os from typing import Any, BinaryIO, List, Optional +from functools import lru_cache from langchain.text_splitter import RecursiveCharacterTextSplitter, TextSplitter from langchain_community.document_loaders.blob_loaders import Blob from langchain_community.vectorstores.redis import Redis +from langchain_community.vectorstores.pgvector import PGVector from langchain_core.runnables import ( ConfigurableField, RunnableConfig, @@ -111,7 +113,7 @@ def batch( index_schema = { "tag": [{"name": "namespace"}], } -vstore = Redis( +redis_store = Redis( redis_url=os.environ["REDIS_URL"], index_name="opengpts", embedding=OpenAIEmbeddings(), @@ -119,9 +121,34 @@ def batch( ) +PG_CONNECTION_STRING = PGVector.connection_string_from_db_params( + driver="psycopg2", + host=os.environ.get("POSTGRES_HOST", "localhost"), + port=int(os.environ.get("POSTGRES_PORT", "5432")), + database=os.environ.get("POSTGRES_DB", "postgres"), + user=os.environ.get("POSTGRES_USER", "postgres"), + password=os.environ.get("POSTGRES_PASSWORD", "postgres"), +) +postgres_store = PGVector( + connection_string=PG_CONNECTION_STRING, + embedding_function=OpenAIEmbeddings(), +) + + +@lru_cache(maxsize=1) +def get_vectorstore() -> VectorStore: + """Get the vectorstore.""" + vstore_type = os.getenv("VSTORE_TYPE") + if vstore_type == "redis": + return redis_store + elif vstore_type == "postgres": + return postgres_store + raise ValueError(f"Unknown vector store type (VSTORE_TYPE): {vstore_type}.") + + ingest_runnable = IngestRunnable( text_splitter=RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200), - vectorstore=vstore, + vectorstore=get_vectorstore(), ).configurable_fields( assistant_id=ConfigurableField( id="assistant_id", diff --git a/backend/poetry.lock b/backend/poetry.lock index 4c3af7f5..e4993184 100644 --- a/backend/poetry.lock +++ b/backend/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.1 and should not be changed by hand. [[package]] name = "aiohttp" @@ -1812,6 +1812,7 @@ description = "Powerful and Pythonic XML processing library combining libxml2/li optional = false python-versions = ">=3.6" files = [ + {file = "lxml-5.1.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:704f5572ff473a5f897745abebc6df40f22d4133c1e0a1f124e4f2bd3330ff7e"}, {file = "lxml-5.1.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:9d3c0f8567ffe7502d969c2c1b809892dc793b5d0665f602aad19895f8d508da"}, {file = "lxml-5.1.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:5fcfbebdb0c5d8d18b84118842f31965d59ee3e66996ac842e21f957eb76138c"}, {file = "lxml-5.1.0-cp310-cp310-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2f37c6d7106a9d6f0708d4e164b707037b7380fcd0b04c5bd9cae1fb46a856fb"}, @@ -1821,6 +1822,7 @@ files = [ {file = "lxml-5.1.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:82bddf0e72cb2af3cbba7cec1d2fd11fda0de6be8f4492223d4a268713ef2147"}, {file = "lxml-5.1.0-cp310-cp310-win32.whl", hash = "sha256:b66aa6357b265670bb574f050ffceefb98549c721cf28351b748be1ef9577d93"}, {file = "lxml-5.1.0-cp310-cp310-win_amd64.whl", hash = "sha256:4946e7f59b7b6a9e27bef34422f645e9a368cb2be11bf1ef3cafc39a1f6ba68d"}, + {file = "lxml-5.1.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:14deca1460b4b0f6b01f1ddc9557704e8b365f55c63070463f6c18619ebf964f"}, {file = "lxml-5.1.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:ed8c3d2cd329bf779b7ed38db176738f3f8be637bb395ce9629fc76f78afe3d4"}, {file = "lxml-5.1.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:436a943c2900bb98123b06437cdd30580a61340fbdb7b28aaf345a459c19046a"}, {file = "lxml-5.1.0-cp311-cp311-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:acb6b2f96f60f70e7f34efe0c3ea34ca63f19ca63ce90019c6cbca6b676e81fa"}, @@ -1830,6 +1832,7 @@ files = [ {file = "lxml-5.1.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:f4c9bda132ad108b387c33fabfea47866af87f4ea6ffb79418004f0521e63204"}, {file = "lxml-5.1.0-cp311-cp311-win32.whl", hash = "sha256:bc64d1b1dab08f679fb89c368f4c05693f58a9faf744c4d390d7ed1d8223869b"}, {file = "lxml-5.1.0-cp311-cp311-win_amd64.whl", hash = "sha256:a5ab722ae5a873d8dcee1f5f45ddd93c34210aed44ff2dc643b5025981908cda"}, + {file = "lxml-5.1.0-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:9aa543980ab1fbf1720969af1d99095a548ea42e00361e727c58a40832439114"}, {file = "lxml-5.1.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:6f11b77ec0979f7e4dc5ae081325a2946f1fe424148d3945f943ceaede98adb8"}, {file = "lxml-5.1.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:a36c506e5f8aeb40680491d39ed94670487ce6614b9d27cabe45d94cd5d63e1e"}, {file = "lxml-5.1.0-cp312-cp312-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f643ffd2669ffd4b5a3e9b41c909b72b2a1d5e4915da90a77e119b8d48ce867a"}, @@ -1855,8 +1858,8 @@ files = [ {file = "lxml-5.1.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:8f52fe6859b9db71ee609b0c0a70fea5f1e71c3462ecf144ca800d3f434f0764"}, {file = "lxml-5.1.0-cp37-cp37m-win32.whl", hash = "sha256:d42e3a3fc18acc88b838efded0e6ec3edf3e328a58c68fbd36a7263a874906c8"}, {file = "lxml-5.1.0-cp37-cp37m-win_amd64.whl", hash = "sha256:eac68f96539b32fce2c9b47eb7c25bb2582bdaf1bbb360d25f564ee9e04c542b"}, + {file = "lxml-5.1.0-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:ae15347a88cf8af0949a9872b57a320d2605ae069bcdf047677318bc0bba45b1"}, {file = "lxml-5.1.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:c26aab6ea9c54d3bed716b8851c8bfc40cb249b8e9880e250d1eddde9f709bf5"}, - {file = "lxml-5.1.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:cfbac9f6149174f76df7e08c2e28b19d74aed90cad60383ad8671d3af7d0502f"}, {file = "lxml-5.1.0-cp38-cp38-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:342e95bddec3a698ac24378d61996b3ee5ba9acfeb253986002ac53c9a5f6f84"}, {file = "lxml-5.1.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:725e171e0b99a66ec8605ac77fa12239dbe061482ac854d25720e2294652eeaa"}, {file = "lxml-5.1.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3d184e0d5c918cff04cdde9dbdf9600e960161d773666958c9d7b565ccc60c45"}, @@ -1864,6 +1867,7 @@ files = [ {file = "lxml-5.1.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:6d48fc57e7c1e3df57be5ae8614bab6d4e7b60f65c5457915c26892c41afc59e"}, {file = "lxml-5.1.0-cp38-cp38-win32.whl", hash = "sha256:7ec465e6549ed97e9f1e5ed51c657c9ede767bc1c11552f7f4d022c4df4a977a"}, {file = "lxml-5.1.0-cp38-cp38-win_amd64.whl", hash = "sha256:b21b4031b53d25b0858d4e124f2f9131ffc1530431c6d1321805c90da78388d1"}, + {file = "lxml-5.1.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:52427a7eadc98f9e62cb1368a5079ae826f94f05755d2d567d93ee1bc3ceb354"}, {file = "lxml-5.1.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:6a2a2c724d97c1eb8cf966b16ca2915566a4904b9aad2ed9a09c748ffe14f969"}, {file = "lxml-5.1.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:843b9c835580d52828d8f69ea4302537337a21e6b4f1ec711a52241ba4a824f3"}, {file = "lxml-5.1.0-cp39-cp39-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9b99f564659cfa704a2dd82d0684207b1aadf7d02d33e54845f9fc78e06b7581"}, @@ -2183,6 +2187,19 @@ dev = ["black", "mypy (==0.931)", "nox", "pytest"] docs = ["sphinx", "sphinx-argparse"] image = ["Pillow"] +[[package]] +name = "pgvector" +version = "0.2.5" +description = "pgvector support for Python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pgvector-0.2.5-py2.py3-none-any.whl", hash = "sha256:5e5e93ec4d3c45ab1fa388729d56c602f6966296e19deee8878928c6d567e41b"}, +] + +[package.dependencies] +numpy = "*" + [[package]] name = "pillow" version = "10.2.0" @@ -2320,6 +2337,87 @@ files = [ {file = "protobuf-4.25.2.tar.gz", hash = "sha256:fe599e175cb347efc8ee524bcd4b902d11f7262c0e569ececcb89995c15f0a5e"}, ] +[[package]] +name = "psycopg2-binary" +version = "2.9.9" +description = "psycopg2 - Python-PostgreSQL Database Adapter" +optional = false +python-versions = ">=3.7" +files = [ + {file = "psycopg2-binary-2.9.9.tar.gz", hash = "sha256:7f01846810177d829c7692f1f5ada8096762d9172af1b1a28d4ab5b77c923c1c"}, + {file = "psycopg2_binary-2.9.9-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:c2470da5418b76232f02a2fcd2229537bb2d5a7096674ce61859c3229f2eb202"}, + {file = "psycopg2_binary-2.9.9-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c6af2a6d4b7ee9615cbb162b0738f6e1fd1f5c3eda7e5da17861eacf4c717ea7"}, + {file = "psycopg2_binary-2.9.9-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:75723c3c0fbbf34350b46a3199eb50638ab22a0228f93fb472ef4d9becc2382b"}, + {file = "psycopg2_binary-2.9.9-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:83791a65b51ad6ee6cf0845634859d69a038ea9b03d7b26e703f94c7e93dbcf9"}, + {file = "psycopg2_binary-2.9.9-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:0ef4854e82c09e84cc63084a9e4ccd6d9b154f1dbdd283efb92ecd0b5e2b8c84"}, + {file = "psycopg2_binary-2.9.9-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ed1184ab8f113e8d660ce49a56390ca181f2981066acc27cf637d5c1e10ce46e"}, + {file = "psycopg2_binary-2.9.9-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:d2997c458c690ec2bc6b0b7ecbafd02b029b7b4283078d3b32a852a7ce3ddd98"}, + {file = "psycopg2_binary-2.9.9-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:b58b4710c7f4161b5e9dcbe73bb7c62d65670a87df7bcce9e1faaad43e715245"}, + {file = "psycopg2_binary-2.9.9-cp310-cp310-musllinux_1_1_ppc64le.whl", hash = "sha256:0c009475ee389757e6e34611d75f6e4f05f0cf5ebb76c6037508318e1a1e0d7e"}, + {file = "psycopg2_binary-2.9.9-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8dbf6d1bc73f1d04ec1734bae3b4fb0ee3cb2a493d35ede9badbeb901fb40f6f"}, + {file = "psycopg2_binary-2.9.9-cp310-cp310-win32.whl", hash = "sha256:3f78fd71c4f43a13d342be74ebbc0666fe1f555b8837eb113cb7416856c79682"}, + {file = "psycopg2_binary-2.9.9-cp310-cp310-win_amd64.whl", hash = "sha256:876801744b0dee379e4e3c38b76fc89f88834bb15bf92ee07d94acd06ec890a0"}, + {file = "psycopg2_binary-2.9.9-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:ee825e70b1a209475622f7f7b776785bd68f34af6e7a46e2e42f27b659b5bc26"}, + {file = "psycopg2_binary-2.9.9-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:1ea665f8ce695bcc37a90ee52de7a7980be5161375d42a0b6c6abedbf0d81f0f"}, + {file = "psycopg2_binary-2.9.9-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:143072318f793f53819048fdfe30c321890af0c3ec7cb1dfc9cc87aa88241de2"}, + {file = "psycopg2_binary-2.9.9-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c332c8d69fb64979ebf76613c66b985414927a40f8defa16cf1bc028b7b0a7b0"}, + {file = "psycopg2_binary-2.9.9-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:f7fc5a5acafb7d6ccca13bfa8c90f8c51f13d8fb87d95656d3950f0158d3ce53"}, + {file = "psycopg2_binary-2.9.9-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:977646e05232579d2e7b9c59e21dbe5261f403a88417f6a6512e70d3f8a046be"}, + {file = "psycopg2_binary-2.9.9-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:b6356793b84728d9d50ead16ab43c187673831e9d4019013f1402c41b1db9b27"}, + {file = "psycopg2_binary-2.9.9-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:bc7bb56d04601d443f24094e9e31ae6deec9ccb23581f75343feebaf30423359"}, + {file = "psycopg2_binary-2.9.9-cp311-cp311-musllinux_1_1_ppc64le.whl", hash = "sha256:77853062a2c45be16fd6b8d6de2a99278ee1d985a7bd8b103e97e41c034006d2"}, + {file = "psycopg2_binary-2.9.9-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:78151aa3ec21dccd5cdef6c74c3e73386dcdfaf19bced944169697d7ac7482fc"}, + {file = "psycopg2_binary-2.9.9-cp311-cp311-win32.whl", hash = "sha256:dc4926288b2a3e9fd7b50dc6a1909a13bbdadfc67d93f3374d984e56f885579d"}, + {file = "psycopg2_binary-2.9.9-cp311-cp311-win_amd64.whl", hash = "sha256:b76bedd166805480ab069612119ea636f5ab8f8771e640ae103e05a4aae3e417"}, + {file = "psycopg2_binary-2.9.9-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:8532fd6e6e2dc57bcb3bc90b079c60de896d2128c5d9d6f24a63875a95a088cf"}, + {file = "psycopg2_binary-2.9.9-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:b0605eaed3eb239e87df0d5e3c6489daae3f7388d455d0c0b4df899519c6a38d"}, + {file = "psycopg2_binary-2.9.9-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8f8544b092a29a6ddd72f3556a9fcf249ec412e10ad28be6a0c0d948924f2212"}, + {file = "psycopg2_binary-2.9.9-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2d423c8d8a3c82d08fe8af900ad5b613ce3632a1249fd6a223941d0735fce493"}, + {file = "psycopg2_binary-2.9.9-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2e5afae772c00980525f6d6ecf7cbca55676296b580c0e6abb407f15f3706996"}, + {file = "psycopg2_binary-2.9.9-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6e6f98446430fdf41bd36d4faa6cb409f5140c1c2cf58ce0bbdaf16af7d3f119"}, + {file = "psycopg2_binary-2.9.9-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:c77e3d1862452565875eb31bdb45ac62502feabbd53429fdc39a1cc341d681ba"}, + {file = "psycopg2_binary-2.9.9-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:cb16c65dcb648d0a43a2521f2f0a2300f40639f6f8c1ecbc662141e4e3e1ee07"}, + {file = "psycopg2_binary-2.9.9-cp312-cp312-musllinux_1_1_ppc64le.whl", hash = "sha256:911dda9c487075abd54e644ccdf5e5c16773470a6a5d3826fda76699410066fb"}, + {file = "psycopg2_binary-2.9.9-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:57fede879f08d23c85140a360c6a77709113efd1c993923c59fde17aa27599fe"}, + {file = "psycopg2_binary-2.9.9-cp312-cp312-win32.whl", hash = "sha256:64cf30263844fa208851ebb13b0732ce674d8ec6a0c86a4e160495d299ba3c93"}, + {file = "psycopg2_binary-2.9.9-cp312-cp312-win_amd64.whl", hash = "sha256:81ff62668af011f9a48787564ab7eded4e9fb17a4a6a74af5ffa6a457400d2ab"}, + {file = "psycopg2_binary-2.9.9-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:2293b001e319ab0d869d660a704942c9e2cce19745262a8aba2115ef41a0a42a"}, + {file = "psycopg2_binary-2.9.9-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:03ef7df18daf2c4c07e2695e8cfd5ee7f748a1d54d802330985a78d2a5a6dca9"}, + {file = "psycopg2_binary-2.9.9-cp37-cp37m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0a602ea5aff39bb9fac6308e9c9d82b9a35c2bf288e184a816002c9fae930b77"}, + {file = "psycopg2_binary-2.9.9-cp37-cp37m-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:8359bf4791968c5a78c56103702000105501adb557f3cf772b2c207284273984"}, + {file = "psycopg2_binary-2.9.9-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:275ff571376626195ab95a746e6a04c7df8ea34638b99fc11160de91f2fef503"}, + {file = "psycopg2_binary-2.9.9-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:f9b5571d33660d5009a8b3c25dc1db560206e2d2f89d3df1cb32d72c0d117d52"}, + {file = "psycopg2_binary-2.9.9-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:420f9bbf47a02616e8554e825208cb947969451978dceb77f95ad09c37791dae"}, + {file = "psycopg2_binary-2.9.9-cp37-cp37m-musllinux_1_1_ppc64le.whl", hash = "sha256:4154ad09dac630a0f13f37b583eae260c6aa885d67dfbccb5b02c33f31a6d420"}, + {file = "psycopg2_binary-2.9.9-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:a148c5d507bb9b4f2030a2025c545fccb0e1ef317393eaba42e7eabd28eb6041"}, + {file = "psycopg2_binary-2.9.9-cp37-cp37m-win32.whl", hash = "sha256:68fc1f1ba168724771e38bee37d940d2865cb0f562380a1fb1ffb428b75cb692"}, + {file = "psycopg2_binary-2.9.9-cp37-cp37m-win_amd64.whl", hash = "sha256:281309265596e388ef483250db3640e5f414168c5a67e9c665cafce9492eda2f"}, + {file = "psycopg2_binary-2.9.9-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:60989127da422b74a04345096c10d416c2b41bd7bf2a380eb541059e4e999980"}, + {file = "psycopg2_binary-2.9.9-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:246b123cc54bb5361588acc54218c8c9fb73068bf227a4a531d8ed56fa3ca7d6"}, + {file = "psycopg2_binary-2.9.9-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:34eccd14566f8fe14b2b95bb13b11572f7c7d5c36da61caf414d23b91fcc5d94"}, + {file = "psycopg2_binary-2.9.9-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:18d0ef97766055fec15b5de2c06dd8e7654705ce3e5e5eed3b6651a1d2a9a152"}, + {file = "psycopg2_binary-2.9.9-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d3f82c171b4ccd83bbaf35aa05e44e690113bd4f3b7b6cc54d2219b132f3ae55"}, + {file = "psycopg2_binary-2.9.9-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ead20f7913a9c1e894aebe47cccf9dc834e1618b7aa96155d2091a626e59c972"}, + {file = "psycopg2_binary-2.9.9-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:ca49a8119c6cbd77375ae303b0cfd8c11f011abbbd64601167ecca18a87e7cdd"}, + {file = "psycopg2_binary-2.9.9-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:323ba25b92454adb36fa425dc5cf6f8f19f78948cbad2e7bc6cdf7b0d7982e59"}, + {file = "psycopg2_binary-2.9.9-cp38-cp38-musllinux_1_1_ppc64le.whl", hash = "sha256:1236ed0952fbd919c100bc839eaa4a39ebc397ed1c08a97fc45fee2a595aa1b3"}, + {file = "psycopg2_binary-2.9.9-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:729177eaf0aefca0994ce4cffe96ad3c75e377c7b6f4efa59ebf003b6d398716"}, + {file = "psycopg2_binary-2.9.9-cp38-cp38-win32.whl", hash = "sha256:804d99b24ad523a1fe18cc707bf741670332f7c7412e9d49cb5eab67e886b9b5"}, + {file = "psycopg2_binary-2.9.9-cp38-cp38-win_amd64.whl", hash = "sha256:a6cdcc3ede532f4a4b96000b6362099591ab4a3e913d70bcbac2b56c872446f7"}, + {file = "psycopg2_binary-2.9.9-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:72dffbd8b4194858d0941062a9766f8297e8868e1dd07a7b36212aaa90f49472"}, + {file = "psycopg2_binary-2.9.9-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:30dcc86377618a4c8f3b72418df92e77be4254d8f89f14b8e8f57d6d43603c0f"}, + {file = "psycopg2_binary-2.9.9-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:31a34c508c003a4347d389a9e6fcc2307cc2150eb516462a7a17512130de109e"}, + {file = "psycopg2_binary-2.9.9-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:15208be1c50b99203fe88d15695f22a5bed95ab3f84354c494bcb1d08557df67"}, + {file = "psycopg2_binary-2.9.9-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:1873aade94b74715be2246321c8650cabf5a0d098a95bab81145ffffa4c13876"}, + {file = "psycopg2_binary-2.9.9-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3a58c98a7e9c021f357348867f537017057c2ed7f77337fd914d0bedb35dace7"}, + {file = "psycopg2_binary-2.9.9-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:4686818798f9194d03c9129a4d9a702d9e113a89cb03bffe08c6cf799e053291"}, + {file = "psycopg2_binary-2.9.9-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:ebdc36bea43063116f0486869652cb2ed7032dbc59fbcb4445c4862b5c1ecf7f"}, + {file = "psycopg2_binary-2.9.9-cp39-cp39-musllinux_1_1_ppc64le.whl", hash = "sha256:ca08decd2697fdea0aea364b370b1249d47336aec935f87b8bbfd7da5b2ee9c1"}, + {file = "psycopg2_binary-2.9.9-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:ac05fb791acf5e1a3e39402641827780fe44d27e72567a000412c648a85ba860"}, + {file = "psycopg2_binary-2.9.9-cp39-cp39-win32.whl", hash = "sha256:9dba73be7305b399924709b91682299794887cbbd88e38226ed9f6712eabee90"}, + {file = "psycopg2_binary-2.9.9-cp39-cp39-win_amd64.whl", hash = "sha256:f7ae5d65ccfbebdfa761585228eb4d0df3a8b15cfb53bd953e713e09fbb12957"}, +] + [[package]] name = "pyasn1" version = "0.5.1" @@ -2686,6 +2784,7 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, + {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, @@ -4012,4 +4111,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.9.0,<3.12" -content-hash = "9d4661534f61d7755cb455da09414a0491e474d41a82f1896951ff3ee115a3d8" +content-hash = "4411aeb7305111d56d6905f11c895b8ed1055cc1853627f2e73cd603dfc8110a" diff --git a/backend/pyproject.toml b/backend/pyproject.toml index b77cfcde..f0c56355 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -40,6 +40,8 @@ fireworks-ai = "^0.11.2" anthropic = "^0.13.0" httpx = { version = "0.25.2", extras = ["socks"] } unstructured = {extras = ["doc", "docx"], version = "^0.12.5"} +pgvector = "^0.2.5" +psycopg2-binary = "^2.9.9" [tool.poetry.group.dev.dependencies] uvicorn = "^0.23.2" diff --git a/docker-compose.yml b/docker-compose.yml index d190b354..cf4ecd8a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,6 +8,17 @@ services: - "6379:6379" volumes: - ./redis-volume:/data + postgres: + container_name: opengpts-postgres + build: + context: postgres + ports: + - "5432:5432" + env_file: + - .env + volumes: + - ./postgres-volume:/var/lib/postgresql/data + - ./postgres/init.sql:/docker-entrypoint-initdb.d/init.sql backend: container_name: opengpts-backend build: diff --git a/postgres/Dockerfile b/postgres/Dockerfile new file mode 100644 index 00000000..db0a967f --- /dev/null +++ b/postgres/Dockerfile @@ -0,0 +1,12 @@ +FROM postgres:latest + +RUN apt-get update && apt-get install -y git build-essential postgresql-server-dev-all + +WORKDIR /tmp +RUN git clone --branch v0.6.1 https://github.com/pgvector/pgvector.git \ + && cd pgvector \ + && make \ + && make install + +RUN apt-get remove -y git build-essential postgresql-server-dev-all && apt-get autoremove -y +RUN rm -rf /var/lib/apt/lists/* /tmp/pgvector \ No newline at end of file diff --git a/postgres/init.sql b/postgres/init.sql new file mode 100644 index 00000000..ffa2e58c --- /dev/null +++ b/postgres/init.sql @@ -0,0 +1 @@ +CREATE EXTENSION vector; \ No newline at end of file From b504de0b92b8fa046b491af52d3dc45ab0491bd9 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Thu, 14 Mar 2024 12:23:09 +0400 Subject: [PATCH 02/35] Sanitize document content. --- backend/app/ingest.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/backend/app/ingest.py b/backend/app/ingest.py index 58dee61d..e0708045 100644 --- a/backend/app/ingest.py +++ b/backend/app/ingest.py @@ -20,6 +20,13 @@ def _update_document_metadata(document: Document, namespace: str) -> None: document.metadata["namespace"] = namespace +def _sanitize_document_content(document: Document) -> Document: + """Sanitize the document.""" + # Without this, PDF ingestion fails with + # "A string literal cannot contain NUL (0x00) characters". + document.page_content = document.page_content.replace("\x00", "x") + + # PUBLIC API @@ -38,6 +45,7 @@ def ingest_blob( for document in parser.lazy_parse(blob): docs = text_splitter.split_documents([document]) for doc in docs: + _sanitize_document_content(doc) _update_document_metadata(doc, namespace) docs_to_index.extend(docs) From cffa7a91fc4e1b6a769cee28851777301599498d Mon Sep 17 00:00:00 2001 From: Mikko Korpela Date: Thu, 14 Mar 2024 11:02:46 +0200 Subject: [PATCH 03/35] Use pgvector/pgvector:pg16 --- .env.example | 1 - README.md | 6 ++++++ docker-compose.yml | 5 ++--- postgres/Dockerfile | 12 ------------ 4 files changed, 8 insertions(+), 16 deletions(-) delete mode 100644 postgres/Dockerfile diff --git a/.env.example b/.env.example index 31a7a553..5ec7a3f0 100644 --- a/.env.example +++ b/.env.example @@ -9,7 +9,6 @@ AZURE_OPENAI_API_VERSION=placeholder CONNERY_RUNNER_URL=https://your-personal-connery-runner-url CONNERY_RUNNER_API_KEY=placeholder PROXY_URL=your_proxy_url -POSTGRES_HOST=placeholder POSTGRES_PORT=placeholder POSTGRES_DB=placeholder POSTGRES_USER=placeholder diff --git a/README.md b/README.md index 6dd6ca09..ee988731 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,12 @@ In order to you use this, you need to a `REDIS_URL` variable. export REDIS_URL=... ``` +For postgres you need to set `POSTGRES_HOST` among other variables. + +```shell +export POSTGRES_HOST=... +``` + **Set up vector database** The backend by default also uses Redis as a vector database, diff --git a/docker-compose.yml b/docker-compose.yml index cf4ecd8a..a2600d77 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,9 +9,7 @@ services: volumes: - ./redis-volume:/data postgres: - container_name: opengpts-postgres - build: - context: postgres + image: pgvector/pgvector:pg16 ports: - "5432:5432" env_file: @@ -33,6 +31,7 @@ services: - ./backend:/backend environment: REDIS_URL: "redis://opengpts-redis:6379" + POSTGRES_HOST: "postgres" command: - --reload frontend: diff --git a/postgres/Dockerfile b/postgres/Dockerfile deleted file mode 100644 index db0a967f..00000000 --- a/postgres/Dockerfile +++ /dev/null @@ -1,12 +0,0 @@ -FROM postgres:latest - -RUN apt-get update && apt-get install -y git build-essential postgresql-server-dev-all - -WORKDIR /tmp -RUN git clone --branch v0.6.1 https://github.com/pgvector/pgvector.git \ - && cd pgvector \ - && make \ - && make install - -RUN apt-get remove -y git build-essential postgresql-server-dev-all && apt-get autoremove -y -RUN rm -rf /var/lib/apt/lists/* /tmp/pgvector \ No newline at end of file From efd3f745667d2600ed4b066db2d9cc6eb4f65a63 Mon Sep 17 00:00:00 2001 From: Mikko Korpela Date: Thu, 14 Mar 2024 11:18:55 +0200 Subject: [PATCH 04/35] add postgres to prod version also --- docker-compose-prod.yml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docker-compose-prod.yml b/docker-compose-prod.yml index 226fde53..07c36653 100644 --- a/docker-compose-prod.yml +++ b/docker-compose-prod.yml @@ -8,6 +8,15 @@ services: - "6379:6379" volumes: - ./redis-volume:/data + postgres: + image: pgvector/pgvector:pg16 + ports: + - "5432:5432" + env_file: + - .env + volumes: + - ./postgres-volume:/var/lib/postgresql/data + - ./postgres/init.sql:/docker-entrypoint-initdb.d/init.sql backend: container_name: opengpts-backend image: docker.io/langchain/open-gpts:latest @@ -19,3 +28,4 @@ services: - .env environment: REDIS_URL: "redis://opengpts-redis:6379" + POSTGRES_HOST: "postgres" From 9ff9cfa52760b342a95b6ccc14938c23073644f6 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Fri, 15 Mar 2024 11:10:40 +0400 Subject: [PATCH 05/35] Add asyncpg package. --- backend/poetry.lock | 59 +++++++++++++++++++++++++++++++++++++++++- backend/pyproject.toml | 1 + 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/backend/poetry.lock b/backend/poetry.lock index e4993184..fa96c78e 100644 --- a/backend/poetry.lock +++ b/backend/poetry.lock @@ -180,6 +180,63 @@ files = [ {file = "async_timeout-4.0.3-py3-none-any.whl", hash = "sha256:7405140ff1230c310e51dc27b3145b9092d659ce68ff733fb0cefe3ee42be028"}, ] +[[package]] +name = "asyncpg" +version = "0.29.0" +description = "An asyncio PostgreSQL driver" +optional = false +python-versions = ">=3.8.0" +files = [ + {file = "asyncpg-0.29.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:72fd0ef9f00aeed37179c62282a3d14262dbbafb74ec0ba16e1b1864d8a12169"}, + {file = "asyncpg-0.29.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:52e8f8f9ff6e21f9b39ca9f8e3e33a5fcdceaf5667a8c5c32bee158e313be385"}, + {file = "asyncpg-0.29.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a9e6823a7012be8b68301342ba33b4740e5a166f6bbda0aee32bc01638491a22"}, + {file = "asyncpg-0.29.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:746e80d83ad5d5464cfbf94315eb6744222ab00aa4e522b704322fb182b83610"}, + {file = "asyncpg-0.29.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:ff8e8109cd6a46ff852a5e6bab8b0a047d7ea42fcb7ca5ae6eaae97d8eacf397"}, + {file = "asyncpg-0.29.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:97eb024685b1d7e72b1972863de527c11ff87960837919dac6e34754768098eb"}, + {file = "asyncpg-0.29.0-cp310-cp310-win32.whl", hash = "sha256:5bbb7f2cafd8d1fa3e65431833de2642f4b2124be61a449fa064e1a08d27e449"}, + {file = "asyncpg-0.29.0-cp310-cp310-win_amd64.whl", hash = "sha256:76c3ac6530904838a4b650b2880f8e7af938ee049e769ec2fba7cd66469d7772"}, + {file = "asyncpg-0.29.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:d4900ee08e85af01adb207519bb4e14b1cae8fd21e0ccf80fac6aa60b6da37b4"}, + {file = "asyncpg-0.29.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:a65c1dcd820d5aea7c7d82a3fdcb70e096f8f70d1a8bf93eb458e49bfad036ac"}, + {file = "asyncpg-0.29.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5b52e46f165585fd6af4863f268566668407c76b2c72d366bb8b522fa66f1870"}, + {file = "asyncpg-0.29.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc600ee8ef3dd38b8d67421359779f8ccec30b463e7aec7ed481c8346decf99f"}, + {file = "asyncpg-0.29.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:039a261af4f38f949095e1e780bae84a25ffe3e370175193174eb08d3cecab23"}, + {file = "asyncpg-0.29.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:6feaf2d8f9138d190e5ec4390c1715c3e87b37715cd69b2c3dfca616134efd2b"}, + {file = "asyncpg-0.29.0-cp311-cp311-win32.whl", hash = "sha256:1e186427c88225ef730555f5fdda6c1812daa884064bfe6bc462fd3a71c4b675"}, + {file = "asyncpg-0.29.0-cp311-cp311-win_amd64.whl", hash = "sha256:cfe73ffae35f518cfd6e4e5f5abb2618ceb5ef02a2365ce64f132601000587d3"}, + {file = "asyncpg-0.29.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:6011b0dc29886ab424dc042bf9eeb507670a3b40aece3439944006aafe023178"}, + {file = "asyncpg-0.29.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:b544ffc66b039d5ec5a7454667f855f7fec08e0dfaf5a5490dfafbb7abbd2cfb"}, + {file = "asyncpg-0.29.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d84156d5fb530b06c493f9e7635aa18f518fa1d1395ef240d211cb563c4e2364"}, + {file = "asyncpg-0.29.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:54858bc25b49d1114178d65a88e48ad50cb2b6f3e475caa0f0c092d5f527c106"}, + {file = "asyncpg-0.29.0-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:bde17a1861cf10d5afce80a36fca736a86769ab3579532c03e45f83ba8a09c59"}, + {file = "asyncpg-0.29.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:37a2ec1b9ff88d8773d3eb6d3784dc7e3fee7756a5317b67f923172a4748a175"}, + {file = "asyncpg-0.29.0-cp312-cp312-win32.whl", hash = "sha256:bb1292d9fad43112a85e98ecdc2e051602bce97c199920586be83254d9dafc02"}, + {file = "asyncpg-0.29.0-cp312-cp312-win_amd64.whl", hash = "sha256:2245be8ec5047a605e0b454c894e54bf2ec787ac04b1cb7e0d3c67aa1e32f0fe"}, + {file = "asyncpg-0.29.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:0009a300cae37b8c525e5b449233d59cd9868fd35431abc470a3e364d2b85cb9"}, + {file = "asyncpg-0.29.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:5cad1324dbb33f3ca0cd2074d5114354ed3be2b94d48ddfd88af75ebda7c43cc"}, + {file = "asyncpg-0.29.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:012d01df61e009015944ac7543d6ee30c2dc1eb2f6b10b62a3f598beb6531548"}, + {file = "asyncpg-0.29.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:000c996c53c04770798053e1730d34e30cb645ad95a63265aec82da9093d88e7"}, + {file = "asyncpg-0.29.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:e0bfe9c4d3429706cf70d3249089de14d6a01192d617e9093a8e941fea8ee775"}, + {file = "asyncpg-0.29.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:642a36eb41b6313ffa328e8a5c5c2b5bea6ee138546c9c3cf1bffaad8ee36dd9"}, + {file = "asyncpg-0.29.0-cp38-cp38-win32.whl", hash = "sha256:a921372bbd0aa3a5822dd0409da61b4cd50df89ae85150149f8c119f23e8c408"}, + {file = "asyncpg-0.29.0-cp38-cp38-win_amd64.whl", hash = "sha256:103aad2b92d1506700cbf51cd8bb5441e7e72e87a7b3a2ca4e32c840f051a6a3"}, + {file = "asyncpg-0.29.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:5340dd515d7e52f4c11ada32171d87c05570479dc01dc66d03ee3e150fb695da"}, + {file = "asyncpg-0.29.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:e17b52c6cf83e170d3d865571ba574577ab8e533e7361a2b8ce6157d02c665d3"}, + {file = "asyncpg-0.29.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f100d23f273555f4b19b74a96840aa27b85e99ba4b1f18d4ebff0734e78dc090"}, + {file = "asyncpg-0.29.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:48e7c58b516057126b363cec8ca02b804644fd012ef8e6c7e23386b7d5e6ce83"}, + {file = "asyncpg-0.29.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:f9ea3f24eb4c49a615573724d88a48bd1b7821c890c2effe04f05382ed9e8810"}, + {file = "asyncpg-0.29.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:8d36c7f14a22ec9e928f15f92a48207546ffe68bc412f3be718eedccdf10dc5c"}, + {file = "asyncpg-0.29.0-cp39-cp39-win32.whl", hash = "sha256:797ab8123ebaed304a1fad4d7576d5376c3a006a4100380fb9d517f0b59c1ab2"}, + {file = "asyncpg-0.29.0-cp39-cp39-win_amd64.whl", hash = "sha256:cce08a178858b426ae1aa8409b5cc171def45d4293626e7aa6510696d46decd8"}, + {file = "asyncpg-0.29.0.tar.gz", hash = "sha256:d1c49e1f44fffafd9a55e1a9b101590859d881d639ea2922516f5d9c512d354e"}, +] + +[package.dependencies] +async-timeout = {version = ">=4.0.3", markers = "python_version < \"3.12.0\""} + +[package.extras] +docs = ["Sphinx (>=5.3.0,<5.4.0)", "sphinx-rtd-theme (>=1.2.2)", "sphinxcontrib-asyncio (>=0.3.0,<0.4.0)"] +test = ["flake8 (>=6.1,<7.0)", "uvloop (>=0.15.3)"] + [[package]] name = "attrs" version = "23.1.0" @@ -4111,4 +4168,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.9.0,<3.12" -content-hash = "4411aeb7305111d56d6905f11c895b8ed1055cc1853627f2e73cd603dfc8110a" +content-hash = "99092c32cccd4a24b7da50f67265e05dfaf95cfce194efe42ee502aadeb12c19" diff --git a/backend/pyproject.toml b/backend/pyproject.toml index f0c56355..8dee8724 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -42,6 +42,7 @@ httpx = { version = "0.25.2", extras = ["socks"] } unstructured = {extras = ["doc", "docx"], version = "^0.12.5"} pgvector = "^0.2.5" psycopg2-binary = "^2.9.9" +asyncpg = "^0.29.0" [tool.poetry.group.dev.dependencies] uvicorn = "^0.23.2" From 29b247b33fadf8d9d2d7942d3b73334afa47f1ff Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Fri, 15 Mar 2024 11:11:44 +0400 Subject: [PATCH 06/35] Create database schema on first startup. --- postgres/init.sql | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/postgres/init.sql b/postgres/init.sql index ffa2e58c..e8920722 100644 --- a/postgres/init.sql +++ b/postgres/init.sql @@ -1 +1,19 @@ -CREATE EXTENSION vector; \ No newline at end of file +CREATE EXTENSION IF NOT EXISTS vector; +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; + +CREATE TABLE IF NOT EXISTS assistant ( + assistant_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + user_id VARCHAR(255) NOT NULL, + name VARCHAR(255) NOT NULL, + config JSON NOT NULL, + updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + public BOOLEAN NOT NULL +); + +CREATE TABLE IF NOT EXISTS thread ( + thread_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + assistant_id UUID REFERENCES assistant(assistant_id) ON DELETE SET NULL, + user_id VARCHAR(255) NOT NULL, + name VARCHAR(255) NOT NULL, + updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP +); \ No newline at end of file From 5d8cd9a2e819829d56a29a16cb61e7834b99b316 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Fri, 15 Mar 2024 11:18:33 +0400 Subject: [PATCH 07/35] Add lifespan to the app to support pg pools. --- backend/app/lifespan.py | 35 +++++++++++++++++++++++++++++++++++ backend/app/server.py | 3 ++- 2 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 backend/app/lifespan.py diff --git a/backend/app/lifespan.py b/backend/app/lifespan.py new file mode 100644 index 00000000..96e65bd1 --- /dev/null +++ b/backend/app/lifespan.py @@ -0,0 +1,35 @@ +from contextlib import asynccontextmanager +import asyncpg +from fastapi import FastAPI +import json +import os + + +_pg_pool = None + + +def get_pg_pool(): + return _pg_pool + + +async def _init_connection(conn): + await conn.set_type_codec( + "json", encoder=json.dumps, decoder=json.loads, schema="pg_catalog" + ) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + global _pg_pool + + _pg_pool = await asyncpg.create_pool( + database=os.environ["POSTGRES_DB"], + user=os.environ["POSTGRES_USER"], + password=os.environ["POSTGRES_PASSWORD"], + host=os.environ["POSTGRES_HOST"], + port=os.environ["POSTGRES_PORT"], + init=_init_connection, + ) + yield + await _pg_pool.close() + _pg_pool = None diff --git a/backend/app/server.py b/backend/app/server.py index 2c9524c2..d4751aa8 100644 --- a/backend/app/server.py +++ b/backend/app/server.py @@ -8,10 +8,11 @@ from app.api import router as api_router from app.upload import ingest_runnable +from app.lifespan import lifespan logger = logging.getLogger(__name__) -app = FastAPI(title="OpenGPTs API") +app = FastAPI(title="OpenGPTs API", lifespan=lifespan) # Get root of app, used to point to directory containing static files From fbe04b69ad63963857a81572ebcac3197542ba72 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Fri, 15 Mar 2024 11:24:45 +0400 Subject: [PATCH 08/35] Add postgres checkpointer. --- backend/app/agent.py | 4 +-- backend/app/checkpoint.py | 74 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 2 deletions(-) diff --git a/backend/app/agent.py b/backend/app/agent.py index 012a5425..5092da5c 100644 --- a/backend/app/agent.py +++ b/backend/app/agent.py @@ -12,7 +12,7 @@ from app.agent_types.openai_agent import get_openai_agent_executor from app.agent_types.xml_agent import get_xml_agent_executor from app.chatbot import get_chatbot_executor -from app.checkpoint import RedisCheckpoint +from app.checkpoint import PostgresCheckpoint from app.llms import ( get_anthropic_llm, get_google_llm, @@ -67,7 +67,7 @@ class AgentType(str, Enum): DEFAULT_SYSTEM_MESSAGE = "You are a helpful assistant." -CHECKPOINTER = RedisCheckpoint(at=CheckpointAt.END_OF_STEP) +CHECKPOINTER = PostgresCheckpoint(at=CheckpointAt.END_OF_STEP) def get_agent_executor( diff --git a/backend/app/checkpoint.py b/backend/app/checkpoint.py index a8a81a8a..f7dce144 100644 --- a/backend/app/checkpoint.py +++ b/backend/app/checkpoint.py @@ -7,8 +7,10 @@ from langgraph.checkpoint import BaseCheckpointSaver from langgraph.checkpoint.base import Checkpoint, empty_checkpoint from redis.client import Redis as RedisType +import asyncpg from app.redis import get_redis_client +from app.lifespan import get_pg_pool def checkpoint_key(user_id: str, thread_id: str): @@ -83,3 +85,75 @@ def get(self, config: RunnableConfig) -> Checkpoint | None: def put(self, config: RunnableConfig, checkpoint: Checkpoint) -> None: return self.client.hmset(self._hash_key(config), _dump(checkpoint)) + + +class PostgresCheckpoint(BaseCheckpointSaver): + pg_pool: Optional[asyncpg.Pool] = None + is_setup: bool = Field(False, init=False, repr=False) + + class Config: + arbitrary_types_allowed = True + + @property + def config_specs(self) -> list[ConfigurableFieldSpec]: + return [ + ConfigurableFieldSpec( + id="thread_id", + annotation=Optional[str], + name="Thread ID", + description=None, + default=None, + is_shared=True, + ), + ] + + async def setup(self) -> None: + if self.is_setup: + return + + if self.pg_pool is None: + self.pg_pool = get_pg_pool() + + try: + async with self.pg_pool.acquire() as conn: + await conn.execute( + """ + CREATE TABLE IF NOT EXISTS checkpoints ( + thread_id TEXT PRIMARY KEY, + checkpoint BYTEA + ); + """ + ) + self.is_setup = True + except BaseException as e: + raise e + + def get(self, config: RunnableConfig) -> Optional[Checkpoint]: + raise NotImplementedError + + def put(self, config: RunnableConfig, checkpoint: Checkpoint) -> None: + raise NotImplementedError + + async def aget(self, config: RunnableConfig) -> Optional[Checkpoint]: + await self.setup() + thread_id = config["configurable"]["thread_id"] + async with self.pg_pool.acquire() as conn: + if value := await conn.fetchrow( + "SELECT checkpoint FROM checkpoints WHERE thread_id = $1", thread_id + ): + return pickle.loads(value[0]) + + async def aput(self, config: RunnableConfig, checkpoint: Checkpoint) -> None: + await self.setup() + thread_id = config["configurable"]["thread_id"] + async with self.pg_pool.acquire() as conn: + await conn.execute( + ( + "INSERT INTO checkpoints (thread_id, checkpoint) " + "VALUES ($1, $2) " + "ON CONFLICT (thread_id) " + "DO UPDATE SET checkpoint = EXCLUDED.checkpoint;" + ), + thread_id, + pickle.dumps(checkpoint), + ) From 833154de68b484dc4b1bdb1c80911a4be1f5af7c Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Fri, 15 Mar 2024 11:35:07 +0400 Subject: [PATCH 09/35] Use postgres for storage and change endpoints to async. --- backend/app/api/assistants.py | 17 ++-- backend/app/api/runs.py | 4 + backend/app/api/threads.py | 26 +++--- backend/app/schema.py | 9 ++- backend/app/storage.py | 147 ++++++++++++++++++++-------------- 5 files changed, 118 insertions(+), 85 deletions(-) diff --git a/backend/app/api/assistants.py b/backend/app/api/assistants.py index 01297c65..ac7c29ac 100644 --- a/backend/app/api/assistants.py +++ b/backend/app/api/assistants.py @@ -7,6 +7,7 @@ import app.storage as storage from app.schema import Assistant, AssistantWithoutUserId, OpengptsUserId + router = APIRouter() FEATURED_PUBLIC_ASSISTANTS = [] @@ -24,9 +25,9 @@ class AssistantPayload(BaseModel): @router.get("/") -def list_assistants(opengpts_user_id: OpengptsUserId) -> List[AssistantWithoutUserId]: +async def list_assistants(opengpts_user_id: OpengptsUserId) -> List[Assistant]: """List all assistants for the current user.""" - return storage.list_assistants(opengpts_user_id) + return await storage.list_assistants(opengpts_user_id) @router.get("/public/") @@ -42,24 +43,24 @@ def list_public_assistants( @router.get("/{aid}") -def get_asistant( +async def get_assistant( opengpts_user_id: OpengptsUserId, aid: AssistantID, ) -> Assistant: """Get an assistant by ID.""" - assistant = storage.get_assistant(opengpts_user_id, aid) + assistant = await storage.get_assistant(opengpts_user_id, aid) if not assistant: raise HTTPException(status_code=404, detail="Assistant not found") return assistant @router.post("") -def create_assistant( +async def create_assistant( opengpts_user_id: OpengptsUserId, payload: AssistantPayload, ) -> Assistant: """Create an assistant.""" - return storage.put_assistant( + return await storage.put_assistant( opengpts_user_id, str(uuid4()), name=payload.name, @@ -69,13 +70,13 @@ def create_assistant( @router.put("/{aid}") -def upsert_assistant( +async def upsert_assistant( opengpts_user_id: OpengptsUserId, aid: AssistantID, payload: AssistantPayload, ) -> Assistant: """Create or update an assistant.""" - return storage.put_assistant( + return await storage.put_assistant( opengpts_user_id, aid, name=payload.name, diff --git a/backend/app/api/runs.py b/backend/app/api/runs.py index f431a5c5..4e672049 100644 --- a/backend/app/api/runs.py +++ b/backend/app/api/runs.py @@ -43,6 +43,10 @@ async def _run_input_and_config(request: Request, opengpts_user_id: OpengptsUser None, get_assistant, public_user_id, body["assistant_id"] ), ) + assistant, public_assistant = await asyncio.gather( + get_assistant(opengpts_user_id, body["assistant_id"]), + get_assistant(public_user_id, body["assistant_id"]), + ) assistant = assistant or public_assistant if not assistant: raise HTTPException(status_code=404, detail="Assistant not found") diff --git a/backend/app/api/threads.py b/backend/app/api/threads.py index a41632c5..80c6d38f 100644 --- a/backend/app/api/threads.py +++ b/backend/app/api/threads.py @@ -6,7 +6,7 @@ from pydantic import BaseModel, Field import app.storage as storage -from app.schema import OpengptsUserId, Thread, ThreadWithoutUserId +from app.schema import OpengptsUserId, Thread router = APIRouter() @@ -28,49 +28,49 @@ class ThreadMessagesPostRequest(BaseModel): @router.get("/") -def list_threads(opengpts_user_id: OpengptsUserId) -> List[ThreadWithoutUserId]: +async def list_threads(opengpts_user_id: OpengptsUserId) -> List[Thread]: """List all threads for the current user.""" - return storage.list_threads(opengpts_user_id) + return await storage.list_threads(opengpts_user_id) @router.get("/{tid}/messages") -def get_thread_messages( +async def get_thread_messages( opengpts_user_id: OpengptsUserId, tid: ThreadID, ): """Get all messages for a thread.""" - return storage.get_thread_messages(opengpts_user_id, tid) + return await storage.get_thread_messages(opengpts_user_id, tid) @router.post("/{tid}/messages") -def add_thread_messages( +async def add_thread_messages( opengpts_user_id: OpengptsUserId, tid: ThreadID, payload: ThreadMessagesPostRequest, ): """Add messages to a thread.""" - return storage.post_thread_messages(opengpts_user_id, tid, payload.messages) + return await storage.post_thread_messages(opengpts_user_id, tid, payload.messages) @router.get("/{tid}") -def get_thread( +async def get_thread( opengpts_user_id: OpengptsUserId, tid: ThreadID, ) -> Thread: """Get a thread by ID.""" - thread = storage.get_thread(opengpts_user_id, tid) + thread = await storage.get_thread(opengpts_user_id, tid) if not thread: raise HTTPException(status_code=404, detail="Thread not found") return thread @router.post("") -def create_thread( +async def create_thread( opengpts_user_id: OpengptsUserId, thread_put_request: ThreadPutRequest, ) -> Thread: """Create a thread.""" - return storage.put_thread( + return await storage.put_thread( opengpts_user_id, str(uuid4()), assistant_id=thread_put_request.assistant_id, @@ -79,13 +79,13 @@ def create_thread( @router.put("/{tid}") -def upsert_thread( +async def upsert_thread( opengpts_user_id: OpengptsUserId, tid: ThreadID, thread_put_request: ThreadPutRequest, ) -> Thread: """Update a thread.""" - return storage.put_thread( + return await storage.put_thread( opengpts_user_id, tid, assistant_id=thread_put_request.assistant_id, diff --git a/backend/app/schema.py b/backend/app/schema.py index 336493cf..3a6e18cc 100644 --- a/backend/app/schema.py +++ b/backend/app/schema.py @@ -1,5 +1,6 @@ from datetime import datetime -from typing import Annotated +from typing import Annotated, Optional +from uuid import UUID from fastapi import Cookie from typing_extensions import TypedDict @@ -8,7 +9,7 @@ class AssistantWithoutUserId(TypedDict): """Assistant model.""" - assistant_id: str + assistant_id: UUID """The ID of the assistant.""" name: str """The name of the assistant.""" @@ -28,9 +29,9 @@ class Assistant(AssistantWithoutUserId): class ThreadWithoutUserId(TypedDict): - thread_id: str + thread_id: UUID """The ID of the thread.""" - assistant_id: str + assistant_id: Optional[UUID] """The assistant that was used in conjunction with this thread.""" name: str """The name of the thread.""" diff --git a/backend/app/storage.py b/backend/app/storage.py index a8b1a620..8ba913a7 100644 --- a/backend/app/storage.py +++ b/backend/app/storage.py @@ -7,10 +7,12 @@ from langgraph.checkpoint.base import empty_checkpoint from langgraph.pregel import _prepare_next_tasks + from app.agent import AgentType, get_agent_executor from app.redis import get_redis_client -from app.schema import Assistant, AssistantWithoutUserId, Thread, ThreadWithoutUserId +from app.schema import Assistant, AssistantWithoutUserId, Thread from app.stream import map_chunk_to_msg +from app.lifespan import get_pg_pool def assistants_list_key(user_id: str) -> str: @@ -42,24 +44,25 @@ def load(keys: list[str], values: list[bytes]) -> dict: return {k: orjson.loads(v) if v is not None else None for k, v in zip(keys, values)} -def list_assistants(user_id: str) -> List[Assistant]: +async def list_assistants(user_id: str) -> List[Assistant]: """List all assistants for the current user.""" - client = get_redis_client() - ids = [orjson.loads(id) for id in client.smembers(assistants_list_key(user_id))] - with client.pipeline() as pipe: - for id in ids: - pipe.hmget(assistant_key(user_id, id), *assistant_hash_keys) - assistants = pipe.execute() - return [load(assistant_hash_keys, values) for values in assistants] + + async with get_pg_pool().acquire() as conn: + return await conn.fetch("SELECT * FROM assistant WHERE user_id = $1", user_id) -def get_assistant(user_id: str, assistant_id: str) -> Assistant | None: +async def get_assistant(user_id: str, assistant_id: str) -> Assistant | None: """Get an assistant by ID.""" - client = get_redis_client() - values = client.hmget(assistant_key(user_id, assistant_id), *assistant_hash_keys) - return load(assistant_hash_keys, values) if any(values) else None + + async with get_pg_pool().acquire() as conn: + return await conn.fetchrow( + "SELECT * FROM assistant WHERE assistant_id = $1 AND user_id = $2", + assistant_id, + user_id, + ) +# TODO How should we represent public assistants? def list_public_assistants( assistant_ids: Sequence[str] ) -> List[AssistantWithoutUserId]: @@ -85,7 +88,7 @@ def list_public_assistants( return [load(assistant_hash_keys, values) for values in assistants] -def put_assistant( +async def put_assistant( user_id: str, assistant_id: str, *, name: str, config: dict, public: bool = False ) -> Assistant: """Modify an assistant. @@ -100,52 +103,61 @@ def put_assistant( Returns: return the assistant model if no exception is raised. """ - saved: Assistant = { - "user_id": user_id, # TODO(Nuno): Could we remove this? - "assistant_id": assistant_id, # TODO(Nuno): remove this? + updated_at = datetime.utcnow() + async with get_pg_pool().acquire() as conn: + async with conn.transaction(): + await conn.execute( + ( + "INSERT INTO assistant (assistant_id, user_id, name, config, updated_at, public) VALUES ($1, $2, $3, $4, $5, $6) " + "ON CONFLICT (assistant_id) DO UPDATE SET " + "user_id = EXCLUDED.user_id, " + "name = EXCLUDED.name, " + "config = EXCLUDED.config, " + "updated_at = EXCLUDED.updated_at, " + "public = EXCLUDED.public;" + ), + assistant_id, + user_id, + name, + config, + updated_at, + public, + ) + return { + "assistant_id": assistant_id, + "user_id": user_id, "name": name, "config": config, - "updated_at": datetime.utcnow(), + "updated_at": updated_at, "public": public, } - client = get_redis_client() - with client.pipeline() as pipe: - pipe.sadd(assistants_list_key(user_id), orjson.dumps(assistant_id)) - pipe.hset(assistant_key(user_id, assistant_id), mapping=_dump(saved)) - if public: - pipe.sadd(assistants_list_key(public_user_id), orjson.dumps(assistant_id)) - pipe.hset(assistant_key(public_user_id, assistant_id), mapping=_dump(saved)) - pipe.execute() - return saved -def list_threads(user_id: str) -> List[ThreadWithoutUserId]: +async def list_threads(user_id: str) -> List[Thread]: """List all threads for the current user.""" - client = get_redis_client() - ids = [orjson.loads(id) for id in client.smembers(threads_list_key(user_id))] - with client.pipeline() as pipe: - for id in ids: - pipe.hmget(thread_key(user_id, id), *thread_hash_keys) - threads = pipe.execute() - return [load(thread_hash_keys, values) for values in threads] + async with get_pg_pool().acquire() as conn: + return await conn.fetch("SELECT * FROM thread WHERE user_id = $1", user_id) -def get_thread(user_id: str, thread_id: str) -> Thread | None: +async def get_thread(user_id: str, thread_id: str) -> Thread | None: """Get a thread by ID.""" - client = get_redis_client() - values = client.hmget(thread_key(user_id, thread_id), *thread_hash_keys) - return load(thread_hash_keys, values) if any(values) else None + async with get_pg_pool().acquire() as conn: + return await conn.fetchrow( + "SELECT * FROM thread WHERE thread_id = $1 AND user_id = $2", + thread_id, + user_id, + ) # TODO remove hardcoded channel name MESSAGES_CHANNEL_NAME = "__root__" -def get_thread_messages(user_id: str, thread_id: str): +async def get_thread_messages(user_id: str, thread_id: str): """Get all messages for a thread.""" - config = {"configurable": {"user_id": user_id, "thread_id": thread_id}} + config = {"configurable": {"thread_id": thread_id}} app = get_agent_executor([], AgentType.GPT_35_TURBO, "", False) - checkpoint = app.checkpointer.get(config) or empty_checkpoint() + checkpoint = await app.checkpointer.aget(config) or empty_checkpoint() with ChannelsManager(app.channels, checkpoint) as channels: return { "messages": [ @@ -155,34 +167,49 @@ def get_thread_messages(user_id: str, thread_id: str): } -def post_thread_messages(user_id: str, thread_id: str, messages: Sequence[AnyMessage]): +async def post_thread_messages( + user_id: str, thread_id: str, messages: Sequence[AnyMessage] +): """Add messages to a thread.""" - config = {"configurable": {"user_id": user_id, "thread_id": thread_id}} + config = {"configurable": {"thread_id": thread_id}} app = get_agent_executor([], AgentType.GPT_35_TURBO, "", False) - checkpoint = app.checkpointer.get(config) or empty_checkpoint() + checkpoint = await app.checkpointer.aget(config) or empty_checkpoint() with ChannelsManager(app.channels, checkpoint) as channels: channel = channels[MESSAGES_CHANNEL_NAME] channel.update([messages]) checkpoint["channel_values"][MESSAGES_CHANNEL_NAME] = channel.checkpoint() checkpoint["channel_versions"][MESSAGES_CHANNEL_NAME] += 1 - app.checkpointer.put(config, checkpoint) + await app.checkpointer.aput(config, checkpoint) -def put_thread(user_id: str, thread_id: str, *, assistant_id: str, name: str) -> Thread: +async def put_thread( + user_id: str, thread_id: str, *, assistant_id: str, name: str +) -> Thread: """Modify a thread.""" - saved: Thread = { - "user_id": user_id, # TODO(Nuno): Could we remove this? - "thread_id": thread_id, - "assistant_id": assistant_id, - "name": name, - "updated_at": datetime.utcnow(), - } - client = get_redis_client() - with client.pipeline() as pipe: - pipe.sadd(threads_list_key(user_id), orjson.dumps(thread_id)) - pipe.hset(thread_key(user_id, thread_id), mapping=_dump(saved)) - pipe.execute() - return saved + updated_at = datetime.utcnow() + async with get_pg_pool().acquire() as conn: + await conn.execute( + ( + "INSERT INTO thread (thread_id, user_id, assistant_id, name, updated_at) VALUES ($1, $2, $3, $4, $5) " + "ON CONFLICT (thread_id) DO UPDATE SET " + "user_id = EXCLUDED.user_id," + "assistant_id = EXCLUDED.assistant_id, " + "name = EXCLUDED.name, " + "updated_at = EXCLUDED.updated_at;" + ), + thread_id, + user_id, + assistant_id, + name, + updated_at, + ) + return { + "thread_id": thread_id, + "user_id": user_id, + "assistant_id": assistant_id, + "name": name, + "updated_at": updated_at, + } if __name__ == "__main__": From 23a7175d04688c28ffddbfa18a2f48bb7d820193 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Fri, 15 Mar 2024 12:35:11 +0400 Subject: [PATCH 10/35] Remove redundant code. --- backend/app/api/runs.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/backend/app/api/runs.py b/backend/app/api/runs.py index 4e672049..13c6a190 100644 --- a/backend/app/api/runs.py +++ b/backend/app/api/runs.py @@ -35,14 +35,6 @@ async def _run_input_and_config(request: Request, opengpts_user_id: OpengptsUser body = await request.json() except json.JSONDecodeError: raise RequestValidationError(errors=["Invalid JSON body"]) - assistant, public_assistant = await asyncio.gather( - asyncio.get_running_loop().run_in_executor( - None, get_assistant, opengpts_user_id, body["assistant_id"] - ), - asyncio.get_running_loop().run_in_executor( - None, get_assistant, public_user_id, body["assistant_id"] - ), - ) assistant, public_assistant = await asyncio.gather( get_assistant(opengpts_user_id, body["assistant_id"]), get_assistant(public_user_id, body["assistant_id"]), From 122d98079f0bbf8b51f794b62ef8fd9533dd1d8d Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Fri, 15 Mar 2024 13:58:38 +0400 Subject: [PATCH 11/35] Handle public assistants. --- backend/app/api/assistants.py | 8 +++---- backend/app/api/runs.py | 4 ++-- backend/app/storage.py | 42 ++++++++++++++++------------------- 3 files changed, 25 insertions(+), 29 deletions(-) diff --git a/backend/app/api/assistants.py b/backend/app/api/assistants.py index ac7c29ac..af1e9e3d 100644 --- a/backend/app/api/assistants.py +++ b/backend/app/api/assistants.py @@ -5,7 +5,7 @@ from pydantic import BaseModel, Field import app.storage as storage -from app.schema import Assistant, AssistantWithoutUserId, OpengptsUserId +from app.schema import Assistant, OpengptsUserId router = APIRouter() @@ -31,13 +31,13 @@ async def list_assistants(opengpts_user_id: OpengptsUserId) -> List[Assistant]: @router.get("/public/") -def list_public_assistants( +async def list_public_assistants( shared_id: Annotated[ Optional[str], Query(description="ID of a publicly shared assistant.") ] = None, -) -> List[AssistantWithoutUserId]: +) -> List[Assistant]: """List all public assistants.""" - return storage.list_public_assistants( + return await storage.list_public_assistants( FEATURED_PUBLIC_ASSISTANTS + ([shared_id] if shared_id else []) ) diff --git a/backend/app/api/runs.py b/backend/app/api/runs.py index 13c6a190..7e117807 100644 --- a/backend/app/api/runs.py +++ b/backend/app/api/runs.py @@ -16,7 +16,7 @@ from app.agent import agent from app.schema import OpengptsUserId -from app.storage import get_assistant, public_user_id +from app.storage import get_assistant, get_public_assistant from app.stream import astream_messages, to_sse router = APIRouter() @@ -37,7 +37,7 @@ async def _run_input_and_config(request: Request, opengpts_user_id: OpengptsUser raise RequestValidationError(errors=["Invalid JSON body"]) assistant, public_assistant = await asyncio.gather( get_assistant(opengpts_user_id, body["assistant_id"]), - get_assistant(public_user_id, body["assistant_id"]), + get_public_assistant(body["assistant_id"]), ) assistant = assistant or public_assistant if not assistant: diff --git a/backend/app/storage.py b/backend/app/storage.py index 8ba913a7..2e7894c9 100644 --- a/backend/app/storage.py +++ b/backend/app/storage.py @@ -9,8 +9,7 @@ from app.agent import AgentType, get_agent_executor -from app.redis import get_redis_client -from app.schema import Assistant, AssistantWithoutUserId, Thread +from app.schema import Assistant, Thread from app.stream import map_chunk_to_msg from app.lifespan import get_pg_pool @@ -62,30 +61,27 @@ async def get_assistant(user_id: str, assistant_id: str) -> Assistant | None: ) -# TODO How should we represent public assistants? -def list_public_assistants( - assistant_ids: Sequence[str] -) -> List[AssistantWithoutUserId]: +async def get_public_assistant(assistant_id: str) -> Assistant | None: + """Get a public assistant by ID.""" + + async with get_pg_pool().acquire() as conn: + return await conn.fetchrow( + "SELECT * FROM assistant WHERE assistant_id = $1 AND public = true", + assistant_id, + ) + + +async def list_public_assistants(assistant_ids: Sequence[str]) -> List[Assistant]: """List all the public assistants.""" - if not assistant_ids: - return [] - client = get_redis_client() - ids = [ - id - for id, is_public in zip( - assistant_ids, - client.smismember( - assistants_list_key(public_user_id), - [orjson.dumps(id) for id in assistant_ids], + async with get_pg_pool().acquire() as conn: + return await conn.fetch( + ( + "SELECT * FROM assistant " + "WHERE assistant_id = ANY($1::uuid[]) " + "AND public = true;" ), + assistant_ids, ) - if is_public - ] - with client.pipeline() as pipe: - for id in ids: - pipe.hmget(assistant_key(public_user_id, id), *assistant_hash_keys) - assistants = pipe.execute() - return [load(assistant_hash_keys, values) for values in assistants] async def put_assistant( From dfe9b4f8fb24b54b49264dd0b7102aef22f677ec Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Fri, 15 Mar 2024 14:02:28 +0400 Subject: [PATCH 12/35] Remove redundant code. --- backend/app/storage.py | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/backend/app/storage.py b/backend/app/storage.py index 2e7894c9..9e61f1e7 100644 --- a/backend/app/storage.py +++ b/backend/app/storage.py @@ -1,7 +1,6 @@ from datetime import datetime from typing import List, Sequence -import orjson from langchain.schema.messages import AnyMessage from langgraph.channels.base import ChannelsManager from langgraph.checkpoint.base import empty_checkpoint @@ -14,35 +13,6 @@ from app.lifespan import get_pg_pool -def assistants_list_key(user_id: str) -> str: - return f"opengpts:{user_id}:assistants" - - -def assistant_key(user_id: str, assistant_id: str) -> str: - return f"opengpts:{user_id}:assistant:{assistant_id}" - - -def threads_list_key(user_id: str) -> str: - return f"opengpts:{user_id}:threads" - - -def thread_key(user_id: str, thread_id: str) -> str: - return f"opengpts:{user_id}:thread:{thread_id}" - - -assistant_hash_keys = ["assistant_id", "name", "config", "updated_at", "public"] -thread_hash_keys = ["assistant_id", "thread_id", "name", "updated_at"] -public_user_id = "eef39817-c173-4eb6-8be4-f77cf37054fb" - - -def _dump(map: dict) -> dict: - return {k: orjson.dumps(v) if v is not None else None for k, v in map.items()} - - -def load(keys: list[str], values: list[bytes]) -> dict: - return {k: orjson.loads(v) if v is not None else None for k, v in zip(keys, values)} - - async def list_assistants(user_id: str) -> List[Assistant]: """List all assistants for the current user.""" From 686ae1064445cc50cce4b2186ca56208d293bdad Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Tue, 19 Mar 2024 19:06:22 +0400 Subject: [PATCH 13/35] Use migration files. --- .../000001_create_extensions_and_first_tables.down.sql | 2 ++ .../000001_create_extensions_and_first_tables.up.sql | 4 ++-- docker-compose-prod.yml | 1 - docker-compose.yml | 1 - 4 files changed, 4 insertions(+), 4 deletions(-) create mode 100644 backend/migrations/000001_create_extensions_and_first_tables.down.sql rename postgres/init.sql => backend/migrations/000001_create_extensions_and_first_tables.up.sql (75%) diff --git a/backend/migrations/000001_create_extensions_and_first_tables.down.sql b/backend/migrations/000001_create_extensions_and_first_tables.down.sql new file mode 100644 index 00000000..c3810fcc --- /dev/null +++ b/backend/migrations/000001_create_extensions_and_first_tables.down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS thread; +DROP TABLE IF EXISTS assistant; \ No newline at end of file diff --git a/postgres/init.sql b/backend/migrations/000001_create_extensions_and_first_tables.up.sql similarity index 75% rename from postgres/init.sql rename to backend/migrations/000001_create_extensions_and_first_tables.up.sql index e8920722..7209a5e3 100644 --- a/postgres/init.sql +++ b/backend/migrations/000001_create_extensions_and_first_tables.up.sql @@ -6,7 +6,7 @@ CREATE TABLE IF NOT EXISTS assistant ( user_id VARCHAR(255) NOT NULL, name VARCHAR(255) NOT NULL, config JSON NOT NULL, - updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP WITH TIME ZONE DEFAULT (CURRENT_TIMESTAMP AT TIME ZONE 'UTC'), public BOOLEAN NOT NULL ); @@ -15,5 +15,5 @@ CREATE TABLE IF NOT EXISTS thread ( assistant_id UUID REFERENCES assistant(assistant_id) ON DELETE SET NULL, user_id VARCHAR(255) NOT NULL, name VARCHAR(255) NOT NULL, - updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP + updated_at TIMESTAMP WITH TIME ZONE DEFAULT (CURRENT_TIMESTAMP AT TIME ZONE 'UTC') ); \ No newline at end of file diff --git a/docker-compose-prod.yml b/docker-compose-prod.yml index 07c36653..1dcfb684 100644 --- a/docker-compose-prod.yml +++ b/docker-compose-prod.yml @@ -16,7 +16,6 @@ services: - .env volumes: - ./postgres-volume:/var/lib/postgresql/data - - ./postgres/init.sql:/docker-entrypoint-initdb.d/init.sql backend: container_name: opengpts-backend image: docker.io/langchain/open-gpts:latest diff --git a/docker-compose.yml b/docker-compose.yml index a2600d77..cae7169a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,7 +16,6 @@ services: - .env volumes: - ./postgres-volume:/var/lib/postgresql/data - - ./postgres/init.sql:/docker-entrypoint-initdb.d/init.sql backend: container_name: opengpts-backend build: From 22e7bbc1c3f78f3272afdb811de89f3300c0f639 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Tue, 19 Mar 2024 19:11:14 +0400 Subject: [PATCH 14/35] Use only PGVector as a vector store. --- .env.example | 3 +-- .env.gcp.yaml.example | 1 - backend/app/tools.py | 21 ++++----------------- backend/app/upload.py | 28 ++-------------------------- 4 files changed, 7 insertions(+), 46 deletions(-) diff --git a/.env.example b/.env.example index 5ec7a3f0..eb33d4ef 100644 --- a/.env.example +++ b/.env.example @@ -12,5 +12,4 @@ PROXY_URL=your_proxy_url POSTGRES_PORT=placeholder POSTGRES_DB=placeholder POSTGRES_USER=placeholder -POSTGRES_PASSWORD=placeholder -VSTORE_TYPE=placeholder \ No newline at end of file +POSTGRES_PASSWORD=placeholder \ No newline at end of file diff --git a/.env.gcp.yaml.example b/.env.gcp.yaml.example index ad07178c..8f811cdb 100644 --- a/.env.gcp.yaml.example +++ b/.env.gcp.yaml.example @@ -20,4 +20,3 @@ POSTGRES_PORT: your_postgres_port_here POSTGRES_DB: your_postgres_db_here POSTGRES_USER: your_postgres_user_here POSTGRES_PASSWORD: your_postgres_password_here -VSTORE_TYPE: redis_or_postgres diff --git a/backend/app/tools.py b/backend/app/tools.py index 86a30307..9c701f41 100644 --- a/backend/app/tools.py +++ b/backend/app/tools.py @@ -21,13 +21,10 @@ ) from langchain_community.utilities.arxiv import ArxivAPIWrapper from langchain_community.utilities.tavily_search import TavilySearchAPIWrapper -from langchain_community.vectorstores.redis import RedisFilter from langchain_robocorp import ActionServerToolkit from typing_extensions import TypedDict -from langchain_community.vectorstores.redis import Redis -from langchain_community.vectorstores.pgvector import PGVector -from app.upload import get_vectorstore +from app.upload import vstore class DDGInput(BaseModel): @@ -193,19 +190,9 @@ class Retrieval(BaseTool): def get_retriever(assistant_id: str, thread_id: str): - vstore = get_vectorstore() - if isinstance(vstore, Redis): - return vstore.as_retriever( - search_kwargs={ - "filter": (RedisFilter.tag("namespace") == assistant_id) - | (RedisFilter.tag("namespace") == thread_id) - } - ) - elif isinstance(vstore, PGVector): - return vstore.as_retriever( - search_kwargs={"filter": {"namespace": {"in": [assistant_id, thread_id]}}} - ) - raise Exception("Vector store not supported.") + return vstore.as_retriever( + search_kwargs={"filter": {"namespace": {"in": [assistant_id, thread_id]}}} + ) @lru_cache(maxsize=5) diff --git a/backend/app/upload.py b/backend/app/upload.py index d5612966..c9428eeb 100644 --- a/backend/app/upload.py +++ b/backend/app/upload.py @@ -10,11 +10,9 @@ import os from typing import Any, BinaryIO, List, Optional -from functools import lru_cache from langchain.text_splitter import RecursiveCharacterTextSplitter, TextSplitter from langchain_community.document_loaders.blob_loaders import Blob -from langchain_community.vectorstores.redis import Redis from langchain_community.vectorstores.pgvector import PGVector from langchain_core.runnables import ( ConfigurableField, @@ -110,17 +108,6 @@ def batch( return ids -index_schema = { - "tag": [{"name": "namespace"}], -} -redis_store = Redis( - redis_url=os.environ["REDIS_URL"], - index_name="opengpts", - embedding=OpenAIEmbeddings(), - index_schema=index_schema, -) - - PG_CONNECTION_STRING = PGVector.connection_string_from_db_params( driver="psycopg2", host=os.environ.get("POSTGRES_HOST", "localhost"), @@ -129,26 +116,15 @@ def batch( user=os.environ.get("POSTGRES_USER", "postgres"), password=os.environ.get("POSTGRES_PASSWORD", "postgres"), ) -postgres_store = PGVector( +vstore = PGVector( connection_string=PG_CONNECTION_STRING, embedding_function=OpenAIEmbeddings(), ) -@lru_cache(maxsize=1) -def get_vectorstore() -> VectorStore: - """Get the vectorstore.""" - vstore_type = os.getenv("VSTORE_TYPE") - if vstore_type == "redis": - return redis_store - elif vstore_type == "postgres": - return postgres_store - raise ValueError(f"Unknown vector store type (VSTORE_TYPE): {vstore_type}.") - - ingest_runnable = IngestRunnable( text_splitter=RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200), - vectorstore=get_vectorstore(), + vectorstore=vstore, ).configurable_fields( assistant_id=ConfigurableField( id="assistant_id", From 52492e70fad406b66c480576bf484c2578688813 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Tue, 19 Mar 2024 19:13:33 +0400 Subject: [PATCH 15/35] Use orjson instead of json to set asyncpg pool type codec. --- backend/app/lifespan.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/backend/app/lifespan.py b/backend/app/lifespan.py index 96e65bd1..1a5306b0 100644 --- a/backend/app/lifespan.py +++ b/backend/app/lifespan.py @@ -1,20 +1,24 @@ +import os from contextlib import asynccontextmanager + import asyncpg +import orjson from fastapi import FastAPI -import json -import os _pg_pool = None -def get_pg_pool(): +def get_pg_pool() -> asyncpg.pool.Pool: return _pg_pool -async def _init_connection(conn): +async def _init_connection(conn) -> None: await conn.set_type_codec( - "json", encoder=json.dumps, decoder=json.loads, schema="pg_catalog" + "json", + encoder=lambda v: orjson.dumps(v).decode(), + decoder=orjson.loads, + schema="pg_catalog", ) From b6b497aa89a83fc4bbd25e3e2d59194337dfb8a5 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Tue, 19 Mar 2024 19:14:26 +0400 Subject: [PATCH 16/35] Remove redis checkpointer. --- backend/app/checkpoint.py | 80 +-------------------------------------- 1 file changed, 2 insertions(+), 78 deletions(-) diff --git a/backend/app/checkpoint.py b/backend/app/checkpoint.py index f7dce144..cd4a08b1 100644 --- a/backend/app/checkpoint.py +++ b/backend/app/checkpoint.py @@ -1,92 +1,16 @@ import pickle -from typing import Any, Optional +from typing import Optional from langchain.pydantic_v1 import Field from langchain.schema.runnable import RunnableConfig from langchain.schema.runnable.utils import ConfigurableFieldSpec from langgraph.checkpoint import BaseCheckpointSaver -from langgraph.checkpoint.base import Checkpoint, empty_checkpoint -from redis.client import Redis as RedisType +from langgraph.checkpoint.base import Checkpoint import asyncpg -from app.redis import get_redis_client from app.lifespan import get_pg_pool -def checkpoint_key(user_id: str, thread_id: str): - return f"opengpts:{user_id}:thread:{thread_id}:checkpoint" - - -def _dump(mapping: dict[str, Any]) -> dict: - return {k: pickle.dumps(v) if v is not None else None for k, v in mapping.items()} - - -def _load(mapping: dict[bytes, bytes]) -> dict: - return { - k.decode(): pickle.loads(v) if v is not None else None - for k, v in mapping.items() - } - - -class RedisCheckpoint(BaseCheckpointSaver): - client: RedisType = Field(default_factory=get_redis_client) - - class Config: - arbitrary_types_allowed = True - - @property - def config_specs(self) -> list[ConfigurableFieldSpec]: - # Although the annotations are Optional[str], both fields are actually - # required to create a valid checkpoint key. - return [ - ConfigurableFieldSpec( - id="user_id", - annotation=Optional[str], - name="User ID", - description=None, - default=None, - is_shared=True, - ), - ConfigurableFieldSpec( - id="thread_id", - annotation=Optional[str], - name="Thread ID", - description=None, - default=None, - is_shared=True, - ), - ] - - def _hash_key(self, config: RunnableConfig) -> str: - return checkpoint_key( - config["configurable"]["user_id"], config["configurable"]["thread_id"] - ) - - def get(self, config: RunnableConfig) -> Checkpoint | None: - value = _load(self.client.hgetall(self._hash_key(config))) - if value.get("v") == 1: - # langgraph version 1 - return value - elif value.get("__pregel_version") == 1: - # permchain version 1 - value.pop("__pregel_version") - value.pop("__pregel_ts") - checkpoint = empty_checkpoint() - if value.get("messages"): - checkpoint["channel_values"] = {"__root__": value["messages"][1]} - else: - checkpoint["channel_values"] = {} - for key in checkpoint["channel_values"]: - checkpoint["channel_versions"][key] = 1 - return checkpoint - else: - # unknown version - return None - - def put(self, config: RunnableConfig, checkpoint: Checkpoint) -> None: - return self.client.hmset(self._hash_key(config), _dump(checkpoint)) - - class PostgresCheckpoint(BaseCheckpointSaver): pg_pool: Optional[asyncpg.Pool] = None is_setup: bool = Field(False, init=False, repr=False) From 80b7d332f1984dd9ddd546113116d622f1c581ac Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Tue, 19 Mar 2024 19:17:26 +0400 Subject: [PATCH 17/35] Use timezone-aware datetime objects. --- backend/app/storage.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/app/storage.py b/backend/app/storage.py index 9e61f1e7..e808db18 100644 --- a/backend/app/storage.py +++ b/backend/app/storage.py @@ -1,4 +1,4 @@ -from datetime import datetime +import datetime from typing import List, Sequence from langchain.schema.messages import AnyMessage @@ -69,7 +69,7 @@ async def put_assistant( Returns: return the assistant model if no exception is raised. """ - updated_at = datetime.utcnow() + updated_at = datetime.datetime.now(datetime.UTC) async with get_pg_pool().acquire() as conn: async with conn.transaction(): await conn.execute( @@ -152,7 +152,7 @@ async def put_thread( user_id: str, thread_id: str, *, assistant_id: str, name: str ) -> Thread: """Modify a thread.""" - updated_at = datetime.utcnow() + updated_at = datetime.datetime.now(datetime.UTC) async with get_pg_pool().acquire() as conn: await conn.execute( ( From 8a29758413bedc8cb8cf63dbf0834d4622cc5227 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Tue, 19 Mar 2024 19:18:04 +0400 Subject: [PATCH 18/35] Remove redundant code. --- backend/app/storage.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/backend/app/storage.py b/backend/app/storage.py index e808db18..4397ddf6 100644 --- a/backend/app/storage.py +++ b/backend/app/storage.py @@ -176,10 +176,3 @@ async def put_thread( "name": name, "updated_at": updated_at, } - - -if __name__ == "__main__": - print(list_assistants("133")) - print(list_threads("123")) - put_assistant("123", "i-am-a-test", name="Test Agent", config={"tags": ["hello"]}) - put_thread("123", "i-am-a-test", "test1", name="Test Thread") From 1fe717b368d202c80b9d2dde89ff6e2248c58bd9 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Tue, 19 Mar 2024 19:24:04 +0400 Subject: [PATCH 19/35] Update tests to use postgres. --- backend/tests/unit_tests/app/test_app.py | 94 ++++++++++-------------- backend/tests/unit_tests/conftest.py | 86 +++++++++++++++++++++- backend/tests/unit_tests/test_imports.py | 33 --------- 3 files changed, 125 insertions(+), 88 deletions(-) diff --git a/backend/tests/unit_tests/app/test_app.py b/backend/tests/unit_tests/app/test_app.py index d06d765e..f3105c11 100644 --- a/backend/tests/unit_tests/app/test_app.py +++ b/backend/tests/unit_tests/app/test_app.py @@ -1,13 +1,11 @@ """Test the server and client together.""" -import os from contextlib import asynccontextmanager from typing import Optional, Sequence +import asyncpg +from uuid import uuid4 -import pytest from httpx import AsyncClient -from langchain.utilities.redis import get_client as _get_redis_client -from redis.client import Redis as RedisType from typing_extensions import AsyncGenerator @@ -20,39 +18,20 @@ async def get_client() -> AsyncGenerator[AsyncClient, None]: yield ac -@pytest.fixture(scope="function") -def redis_client() -> RedisType: - """Get a redis client -- and clear it before the test!""" - redis_url = os.environ.get("REDIS_URL") - if "localhost" not in redis_url: - raise ValueError( - "This test is only intended to be run against a local redis instance" - ) - - if not redis_url.endswith("/3"): - raise ValueError( - "This test is only intended to be run against a local redis instance. " - "For testing purposes this is expected to be database #3 (arbitrary)." - ) - - client = _get_redis_client(redis_url) - client.flushdb() - try: - yield client - finally: - client.close() - - def _project(d: dict, *, exclude_keys: Optional[Sequence[str]]) -> dict: """Return a dict with only the keys specified.""" _exclude = set(exclude_keys) if exclude_keys else set() return {k: v for k, v in d.items() if k not in _exclude} -async def test_list_and_create_assistants(redis_client: RedisType) -> None: +async def test_list_and_create_assistants(pool: asyncpg.pool.Pool) -> None: """Test list and create assistants.""" headers = {"Cookie": "opengpts_user_id=1"} - assert sorted(redis_client.keys()) == [] + aid = str(uuid4()) + + async with pool.acquire() as conn: + assert len(await conn.fetch("SELECT * FROM assistant;")) == 0 + async with get_client() as client: response = await client.get( "/assistants/", @@ -64,41 +43,40 @@ async def test_list_and_create_assistants(redis_client: RedisType) -> None: # Create an assistant response = await client.put( - "/assistants/bobby", + f"/assistants/{aid}", json={"name": "bobby", "config": {}, "public": False}, headers=headers, ) assert response.status_code == 200 assert _project(response.json(), exclude_keys=["updated_at"]) == { - "assistant_id": "bobby", + "assistant_id": aid, "config": {}, "name": "bobby", "public": False, "user_id": "1", } - assert sorted(redis_client.keys()) == [ - b"opengpts:1:assistant:bobby", - b"opengpts:1:assistants", - ] + async with pool.acquire() as conn: + assert len(await conn.fetch("SELECT * FROM assistant;")) == 1 response = await client.get("/assistants/", headers=headers) assert [_project(d, exclude_keys=["updated_at"]) for d in response.json()] == [ { - "assistant_id": "bobby", + "assistant_id": aid, "config": {}, "name": "bobby", "public": False, + "user_id": "1", } ] response = await client.put( - "/assistants/bobby", + f"/assistants/{aid}", json={"name": "bobby", "config": {}, "public": False}, headers=headers, ) assert _project(response.json(), exclude_keys=["updated_at"]) == { - "assistant_id": "bobby", + "assistant_id": aid, "config": {}, "name": "bobby", "public": False, @@ -112,43 +90,51 @@ async def test_list_and_create_assistants(redis_client: RedisType) -> None: assert response.json() == [] -async def test_threads(redis_client: RedisType) -> None: +async def test_threads() -> None: """Test put thread.""" + headers = {"Cookie": "opengpts_user_id=1"} + aid = str(uuid4()) + tid = str(uuid4()) + async with get_client() as client: response = await client.put( - "/threads/1", - json={"name": "bobby", "assistant_id": "bobby"}, - headers={"Cookie": "opengpts_user_id=1"}, + f"/assistants/{aid}", + json={"name": "assistant", "config": {}, "public": False}, + headers=headers, ) - assert response.status_code == 200, response.text - response = await client.get( - "/threads/1/messages", headers={"Cookie": "opengpts_user_id=1"} + response = await client.put( + f"/threads/{tid}", + json={"name": "bobby", "assistant_id": aid}, + headers=headers, ) + assert response.status_code == 200, response.text + + response = await client.get(f"/threads/{tid}/messages", headers=headers) assert response.status_code == 200 - assert response.json() == {"messages": []} + assert response.json() == {"messages": [], "resumeable": False} + + response = await client.get("/threads/", headers=headers) - response = await client.get( - "/threads/", headers={"Cookie": "opengpts_user_id=1"} - ) assert response.status_code == 200 assert [_project(d, exclude_keys=["updated_at"]) for d in response.json()] == [ { - "assistant_id": "bobby", + "assistant_id": aid, "name": "bobby", - "thread_id": "1", + "thread_id": tid, + "user_id": "1", } ] # Test a bad requests response = await client.put( - "/threads/1", - json={"name": "bobby", "assistant_id": "bobby"}, + f"/threads/{tid}", + json={"name": "bobby", "assistant_id": aid}, ) assert response.status_code == 422 response = await client.put( - "/threads/1", + f"/threads/{tid}", headers={"Cookie": "opengpts_user_id=2"}, ) assert response.status_code == 422 diff --git a/backend/tests/unit_tests/conftest.py b/backend/tests/unit_tests/conftest.py index e980db47..134bc5b3 100644 --- a/backend/tests/unit_tests/conftest.py +++ b/backend/tests/unit_tests/conftest.py @@ -1,5 +1,89 @@ +import asyncio import os +import subprocess + +import asyncpg +import pytest + +from app.lifespan import get_pg_pool, lifespan +from app.server import app # Temporary handling of environment variables for testing -os.environ["REDIS_URL"] = "redis://localhost:6379/3" os.environ["OPENAI_API_KEY"] = "test" + +TEST_DB = "test" +assert os.environ["POSTGRES_DB"] != TEST_DB, "Test and main database conflict." +os.environ["POSTGRES_DB"] = TEST_DB + + +async def _get_conn() -> asyncpg.Connection: + return await asyncpg.connect( + user=os.environ["POSTGRES_USER"], + password=os.environ["POSTGRES_PASSWORD"], + host=os.environ["POSTGRES_HOST"], + port=os.environ["POSTGRES_PORT"], + database="postgres", + ) + + +async def _create_test_db() -> None: + """Check if the test database exists and create it if it doesn't.""" + conn = await _get_conn() + exists = await conn.fetchval("SELECT 1 FROM pg_database WHERE datname=$1", TEST_DB) + if not exists: + await conn.execute(f'CREATE DATABASE "{TEST_DB}"') + await conn.close() + + +async def _drop_test_db() -> None: + """Check if the test database exists and if so, drop it.""" + conn = await _get_conn() + exists = await conn.fetchval("SELECT 1 FROM pg_database WHERE datname=$1", TEST_DB) + if exists: + await conn.execute(f'DROP DATABASE "{TEST_DB}" WITH (FORCE)') + await conn.close() + + +def _migrate_test_db() -> None: + dsn = ( + f"postgres://{os.environ['POSTGRES_USER']}:{os.environ['POSTGRES_PASSWORD']}" + f"@localhost:{os.environ['POSTGRES_PORT']}/{TEST_DB}?sslmode=disable" + ) + cmd = ["migrate", "-path", "./migrations", "-database", dsn, "up"] + subprocess.run(cmd, check=True) + + +@pytest.fixture(scope="session") +async def pool(): + await _drop_test_db() # In case previous test session was abruptly terminated + await _create_test_db() + _migrate_test_db() + async with lifespan(app): + yield get_pg_pool() + await _drop_test_db() + + +@pytest.fixture(scope="function", autouse=True) +async def clear_test_db(pool): + """Truncate all tables before each test.""" + async with pool.acquire() as conn: + query = """ + DO + $$ + DECLARE + r RECORD; + BEGIN + FOR r IN (SELECT tablename FROM pg_tables WHERE schemaname = 'public') LOOP + EXECUTE 'TRUNCATE TABLE ' || quote_ident(r.tablename) || ' CASCADE;'; + END LOOP; + END + $$; + """ + await conn.execute(query) + + +@pytest.fixture(scope="session") +def event_loop(request): + loop = asyncio.get_event_loop_policy().new_event_loop() + yield loop + loop.close() diff --git a/backend/tests/unit_tests/test_imports.py b/backend/tests/unit_tests/test_imports.py index 80db0aee..a399d5ed 100644 --- a/backend/tests/unit_tests/test_imports.py +++ b/backend/tests/unit_tests/test_imports.py @@ -1,38 +1,5 @@ """Shallow tests that make sure we can at least import the code.""" -import os - -from pytest import MonkeyPatch - - -def test_redis_url_set() -> None: - """Verify that the redis URL is set.""" - if "REDIS_URL" not in os.environ: - raise AssertionError( - "REDIS_URL not set in environment. " - "You can run docker-compose from the root directory to get redis up and " - # Simplify the instructions for running the tests - "running. Then run the tests with `REDIS_URL=... make test`." - ) - # Use database 3 for unit tests - # Poorman's convention for accidentally wiping out an actual database - # Should change ports in a bit and add a fake test password - assert os.environ["REDIS_URL"].endswith("/3") - - -def test_agent_executor() -> None: - """Test agent executor.""" - # Shallow test to verify that teh code can be imported - import agent_executor # noqa: F401 - - -def test_gizmo_agent() -> None: - """Test gizmo agent.""" - # Shallow test to verify that teh code can be imported - with MonkeyPatch.context() as mp: - mp.setenv("OPENAI_API_KEY", "no_such_key") - import gizmo_agent # noqa: F401 - def test_import_app() -> None: """Test import app""" From 0f3296b8f23be57c1d5c6b42e4de0b82af76dba4 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Tue, 19 Mar 2024 19:24:42 +0400 Subject: [PATCH 20/35] Remove Redis. --- .env.gcp.yaml.example | 1 - .gitignore | 1 - backend/app/redis.py | 20 -------------------- backend/poetry.lock | 20 +------------------- backend/pyproject.toml | 1 - docker-compose-prod.yml | 10 +--------- docker-compose.yml | 10 +--------- 7 files changed, 3 insertions(+), 60 deletions(-) delete mode 100644 backend/app/redis.py diff --git a/.env.gcp.yaml.example b/.env.gcp.yaml.example index 8f811cdb..9a0d86d7 100644 --- a/.env.gcp.yaml.example +++ b/.env.gcp.yaml.example @@ -2,7 +2,6 @@ OPENAI_API_KEY: your_secret_key_here LANGCHAIN_TRACING_V2: "true" LANGCHAIN_PROJECT: langserve-launch-example LANGCHAIN_API_KEY: your_secret_key_here -REDIS_URL: your_secret_here FIREWORKS_API_KEY: your_secret_here AWS_ACCESS_KEY_ID: your_secret_here AWS_SECRET_ACCESS_KEY: your_secret_here diff --git a/.gitignore b/.gitignore index 722b4aa3..0ee44775 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,5 @@ *.env .env.gcp.yaml -redis-volume/ postgres-volume/ backend/ui diff --git a/backend/app/redis.py b/backend/app/redis.py deleted file mode 100644 index e4b088c7..00000000 --- a/backend/app/redis.py +++ /dev/null @@ -1,20 +0,0 @@ -import os - -from langchain.utilities.redis import get_client -from redis.client import Redis as RedisType - -CLIENT: RedisType | None = None - - -def get_redis_client() -> RedisType: - """Get a Redis client.""" - global CLIENT - - if CLIENT is not None: - return CLIENT - - url = os.environ.get("REDIS_URL") - if not url: - raise ValueError("REDIS_URL not set") - CLIENT = get_client(url, socket_keepalive=True) - return CLIENT diff --git a/backend/poetry.lock b/backend/poetry.lock index fa96c78e..0c803851 100644 --- a/backend/poetry.lock +++ b/backend/poetry.lock @@ -2978,24 +2978,6 @@ files = [ [package.extras] full = ["numpy"] -[[package]] -name = "redis" -version = "5.0.1" -description = "Python client for Redis database and key-value store" -optional = false -python-versions = ">=3.7" -files = [ - {file = "redis-5.0.1-py3-none-any.whl", hash = "sha256:ed4802971884ae19d640775ba3b03aa2e7bd5e8fb8dfaed2decce4d0fc48391f"}, - {file = "redis-5.0.1.tar.gz", hash = "sha256:0dab495cd5753069d3bc650a0dde8a8f9edde16fc5691b689a566eda58100d0f"}, -] - -[package.dependencies] -async-timeout = {version = ">=4.0.2", markers = "python_full_version <= \"3.11.2\""} - -[package.extras] -hiredis = ["hiredis (>=1.0.0)"] -ocsp = ["cryptography (>=36.0.1)", "pyopenssl (==20.0.1)", "requests (>=2.26.0)"] - [[package]] name = "regex" version = "2023.10.3" @@ -4168,4 +4150,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.9.0,<3.12" -content-hash = "99092c32cccd4a24b7da50f67265e05dfaf95cfce194efe42ee502aadeb12c19" +content-hash = "17bcc542ae659359ec64bbc6b1a3d048a87b376a54aed76c1c1bc5defc1b8cfb" diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 8dee8724..dc6d2c7b 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -17,7 +17,6 @@ langserve = "0.0.32" # This will only work for local development though! # langchain = { git = "git@github.com:langchain-ai/langchain.git/", branch = "nc/subclass-runnable-binding" , subdirectory = "libs/langchain"} orjson = "^3.9.10" -redis = "^5.0.1" python-multipart = "^0.0.6" tiktoken = "^0.5.1" langchain = ">=0.0.338" diff --git a/docker-compose-prod.yml b/docker-compose-prod.yml index 1dcfb684..b8bab2a8 100644 --- a/docker-compose-prod.yml +++ b/docker-compose-prod.yml @@ -1,13 +1,6 @@ version: "3" services: - redis: - container_name: opengpts-redis - image: redis/redis-stack-server:latest - ports: - - "6379:6379" - volumes: - - ./redis-volume:/data postgres: image: pgvector/pgvector:pg16 ports: @@ -22,9 +15,8 @@ services: ports: - "8100:8000" # Backend is accessible on localhost:8100 and serves the frontend depends_on: - - redis + - postgres env_file: - .env environment: - REDIS_URL: "redis://opengpts-redis:6379" POSTGRES_HOST: "postgres" diff --git a/docker-compose.yml b/docker-compose.yml index cae7169a..86e59501 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,13 +1,6 @@ version: "3" services: - redis: - container_name: opengpts-redis - image: redis/redis-stack-server:latest - ports: - - "6379:6379" - volumes: - - ./redis-volume:/data postgres: image: pgvector/pgvector:pg16 ports: @@ -23,13 +16,12 @@ services: ports: - "8100:8000" # Backend is accessible on localhost:8100 depends_on: - - redis + - postgres env_file: - .env volumes: - ./backend:/backend environment: - REDIS_URL: "redis://opengpts-redis:6379" POSTGRES_HOST: "postgres" command: - --reload From 9bbb3af154949324d792b513e21cf2817579d9e3 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Tue, 19 Mar 2024 22:08:42 +0400 Subject: [PATCH 21/35] Format. --- backend/app/api/assistants.py | 1 - backend/app/checkpoint.py | 2 +- backend/app/lifespan.py | 1 - backend/app/server.py | 2 +- backend/app/storage.py | 3 +-- backend/tests/unit_tests/app/test_app.py | 2 +- 6 files changed, 4 insertions(+), 7 deletions(-) diff --git a/backend/app/api/assistants.py b/backend/app/api/assistants.py index af1e9e3d..1667c5f4 100644 --- a/backend/app/api/assistants.py +++ b/backend/app/api/assistants.py @@ -7,7 +7,6 @@ import app.storage as storage from app.schema import Assistant, OpengptsUserId - router = APIRouter() FEATURED_PUBLIC_ASSISTANTS = [] diff --git a/backend/app/checkpoint.py b/backend/app/checkpoint.py index cd4a08b1..494c5a63 100644 --- a/backend/app/checkpoint.py +++ b/backend/app/checkpoint.py @@ -1,12 +1,12 @@ import pickle from typing import Optional +import asyncpg from langchain.pydantic_v1 import Field from langchain.schema.runnable import RunnableConfig from langchain.schema.runnable.utils import ConfigurableFieldSpec from langgraph.checkpoint import BaseCheckpointSaver from langgraph.checkpoint.base import Checkpoint -import asyncpg from app.lifespan import get_pg_pool diff --git a/backend/app/lifespan.py b/backend/app/lifespan.py index 1a5306b0..1b685671 100644 --- a/backend/app/lifespan.py +++ b/backend/app/lifespan.py @@ -5,7 +5,6 @@ import orjson from fastapi import FastAPI - _pg_pool = None diff --git a/backend/app/server.py b/backend/app/server.py index d4751aa8..72c1e2cf 100644 --- a/backend/app/server.py +++ b/backend/app/server.py @@ -7,8 +7,8 @@ from fastapi.staticfiles import StaticFiles from app.api import router as api_router -from app.upload import ingest_runnable from app.lifespan import lifespan +from app.upload import ingest_runnable logger = logging.getLogger(__name__) diff --git a/backend/app/storage.py b/backend/app/storage.py index 4397ddf6..1d5e2dae 100644 --- a/backend/app/storage.py +++ b/backend/app/storage.py @@ -6,11 +6,10 @@ from langgraph.checkpoint.base import empty_checkpoint from langgraph.pregel import _prepare_next_tasks - from app.agent import AgentType, get_agent_executor +from app.lifespan import get_pg_pool from app.schema import Assistant, Thread from app.stream import map_chunk_to_msg -from app.lifespan import get_pg_pool async def list_assistants(user_id: str) -> List[Assistant]: diff --git a/backend/tests/unit_tests/app/test_app.py b/backend/tests/unit_tests/app/test_app.py index f3105c11..f2bfdc6c 100644 --- a/backend/tests/unit_tests/app/test_app.py +++ b/backend/tests/unit_tests/app/test_app.py @@ -2,9 +2,9 @@ from contextlib import asynccontextmanager from typing import Optional, Sequence -import asyncpg from uuid import uuid4 +import asyncpg from httpx import AsyncClient from typing_extensions import AsyncGenerator From 723ce17c81c13a18b4d0918450136afc7ae29f4a Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Wed, 20 Mar 2024 10:08:46 +0400 Subject: [PATCH 22/35] Fix: read postgres host from env in tests. --- backend/tests/unit_tests/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/tests/unit_tests/conftest.py b/backend/tests/unit_tests/conftest.py index 134bc5b3..1ccb49b2 100644 --- a/backend/tests/unit_tests/conftest.py +++ b/backend/tests/unit_tests/conftest.py @@ -47,7 +47,7 @@ async def _drop_test_db() -> None: def _migrate_test_db() -> None: dsn = ( f"postgres://{os.environ['POSTGRES_USER']}:{os.environ['POSTGRES_PASSWORD']}" - f"@localhost:{os.environ['POSTGRES_PORT']}/{TEST_DB}?sslmode=disable" + f"@{os.environ['POSTGRES_HOST']}:{os.environ['POSTGRES_PORT']}/{TEST_DB}?sslmode=disable" ) cmd = ["migrate", "-path", "./migrations", "-database", dsn, "up"] subprocess.run(cmd, check=True) From 45356fc668f95f6f87494a997b961d77ba0d36f4 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Wed, 20 Mar 2024 10:16:18 +0400 Subject: [PATCH 23/35] Add go-migrate to dev and prod dockerfiles. --- Dockerfile | 3 ++- backend/Dockerfile | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 7e9d4a27..323accd0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,7 +16,8 @@ RUN yarn build FROM python:3.11 # Install system dependencies -RUN apt-get update && apt-get install -y libmagic1 && rm -rf /var/lib/apt/lists/* +RUN curl -s https://packagecloud.io/install/repositories/golang-migrate/migrate/script.deb.sh | bash +RUN apt-get update && apt-get install -y migrate libmagic1 && rm -rf /var/lib/apt/lists/* # Install Poetry RUN pip install poetry diff --git a/backend/Dockerfile b/backend/Dockerfile index 5139a3cb..1baf7148 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -2,7 +2,8 @@ FROM python:3.11 # Install system dependencies -RUN apt-get update && apt-get install -y libmagic1 && rm -rf /var/lib/apt/lists/* +RUN curl -s https://packagecloud.io/install/repositories/golang-migrate/migrate/script.deb.sh | bash +RUN apt-get update && apt-get install -y migrate libmagic1 && rm -rf /var/lib/apt/lists/* # Install Poetry RUN pip install poetry From ff16b8ab13a1898d4ec2365c6f89058071906c6f Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Wed, 20 Mar 2024 10:24:42 +0400 Subject: [PATCH 24/35] Install all dependencies in dev dockerfile. --- backend/Dockerfile | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/backend/Dockerfile b/backend/Dockerfile index 1baf7148..e7f6e450 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -14,10 +14,9 @@ WORKDIR /backend # Copy only dependencies COPY pyproject.toml poetry.lock* ./ -# Install dependencies -# --only main: Skip installing packages listed in the [tool.poetry.dev-dependencies] section +# Install all dependencies RUN poetry config virtualenvs.create false \ - && poetry install --no-interaction --no-ansi --only main + && poetry install --no-interaction --no-ansi # Copy the rest of application code COPY . . From f6cfc659e9e6a690f18041d551c603d34364e790 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Wed, 20 Mar 2024 11:15:29 +0400 Subject: [PATCH 25/35] Change env var access style. --- backend/app/upload.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/backend/app/upload.py b/backend/app/upload.py index c9428eeb..ce32f74f 100644 --- a/backend/app/upload.py +++ b/backend/app/upload.py @@ -110,11 +110,11 @@ def batch( PG_CONNECTION_STRING = PGVector.connection_string_from_db_params( driver="psycopg2", - host=os.environ.get("POSTGRES_HOST", "localhost"), - port=int(os.environ.get("POSTGRES_PORT", "5432")), - database=os.environ.get("POSTGRES_DB", "postgres"), - user=os.environ.get("POSTGRES_USER", "postgres"), - password=os.environ.get("POSTGRES_PASSWORD", "postgres"), + host=os.environ["POSTGRES_HOST"], + port=int(os.environ["POSTGRES_PORT"]), + database=os.environ["POSTGRES_DB"], + user=os.environ["POSTGRES_USER"], + password=os.environ["POSTGRES_PASSWORD"], ) vstore = PGVector( connection_string=PG_CONNECTION_STRING, From 4ee00e41376cc20d4acab43d4d3902b9e554b9ce Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Wed, 20 Mar 2024 11:29:15 +0400 Subject: [PATCH 26/35] Clean up schema. --- backend/app/schema.py | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/backend/app/schema.py b/backend/app/schema.py index 3a6e18cc..0c0e5923 100644 --- a/backend/app/schema.py +++ b/backend/app/schema.py @@ -6,11 +6,13 @@ from typing_extensions import TypedDict -class AssistantWithoutUserId(TypedDict): +class Assistant(TypedDict): """Assistant model.""" assistant_id: UUID """The ID of the assistant.""" + user_id: str + """The ID of the user that owns the assistant.""" name: str """The name of the assistant.""" config: dict @@ -21,16 +23,11 @@ class AssistantWithoutUserId(TypedDict): """Whether the assistant is public.""" -class Assistant(AssistantWithoutUserId): - """Assistant model.""" - - user_id: str - """The ID of the user that owns the assistant.""" - - -class ThreadWithoutUserId(TypedDict): +class Thread(TypedDict): thread_id: UUID """The ID of the thread.""" + user_id: str + """The ID of the user that owns the thread.""" assistant_id: Optional[UUID] """The assistant that was used in conjunction with this thread.""" name: str @@ -39,13 +36,6 @@ class ThreadWithoutUserId(TypedDict): """The last time the thread was updated.""" -class Thread(ThreadWithoutUserId): - """Thread model.""" - - user_id: str - """The ID of the user that owns the thread.""" - - OpengptsUserId = Annotated[ str, Cookie( From 71610fac938961fce95bd791e28cb8ec515fd821 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Wed, 20 Mar 2024 11:58:36 +0400 Subject: [PATCH 27/35] Update ci to run tests. --- .github/workflows/ci.yml | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 87c82812..70721bae 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,16 +43,20 @@ jobs: name: Python ${{ matrix.python-version }} tests services: # Label used to access the service container - redis: - image: redislabs/redisearch:latest - # Set health checks to wait until redis has started + postgres: + image: pgvector/pgvector:pg16 + env: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: postgres + # Set health checks to wait until postgres has started options: >- - --health-cmd "redis-cli ping" + --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 ports: - - "6379:6379" + - "5432:5432" steps: - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} + Poetry ${{ env.POETRY_VERSION }} @@ -62,12 +66,21 @@ jobs: poetry-version: ${{ env.POETRY_VERSION }} working-directory: . cache-key: langserve-all - - name: install redis + - name: Install dependencies run: | - pip install redis - - name: check redis is running + poetry install --with test + - name: Install go migrate run: | - python -c "import redis; redis.Redis(host='localhost', port=6379).ping()" + curl -s https://packagecloud.io/install/repositories/golang-migrate/migrate/script.deb.sh | sudo bash + sudo apt-get update && sudo apt-get install -y migrate + - name: Run tests + env: + POSTGRES_HOST: localhost + POSTGRES_PORT: 5432 + POSTGRES_DB: postgres + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + run: make test frontend-lint-and-build: runs-on: ubuntu-latest From d89ec0bcc4cd598d297d8aaf04e789655e3077f6 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Wed, 20 Mar 2024 12:26:54 +0400 Subject: [PATCH 28/35] Fixes for python 3.9. --- backend/app/storage.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/backend/app/storage.py b/backend/app/storage.py index 1d5e2dae..bf32826e 100644 --- a/backend/app/storage.py +++ b/backend/app/storage.py @@ -1,5 +1,5 @@ -import datetime -from typing import List, Sequence +from datetime import datetime, timezone +from typing import List, Optional, Sequence from langchain.schema.messages import AnyMessage from langgraph.channels.base import ChannelsManager @@ -19,7 +19,7 @@ async def list_assistants(user_id: str) -> List[Assistant]: return await conn.fetch("SELECT * FROM assistant WHERE user_id = $1", user_id) -async def get_assistant(user_id: str, assistant_id: str) -> Assistant | None: +async def get_assistant(user_id: str, assistant_id: str) -> Optional[Assistant]: """Get an assistant by ID.""" async with get_pg_pool().acquire() as conn: @@ -30,7 +30,7 @@ async def get_assistant(user_id: str, assistant_id: str) -> Assistant | None: ) -async def get_public_assistant(assistant_id: str) -> Assistant | None: +async def get_public_assistant(assistant_id: str) -> Optional[Assistant]: """Get a public assistant by ID.""" async with get_pg_pool().acquire() as conn: @@ -68,7 +68,7 @@ async def put_assistant( Returns: return the assistant model if no exception is raised. """ - updated_at = datetime.datetime.now(datetime.UTC) + updated_at = datetime.now(timezone.utc) async with get_pg_pool().acquire() as conn: async with conn.transaction(): await conn.execute( @@ -104,7 +104,7 @@ async def list_threads(user_id: str) -> List[Thread]: return await conn.fetch("SELECT * FROM thread WHERE user_id = $1", user_id) -async def get_thread(user_id: str, thread_id: str) -> Thread | None: +async def get_thread(user_id: str, thread_id: str) -> Optional[Thread]: """Get a thread by ID.""" async with get_pg_pool().acquire() as conn: return await conn.fetchrow( @@ -151,7 +151,7 @@ async def put_thread( user_id: str, thread_id: str, *, assistant_id: str, name: str ) -> Thread: """Modify a thread.""" - updated_at = datetime.datetime.now(datetime.UTC) + updated_at = datetime.now(timezone.utc) async with get_pg_pool().acquire() as conn: await conn.execute( ( From a3f429edb306f7f4640f78f6641723bf7f29fb7e Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Wed, 20 Mar 2024 15:04:15 +0400 Subject: [PATCH 29/35] Improve how golang-migrate is installed and add migration command to makefile. --- .github/workflows/ci.yml | 6 +++--- Dockerfile | 5 +++-- backend/Dockerfile | 5 +++-- backend/Makefile | 3 +++ backend/tests/unit_tests/conftest.py | 7 +------ 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 70721bae..ec79392a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -69,10 +69,10 @@ jobs: - name: Install dependencies run: | poetry install --with test - - name: Install go migrate + - name: Install golang-migrate run: | - curl -s https://packagecloud.io/install/repositories/golang-migrate/migrate/script.deb.sh | sudo bash - sudo apt-get update && sudo apt-get install -y migrate + wget -O golang-migrate.deb https://github.com/golang-migrate/migrate/releases/download/v4.17.0/migrate.linux-amd64.deb + sudo dpkg -i golang-migrate.deb && rm golang-migrate.deb - name: Run tests env: POSTGRES_HOST: localhost diff --git a/Dockerfile b/Dockerfile index 323accd0..86f6d9f1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,8 +16,9 @@ RUN yarn build FROM python:3.11 # Install system dependencies -RUN curl -s https://packagecloud.io/install/repositories/golang-migrate/migrate/script.deb.sh | bash -RUN apt-get update && apt-get install -y migrate libmagic1 && rm -rf /var/lib/apt/lists/* +RUN wget -O golang-migrate.deb https://github.com/golang-migrate/migrate/releases/download/v4.17.0/migrate.linux-amd64.deb \ + && dpkg -i golang-migrate.deb \ + && rm golang-migrate.deb # Install Poetry RUN pip install poetry diff --git a/backend/Dockerfile b/backend/Dockerfile index e7f6e450..75c4a8d9 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -2,8 +2,9 @@ FROM python:3.11 # Install system dependencies -RUN curl -s https://packagecloud.io/install/repositories/golang-migrate/migrate/script.deb.sh | bash -RUN apt-get update && apt-get install -y migrate libmagic1 && rm -rf /var/lib/apt/lists/* +RUN wget -O golang-migrate.deb https://github.com/golang-migrate/migrate/releases/download/v4.17.0/migrate.linux-amd64.deb \ + && dpkg -i golang-migrate.deb \ + && rm golang-migrate.deb # Install Poetry RUN pip install poetry diff --git a/backend/Makefile b/backend/Makefile index 6ff46f51..3e8dc5b2 100644 --- a/backend/Makefile +++ b/backend/Makefile @@ -16,6 +16,9 @@ TEST_FILE ?= tests/unit_tests/ start: poetry run uvicorn app.server:app --reload --port 8100 +migrate: + migrate -database postgres://$(POSTGRES_USER):$(POSTGRES_PASSWORD)@$(POSTGRES_HOST):$(POSTGRES_PORT)/$(POSTGRES_DB)?sslmode=disable -path ./migrations up + test: # We need to update handling of env variables for tests YDC_API_KEY=placeholder OPENAI_API_KEY=placeholder poetry run pytest $(TEST_FILE) diff --git a/backend/tests/unit_tests/conftest.py b/backend/tests/unit_tests/conftest.py index 1ccb49b2..1520a429 100644 --- a/backend/tests/unit_tests/conftest.py +++ b/backend/tests/unit_tests/conftest.py @@ -45,12 +45,7 @@ async def _drop_test_db() -> None: def _migrate_test_db() -> None: - dsn = ( - f"postgres://{os.environ['POSTGRES_USER']}:{os.environ['POSTGRES_PASSWORD']}" - f"@{os.environ['POSTGRES_HOST']}:{os.environ['POSTGRES_PORT']}/{TEST_DB}?sslmode=disable" - ) - cmd = ["migrate", "-path", "./migrations", "-database", dsn, "up"] - subprocess.run(cmd, check=True) + subprocess.run(["make", "migrate"], check=True) @pytest.fixture(scope="session") From 16febab8d641373fc926592402292fcde0ee7f37 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Wed, 20 Mar 2024 15:12:47 +0400 Subject: [PATCH 30/35] Remove unnecessary env vars from example. --- .env.gcp.yaml.example | 2 -- 1 file changed, 2 deletions(-) diff --git a/.env.gcp.yaml.example b/.env.gcp.yaml.example index 9a0d86d7..1faec225 100644 --- a/.env.gcp.yaml.example +++ b/.env.gcp.yaml.example @@ -10,8 +10,6 @@ AZURE_OPENAI_API_BASE: your_secret_here AZURE_OPENAI_API_VERSION: your_secret_here AZURE_OPENAI_API_KEY: your_secret_here KAY_API_KEY: your_secret_here -ROBOCORP_ACTION_SERVER_URL: https://dummy-action-server.robocorp.link -ROBOCORP_ACTION_SERVER_KEY: dummy-api-key CONNERY_RUNNER_URL: https://your-personal-connery-runner-url CONNERY_RUNNER_API_KEY: your_secret_here POSTGRES_HOST: your_postgres_host_here From 895caced5cc90dda882a47fca6520e37376ec6c8 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Wed, 20 Mar 2024 15:32:24 +0400 Subject: [PATCH 31/35] Update readme. --- README.md | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index ee988731..8325d1dc 100644 --- a/README.md +++ b/README.md @@ -48,25 +48,24 @@ poetry install **Set up persistence layer** -The backend by default uses Redis for saving agent configurations and chat message history. -In order to you use this, you need to a `REDIS_URL` variable. - -```shell -export REDIS_URL=... -``` - -For postgres you need to set `POSTGRES_HOST` among other variables. +The backend uses Postgres for saving agent configurations and chat message history. +In order to use this, you need to set the following environment variables: ```shell export POSTGRES_HOST=... +export POSTGRES_PORT=... +export POSTGRES_DB=... +export POSTGRES_USER=... +export POSTGRES_PASSWORD=... ``` +Migrations are managed with [golang-migrate](https://github.com/golang-migrate/migrate). Ensure it's installed on your machine and use `make migrate` to run all migrations. For those opting to run the project via Docker, the Dockerfile already includes golang-migrate. + **Set up vector database** -The backend by default also uses Redis as a vector database, +The backend by default also uses Postgres as a vector database, although you can easily switch this out to use any of the 50+ vector databases in LangChain. -If you are using Redis as a vectorstore, the above environment variable should work -(assuming you've enabled `redissearch`) +If you are using Postgres as a vectorstore, the above environment variables should work. **Set up language models** @@ -118,7 +117,7 @@ Navigate to [http://localhost:5173/](http://localhost:5173/) and enjoy! ## Installation and Running with Docker -This project supports a Docker-based setup, streamlining installation and execution. It automatically builds images for the frontend and backend and sets up Redis using docker-compose. +This project supports a Docker-based setup, streamlining installation and execution. It automatically builds images for the frontend and backend and sets up Postgres using docker-compose. ### Quick Start @@ -137,7 +136,7 @@ This project supports a Docker-based setup, streamlining installation and execut docker compose up ``` - This command builds the Docker images for the frontend and backend from their respective Dockerfiles and starts all necessary services, including Redis. + This command builds the Docker images for the frontend and backend from their respective Dockerfiles and starts all necessary services, including Postgres. 3. **Access the Application:** With the services running, access the frontend at [http://localhost:5173](http://localhost:5173), substituting `5173` with the designated port number. From 55841cbe3552c6ea2d987244f1e389e9012c7ab8 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Wed, 20 Mar 2024 15:35:29 +0400 Subject: [PATCH 32/35] Format. --- backend/app/storage.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/backend/app/storage.py b/backend/app/storage.py index bf32826e..4a08132a 100644 --- a/backend/app/storage.py +++ b/backend/app/storage.py @@ -14,14 +14,12 @@ async def list_assistants(user_id: str) -> List[Assistant]: """List all assistants for the current user.""" - async with get_pg_pool().acquire() as conn: return await conn.fetch("SELECT * FROM assistant WHERE user_id = $1", user_id) async def get_assistant(user_id: str, assistant_id: str) -> Optional[Assistant]: """Get an assistant by ID.""" - async with get_pg_pool().acquire() as conn: return await conn.fetchrow( "SELECT * FROM assistant WHERE assistant_id = $1 AND user_id = $2", @@ -32,7 +30,6 @@ async def get_assistant(user_id: str, assistant_id: str) -> Optional[Assistant]: async def get_public_assistant(assistant_id: str) -> Optional[Assistant]: """Get a public assistant by ID.""" - async with get_pg_pool().acquire() as conn: return await conn.fetchrow( "SELECT * FROM assistant WHERE assistant_id = $1 AND public = true", From 6972b36d178b1c9bc5629edf3a8ba8d12f29eb6a Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Wed, 20 Mar 2024 17:45:15 +0400 Subject: [PATCH 33/35] Return libmagic to dockerfiles. --- Dockerfile | 1 + backend/Dockerfile | 1 + 2 files changed, 2 insertions(+) diff --git a/Dockerfile b/Dockerfile index 86f6d9f1..af28f025 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,6 +16,7 @@ RUN yarn build FROM python:3.11 # Install system dependencies +RUN apt-get update && apt-get install -y libmagic1 && rm -rf /var/lib/apt/lists/* RUN wget -O golang-migrate.deb https://github.com/golang-migrate/migrate/releases/download/v4.17.0/migrate.linux-amd64.deb \ && dpkg -i golang-migrate.deb \ && rm golang-migrate.deb diff --git a/backend/Dockerfile b/backend/Dockerfile index 75c4a8d9..075fa2af 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -2,6 +2,7 @@ FROM python:3.11 # Install system dependencies +RUN apt-get update && apt-get install -y libmagic1 && rm -rf /var/lib/apt/lists/* RUN wget -O golang-migrate.deb https://github.com/golang-migrate/migrate/releases/download/v4.17.0/migrate.linux-amd64.deb \ && dpkg -i golang-migrate.deb \ && rm golang-migrate.deb From b17c425071b71c76616af7019cbb8ac55bcd0fd2 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Wed, 20 Mar 2024 19:26:06 +0400 Subject: [PATCH 34/35] Create the checkpoints table in a migration. --- backend/app/checkpoint.py | 32 ++----------------- ..._create_extensions_and_first_tables.up.sql | 5 +++ 2 files changed, 7 insertions(+), 30 deletions(-) diff --git a/backend/app/checkpoint.py b/backend/app/checkpoint.py index 494c5a63..515809ab 100644 --- a/backend/app/checkpoint.py +++ b/backend/app/checkpoint.py @@ -1,8 +1,6 @@ import pickle from typing import Optional -import asyncpg -from langchain.pydantic_v1 import Field from langchain.schema.runnable import RunnableConfig from langchain.schema.runnable.utils import ConfigurableFieldSpec from langgraph.checkpoint import BaseCheckpointSaver @@ -12,9 +10,6 @@ class PostgresCheckpoint(BaseCheckpointSaver): - pg_pool: Optional[asyncpg.Pool] = None - is_setup: bool = Field(False, init=False, repr=False) - class Config: arbitrary_types_allowed = True @@ -31,27 +26,6 @@ def config_specs(self) -> list[ConfigurableFieldSpec]: ), ] - async def setup(self) -> None: - if self.is_setup: - return - - if self.pg_pool is None: - self.pg_pool = get_pg_pool() - - try: - async with self.pg_pool.acquire() as conn: - await conn.execute( - """ - CREATE TABLE IF NOT EXISTS checkpoints ( - thread_id TEXT PRIMARY KEY, - checkpoint BYTEA - ); - """ - ) - self.is_setup = True - except BaseException as e: - raise e - def get(self, config: RunnableConfig) -> Optional[Checkpoint]: raise NotImplementedError @@ -59,18 +33,16 @@ def put(self, config: RunnableConfig, checkpoint: Checkpoint) -> None: raise NotImplementedError async def aget(self, config: RunnableConfig) -> Optional[Checkpoint]: - await self.setup() thread_id = config["configurable"]["thread_id"] - async with self.pg_pool.acquire() as conn: + async with get_pg_pool().acquire() as conn: if value := await conn.fetchrow( "SELECT checkpoint FROM checkpoints WHERE thread_id = $1", thread_id ): return pickle.loads(value[0]) async def aput(self, config: RunnableConfig, checkpoint: Checkpoint) -> None: - await self.setup() thread_id = config["configurable"]["thread_id"] - async with self.pg_pool.acquire() as conn: + async with get_pg_pool().acquire() as conn: await conn.execute( ( "INSERT INTO checkpoints (thread_id, checkpoint) " diff --git a/backend/migrations/000001_create_extensions_and_first_tables.up.sql b/backend/migrations/000001_create_extensions_and_first_tables.up.sql index 7209a5e3..cb395a74 100644 --- a/backend/migrations/000001_create_extensions_and_first_tables.up.sql +++ b/backend/migrations/000001_create_extensions_and_first_tables.up.sql @@ -16,4 +16,9 @@ CREATE TABLE IF NOT EXISTS thread ( user_id VARCHAR(255) NOT NULL, name VARCHAR(255) NOT NULL, updated_at TIMESTAMP WITH TIME ZONE DEFAULT (CURRENT_TIMESTAMP AT TIME ZONE 'UTC') +); + +CREATE TABLE IF NOT EXISTS checkpoints ( + thread_id TEXT PRIMARY KEY, + checkpoint BYTEA ); \ No newline at end of file From 392de43036cb5ef312e24a8db5eccfbb60a1eff3 Mon Sep 17 00:00:00 2001 From: Bakar Tavadze Date: Wed, 20 Mar 2024 20:09:23 +0400 Subject: [PATCH 35/35] Update the get_assistant storage method. --- backend/app/api/runs.py | 9 ++------- backend/app/storage.py | 11 +---------- 2 files changed, 3 insertions(+), 17 deletions(-) diff --git a/backend/app/api/runs.py b/backend/app/api/runs.py index 7e117807..8ec4b813 100644 --- a/backend/app/api/runs.py +++ b/backend/app/api/runs.py @@ -1,4 +1,3 @@ -import asyncio import json from typing import Optional, Sequence @@ -16,7 +15,7 @@ from app.agent import agent from app.schema import OpengptsUserId -from app.storage import get_assistant, get_public_assistant +from app.storage import get_assistant from app.stream import astream_messages, to_sse router = APIRouter() @@ -35,11 +34,7 @@ async def _run_input_and_config(request: Request, opengpts_user_id: OpengptsUser body = await request.json() except json.JSONDecodeError: raise RequestValidationError(errors=["Invalid JSON body"]) - assistant, public_assistant = await asyncio.gather( - get_assistant(opengpts_user_id, body["assistant_id"]), - get_public_assistant(body["assistant_id"]), - ) - assistant = assistant or public_assistant + assistant = await get_assistant(opengpts_user_id, body["assistant_id"]) if not assistant: raise HTTPException(status_code=404, detail="Assistant not found") config: RunnableConfig = { diff --git a/backend/app/storage.py b/backend/app/storage.py index 4a08132a..ac97045c 100644 --- a/backend/app/storage.py +++ b/backend/app/storage.py @@ -22,21 +22,12 @@ async def get_assistant(user_id: str, assistant_id: str) -> Optional[Assistant]: """Get an assistant by ID.""" async with get_pg_pool().acquire() as conn: return await conn.fetchrow( - "SELECT * FROM assistant WHERE assistant_id = $1 AND user_id = $2", + "SELECT * FROM assistant WHERE assistant_id = $1 AND (user_id = $2 OR public = true)", assistant_id, user_id, ) -async def get_public_assistant(assistant_id: str) -> Optional[Assistant]: - """Get a public assistant by ID.""" - async with get_pg_pool().acquire() as conn: - return await conn.fetchrow( - "SELECT * FROM assistant WHERE assistant_id = $1 AND public = true", - assistant_id, - ) - - async def list_public_assistants(assistant_ids: Sequence[str]) -> List[Assistant]: """List all the public assistants.""" async with get_pg_pool().acquire() as conn: