The AlloyDB Python Connector is an AlloyDB Connector library designed for use with the Python language.
Using an AlloyDB Connector provides the following benefits:
-
IAM Authorization: uses IAM permissions to control who/what can connect to your AlloyDB instances
-
Improved Security: uses robust, updated TLS 1.3 encryption and identity verification between the client connector and the server-side proxy, independent of the database protocol.
-
Convenience: removes the requirement to use and distribute SSL certificates, as well as manage firewalls or source/destination IP addresses.
-
(optionally) IAM DB Authentication: provides support for AlloyDB’s automatic IAM DB AuthN feature.
The AlloyDB Python Connector is a package to be used alongside a database driver. Currently supported drivers are:
You can install this library with pip install
:
pip install "google-cloud-alloydb-connector[pg8000]"
See Synchronous Driver Usage for details.
pip install "google-cloud-alloydb-connector[asyncpg]"
See Async Driver Usage for details.
This package requires the following to connect successfully:
-
IAM principal (user, service account, etc.) with the AlloyDB Client role or equivalent. Credentials for the IAM principal are used to authorize connections to an AlloyDB instance.
-
The AlloyDB API to be enabled within your Google Cloud Project. By default, the API will be called in the project associated with the IAM principal.
This library uses the Application Default Credentials (ADC) strategy for resolving credentials. Please see these instructions for how to set your ADC (Google Cloud Application vs Local Development, IAM user vs service account credentials), or consult the google.auth package.
This package provides several functions for authorizing and encrypting connections. These functions are used with your database driver to connect to your AlloyDB instance.
AlloyDB supports network connectivity through public IP addresses and private, internal IP addresses, as well as Private Service Connect (PSC). By default this package will attempt to connect over a private IP connection. When doing so, this package must be run in an environment that is connected to the VPC Network that hosts your AlloyDB private IP address.
Please see Configuring AlloyDB Connectivity for more details.
To connect to AlloyDB using the connector, inititalize a Connector
object and call it's connect
method with the proper input parameters.
The Connector
itself creates database connection objects by calling its connect
method
but does not manage database connection pooling. For this reason, it is recommended to use
the connector alongside a library that can create connection pools, such as
SQLAlchemy. This will allow for connections to remain open and
be reused, reducing connection overhead and the number of connections needed.
In the Connector's connect
method below, input your AlloyDB instance URI as
the first positional argument and the name of the database driver for the
second positional argument. Insert the rest of your connection keyword arguments
like user
, password
and db
etc.
To use this connector with SQLAlchemy, use the creator
argument for sqlalchemy.create_engine
:
from google.cloud.alloydb.connector import Connector
import sqlalchemy
# initialize Connector object
connector = Connector()
# function to return the database connection
def getconn():
conn = connector.connect(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
"pg8000",
user="my-user",
password="my-password",
db="my-db-name"
# NOTE: this assumes private IP by default.
# Add the following keyword arg to use public IP:
# ip_type="PUBLIC"
)
return conn
# create connection pool
pool = sqlalchemy.create_engine(
"postgresql+pg8000://",
creator=getconn,
)
The returned connection pool engine can then be used to query and modify the database.
# insert statement
insert_stmt = sqlalchemy.text(
"INSERT INTO my_table (id, title) VALUES (:id, :title)",
)
with pool.connect() as db_conn:
# insert into database
db_conn.execute(insert_stmt, parameters={"id": "book1", "title": "Book One"})
# query database
result = db_conn.execute(sqlalchemy.text("SELECT * from my_table")).fetchall()
# commit transaction (SQLAlchemy v2.X.X is commit as you go)
db_conn.commit()
# Do something with the results
for row in result:
print(row)
To close the Connector
object's background resources, call it's close()
method as follows:
connector.close()
The Connector
object can also be used as a context manager in order to
automatically close and cleanup resources, removing the need for explicit
calls to connector.close()
.
Connector as a context manager:
from google.cloud.alloydb.connector import Connector
import sqlalchemy
# helper function to return SQLAlchemy connection pool
def init_connection_pool(connector: Connector) -> sqlalchemy.engine.Engine:
# function used to generate database connection
def getconn():
conn = connector.connect(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
"pg8000",
user="my-user",
password="my-password",
db="my-db-name"
)
return conn
# create connection pool
pool = sqlalchemy.create_engine(
"postgresql+pg8000://",
creator=getconn,
)
return pool
# initialize Connector as context manager
with Connector() as connector:
# initialize connection pool
pool = init_connection_pool(connector)
# insert statement
insert_stmt = sqlalchemy.text(
"INSERT INTO my_table (id, title) VALUES (:id, :title)",
)
# interact with AlloyDB database using connection pool
with pool.connect() as db_conn:
# insert into database
db_conn.execute(insert_stmt, parameters={"id": "book1", "title": "Book One"})
# commit transaction (SQLAlchemy v2.X.X is commit as you go)
db_conn.commit()
# query database
result = db_conn.execute(sqlalchemy.text("SELECT * from my_table")).fetchall()
# Do something with the results
for row in result:
print(row)
The AlloyDB Connector is compatible with asyncio to improve the speed and
efficiency of database connections through concurrency. The AsyncConnector
currently supports the following asyncio database drivers:
import asyncpg
from google.cloud.alloydb.connector import AsyncConnector
async def main():
# initialize Connector object for connections to AlloyDB
connector = AsyncConnector()
# creation function to generate asyncpg connections as the 'connect' arg
async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection:
return await connector.connect(
instance_connection_name,
"asyncpg",
user="my-user",
password="my-password",
db="my-db",
)
# initialize connection pool
pool = await asyncpg.create_pool(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
connect=getconn,
)
# acquire connection and query AlloyDB database
async with pool.acquire() as conn:
res = await conn.fetch("SELECT NOW()")
# close Connector
await connector.close()
import asyncpg
import sqlalchemy
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from google.cloud.alloydb.connector import AsyncConnector
async def init_connection_pool(connector: AsyncConnector) -> AsyncEngine:
# creation function to generate asyncpg connections as 'async_creator' arg
async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
"asyncpg",
user="my-user",
password="my-password",
db="my-db-name"
# NOTE: this assumes private IP by default.
# Add the following keyword arg to use public IP:
# ip_type="PUBLIC"
# ... additional database driver args
)
return conn
# The AlloyDB Python Connector can be used along with SQLAlchemy using the
# 'async_creator' argument to 'create_async_engine'
pool = create_async_engine(
"postgresql+asyncpg://",
async_creator=getconn,
)
return pool
async def main():
connector = AsyncConnector()
# initialize connection pool
pool = await init_connection_pool(connector)
# example query
async with pool.connect() as conn:
await conn.execute(sqlalchemy.text("SELECT NOW()"))
# dispose of connection pool
await pool.dispose()
# close Connector
await connector.close()
For more details on additional arguments with an asyncpg.Connection
, please
visit the official documentation.
The AsyncConnector
also may be used as an async context manager, removing the
need for explicit calls to connector.close()
to cleanup resources.
import asyncpg
from google.cloud.alloydb.connector import AsyncConnector
async def main():
# initialize AsyncConnector object for connections to AlloyDB
async with AsyncConnector() as connector:
# creation function to generate asyncpg connections as the 'connect' arg
async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection:
return await connector.connect(
instance_connection_name,
"asyncpg",
user="my-user",
password="my-password",
db="my-db",
)
# create connection pool
pool = await asyncpg.create_pool(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
connect=getconn,
)
# acquire connection and query AlloyDB database
async with pool.acquire() as conn:
res = await conn.fetch("SELECT NOW()")
import asyncio
import asyncpg
import sqlalchemy
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from google.cloud.alloydb.connector import AsyncConnector
async def init_connection_pool(connector: AsyncConnector) -> AsyncEngine:
# creation function to generate asyncpg connections as 'async_creator' arg
async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
"asyncpg",
user="my-user",
password="my-password",
db="my-db-name"
# ... additional database driver args
)
return conn
# The AlloyDB Python Connector can be used along with SQLAlchemy using the
# 'async_creator' argument to 'create_async_engine'
pool = create_async_engine(
"postgresql+asyncpg://",
async_creator=getconn,
)
return pool
async def main():
# initialize Connector object for connections to AlloyDB
async with AsyncConnector() as connector:
# initialize connection pool
pool = await init_connection_pool(connector)
# example query
async with pool.connect() as conn:
await conn.execute(sqlalchemy.text("SELECT NOW()"))
# dispose of connection pool
await pool.dispose()
The Python Connector supports Automatic IAM database authentication.
Make sure to configure your AlloyDB Instance to allow IAM authentication and add an IAM database user.
A Connector
or AsyncConnector
can be configured to connect to an AlloyDB instance using
automatic IAM database authentication with the enable_iam_auth
argument set to True
.
When configuring the connector.connect
call for IAM authentication, the password
field can be
omitted and the user
field should be formatted as follows:
- For an IAM user account, this is the user's email address.
- For a service account, it is the service account's email without the
.gserviceaccount.com
domain suffix.
For example, to connect with IAM authentication using the
[email protected]
service account:
connector.connect(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
"pg8000", # asyncpg for AsyncConnector
user="[email protected]",
db="my-db-name",
enable_iam_auth=True,
)
The Connector's refresh_strategy
argument can be set to "lazy"
to configure
the Python Connector to retrieve connection info lazily and as-needed.
Otherwise, a background refresh cycle runs to retrive the connection info
periodically. This setting is useful in environments where the CPU may be
throttled outside of a request context, e.g., Cloud Run, Cloud Functions, etc.
To set the refresh strategy, set the refresh_strategy
keyword argument when
initializing a Connector
:
connector = Connector(refresh_strategy="lazy")
The AlloyDB Python Connector by default will attempt to establish connections
to your instance's private IP. To change this, such as connecting to AlloyDB
over a public IP address or Private Service Connect (PSC), set the ip_type
keyword argument when initializing a Connector()
or when calling
connector.connect()
.
Possible values for ip_type
are "PRIVATE"
(default value), "PUBLIC"
,
and "PSC"
.
Example:
from google.cloud.alloydb.connector import Connector
import sqlalchemy
# initialize Connector object
connector = Connector()
# function to return the database connection
def getconn():
return connector.connect(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
"pg8000",
user="my-user",
password="my-password",
db="my-db-name",
ip_type="PUBLIC", # use public IP
)
# create connection pool
pool = sqlalchemy.create_engine(
"postgresql+pg8000://",
creator=getconn,
)
# use connection pool...
connector.close()
The AlloyDB Python Connector uses the standard Python logging module for debug logging support.
Add the below code to your application to enable debug logging with the AlloyDB Python Connector:
import logging
logging.basicConfig(format="%(asctime)s [%(levelname)s]: %(message)s")
logger = logging.getLogger(name="google.cloud.alloydb.connector")
logger.setLevel(logging.DEBUG)
For more details on configuring logging, please refer to the Python logging docs.
This project uses semantic versioning, and uses the following lifecycle regarding support for a major version:
Active - Active versions get all new features and security fixes (that wouldn’t otherwise introduce a breaking change). New major versions are guaranteed to be "active" for a minimum of 1 year. Deprecated - Deprecated versions continue to receive security and critical bug fixes, but do not receive new features. Deprecated versions will be publicly supported for 1 year. Unsupported - Any major version that has been deprecated for >=1 year is considered publicly unsupported.
We follow the Python Version Support Policy used by Google Cloud Libraries for Python. Changes in supported Python versions will be considered a minor change, and will be listed in the release notes.
This project aims for a minimum monthly release cadence. If no new features or fixes have been added, a new PATCH version with the latest dependencies is released.
We welcome outside contributions. Please see our Contributing Guide for details on how best to contribute.