Skip to content

Commit

Permalink
Merge pull request #14 from vexingcodes/read_zk
Browse files Browse the repository at this point in the history
Parse GRPC data in ZooKeeper.
vexingcodes authored Sep 10, 2019
2 parents adb9caf + c1b0fb3 commit cef37dd
Showing 5 changed files with 168 additions and 14 deletions.
3 changes: 3 additions & 0 deletions config/Pipfile
Original file line number Diff line number Diff line change
@@ -9,6 +9,9 @@ verify_ssl = true
docker = "*"
pyaml = "*"
pyexpect = "*"
grpcio = "*"
grpcio-tools = "*"
kazoo = "*"

[requires]
python_version = "2.7"
125 changes: 124 additions & 1 deletion config/Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions config/make-ramcloud
Original file line number Diff line number Diff line change
@@ -48,3 +48,10 @@ mv install/bin/server install/bin/rc-server

# Move the libraries to the correct place instead of in the ramcloud subdirectory.
mv install/lib/ramcloud/* install/lib && rmdir install/lib/ramcloud

# Build the proto files so we can read what RAMCloud puts in ZooKeeper.
cd src
PROTO_OUT=/src/RAMCloud/bindings/python
for proto_file in $(ls *.proto); do
python -m grpc_tools.protoc -I. --python_out=${PROTO_OUT} --grpc_python_out=${PROTO_OUT} ${proto_file}
done
8 changes: 7 additions & 1 deletion testing/cluster_test_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import docker
import kazoo.client
import logging
import logging.config
import ramcloud
@@ -16,7 +17,7 @@ def get_host(locator):
return arg_map['basic+udp:host']

def external_storage_string(ensemble):
return 'zk:' + ','.join(['{}:2181'.format(ip) for (_, ip) in ensemble.items()])
return ','.join(['{}:2181'.format(ip) for (_, ip) in ensemble.items()])

def ensemble_servers_string(ensemble):
return ' '.join(['server.{}={}:2888:3888;2181'.format(zkid, ip) for (zkid, ip) in ensemble.items()])
@@ -69,3 +70,8 @@ def launch_node(cluster_name, hostname, zk_servers, external_storage, zkid, ip,

logger.info('Launching node container %s with IP address %s...successful', hostname, ip)
return docker_client.containers.get(container_id)

def get_zookeeper_client(ensemble, read_only=True):
client = kazoo.client.KazooClient(hosts=external_storage_string(ensemble), read_only=read_only)
client.start()
return client
39 changes: 27 additions & 12 deletions testing/test_cluster.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import ramcloud
import os
import ramcloud
import Table_pb2
import unittest
from pyexpect import expect
import cluster_test_utils as ctu
@@ -24,19 +25,19 @@ def createTestValue(self):
def make_cluster(self, num_nodes):
self.assertGreaterEqual(num_nodes, 3)

ensemble = {i: '10.0.1.{}'.format(i) for i in xrange(1, num_nodes + 1)}
zk_servers = ctu.ensemble_servers_string(ensemble)
external_storage = ctu.external_storage_string(ensemble)
self.ensemble = {i: '10.0.1.{}'.format(i) for i in xrange(1, num_nodes + 1)}
zk_servers = ctu.ensemble_servers_string(self.ensemble)
external_storage = 'zk:' + ctu.external_storage_string(self.ensemble)
for i in xrange(1, num_nodes + 1):
hostname = 'ramcloud-node-{}'.format(i)
self.node_containers[ensemble[i]] = ctu.launch_node('main',
hostname,
zk_servers,
external_storage,
i,
ensemble[i],
self.node_image,
self.ramcloud_network)
self.node_containers[self.ensemble[i]] = ctu.launch_node('main',
hostname,
zk_servers,
external_storage,
i,
self.ensemble[i],
self.node_image,
self.ramcloud_network)
self.rc_client.connect(external_storage, 'main')

def simple_recovery(self, kill_command):
@@ -56,6 +57,20 @@ def simple_recovery(self, kill_command):
value = self.rc_client.read(self.table, 'testKey')
expect(value).equals(('testValue', 1))

def test_zookeeper_read(self):
self.make_cluster(num_nodes=4)
self.createTestValue()
zk_client = ctu.get_zookeeper_client(self.ensemble)

# Read the ZooKeeper entry for the table and make sure it looks sane.
# This mostly tests our ability to read from ZooKeeper and parse the
# GRPC contents correctly.
table_data = zk_client.get('/ramcloud/main/tables/test')[0]
table_parsed = Table_pb2.Table()
table_parsed.ParseFromString(table_data)
expect(table_parsed.id).equals(1L)
expect(table_parsed.name).equals("test")

def test_read_write(self):
self.make_cluster(num_nodes=3)
self.rc_client.create_table('test_table')

0 comments on commit cef37dd

Please sign in to comment.