-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathprocess_file.py
123 lines (96 loc) · 3.6 KB
/
process_file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import os
import subprocess
import sys
from pathlib import Path
import scenedetect
from pytube import YouTube
import csv
def sizeof_fmt(num, suffix="B"):
for unit in ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"):
if abs(num) < 1024.0:
return f"{num:3.1f}{unit}{suffix}"
num /= 1024.0
return f"{num:.1f}Yi{suffix}"
YT_URLS = [
("https://www.youtube.com/watch?v=tsm1rizxAj8", "beethoven-2nd-symphony"),
]
def convert_timecode_to_seconds(timestamp):
hours, minutes, seconds = map(float, timestamp.split(':'))
total_seconds = hours * 3600 + minutes * 60 + seconds
return total_seconds
def download(url, filename):
"""Currently broken due to YouTube update. Next pytube release will probably fix it."""
YouTube(
url,
on_progress_callback=lambda stream, chunk, remaining: print(sizeof_fmt(remaining)),
).streams.first().download(filename=filename + '.mp4')
def split_scenes(path: str):
args = ['scenedetect', '-i', path, 'list-scenes', 'save-images', '-n', '1']
# args += ['time', '--start', '0s', '--end', '10s'] # use these args for testing on parts of the video
p = subprocess.run(args, shell=True)
return p.returncode
def move_split_output_to_separate_folder(filename):
files_to_move = Path().glob(f"{filename}*")
dir = Path(filename)
dir.mkdir(exist_ok=True)
for path in files_to_move:
if path.name.endswith('.mp4'):
continue
new_path = dir / path.name
try:
os.replace(path, new_path)
except OSError:
pass
files_to_rename = dir.glob(filename + '-*')
for path in files_to_rename:
new_path = dir / path.name.replace(filename + '-', '')
try:
os.replace(path, new_path)
except OSError:
pass
def generate_tilia_csvs(filename):
hierarchy_data = []
dir = Path(filename)
with open(dir / 'Scenes.csv', 'r', newline='') as csvfile:
reader = csv.reader(csvfile)
next(reader)
next(reader)
for row in reader:
scene_number = row[0]
start_time = convert_timecode_to_seconds(row[2])
end_time = convert_timecode_to_seconds(row[5])
hierarchy_data.append([start_time, end_time, 1, scene_number])
with open(dir / 'hierarchies.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['start', 'end', 'level', 'label'])
for row in hierarchy_data:
writer.writerow(row)
with open(dir / 'labels.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['scene_number', 'label'])
for i in range(len(hierarchy_data) + 1):
writer.writerow([i + 1, ''])
def get_video_and_cut_times():
for i, (url, filename) in enumerate(YT_URLS):
download(url, filename)
split_scenes(filename)
move_split_output_to_separate_folder(filename)
generate_tilia_csvs(filename)
def process_yt_url(url, filename):
download(url, filename)
split_scenes(filename)
move_split_output_to_separate_folder(filename)
generate_tilia_csvs(filename)
def process_local_file(filename):
if filename.endswith('.mp4'):
filename = filename[:-4]
print('Processing ' + filename + '.')
returncode = split_scenes(filename + '.mp4')
if returncode != 0:
print('Failed to split video.')
return
move_split_output_to_separate_folder(filename)
generate_tilia_csvs(filename)
print('Finished processing ' + filename + '.')
if __name__ == "__main__":
get_video_and_cut_times()