diff --git a/chaos/cluster-load.test.js b/chaos/cluster-load.test.js new file mode 100644 index 00000000..4b669c4f --- /dev/null +++ b/chaos/cluster-load.test.js @@ -0,0 +1,39 @@ +import redis from 'k6/experimental/redis'; +import { check } from 'k6'; +import exec from 'k6/execution'; + + +const client = new redis.Client({ + socket: { + dialTimeout: 5000, + readTimeout: 5000, + writeTimeout: 5000, + poolTimeout: 5000, + }, + cluster: { + // Cluster options + maxRedirects: 3, + readOnly: true, + routeByLatency: true, + routeRandomly: true, + nodes: [ + // Nodes URLs + ] + }, +}); + +export const options = { + stages: [ + { duration: '120s', target: 100 }, + ] +}; + +async function callServer() { + const vuId = exec.vu.idInTest; + const ok = await client.set(`a-${vuId}`, `${vuId}`).then(() => true).catch(() => false); + check(ok, { 'set command succeeded': (ok) => ok }); +} + +export default function () { + callServer(); +} \ No newline at end of file diff --git a/falkordb-cluster/node.conf b/falkordb-cluster/node.conf index 36b55d89..90c21649 100644 --- a/falkordb-cluster/node.conf +++ b/falkordb-cluster/node.conf @@ -2,7 +2,45 @@ loadmodule /FalkorDB/bin/src/falkordb.so CACHE_SIZE $FALKORDB_CACHE_SIZE NODE_CREATION_BUFFER $FALKORDB_NODE_CREATION_BUFFER MAX_QUEUED_QUERIES $FALKORDB_MAX_QUEUED_QUERIES TIMEOUT_MAX $FALKORDB_TIMEOUT_MAX TIMEOUT_DEFAULT $FALKORDB_TIMEOUT_DEFAULT RESULTSET_SIZE $FALKORDB_RESULT_SET_SIZE QUERY_MEM_CAPACITY $FALKORDB_QUERY_MEM_CAPACITY loglevel $LOG_LEVEL +#Enable tls for Redis Cluster Bus +tls-cluster no +# For maximum availability, it is possible to set the cluster-replica-validity-factor +# to a value of 0, which means, that replicas will always try to failover the +# master regardless of the last time they interacted with the master. +# (However they'll always try to apply a delay proportional to their +# offset rank). +# +# Zero is the only value able to guarantee that when all the partitions heal +# the cluster will always be able to continue. +# +cluster-replica-validity-factor 0 +#Allows replicas to migrate to orphaned masters (masters with no replicas left) +# only if their masters remain with at least one replica +cluster-allow-replica-migration yes +cluster-migration-barrier 1 +# If a hash slot is uncovered (no node serving it) the whole cluster +# becomes unhealthy and stops accepting queries. +cluster-require-full-coverage yes +# This prevents replicas from failing their masters +# can be forced by manual failover. (Replica should never be promoted to master) +cluster-replica-no-failover no +# Allows replicas to serve read queries in a cluster down state. +cluster-allow-reads-when-down no +# This feature should be kept as yes. +cluster-allow-pubsubshard-when-down yes +# Cluster link send buffer limit is the limit on the memory usage of an individual +# cluster bus link's send buffer in bytes. Cluster links would be freed if they exceed +# this limit. This is to primarily prevent send buffers from growing unbounded on links +# toward slow peers (E.g. PubSub messages being piled up). +# This limit is disabled by default. Enable this limit when 'mem_cluster_links' INFO field +# and/or 'send-buffer-allocated' entries in the 'CLUSTER LINKS` command output continuously increase. +# Minimum limit of 1gb is recommended so that cluster link buffer can fit in at least a single +# PubSub message by default. (client-query-buffer-limit default value is 1gb) +cluster-link-sendbuf-limit 0 +# optional nodename to be used in addition to the node ID for +# debugging and admin information +cluster-announce-human-nodename "" cluster-preferred-endpoint-type hostname cluster-port 16379 cluster-enabled yes diff --git a/falkordb-node/node-entrypoint.sh b/falkordb-node/node-entrypoint.sh index 3aacf972..72e85b30 100755 --- a/falkordb-node/node-entrypoint.sh +++ b/falkordb-node/node-entrypoint.sh @@ -48,6 +48,7 @@ FALKORDB_RESULT_SET_SIZE=${FALKORDB_RESULT_SET_SIZE:-10000} FALKORDB_QUERY_MEM_CAPACITY=${FALKORDB_QUERY_MEM_CAPACITY:-0} FALKORDB_TIMEOUT_MAX=${FALKORDB_TIMEOUT_MAX:-0} FALKORDB_TIMEOUT_DEFAULT=${FALKORDB_TIMEOUT_DEFAULT:-0} +FALKORDB_VKEY_MAX_ENTITY_COUNT=${FALKORDB_VKEY_MAX_ENTITY_COUNT:-4611686000000000000} MEMORY_LIMIT=${MEMORY_LIMIT:-''} # If vars are , set it to 0 if [[ "$FALKORDB_QUERY_MEM_CAPACITY" == "" ]]; then @@ -354,12 +355,19 @@ create_user() { config_rewrite } + config_rewrite() { # Config rewrite echo "Rewriting config" redis-cli -p $NODE_PORT -a $ADMIN_PASSWORD --no-auth-warning $TLS_CONNECTION_STRING CONFIG REWRITE } +set_configs() { + echo "Setting configs" + redis-cli -p $NODE_PORT -a $ADMIN_PASSWORD --no-auth-warning $TLS_CONNECTION_STRING GRAPH.CONFIG set VKEY_MAX_ENTITY_COUNT $FALKORDB_VKEY_MAX_ENTITY_COUNT + redis-cli -p $NODE_PORT -a $ADMIN_PASSWORD --no-auth-warning $TLS_CONNECTION_STRING GRAPH.CONFIG get VKEY_MAX_ENTITY_COUNT +} + if [ -f $NODE_CONF_FILE ]; then # Get current admin password CURRENT_ADMIN_PASSWORD=$(cat $NODE_CONF_FILE | grep -oP '(?<=requirepass ).*' | sed 's/\"//g') @@ -443,6 +451,7 @@ if [ "$RUN_NODE" -eq "1" ]; then sleep 10 create_user + set_configs # If node should be master, add it to sentinel if [[ $IS_REPLICA -eq 0 && $RUN_SENTINEL -eq 1 ]]; then diff --git a/omnistrate.enterprise.yaml b/omnistrate.enterprise.yaml index 19adc699..a5dcc5db 100644 --- a/omnistrate.enterprise.yaml +++ b/omnistrate.enterprise.yaml @@ -1127,6 +1127,8 @@ services: instanceTypes: - cloudProvider: gcp name: e2-small + - cloudProvider: aws + name: t3a.small environment: - RUN_NODE=0 - RUN_SENTINEL=1 @@ -1624,6 +1626,8 @@ services: instanceTypes: - cloudProvider: gcp name: e2-small + - cloudProvider: aws + name: t3a.small environment: - RUN_NODE=0 - RUN_SENTINEL=1 @@ -1946,6 +1950,8 @@ services: instanceTypes: - cloudProvider: gcp name: e2-small + - cloudProvider: aws + name: t3a.small <<: *rebalanceparam environment: - IS_MULTI_ZONE=0 @@ -2251,6 +2257,8 @@ services: instanceTypes: - cloudProvider: gcp name: e2-small + - cloudProvider: aws + name: t3a.small <<: *rebalanceparam environment: - IS_MULTI_ZONE=1 diff --git a/omnistrate.pro.yaml b/omnistrate.pro.yaml index 44a230fd..0bd765b0 100644 --- a/omnistrate.pro.yaml +++ b/omnistrate.pro.yaml @@ -1404,6 +1404,8 @@ services: instanceTypes: - cloudProvider: gcp name: e2-small + - cloudProvider: aws + name: t3a.small environment: - RUN_NODE=0 - RUN_SENTINEL=1 @@ -2218,6 +2220,8 @@ services: instanceTypes: - cloudProvider: gcp name: e2-small + - cloudProvider: aws + name: t3a.small environment: - RUN_NODE=0 - RUN_SENTINEL=1 @@ -2566,6 +2570,8 @@ services: instanceTypes: - cloudProvider: gcp name: e2-small + - cloudProvider: aws + name: t3a.small <<: *rebalanceparam environment: - NODE_HOST=$sys.network.node.externalEndpoint @@ -2877,6 +2883,8 @@ services: instanceTypes: - cloudProvider: gcp name: e2-small + - cloudProvider: aws + name: t3a.small <<: *rebalanceparam environment: - IS_MULTI_ZONE=1 diff --git a/omnistrate_tests/test_replication_replicas.py b/omnistrate_tests/test_replication_replicas.py index 7a1109df..fb465ebf 100644 --- a/omnistrate_tests/test_replication_replicas.py +++ b/omnistrate_tests/test_replication_replicas.py @@ -5,15 +5,10 @@ from redis import Redis from redis.backoff import ExponentialBackoff from redis.retry import Retry -from redis.exceptions import ( - ConnectionError, - TimeoutError, - ReadOnlyError -) +from redis.exceptions import ConnectionError, TimeoutError, ReadOnlyError import threading import socket - file = Path(__file__).resolve() parent, root = file.parent, file.parents[1] sys.path.append(str(root)) @@ -55,19 +50,21 @@ parser.add_argument("--rdb-config", required=False, default="medium") parser.add_argument("--aof-config", required=False, default="always") parser.add_argument("--replica-count", required=False, default="2") -parser.add_argument("--persist-instance-on-fail",action="store_true") +parser.add_argument("--persist-instance-on-fail", action="store_true") parser.set_defaults(tls=False) args = parser.parse_args() instance: OmnistrateFleetInstance = None + # Intercept exit signals so we can delete the instance before exiting def signal_handler(sig, frame): if instance: instance.delete(False) sys.exit(0) + if not args.persist_instance_on_fail: signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) @@ -106,7 +103,7 @@ def test_add_remove_replica(): subscription_id=args.subscription_id, deployment_create_timeout_seconds=2400, deployment_delete_timeout_seconds=2400, - deployment_failover_timeout_seconds=2400 + deployment_failover_timeout_seconds=2400, ) try: @@ -128,19 +125,22 @@ def test_add_remove_replica(): try: ip = resolve_hostname(instance=instance) - logging.info(f"Instance endpoint {instance.get_cluster_endpoint()['endpoint']} resolved to {ip}") + logging.info( + f"Instance endpoint {instance.get_cluster_endpoint()['endpoint']} resolved to {ip}" + ) except TimeoutError as e: logging.error(f"DNS resolution failed: {e}") raise Exception("Instance endpoint not ready: DNS resolution failed") from e - + add_data(instance) thread_signal = threading.Event() error_signal = threading.Event() thread = threading.Thread( - target=test_zero_downtime, args=(thread_signal, error_signal, instance, args.tls) + target=test_zero_downtime, + args=(thread_signal, error_signal, instance, args.tls), ) - + thread.start() check_data(instance) @@ -149,8 +149,8 @@ def test_add_remove_replica(): test_fail_over(instance) - change_replica_count(instance,int(args.replica_count)) - + change_replica_count(instance, int(args.replica_count)) + check_data(instance) thread_signal.set() @@ -179,36 +179,52 @@ def change_replica_count(instance: OmnistrateFleetInstance, new_replica_count: i wait_for_ready=True, ) + def test_fail_over(instance: OmnistrateFleetInstance): logging.info("Testing failover to the newly created replica") endpoint = instance.get_cluster_endpoint() password = instance.falkordb_password id_key = "sz" if args.resource_key == "single-Zone" else "mz" - retry = Retry(ExponentialBackoff(base=1,cap=10), retries=20,supported_errors=(TimeoutError,ConnectionError,ConnectionRefusedError,ReadOnlyError)) + retry = Retry( + ExponentialBackoff(base=1, cap=10), + retries=20, + supported_errors=( + TimeoutError, + ConnectionError, + ConnectionRefusedError, + ReadOnlyError, + ), + ) try: client = Redis( - host=f"{endpoint["endpoint"]}", port=endpoint['ports'][0], - username="falkordb", - password=password, - decode_responses=True, - ssl=args.tls, - retry=retry, - retry_on_error=[TimeoutError,ConnectionError,ConnectionRefusedError,ReadOnlyError] + host=f"{endpoint['endpoint']}", + port=endpoint["ports"][0], + username="falkordb", + password=password, + decode_responses=True, + ssl=args.tls, + retry=retry, + retry_on_error=[ + TimeoutError, + ConnectionError, + ConnectionRefusedError, + ReadOnlyError, + ], ) except Exception as e: logging.exception("Failed to connect to Sentinel!") logging.info(e) - + tout = time.time() + 600 while True: if time.time() > tout: raise Exception(f"Failed to failover to node-{id_key}-2") try: time.sleep(5) - print(client.execute_command('SENTINEL FAILOVER master')) + print(client.execute_command("SENTINEL FAILOVER master")) time.sleep(10) - master = client.execute_command('SENTINEL MASTER master')[3] + master = client.execute_command("SENTINEL MASTER master")[3] if master.startswith(f"node-{id_key}-2"): break except Exception as e: @@ -216,10 +232,11 @@ def test_fail_over(instance: OmnistrateFleetInstance): continue time.sleep(15) check_data(instance) - + + def add_data(instance: OmnistrateFleetInstance): """This function should retrieve the instance host and port for connection, write some data to the DB, then check that the data is there""" - logging.info('Added data ....') + logging.info("Added data ....") # Get instance host and port db = instance.create_connection( ssl=args.tls, @@ -237,12 +254,9 @@ def add_data(instance: OmnistrateFleetInstance): def check_data(instance: OmnistrateFleetInstance): - logging.info('Retrieving data ....') + logging.info("Retrieving data ....") # Get instance host and port - db = instance.create_connection( - ssl=args.tls, - force_reconnect=True - ) + db = instance.create_connection(ssl=args.tls, force_reconnect=True) graph = db.select_graph("test") @@ -251,6 +265,7 @@ def check_data(instance: OmnistrateFleetInstance): if len(result.result_set) == 0: raise Exception("Data did not persist after host count change") + def test_zero_downtime( thread_signal: threading.Event, error_signal: threading.Event, @@ -272,14 +287,15 @@ def test_zero_downtime( logging.exception(e) error_signal.set() raise e - -def resolve_hostname(instance: OmnistrateFleetInstance,timeout=300, interval=1): + + +def resolve_hostname(instance: OmnistrateFleetInstance, timeout=300, interval=1): """Check if the instance's main endpoint is resolvable. Args: instance: The OmnistrateFleetInstance to check timeout: Maximum time in seconds to wait for resolution (default: 30) interval: Time in seconds between retry attempts (default: 1) - + Returns: str: The resolved IP address @@ -290,13 +306,13 @@ def resolve_hostname(instance: OmnistrateFleetInstance,timeout=300, interval=1): """ if interval <= 0 or timeout <= 0: raise ValueError("Interval and timeout must be positive") - + cluster_endpoint = instance.get_cluster_endpoint() - if not cluster_endpoint or 'endpoint' not in cluster_endpoint: + if not cluster_endpoint or "endpoint" not in cluster_endpoint: raise KeyError("Missing endpoint information in cluster configuration") - hostname = cluster_endpoint['endpoint'] + hostname = cluster_endpoint["endpoint"] start_time = time.time() while time.time() - start_time < timeout: @@ -306,8 +322,11 @@ def resolve_hostname(instance: OmnistrateFleetInstance,timeout=300, interval=1): except (socket.gaierror, socket.error) as e: logging.debug(f"DNS resolution attempt failed: {e}") time.sleep(interval) - - raise TimeoutError(f"Unable to resolve hostname '{hostname}' within {timeout} seconds.") + + raise TimeoutError( + f"Unable to resolve hostname '{hostname}' within {timeout} seconds." + ) + if __name__ == "__main__": test_add_remove_replica() diff --git a/omnistrate_tests/test_upgrade_version.py b/omnistrate_tests/test_upgrade_version.py index 2bdb624b..f46b3b4d 100644 --- a/omnistrate_tests/test_upgrade_version.py +++ b/omnistrate_tests/test_upgrade_version.py @@ -3,7 +3,8 @@ from random import randbytes from pathlib import Path # if you haven't already done so import threading -from .utils import get_last_gh_tag +# pylint: disable=no-name-in-module +from utils import get_last_gh_tag import socket file = Path(__file__).resolve()