-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathstream_producer.py
34 lines (30 loc) · 988 Bytes
/
stream_producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import random
from time import sleep
from itertools import count
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
label1 = label2 = "Person"
edge_type = "CONNECTED_WITH"
edge_fields = '{}'
for i in count():
fields1 = f'{{id: {i}}}'
for j in random.sample(range(0, i), random.randint(0, min(i, 20))):
fields2 = f'{{id: {j}}}'
messages = [
f'edge|{label1}|{fields1}|{edge_type}|{edge_fields}|{label2}|{fields2}',
f'edge|{label2}|{fields2}|{edge_type}|{edge_fields}|{label1}|{fields1}',
]
for message in messages:
print(message)
producer.send(
'topic',
message.encode('utf-8')
)
print(message)
producer.send(
'topic',
f'node|{label1}|{fields1}|{{name: "person"}}'.encode('utf-8')
)
print(f'node|{label1}|{fields1}|{{name: "person"}}')
producer.flush()
sleep(4)