From e8fd4310fd86aaba557a05caed3af8111a0c56f0 Mon Sep 17 00:00:00 2001 From: Bert Date: Wed, 7 Apr 2021 19:48:42 -0500 Subject: [PATCH] initial upload --- CTA_bustracker.py | 48 +++++ README.md | 2 + all.yml | 134 ++++++++++++++ cpu2mqtt.py | 46 +++++ create_aws_instances.py | 172 ++++++++++++++++++ create_connect_docker_compose_dot_yaml.py | 90 +++++++++ create_hosts_dot_yaml.py | 124 +++++++++++++ create_mqtt_config.py | 63 +++++++ ...te_mqtt_connect_docker_compose_dot_yaml.py | 93 ++++++++++ ...metheus_connect_docker_compose_dot_yaml.py | 91 +++++++++ create_prometheus_sink_config.py | 66 +++++++ create_rest_proxy_docker_compose.py | 70 +++++++ make_prometheus_server_config.py | 26 +++ mqtt_config_template.json | 24 +++ prom_server_default.yml | 11 ++ start_mqtt_connect.sh | 13 ++ start_prometheus_sink_connect.sh | 15 ++ 17 files changed, 1088 insertions(+) create mode 100644 CTA_bustracker.py create mode 100644 README.md create mode 100644 all.yml create mode 100644 cpu2mqtt.py create mode 100644 create_aws_instances.py create mode 100644 create_connect_docker_compose_dot_yaml.py create mode 100644 create_hosts_dot_yaml.py create mode 100644 create_mqtt_config.py create mode 100644 create_mqtt_connect_docker_compose_dot_yaml.py create mode 100644 create_prometheus_connect_docker_compose_dot_yaml.py create mode 100644 create_prometheus_sink_config.py create mode 100644 create_rest_proxy_docker_compose.py create mode 100644 make_prometheus_server_config.py create mode 100644 mqtt_config_template.json create mode 100644 prom_server_default.yml create mode 100644 start_mqtt_connect.sh create mode 100644 start_prometheus_sink_connect.sh diff --git a/CTA_bustracker.py b/CTA_bustracker.py new file mode 100644 index 0000000..d5bdd04 --- /dev/null +++ b/CTA_bustracker.py @@ -0,0 +1,48 @@ +import requests +import json + +# Uses the CTA Bus Tracker API (documentation available here): +# https://www.transitchicago.com/assets/1/6/cta_Bus_Tracker_API_Developer_Guide_and_Documentation_20160929.pdf + +# Given a list of Route Designators ('rt' below), return Vehicle IDs +# JSON response will be most recent status for each vehicle + +# Take that JSON response and send it Confluent Kafka REST Proxy + +# TODO: wrap this in a loop so it runs every minute or so (check AUP) + +# Confluent REST Proxy values +rest_proxy_host = "ec2-52-14-116-180.us-east-2.compute.amazonaws.com" +topic = "bustest" +rest_headers = {'Content-Type': 'application/vnd.kafka.json.v2+json', 'Accept': 'application/vnd.kafka.v2+json'} +rest_url = "http://" + rest_proxy_host + ":8082/topics/" + topic + +# CTA Bus Tracker API values +api_key = 'HL76TdXUt6YZGmTKSziz3qXyT' +getvehicles_url = 'http://ctabustracker.com/bustime/api/v2/getvehicles' + +# Format the API request and parse the response +vehicle_params = {'key': api_key, 'format': 'json', 'rt': 'X9,11,12,J14,15,18,19,20,21,22', 'tmres': 's'} +r_vehicles = requests.get(getvehicles_url, params=vehicle_params) +# each JSON object is the latest stats for each vehicle ID (bus). +response_dict = r_vehicles.json() +vehicle_dict = response_dict['bustime-response'] +list_of_vids = vehicle_dict['vehicle'] + + +for vid in list_of_vids: + # each vid is a dict + list_of_records = [] + kafka_record = {} + kafka_record['value'] = vid + # use the vehicle ID - vid as the key for each record + kafka_record['key'] = vid["vid"] + list_of_records.append(kafka_record) + send_data = {} + send_data['records'] = list_of_records + send_json = json.dumps(send_data) + print(send_json) + # POST to the REST API + kpost = requests.post(rest_url, headers=rest_headers, data=send_json) + print(kpost.text) +# time.sleep(10) \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..b2f52de --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ +# iot +A turnkey environment for sending IoT data to Confluent Cloud diff --git a/all.yml b/all.yml new file mode 100644 index 0000000..f9657ec --- /dev/null +++ b/all.yml @@ -0,0 +1,134 @@ +--- +- name: Kafka Provisioning + hosts: all + remote_user: root + gather_facts: false + tags: base_install + tasks: + - name: apt-get update and upgrade + apt: + update_cache: yes + upgrade: full + force_apt_get: yes + - name: install java and docker + apt: + name: openjdk-11-jdk, jq, docker, docker-compose, python3, python3-pip + state: latest + force_apt_get: yes + - name: Installing Python Modules + pip: + name: + - ruamel.yaml, yq, psutil + - name: Adding user ubuntu to Docker group + ansible.builtin.user: + name: ubuntu + append: true + groups: docker +- name: Configure Confluent REST Proxy + hosts: rest_proxy + remote_user: root + gather_facts: false + tags: kafka_rest + tasks: + - name: Set the hostname to rest-proxy + ansible.builtin.hostname: + name: rest-proxy + - name: Copying create_rest_proxy_docker_compose.py + copy: + src: create_rest_proxy_docker_compose.py + dest: /home/ubuntu/create_rest_proxy_docker_compose.py + - name: Copying ccloud-kafka-rest.properties + copy: + src: ccloud-kafka-rest.properties + dest: /home/ubuntu/ccloud-kafka-rest.properties + - name: Creating docker-compose.yml file + script: create_rest_proxy_docker_compose.py + args: + executable: python3 + - name: Starting Docker + shell: + cmd: docker-compose -f rest-docker-compose.yml up -d +- name: Configure MQTT Source Connector + hosts: kafka_connect_mqtt + remote_user: root + gather_facts: false + tags: kafka_connect + tasks: + - name: Set the hostname to mqtt-connect + ansible.builtin.hostname: + name: mqtt-connect + - name: Copying create_mqtt_connect_docker_compose_dot_yaml.py + copy: + src: create_mqtt_connect_docker_compose_dot_yaml.py + dest: /home/ubuntu/create_mqtt_connect_docker_compose_dot_yaml.py + - name: Copying my-connect-distributed.properties + copy: + src: my-connect-distributed.properties + dest: /home/ubuntu/my-connect-distributed.properties + - name: Creating docker-compose.yml file + script: create_mqtt_connect_docker_compose_dot_yaml.py + args: + executable: python3 + - name: Starting Docker + shell: + cmd: docker-compose -f docker-compose.yml up -d +- name: Configure the Prometheus Sink Connector + hosts: kafka_connect_prometheus + remote_user: root + gather_facts: false + tags: kafka_connect_prometheus + tasks: + - name: Set the hostname to prometheus-connect + ansible.builtin.hostname: + name: prometheus-connect + - name: Copying my-connect-distributed.properties to prometheus connect + copy: + src: my-connect-distributed.properties + dest: my-connect-distributed.properties + - name: Copying create_prometheus_connect_docker_compose_dot_yaml.py + copy: + src: create_prometheus_connect_docker_compose_dot_yaml.py + dest: /home/ubuntu/create_prometheus_connect_docker_compose_dot_yaml.py + - name: Creating docker-compose.yml file + script: create_prometheus_connect_docker_compose_dot_yaml.py + args: + executable: python3 + - name: Starting Docker + shell: + cmd: docker-compose -f docker-compose.yml up -d +- name: Configure the Prometheus Server + hosts: prometheus_server + remote_user: root + gather_facts: false + tags: prometheus_server + tasks: + - name: Set the hostname to prometheus-server + ansible.builtin.hostname: + name: prometheus-server + - name: Copying hosts.yml + copy: + src: hosts.yml + dest: /home/ubuntu/hosts.yml + - name: Copying prom_server_default.yml + copy: + src: prom_server_default.yml + dest: prom_server_default.yml + - name: Copying make_prometheus_server_config.py + copy: + src: make_prometheus_server_config.py + dest: make_prometheus_server_config.py + - name: Creating prometheus.yml + script: make_prometheus_server_config.py + args: + executable: python3 + - name: installing prometheus + apt: + name: prometheus + state: latest + force_apt_get: yes + - name: moving Prometheus.yml config file + command: mv /home/ubuntu/prometheus.yml /etc/prometheus/prometheus.yml + - name: starting prometheus server + ansible.builtin.command: + cmd: /usr/bin/prometheus --config.file="/home/ubuntu/prometheus.yml" + diff --git a/cpu2mqtt.py b/cpu2mqtt.py new file mode 100644 index 0000000..dc9c2e3 --- /dev/null +++ b/cpu2mqtt.py @@ -0,0 +1,46 @@ +# cpu2mqtt - a simple script to read cpu metrics using +# the psutil library, then send them to an mqtt broker + +# TODO: pull some of these configs from a config file + +import paho.mqtt.client as paho +#import paho.mqtt.publish as publish +import socket +import psutil +import time +import json + +hostname = socket.gethostname() + +def on_connect(client, userdata, flags, rc): + print("CONNACK received with code ") + print(rc) + +def on_publish(client, userdata, mid): + print("mid: " + str(mid)) + +client = paho.Client(client_id=hostname, clean_session=True, userdata=None, protocol=paho.MQTTv31) +client.on_connect = on_connect +client.connect("broker.hivemq.com", 1883) + + +client.on_publish = on_publish +client.loop_start() + +while True: + # cpu_times_percent is the percent + #(rc, mid) = client.publish(“encyclopedia / temperature”, str(temperature), qos = 1) + cpu_times_percent = psutil.cpu_times_percent(interval=1, percpu=False) + cpu_stats = {} + cpu_stats["timestamp"] = time.time() + cpu_stats["user"] = cpu_times_percent.user + cpu_stats["nice"] = cpu_times_percent.nice + cpu_stats["idle"] = cpu_times_percent.idle + cpu_stats["system"] = cpu_times_percent.system + print(cpu_stats) + payload = json.dumps(cpu_stats) + print(payload) + mqtt_topic = hostname + "/cpu_stats" + print(mqtt_topic) + (rc, mid) = client.publish(mqtt_topic, payload=payload) + time.sleep(30) \ No newline at end of file diff --git a/create_aws_instances.py b/create_aws_instances.py new file mode 100644 index 0000000..e9c6272 --- /dev/null +++ b/create_aws_instances.py @@ -0,0 +1,172 @@ +import boto3 +from configparser import ConfigParser + +# create a bunch of new EC2 instance +# TODO: create command line args to specify config file at least +# TODO: clean-up/refactor a bunch of this + +cfg = ConfigParser() +cfg.read('yak_shaving.conf') + + +def create_instance(aws_opts, node_opts, node_count): + ec2 = boto3.resource('ec2') + cluster_name = aws_opts['cluster_name'] + node_job = node_opts['node_job'] + security_group_id = aws_opts['security_group_id'] + volume_size = node_opts['volume_size'] + ami = aws_opts['ami'] + InstanceType = node_opts['InstanceType'] + pem = aws_opts['pem'] + Owner_Name = aws_opts['Owner_Name'] + your_email = aws_opts['your_email'] + + for i in range(0, node_count): + iteration = str(i+1) + node_name = cluster_name + "-" + node_job + "-" + iteration + SecurityGroupIds = [] + SecurityGroupIds.append(security_group_id) + print("Creating Instance ", node_name) + ec2.create_instances( + #DryRun=True, + BlockDeviceMappings=[ + { + 'DeviceName': '/dev/sda1', + 'Ebs': { + 'DeleteOnTermination': True, + 'VolumeSize': int(volume_size), + 'VolumeType': 'gp2', + 'Encrypted': False + } + } + ], + ImageId=ami, + MinCount=1, + MaxCount=1, + InstanceType=InstanceType, + KeyName=pem, + SecurityGroupIds=SecurityGroupIds, + TagSpecifications=[ + { + 'ResourceType': 'instance', + 'Tags': [ + { + 'Key': 'Name', + 'Value': node_name + }, + { + 'Key': 'Owner_Name', + 'Value': Owner_Name + }, + { + 'Key': 'Owner_Email', + 'Value': your_email + }, + { + 'Key': 'node_job', + 'Value': node_job + }, + { + 'Key': 'cluster_name', + 'Value': cluster_name + } + ] + } + ] + ) + print(node_name, " has been created") + + +def make_aws_common_config_dict(): + # This makes a dict of all the common aws/ec2 options + security_group_id = cfg.get('aws_common', 'security_group_id') + ami = cfg.get('aws_common', 'ami') + Owner_Name = cfg.get('aws_common', 'Owner_Name') + pem = cfg.get('aws_common', 'your_pem') + your_email = cfg.get('aws_common', 'your_email') + cluster_name = cfg.get('aws_common', 'cluster_name') + + aws_opts = {'security_group_id': security_group_id, + 'cluster_name': cluster_name, + 'ami': ami, + 'Owner_Name': Owner_Name, + 'pem': pem, + 'your_email': your_email + } + + return aws_opts + + +def create_kafka_connect_mqtt_opts(): + InstanceType = cfg.get('kafka_connect_mqtt', 'InstanceType') + volume_size = cfg.get('kafka_connect_mqtt', 'volume_size') + node_job = "kafka_connect_mqtt" + + kafka_connect_mqtt_opts = { + 'InstanceType': InstanceType, + 'volume_size': volume_size, + 'node_job': node_job + } + return(kafka_connect_mqtt_opts) + +def create_kafka_connect_prometheus_opts(): + InstanceType = cfg.get('kafka_connect_prometheus', 'InstanceType') + volume_size = cfg.get('kafka_connect_prometheus', 'volume_size') + node_job = "kafka_connect_prometheus" + + kafka_connect_prometheus_opts = { + 'InstanceType': InstanceType, + 'volume_size': volume_size, + 'node_job': node_job + } + return(kafka_connect_prometheus_opts) + +def create_prometheus_server_opts(): + InstanceType = cfg.get('prometheus_server', 'InstanceType') + volume_size = cfg.get('prometheus_server', 'volume_size') + node_job = "prometheus_server" + + prometheus_server_opts = { + 'InstanceType': InstanceType, + 'volume_size': volume_size, + 'node_job': node_job + } + return(prometheus_server_opts) + +def create_rest_proxy_opts(): + InstanceType = cfg.get('rest_proxy', 'InstanceType') + volume_size = cfg.get('rest_proxy', 'volume_size') + node_job = "rest_proxy" + rest_proxy_opts = { + 'InstanceType': InstanceType, + 'volume_size': volume_size, + 'node_job': node_job + } + return(rest_proxy_opts) + +# TODO: simplify this with a loop? +if int(cfg.get('kafka_connect_mqtt', 'node_count')) > 0: + node_count = int(cfg.get('kafka_connect_mqtt', 'node_count')) + aws_opts = make_aws_common_config_dict() + node_opts = create_kafka_connect_mqtt_opts() + create_instance(aws_opts, node_opts, node_count) + +if int(cfg.get('kafka_connect_prometheus', 'node_count')) > 0: + node_count = int(cfg.get('kafka_connect_prometheus', 'node_count')) + aws_opts = make_aws_common_config_dict() + node_opts = create_kafka_connect_prometheus_opts() + create_instance(aws_opts, node_opts, node_count) + +if int(cfg.get('prometheus_server', 'node_count')) > 0: + node_count = int(cfg.get('prometheus_server', 'node_count')) + aws_opts = make_aws_common_config_dict() + node_opts = create_prometheus_server_opts() + create_instance(aws_opts, node_opts, node_count) + +if int(cfg.get('rest_proxy', 'node_count')) > 0: + node_count = int(cfg.get('rest_proxy', 'node_count')) + aws_opts = make_aws_common_config_dict() + node_opts = create_rest_proxy_opts() + create_instance(aws_opts, node_opts, node_count) + + diff --git a/create_connect_docker_compose_dot_yaml.py b/create_connect_docker_compose_dot_yaml.py new file mode 100644 index 0000000..12418db --- /dev/null +++ b/create_connect_docker_compose_dot_yaml.py @@ -0,0 +1,90 @@ +#!/usr/local/bin/python3 + +# read a file from command line or a .properties file and +# turn it into a docker-compose.yml file +# or at least format the fields appropriately. + +import os +import re +import ruamel.yaml +import requests + +try: + r = requests.get('http://169.254.169.254/latest/meta-data/public-hostname', timeout=1) + advertised_hostname = r.text +except: + advertised_hostname = '0.0.0.0' + +file = 'connect.properties' +# TODO: Make this not hard coded. + +if os.path.exists(file): + try: + with open(file, 'rt') as input_file: + environment = {} + for line in input_file: + if re.match(r'^\s+', line): + #print("leading whitespace") + continue + elif re.match(r'\w+', line): + line = line.strip() + line = str(line) + # split line on = into field/value + fv = re.match(r'([^\=]*)=(.*)', line) + field = fv.group(1) + field_ = re.sub(r'\.', r'_', field) + ufield = field_.upper() + dcfield = 'CONNECT_' + ufield + value = fv.group(2) + #if re.match(r'^\d', value): + # value = int(value) + environment[dcfield] = value + #print("environment[" + dcfield + "] = " + value) + environment['CONNECT_REST_ADVERTISED_HOST_NAME'] = advertised_hostname + environment['CONNECT_GROUP_ID'] = 's3csv' + environment['CONNECT_CONFIG_STORAGE_TOPIC'] = 's3csv-storage' + environment['CONNECT_STATUS_STORAGE_TOPIC'] = 's3csv-status' + environment['CONNECT_OFFSET_STORAGE_TOPIC'] = 's3csv-offset' + #print(environment) + input_file.close() + except: + print("Can't do stuff here") + exit() + + +env = {} +env['environment'] = environment + +yaml = {} +yaml['version'] = '2' + +version = {} +services = {} +connect = {} +connect['image'] = 'confluentinc/cp-server-connect:6.1.1' +connect['hostname'] = 'ansible-connect' +connect['container_name'] = 'ansible-connect' +connect['ports'] = ["8083:8083"] +connect['environment'] = environment +connect['volumes'] = ["/var/spooldir:/var/spooldir"] + +yaml['version'] = '2' +yaml['services'] = services +services['connect'] = connect + + +docker_compose_file = ruamel.yaml.round_trip_dump(yaml, explicit_start=True) + +forgive_me = ''' command: + - bash + - -c + - | + echo "Installing connector plugins" + confluent-hub install --no-prompt jcustenborder/kafka-connect-spooldir:2.0.46 + echo "Launching Kafka Connect worker" + /etc/confluent/docker/run''' + +#print(forgive_me) +with open('docker-compose.yml', 'wt') as dc: + dc.write(str(docker_compose_file)) + dc.write(forgive_me) diff --git a/create_hosts_dot_yaml.py b/create_hosts_dot_yaml.py new file mode 100644 index 0000000..5f348e5 --- /dev/null +++ b/create_hosts_dot_yaml.py @@ -0,0 +1,124 @@ +# create_hosts_dot_yaml.py +# +# read a config file and connect to AWS via boto3 library +# find all running EC2 instances that match config specs +# and turn them into a hosts.yml file for ansible, etc. + +import boto3 +from configparser import ConfigParser +import ruamel.yaml + +output_file = 'hosts.yml' + +cfg = ConfigParser() +cfg.read('yak_shaving.conf') + +pem = cfg.get('aws_common', 'your_pem') +cluster_name = cfg.get('aws_common', 'cluster_name') + +ec2 = boto3.client('ec2') +node_filters = [ + {'Name': 'tag:cluster_name', 'Values': [cluster_name]}, + {'Name': 'key-name', 'Values': [pem]}, + {'Name': 'instance-state-name', 'Values': ['running']} +] + +response = ec2.describe_instances(Filters=node_filters) +#print(response) + +# response is a dictionary +# response['Reservations'] is a list +Reservations = response['Reservations'] + +# Reservations is a list of dictionaries. +# Each dictionary includes a list of Groups and a list of Instances + +nodes_list = [] +jobs_list = [] +hosthash = {} + +for res_dict in Reservations: + instance_list = res_dict['Instances'] + for instance_dict in instance_list: + try: + instance_dict['PrivateIpAddress'] + except KeyError: + private_ip = "NULL" + else: + private_ip = instance_dict['PrivateIpAddress'] + try: + instance_dict['PrivateDnsName'] + except KeyError: + private_dns = "NULL" + else: + private_dns = instance_dict['PrivateDnsName'] + try: + instance_dict['PublicDnsName'] + except KeyError: + public_dns = "NULL" + else: + public_dns = instance_dict['PublicDnsName'] + try: + instance_dict['PublicIpAddress'] + except KeyError: + public_ip = "NULL" + else: + public_ip = instance_dict['PublicIpAddress'] + tags = instance_dict['Tags'] + for tag_dict in tags: + for key in tag_dict: + if tag_dict.get(key) == 'Name': + ec2_name = tag_dict['Value'] + elif tag_dict.get(key) == 'node_job': + node_job = tag_dict['Value'] + elif tag_dict.get(key) == 'cluster_name': + cluster_name = tag_dict['Value'] + + # Is host_info actually used anywhere? Why am I creating it? + host_info = { + 'ec2_name': ec2_name, + 'node_job': node_job, + 'cluster_name': cluster_name, + 'public_ip': public_ip, + 'private_ip': private_ip, + 'public_dns': public_dns, + 'private_dns': private_dns + } + + hosthash[public_dns] = node_job + + jobs_list.append(node_job) + nodes_list.append(public_dns) + +# create a de-duplicated list of node jobs/roles +dedup = list(dict.fromkeys(jobs_list)) + +# create dictionary of 'all' common variables +all = {} +vars = {} +vars['vars'] = { + 'ansible_connection': 'ssh', + 'ansible_user': 'ubuntu', + 'ansible_become': 'true', + 'ansible_ssh_private_key_file': '~/aws.pem' +} + +all['all'] = vars + +common_vars = ruamel.yaml.round_trip_dump(all) + +with open(output_file, 'wt') as output: + for role in dedup: + hosts = {} + for n in nodes_list: + if hosthash[n] == role: + hosts[n] = None + + mcgilla = {} + mcgilla[role] = {'hosts': hosts} + + #print(mcgilla) + nodes_yaml = ruamel.yaml.round_trip_dump(mcgilla) + output.write(str(nodes_yaml)) + + output.write(str(common_vars)) diff --git a/create_mqtt_config.py b/create_mqtt_config.py new file mode 100644 index 0000000..f2b9b72 --- /dev/null +++ b/create_mqtt_config.py @@ -0,0 +1,63 @@ +# create_mqtt_config.py +# +# read a .properties file from Confluent Cloud and create a JSON +# config file to send to the Connect REST API to start a +# Confluent MQTT source connector + +import tempfile +import json +from configparser import ConfigParser +import socket + +connect_conf = "my-connect-distributed.properties" +mqtt_conf_template = "mqtt_config_template.json" +mqtt_generated_config = "mqtt_config.json" +yak_conf = "./yak_shaving.conf" + +with tempfile.NamedTemporaryFile(delete=False, mode='wt') as t: + t.write('[conf]') + path = t.name + #print(path) + + with open(connect_conf, 'r') as f: + for line in f: + t.write(line) + + +cfg = ConfigParser() +cfg.read(path) + +schema_url = cfg.get('conf', 'value.converter.schema.registry.url') +schema_creds = cfg.get('conf', 'value.converter.schema.registry.basic.auth.user.info') +license_broker = cfg.get('conf', 'confluent.topic.bootstrap.servers') +api_key = cfg.get('conf', 'sasl.jaas.config') +#print(schema_url) + +cfg = ConfigParser() +cfg.read(yak_conf) + +mqtt_server = cfg.get('mqtt-connect', 'mqtt_server') +kafka_topic = cfg.get('mqtt-connect', 'kafka_topic') +mqtt_topic = socket.gethostname() +mqtt_topic = mqtt_topic + "/cpu_stats" +mqtt_server_uri = "tcp://" + mqtt_server + ":1883" + +with open(mqtt_conf_template, 'r') as f: + mqtt_dict = json.loads(f.read()) + + +config_dict = mqtt_dict['config'] +config_dict['value.converter.schema.registry.url'] = schema_url +config_dict['key.converter.schema.registry.url'] = schema_url +config_dict['value.converter.schema.registry.basic.auth.user.info'] = schema_creds +config_dict['mqtt.topics'] = mqtt_topic +config_dict['kafka.topic'] = kafka_topic +config_dict['mqtt.server.uri'] = mqtt_server_uri +config_dict['confluent.topic.bootstrap.servers'] = license_broker +config_dict['confluent.topic.sasl.jaas.config'] = api_key + + +#print(json.dumps(mqtt_dict)) + +with open(mqtt_generated_config, 'wt') as c: + c.write(json.dumps(mqtt_dict, indent=4)) diff --git a/create_mqtt_connect_docker_compose_dot_yaml.py b/create_mqtt_connect_docker_compose_dot_yaml.py new file mode 100644 index 0000000..7edc1cb --- /dev/null +++ b/create_mqtt_connect_docker_compose_dot_yaml.py @@ -0,0 +1,93 @@ +#!/usr/local/bin/python3 + +# read a file from command line or a .properties file and +# turn it into a docker-compose.yml file +# or at least format the fields appropriately. + +import os +import re +import ruamel.yaml +import requests +import secrets + +try: + r = requests.get('http://169.254.169.254/latest/meta-data/public-hostname', timeout=1) + advertised_hostname = r.text +except: + advertised_hostname = '0.0.0.0' + +file = 'my-connect-distributed.properties' +# TODO: Make this not hard coded. + +if os.path.exists(file): + try: + with open(file, 'rt') as input_file: + environment = {} + for line in input_file: + if re.match(r'^\s+', line): + #print("leading whitespace") + continue + elif re.match(r'\w+', line): + line = line.strip() + line = str(line) + # split line on = into field/value + fv = re.match(r'([^\=]*)=(.*)', line) + field = fv.group(1) + field_ = re.sub(r'\.', r'_', field) + ufield = field_.upper() + dcfield = 'CONNECT_' + ufield + value = fv.group(2) + #if re.match(r'^\d', value): + # value = int(value) + environment[dcfield] = value + #print("environment[" + dcfield + "] = " + value) + environment['CONNECT_REST_ADVERTISED_HOST_NAME'] = advertised_hostname + # create random prefix to avoid previous configs, offsets when running multiple times + prefix = secrets.token_urlsafe(4) + environment['CONNECT_GROUP_ID'] = prefix + '-mqtt' + environment['CONNECT_CONFIG_STORAGE_TOPIC'] = prefix + '-mqtt-storage' + environment['CONNECT_STATUS_STORAGE_TOPIC'] = prefix + '-mqtt-status' + environment['CONNECT_OFFSET_STORAGE_TOPIC'] = prefix + '-mqtt-offset' + #print(environment) + input_file.close() + except: + print("Can't do stuff here") + exit() + + +env = {} +env['environment'] = environment + +yaml = {} +yaml['version'] = '2' + +version = {} +services = {} +connect = {} +connect['image'] = 'confluentinc/cp-server-connect:6.1.0' +connect['hostname'] = 'mqtt-connect' +connect['container_name'] = 'mqtt-connect' +connect['ports'] = ["8083:8083"] +connect['environment'] = environment +#connect['volumes'] = ["/var/spooldir:/var/spooldir"] + +yaml['version'] = '2' +yaml['services'] = services +services['connect'] = connect + + +docker_compose_file = ruamel.yaml.round_trip_dump(yaml, explicit_start=True) + +forgive_me = ''' command: + - bash + - -c + - | + echo "Installing connector plugins" + confluent-hub install --no-prompt confluentinc/kafka-connect-mqtt:1.4.0 + echo "Launching Kafka Connect worker" + /etc/confluent/docker/run''' + +#print(forgive_me) +with open('docker-compose.yml', 'wt') as dc: + dc.write(str(docker_compose_file)) + dc.write(forgive_me) diff --git a/create_prometheus_connect_docker_compose_dot_yaml.py b/create_prometheus_connect_docker_compose_dot_yaml.py new file mode 100644 index 0000000..29c2fb6 --- /dev/null +++ b/create_prometheus_connect_docker_compose_dot_yaml.py @@ -0,0 +1,91 @@ +#!/usr/local/bin/python3 + +# read a file from command line or a .properties file and +# turn it into a docker-compose.yml file +# or at least format the fields appropriately. + +import os +import re +import ruamel.yaml +import requests +import secrets + +try: + r = requests.get('http://169.254.169.254/latest/meta-data/public-hostname', timeout=1) + advertised_hostname = r.text +except: + advertised_hostname = '0.0.0.0' + +file = 'my-connect-distributed.properties' +# TODO: Make this not hard coded. + +if os.path.exists(file): + try: + with open(file, 'rt') as input_file: + environment = {} + for line in input_file: + if re.match(r'^\s+', line): + #print("leading whitespace") + continue + elif re.match(r'\w+', line): + line = line.strip() + line = str(line) + # split line on = into field/value + fv = re.match(r'([^\=]*)=(.*)', line) + field = fv.group(1) + field_ = re.sub(r'\.', r'_', field) + ufield = field_.upper() + dcfield = 'CONNECT_' + ufield + value = fv.group(2) + #if re.match(r'^\d', value): + # value = int(value) + environment[dcfield] = value + #print("environment[" + dcfield + "] = " + value) + environment['CONNECT_REST_ADVERTISED_HOST_NAME'] = advertised_hostname + prefix = secrets.token_urlsafe(4) + environment['CONNECT_GROUP_ID'] = prefix + '-prometheus' + environment['CONNECT_CONFIG_STORAGE_TOPIC'] = prefix + '-prometheus-storage' + environment['CONNECT_STATUS_STORAGE_TOPIC'] = prefix + '-prometheus-status' + environment['CONNECT_OFFSET_STORAGE_TOPIC'] = prefix + '-prometheus-offset' + #print(environment) + input_file.close() + except: + print("Can't do stuff here") + exit() + + +env = {} +env['environment'] = environment + +yaml = {} +yaml['version'] = '2' + +version = {} +services = {} +connect = {} +connect['image'] = 'confluentinc/cp-server-connect:6.1.0' +connect['hostname'] = 'prometheus-connect' +connect['container_name'] = 'prometheus-connect' +connect['ports'] = ["8083:8083", "8889:8889"] +connect['environment'] = environment + +yaml['version'] = '2' +yaml['services'] = services +services['connect'] = connect + + +docker_compose_file = ruamel.yaml.round_trip_dump(yaml, explicit_start=True) + +forgive_me = ''' command: + - bash + - -c + - | + echo "Installing connector plugins" + confluent-hub install --no-prompt confluentinc/kafka-connect-prometheus-metrics:latest + echo "Launching Kafka Connect worker" + /etc/confluent/docker/run''' + +#print(forgive_me) +with open('docker-compose.yml', 'wt') as dc: + dc.write(str(docker_compose_file)) + dc.write(forgive_me) diff --git a/create_prometheus_sink_config.py b/create_prometheus_sink_config.py new file mode 100644 index 0000000..a35abe3 --- /dev/null +++ b/create_prometheus_sink_config.py @@ -0,0 +1,66 @@ +# create_prometheus_sink_config.py +# read a .properties file saved from Confluent Cloud and create a JSON +# config file to POST to the Connect REST API to start the +# Confluent Prometheus sink connector + +import tempfile +import json +from configparser import ConfigParser + +connect_conf = "my-connect-distributed.properties" +prom_conf_template = "prometheus_config_template.json" +prom_generated_config = "prom_config.json" +yak_conf = "./yak_shaving.conf" + +with tempfile.NamedTemporaryFile(delete=False, mode='wt') as t: + t.write('[conf]') + path = t.name + + with open(connect_conf, 'r') as f: + for line in f: + t.write(line) + + +cfg = ConfigParser() +cfg.read(path) + +schema_url = cfg.get('conf', 'value.converter.schema.registry.url') +schema_creds = cfg.get('conf', 'value.converter.schema.registry.basic.auth.user.info') +license_broker = cfg.get('conf', 'confluent.topic.bootstrap.servers') +api_key = cfg.get('conf', 'sasl.jaas.config') +#print(schema_url) + +cfg = ConfigParser() +cfg.read(yak_conf) + +prom_server = cfg.get('kafka_connect_prometheus', 'prom_server') +kafka_topic = cfg.get('kafka_connect_prometheus', 'kafka_topic') + +with open(prom_conf_template, 'r') as f: + prom_dict = json.loads(f.read()) + + +config_dict = prom_dict['config'] +config_dict['value.converter.schema.registry.url'] = schema_url +config_dict['key.converter.schema.registry.url'] = schema_url +config_dict['value.converter.schema.registry.basic.auth.user.info'] = schema_creds +#config_dict['prom.topics'] = prom_topic +config_dict['topics'] = kafka_topic +#config_dict['prom.server.uri'] = prom_server_uri +config_dict['confluent.topic.bootstrap.servers'] = license_broker +config_dict['confluent.topic.sasl.jaas.config'] = api_key +config_dict['sasl.jaas.config'] = api_key +config_dict['reporter.bootstrap.servers'] = license_broker +config_dict['confluent.topic.bootstrap.servers'] = license_broker +config_dict['producer.bootstrap.servers'] = license_broker +config_dict['consumer.bootstrap.servers'] = license_broker +config_dict['consumer.sasl.jaas.config'] = api_key +#config_dict['producer.sasl.jaas.config'] = api_key +config_dict['producer.sasl.jaas.config'] = api_key +config_dict['bootstrap.servers'] = license_broker + + +#print(json.dumps(prom_dict)) + +with open(prom_generated_config, 'wt') as c: + c.write(json.dumps(prom_dict, indent=4)) diff --git a/create_rest_proxy_docker_compose.py b/create_rest_proxy_docker_compose.py new file mode 100644 index 0000000..e5d1c78 --- /dev/null +++ b/create_rest_proxy_docker_compose.py @@ -0,0 +1,70 @@ +#!/usr/local/bin/python3 + +# read a file from command line or a .properties file and +# turn it into a docker-compose.yml file +# or at least format the fields appropriately. + +import os +import re +import ruamel.yaml +import requests + +try: + r = requests.get('http://169.254.169.254/latest/meta-data/public-hostname', timeout=1) + advertised_hostname = r.text +except: + advertised_hostname = '0.0.0.0' + +file = 'ccloud-kafka-rest.properties' +# TODO: Make this not hard coded. + +if os.path.exists(file): + try: + with open(file, 'rt') as input_file: + environment = {} + for line in input_file: + if re.match(r'^\s+', line): + #print("leading whitespace") + continue + elif re.match(r'\w+', line): + line = line.strip() + line = str(line) + # split line on = into field/value + fv = re.match(r'([^\=]*)=(.*)', line) + field = fv.group(1) + field_ = re.sub(r'\.', r'_', field) + ufield = field_.upper() + dcfield = 'KAFKA_REST_' + ufield + value = fv.group(2) + environment[dcfield] = value + environment['KAFKA_REST_LISTENERS'] = "http://0.0.0.0:8082" + input_file.close() + except: + print("I am slain!") + exit() + + +env = {} +env['environment'] = environment + +yaml = {} +yaml['version'] = '2' + +version = {} +services = {} +rest = {} +rest['image'] = 'confluentinc/cp-kafka-rest:6.1.0' +rest['hostname'] = 'rest-proxy' +rest['container_name'] = 'kafka-rest-proxy' +rest['ports'] = ["8082:8082"] +rest['environment'] = environment + +yaml['version'] = '2' +yaml['services'] = services +services['rest-proxy'] = rest + + +docker_compose_file = ruamel.yaml.round_trip_dump(yaml, explicit_start=True) + +with open('rest-docker-compose.yml', 'wt') as dc: + dc.write(str(docker_compose_file)) diff --git a/make_prometheus_server_config.py b/make_prometheus_server_config.py new file mode 100644 index 0000000..6c354bc --- /dev/null +++ b/make_prometheus_server_config.py @@ -0,0 +1,26 @@ +# make_prometheus_server_config.py +# +# a simple script to read the hosts.yml file, pull out the +# Prometheus sink connector hostname and use it to replace +# a placeholder in a template file. + +from ruamel.yaml import YAML +yaml = YAML() + +hostinfo = open("hosts.yml", "r") +ymldata = yaml.load(hostinfo) + +hostdict = ymldata['kafka_connect_prometheus']['hosts'] +datadict = dict(hostdict) +host = list(datadict.keys())[0] +#print(host) +host_uri = host + ":8889" + +prom = open("prom_server_default.yml", "r") +ymlprom = yaml.load(prom) + +ymlprom['scrape_configs'][1]['static_configs'][0]['targets'][0] = host_uri + +with open("prometheus.yml", "wt") as prom: + yaml.dump(ymlprom, prom) + diff --git a/mqtt_config_template.json b/mqtt_config_template.json new file mode 100644 index 0000000..1db8121 --- /dev/null +++ b/mqtt_config_template.json @@ -0,0 +1,24 @@ +{ + "name": "mqtt-source", + "config": { + "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector", + "tasks.max": "1", + "mqtt.server.uri": "tcp://MQTT-SERVER:1883", + "mqtt.topics": "MQTT-TOPIC", + "kafka.topic": "KAFKA-TOPIC", + "mqtt.qos": "2", + "confluent.topic.bootstrap.servers": "CCLOUD-BOOTSTRAP", + "confluent.topic.replication.factor": "3", + "confluent.license": "", + "confluent.topic.security.protocol": "SASL_SSL", + "confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"API-KEY\" password=\"SECRET\";", + "confluent.topic.ssl.endpoint.identification.algorithm": "https", + "confluent.topic.sasl.mechanism": "PLAIN", + "confluent.topic": "_confluent-command", + "confluent.topic.retry.backoff.ms": "500", + "confluent.topic.request.timeout.ms": "60000", + "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", + "key.converter": "org.apache.kafka.connect.storage.StringConverter" + + } +} \ No newline at end of file diff --git a/prom_server_default.yml b/prom_server_default.yml new file mode 100644 index 0000000..ae530ee --- /dev/null +++ b/prom_server_default.yml @@ -0,0 +1,11 @@ +global: + scrape_interval: 10s +scrape_configs: + - job_name: prometheus + static_configs: + - targets: + - localhost:9090 + - job_name: prometheus-connect + static_configs: + - targets: + - KAFKA_CONNECT \ No newline at end of file diff --git a/start_mqtt_connect.sh b/start_mqtt_connect.sh new file mode 100644 index 0000000..e22a979 --- /dev/null +++ b/start_mqtt_connect.sh @@ -0,0 +1,13 @@ +#!/bin/sh +# start_mqtt_connect.sh +# +# a simple script to connect to the MQTT source connect node and +# POST a JSON file with config properties for the Confluent MQTT source connector + +hostname=$(cat hosts.yml| yq e '.kafka_connect_mqtt.hosts' - | sed s'/.$//') + +echo "hostname is " && echo $hostname +echo "Making Connection to MQTT Server" +curl $hostname:8083/connectors -X POST -H "Content-Type: application/json" -d @mqtt_config.json + + diff --git a/start_prometheus_sink_connect.sh b/start_prometheus_sink_connect.sh new file mode 100644 index 0000000..c968017 --- /dev/null +++ b/start_prometheus_sink_connect.sh @@ -0,0 +1,15 @@ +#!/bin/sh +# start_prometheus_sink_connect.sh +# +# a simple script to read the hosts.yml file, extract the Prometheus +# sink connect hostname, connect to it and POST a JSON config file +# to start the Prometheus sink connector + + +hostname=$(cat hosts.yml| yq e '.kafka_connect_prometheus.hosts' - | sed s'/.$//') + +echo "hostname is " && echo $hostname +echo "Making Connection to Prometheus Connect Sink node" +curl $hostname:8083/connectors -X POST -H "Content-Type: application/json" -d @prom_config.json + +