Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

develop input stream from kafka #25

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,6 @@ You can use dsio with your own hand coded anomaly detectors. These should inheri
Naturally we encourage people to use `dsio` in combination with `sklearn`: we have no wish to reinvent the wheel! However, `sklearn` currently supports regression, classification and clustering interfaces, but not anomaly detection as a standalone category. We are trying to correct that by the introduction of the `AnomalyMixin`: an interface for anomaly detection which follows `sklearn` design patterns. When you import an `sklearn` object you can therefore simply define or override certain methods to make it compatible with `dsio`. We have provided an example for you here:

./datamstream.io/examples/lof_anomaly_detector.py

### Read data stream from Kafka (optional)
It is possible to read a stream of data from Kafka adding the parameter `--kafka-uri` with the list of the bootstrap_servers delimited by `;`. The analysis will be done on a window of `--kafka-window-size` items from the input stream. The input parameter will represent input topic. The parameter `--auto_offset_reset` can be set to latest (default) to read only new messages or to earliest to read a topic from the beginning.
2 changes: 1 addition & 1 deletion dsio/dashboard/bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def update():

io_loop = tornado.ioloop.IOLoop.current()

if io_loop._running: # Assume we're in a Jupyter notebook
if hasattr(io_loop,'_running'): # Assume we're in a Jupyter notebook
output_notebook()
show(app)
else: # Otherwise start the bokeh server in a thread and open browser
Expand Down
6 changes: 6 additions & 0 deletions dsio/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ def parse_arguments():
default="http://localhost:5601/app/kibana")
parser.add_argument("--bokeh-port", help="Bokeh server port", default="5001")
parser.add_argument("--es-index", help="Elasticsearch index name")
parser.add_argument("--kafka-uri",help="Kafka broker uri",
default=None)
parser.add_argument("--kafka-window-size",help="messages to wait for batch analysis",
default=100)
parser.add_argument("--auto_offset_reset",help="kafka offset strategy: earliest or latest",
default="latest")
parser.add_argument("--entry-type", help="Entry type name",
default="measurement")
parser.add_argument("-v", "--verbose", help="Increase output verbosity",
Expand Down
64 changes: 52 additions & 12 deletions dsio/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import numpy as np
import pandas as pd

from kafka import KafkaConsumer
import json

from bokeh.plotting import curdoc

from .restream.elastic import init_elasticsearch
Expand Down Expand Up @@ -142,18 +145,55 @@ def main():
if not index_name:
index_name = 'dsio'

print('Loading the data...')
dataframe = pd.read_csv(args.input, sep=',')
print('Done.\n')

restream_dataframe(
dataframe=dataframe, detector=detector,
sensors=args.sensors, timefield=args.timefield,
speed=int(float(args.speed)), es_uri=args.es and args.es_uri,
kibana_uri=args.kibana_uri, index_name=index_name,
entry_type=args.entry_type, bokeh_port=int(args.bokeh_port),
cols=int(args.cols)
)
if args.kafka_uri:
print('Kafka input mode...')
kafka_window_size = int(args.kafka_window_size)
consumer = KafkaConsumer(args.input,
group_id='dsio-group',
bootstrap_servers=args.kafka_uri.split(';'),
#decoding json messages
value_deserializer=lambda m: json.loads(m.decode('ascii')),
auto_offset_reset= args.auto_offset_reset)
try:
to_analyze = []
while True:
received = consumer.poll(timeout_ms=1000)
for topic_partition, messages in received.items():
for m in messages:
print(m.value)
to_analyze.append(m.value)
#waiting to accumlate at least kafka_window_size items to analyze
if len(to_analyze) < kafka_window_size:
pass
else:
print(to_analyze)
dataframe=pd.DataFrame(to_analyze)
restream_dataframe(
dataframe=dataframe, detector=detector,
sensors=args.sensors, timefield=args.timefield,
speed=int(float(args.speed)), es_uri=args.es and args.es_uri,
kibana_uri=args.kibana_uri, index_name=index_name,
entry_type=args.entry_type, bokeh_port=int(args.bokeh_port),
cols=int(args.cols)
)
except KeyboardInterrupt:
print('Stopping Kafka consumer...')
finally:
consumer.close()

else:
print('Loading the data from csv...')
dataframe = pd.read_csv(args.input, sep=',')
print('Done.\n')

restream_dataframe(
dataframe=dataframe, detector=detector,
sensors=args.sensors, timefield=args.timefield,
speed=int(float(args.speed)), es_uri=args.es and args.es_uri,
kibana_uri=args.kibana_uri, index_name=index_name,
entry_type=args.entry_type, bokeh_port=int(args.bokeh_port),
cols=int(args.cols)
)

except DsioError as exc:
print(repr(exc))
Expand Down
30 changes: 30 additions & 0 deletions examples/kafka-procuder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda m: json.dumps(m).encode('ascii'))

messages = [
{"timestamp":1522513435, "speed": 30},
{"timestamp":1522513535, "speed": 32},
{"timestamp":1522513635, "speed": 31},
{"timestamp":1522513735, "speed": 29},
{"timestamp":1522513835, "speed": 32},
{"timestamp":1522513935, "speed": 30},
{"timestamp":1522514415, "speed": 600},
{"timestamp":1522514425, "speed": 30},
{"timestamp":1522514435, "speed": 31},
{"timestamp":1522514455, "speed": 29},
]

for m in messages:
try:
print('sending {}'.format(m))
future=producer.send('json-topic', m)
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
print('error')
pass
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ scipy
pandas
kibana-dashboard-api
scikit-learn
kafka-python