-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodify-data-jsonl.py
72 lines (58 loc) · 2.25 KB
/
modify-data-jsonl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
"""
Modify Spectacular AI data JSONL files
Input JSONL in read from stdin an output written to stdout
"""
import json
import sys
def modify_jsonl_sorted(in_f, out_f, func):
lines = []
for line in in_f:
d = func(json.loads(line))
if d is not None:
lines.append(d)
for l in sorted(lines, key=lambda d: d.get('time', -1e100)):
out_f.write(json.dumps(l)+'\n')
def modify_jsonl_stream(in_f, out_f, func):
for line in in_f:
d = func(json.loads(line))
if d is not None:
out_f.write(json.dumps(d)+'\n')
return out_f
def shift_imu_time(data, delta_t):
if 'sensor' in data:
data['time'] += delta_t
return data
def shift_frame_times(data, delta_t):
if 'frames' in data:
data['time'] += delta_t
for cam in data['frames']:
if 'time' in cam: cam['time'] += delta_t
return data
def drop_zero_imu_samples(data):
if 'sensor' in data and all([v == 0.0 for v in data['sensor']['values']]): return None
return data
if __name__ == '__main__':
import argparse
p = argparse.ArgumentParser(__doc__)
subs = p.add_subparsers(dest='command')
sub_shift_imu = subs.add_parser('shift_imu_time', help='shift IMU timestamps by given delta_t')
sub_shift_imu.add_argument('delta_t', help='timestamp shift in seconds', type=float)
sub_shift_frames = subs.add_parser('shift_frame_times', help='shift frame timestamps by given delta_t')
sub_shift_frames.add_argument('delta_t', help='timestamp shift in seconds', type=float)
sub_drop_zero_imus = subs.add_parser('drop_zero_imu_samples', help='remove all-zero IMU samples')
p.add_argument('-s', '--sort', action='store_true', help='sort the resulting file based on timestamp')
args = p.parse_args()
f_in = sys.stdin
f_out = sys.stdout
if args.command == 'shift_imu_time':
func = lambda x: shift_imu_time(x, args.delta_t)
elif args.command == 'shift_frame_times':
func = lambda x: shift_frame_times(x, args.delta_t)
elif args.command == 'drop_zero_imu_samples':
func = drop_zero_imu_samples
else:
assert(False)
if args.sort:
modify_jsonl_sorted(f_in, f_out, func)
else:
modify_jsonl_stream(f_in, f_out, func)