Skip to content

Commit

Permalink
refactor sendall to check for error, fix int to uint casts, collect c…
Browse files Browse the repository at this point in the history
…overage info at the end of all tests
  • Loading branch information
var77 committed Aug 6, 2024
1 parent 6a4cf73 commit d84c174
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 49 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/benchmark-linux.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:

- name: Build
id: build
run: sudo su -c "PG_VERSION=15 USE_SOURCE=1 ./ci/scripts/build.sh"
run: sudo su -c "PG_VERSION=15 USE_SOURCE=1 USE_SSL=1 ./ci/scripts/build.sh"
env:
BRANCH_NAME: ${{ github.head_ref || github.ref_name }}

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
submodules: "recursive"
- name: Build
id: build
run: sudo sh -c "PG_VERSION=$PG_VERSION USE_SOURCE=1 GITHUB_OUTPUT=$GITHUB_OUTPUT BUILD_PACKAGES=1 ./ci/scripts/build.sh"
run: sudo sh -c "PG_VERSION=$PG_VERSION USE_SOURCE=1 GITHUB_OUTPUT=$GITHUB_OUTPUT BUILD_PACKAGES=1 USE_SSL=1 ./ci/scripts/build.sh"
env:
PG_VERSION: ${{ matrix.postgres }}
- name: Create Archive Package
Expand Down
16 changes: 11 additions & 5 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:

- name: Build without SSL
id: build_without_ssl
run: sudo sh -c "DISABLE_SSL=1 PG_VERSION=$PG_VERSION USE_SOURCE=1 GITHUB_OUTPUT=$GITHUB_OUTPUT INSTALL_CLI=1 ./ci/scripts/build.sh"
run: sudo sh -c "USE_SSL=0 PG_VERSION=$PG_VERSION USE_SOURCE=1 GITHUB_OUTPUT=$GITHUB_OUTPUT INSTALL_CLI=1 ./ci/scripts/build.sh"
env:
PG_VERSION: ${{ matrix.postgres }}
BRANCH_NAME: ${{ github.head_ref || github.ref_name }}
Expand All @@ -51,22 +51,22 @@ jobs:
run: |
# pytest tries to open files with full path and so 'work' home folder must be listable by postgres for it to work
sudo chmod +x /home/runner && \
sudo su postgres -c "LANTERN_CLI_PATH=${{ env.LANTERN_CLI_PATH }} EXTERNAL_INDEX_SECURE=0 /tmp/lantern/cienv/bin/python ./scripts/integration_tests.py -k external_index" &&\
sudo su postgres -c "LANTERN_CLI_PATH=${{ env.LANTERN_CLI_PATH }} USE_SSL=0 /tmp/lantern/cienv/bin/python ./scripts/integration_tests.py -k external_index" &&\
echo "Done with integration tests"
env:
PG_VERSION: ${{ matrix.postgres }}
if: ${{ !startsWith(matrix.os, 'mac') }}

- name: Build
id: build
run: sudo sh -c "PG_VERSION=$PG_VERSION USE_SOURCE=1 GITHUB_OUTPUT=$GITHUB_OUTPUT ENABLE_COVERAGE=$ENABLE_COVERAGE INSTALL_CLI=1 ./ci/scripts/build.sh"
run: sudo sh -c "USE_SSL=1 PG_VERSION=$PG_VERSION USE_SOURCE=1 GITHUB_OUTPUT=$GITHUB_OUTPUT ENABLE_COVERAGE=$ENABLE_COVERAGE ./ci/scripts/build.sh"
env:
PG_VERSION: ${{ matrix.postgres }}
BRANCH_NAME: ${{ github.head_ref || github.ref_name }}
ENABLE_COVERAGE: ${{ (startsWith(matrix.os, 'ubuntu') && matrix.postgres == 15) && '1' || '0' }}
- name: Run tests linux
id: test-linux
run: sudo su postgres -c "PG_VERSION=$PG_VERSION ENABLE_COVERAGE=$ENABLE_COVERAGE ./ci/scripts/run-tests-linux.sh"
run: sudo su postgres -c "PG_VERSION=$PG_VERSION ./ci/scripts/run-tests-linux.sh"
env:
PG_VERSION: ${{ matrix.postgres }}
ENABLE_COVERAGE: ${{ (startsWith(matrix.os, 'ubuntu') && matrix.postgres == 15) && '1' || '0' }}
Expand All @@ -81,11 +81,17 @@ jobs:
# pytest tries to open files with full path and so 'work' home folder must be listable by postgres for it to work
sudo fuser -k 8998/tcp || true 2>/dev/null # kill previously started cli
sudo chmod +x /home/runner && \
sudo su postgres -c "LANTERN_CLI_PATH=${{ env.LANTERN_CLI_PATH }} EXTERNAL_INDEX_SECURE=1 /tmp/lantern/cienv/bin/python ./scripts/integration_tests.py" &&\
sudo su postgres -c "LANTERN_CLI_PATH=${{ env.LANTERN_CLI_PATH }} USE_SSL=1 /tmp/lantern/cienv/bin/python ./scripts/integration_tests.py" &&\
echo "Done with integration tests"
env:
PG_VERSION: ${{ matrix.postgres }}
if: ${{ !startsWith(matrix.os, 'mac') }}

- name: Collect coverage files
id: collect-cov-files
run: sudo su postgres -c "cd /tmp/lantern/build && make cover"
if: ${{ startsWith(matrix.os, 'ubuntu') && matrix.postgres == 15 }}

- name: Run update tests linux
id: update-test-linux
run: |
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/build-linux.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ function install_platform_specific_dependencies() {
if [[ "$INSTALL_CLI" = "1" ]]
then
setup_rust
ORT_STRATEGY=system cargo install --git https://github.com/lanterndata/lantern_extras.git --branch varik/external-indexing-server-ssl --debug --bin lantern-cli --root /tmp
ORT_STRATEGY=system cargo install --git https://github.com/lanterndata/lantern_extras.git --branch main --debug --bin lantern-cli --root /tmp
fi

popd
Expand Down
8 changes: 1 addition & 7 deletions ci/scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,13 @@ function build_and_install() {
mkdir build
cd build

if [ -n "$DISABLE_SSL" ]; then
USE_SSL=0
else
USE_SSL=1
fi

flags="-DBUILD_FOR_DISTRIBUTING=YES -DMARCH_NATIVE=OFF -DCMAKE_COMPILE_WARNING_AS_ERROR=ON \
-DCMAKE_C_COMPILER=$CC -DCMAKE_CXX_COMPILER=$CXX -DUSE_SSL=$USE_SSL"

if [[ "$ENABLE_COVERAGE" == "1" ]]
then
flags="$flags -DCODECOVERAGE=ON"
mv /usr/bin/gcov-12 /usr/bin/gcov
cp /usr/bin/gcov-12 /usr/bin/gcov
fi

# Run cmake
Expand Down
6 changes: 1 addition & 5 deletions ci/scripts/run-tests-linux.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,7 @@ function run_db_tests(){
make test-parallel && \
$WORKDIR/cienv/bin/python ../scripts/test_wal.py && \
run_pgvector_tests && \
stop_current_postgres && \
if [[ "$ENABLE_COVERAGE" == "1" ]]
then
make cover
fi
stop_current_postgres
fi
}

Expand Down
14 changes: 10 additions & 4 deletions scripts/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import socket
import subprocess
import time
from packaging.version import Version

# for pry
import inspect
Expand Down Expand Up @@ -653,7 +654,7 @@ def test_vector_search_with_filter(primary, source_table):
@pytest.fixture
def external_index(request):
cli_path = os.getenv("LANTERN_CLI_PATH")
use_ssl = os.getenv("EXTERNAL_INDEX_SECURE") == "1"
use_ssl = os.getenv("USE_SSL") == "1"
if not cli_path:
pytest.skip("pass 'LANTERN_CLI_PATH' environment variable to run external indexing tests")
return
Expand All @@ -675,7 +676,7 @@ def external_index(request):
def test_external_index(external_index, primary, source_table, quant_bits, distance_metric):
table_name = f"{source_table}_{quant_bits}_{distance_metric}_external_index"
index_name = f"idx_hnsw_{table_name}_{distance_metric}"
use_ssl = "ON" if os.getenv("EXTERNAL_INDEX_SECURE") == "1" else "OFF"
use_ssl = "ON" if os.getenv("USE_SSL") == "1" else "OFF"

data_type = 'INT[]' if distance_metric == 'hamming' else 'REAL[]'

Expand Down Expand Up @@ -717,7 +718,7 @@ def test_external_index(external_index, primary, source_table, quant_bits, dista
@pytest.mark.external_index
def test_external_index_pq(external_index, primary, source_table):
table_name = f"{source_table}_external_index_pq"
use_ssl = "ON" if os.getenv("EXTERNAL_INDEX_SECURE") == "1" else "OFF"
use_ssl = "ON" if os.getenv("USE_SSL") == "1" else "OFF"
primary.execute(
"testdb",
f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM {source_table}",
Expand Down Expand Up @@ -753,9 +754,12 @@ def test_external_index_pq(external_index, primary, source_table):
@pytest.mark.parametrize("quant_bits", [32])
@pytest.mark.external_index
def test_external_index_reindex(external_index, primary, source_table, quant_bits, distance_metric):
if primary.version < Version("12.0.0"):
pytest.skip("REINDEX is not supported on postgres 11")

table_name = f"{source_table}_{quant_bits}_{distance_metric}_reindx_external_index"
index_name = f"idx_hnsw_{table_name}_{distance_metric}"
use_ssl = "ON" if os.getenv("EXTERNAL_INDEX_SECURE") == "1" else "OFF"
use_ssl = "ON" if os.getenv("USE_SSL") == "1" else "OFF"

data_type = 'INT[]' if distance_metric == 'hamming' else 'REAL[]'

Expand Down Expand Up @@ -798,7 +802,9 @@ def test_external_index_reindex(external_index, primary, source_table, quant_bit
primary.execute("testdb", f"SELECT _lantern_internal.validate_index('{index_name}')")
primary.execute("testdb", f"REINDEX INDEX {index_name};")
primary.execute("testdb", f"SELECT _lantern_internal.validate_index('{index_name}')")

primary.safe_psql(dbname="testdb", query=f"REINDEX INDEX CONCURRENTLY {index_name};")
primary.execute("testdb", f"SELECT _lantern_internal.validate_index('{index_name}')")

if __name__ == "__main__":
os._exit(pytest.main(["-s", __file__, *sys.argv[1:]]))
48 changes: 23 additions & 25 deletions src/hnsw/external_index_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,24 +119,6 @@ static int connect_with_timeout(int sockfd, const struct sockaddr *addr, socklen
return 0;
}

int sendall(external_index_socket_t *socket_con, const char *buf, uint32 len, int flags)
{
int total = 0;
int bytesleft = len;
int n;

while(total < len) {
n = socket_con->send(socket_con, buf + total, bytesleft, flags);
if(n == -1 || n == 0) {
return n;
}
total += n;
bytesleft -= n;
}

return total;
}

/**
* Check for error received from socket response
* This function will return void or elog(ERROR) and exit process
Expand Down Expand Up @@ -174,6 +156,22 @@ static void check_external_index_request_error(external_index_socket_t *socket_c
elog(ERROR, "external index socket send failed");
}

static int sendall_or_error(external_index_socket_t *socket_con, const char *buf, uint32 len, int flags)
{
uint32 total = 0;
uint32 bytesleft = len;
int32 n;

while(total < len) {
n = socket_con->send(socket_con, buf + total, bytesleft, flags);
check_external_index_request_error(socket_con, n);
total += n;
bytesleft -= n;
}

return total;
}

void external_index_send_codebook(external_index_socket_t *socket_con,
float *codebook,
uint32 dimensions,
Expand All @@ -186,12 +184,12 @@ void external_index_send_codebook(external_index_socket_t *socket_con,

for(int i = 0; i < num_centroids; i++) {
memcpy(buf, &codebook[ i * dimensions ], data_size);
bytes_written = sendall(socket_con, buf, data_size, 0);
bytes_written = sendall_or_error(socket_con, buf, data_size, 0);
check_external_index_request_error(socket_con, bytes_written);
}

uint32 end_msg = EXTERNAL_INDEX_END_MSG;
bytes_written = sendall(socket_con, (char *)&end_msg, EXTERNAL_INDEX_MAGIC_MSG_SIZE, 0);
bytes_written = sendall_or_error(socket_con, (char *)&end_msg, EXTERNAL_INDEX_MAGIC_MSG_SIZE, 0);

check_external_index_request_error(socket_con, bytes_written);
}
Expand Down Expand Up @@ -277,8 +275,8 @@ external_index_socket_t *create_external_index_session(const char
uint32 hdr_msg = EXTERNAL_INDEX_INIT_MSG;
memcpy(init_buf, &hdr_msg, EXTERNAL_INDEX_MAGIC_MSG_SIZE);
memcpy(init_buf + EXTERNAL_INDEX_MAGIC_MSG_SIZE, &index_params, sizeof(external_index_params_t));
uint32 bytes_written
= sendall(socket_con, init_buf, sizeof(external_index_params_t) + EXTERNAL_INDEX_MAGIC_MSG_SIZE, 0);
int32 bytes_written
= sendall_or_error(socket_con, init_buf, sizeof(external_index_params_t) + EXTERNAL_INDEX_MAGIC_MSG_SIZE, 0);

check_external_index_request_error(socket_con, bytes_written);

Expand All @@ -287,7 +285,7 @@ external_index_socket_t *create_external_index_session(const char
socket_con, buildstate->pq_codebook, params->dimensions, params->num_centroids, params->num_subvectors);
}

uint32 buf_size = socket_con->read(socket_con, (char *)&init_response, EXTERNAL_INDEX_INIT_BUFFER_SIZE);
int32 buf_size = socket_con->read(socket_con, (char *)&init_response, EXTERNAL_INDEX_INIT_BUFFER_SIZE);

check_external_index_response_error(socket_con, (char *)init_response, buf_size);

Expand All @@ -306,7 +304,7 @@ void external_index_receive_index_file(external_index_socket_t *socket_con,
// disable read timeout while indexing is in progress
set_read_timeout(socket_con->fd, 0);
// send message indicating that we have finished streaming tuples
bytes_written = sendall(socket_con, (char *)&end_msg, EXTERNAL_INDEX_MAGIC_MSG_SIZE, 0);
bytes_written = sendall_or_error(socket_con, (char *)&end_msg, EXTERNAL_INDEX_MAGIC_MSG_SIZE, 0);
check_external_index_request_error(socket_con, bytes_written);

// read how many tuples have been indexed
Expand Down Expand Up @@ -364,6 +362,6 @@ void external_index_send_tuple(
// send tuple over socket if this is external indexing
memcpy(tuple, label, sizeof(usearch_label_t));
memcpy(tuple + sizeof(usearch_label_t), vector, tuple_size - sizeof(usearch_label_t));
bytes_written = sendall(socket_con, tuple, tuple_size, 0);
bytes_written = sendall_or_error(socket_con, tuple, tuple_size, 0);
check_external_index_request_error(socket_con, bytes_written);
}

0 comments on commit d84c174

Please sign in to comment.