AV1-vs-HEVC/workflows/res.py
2024-01-12 20:33:08 +01:00

503 lines
16 KiB
Python

from jinja2 import Environment, BaseLoader
import glob
import json
import matplotlib.pyplot as plt
import numpy as np
import os
import re
import statistics
import sys
def aggregated_metrics(metrics):
all_encoding_times = []
all_encoding_time_percentages = []
all_filesize_percentages = []
all_vmaf_scores = []
for k, v in metrics["samples"].items():
all_encoding_times.append(v["encoding_time"])
all_filesize_percentages.append(v["filesize_percentage"])
all_vmaf_scores.append(v["vmaf_score"])
if "encoding_time_percentage" in v.keys():
all_encoding_time_percentages.append(v["encoding_time_percentage"])
metrics["encoding_time"] = {
"max": max(all_encoding_times),
"mean": int(sum(all_encoding_times) / len(all_encoding_times)),
"median": int(statistics.median(all_encoding_times)),
"min": min(all_encoding_times)
}
metrics["filesize_percentage"] = {
"max": max(all_filesize_percentages),
"mean": round(sum(all_filesize_percentages) / len(all_filesize_percentages), 2),
"median": round(statistics.median(all_filesize_percentages), 2),
"min": min(all_filesize_percentages)
}
metrics["vmaf_score"] = {
"max": max(all_vmaf_scores),
"mean": round(sum(all_vmaf_scores) / len(all_vmaf_scores), 2),
"median": round(statistics.median(all_vmaf_scores), 2),
"min": min(all_vmaf_scores)
}
if len(all_encoding_time_percentages) > 0:
metrics["encoding_time_percentage"] = {
"max": max(all_encoding_time_percentages),
"mean": round(sum(all_encoding_time_percentages) / len(all_encoding_time_percentages), 2),
"median": round(statistics.median(all_encoding_time_percentages), 2),
"min": min(all_encoding_time_percentages)
}
return metrics
def bootstrap_folder_structure():
folders = [
get_path_data(),
get_path_data_encodes(),
get_path_data_encodes_av1(),
get_path_data_encodes_hevc(),
get_path_data_samples(),
get_path_results(),
get_path_results_aggregations(),
get_path_results_candidates(),
get_path_results_diagrams(),
get_path_results_encoding_time(),
get_path_results_metrics(),
get_path_templates()
]
for f in folders:
os.makedirs(f, exist_ok=True)
def generate_diagram_bars(data, title, ylabel):
fig, ax = plt.subplots()
fig.patch.set_facecolor(get_background_color_for_diagrams())
ax.set_facecolor(get_background_color_for_diagrams())
presets = list(data.keys())
crfs = list(data[presets[0]].keys())
bar_width = 0.35
index = np.arange(len(presets) * len(crfs))
for i, preset in enumerate(presets):
for j, crf in enumerate(crfs):
mean_value = data[preset][crf]['mean']
median_value = data[preset][crf]['median']
mean_bar = ax.bar(index[i * len(crfs) + j] + 1 * bar_width, mean_value, bar_width, label=f'Mean ({preset} - {crf})', color='#106daa')
median_bar = ax.bar(index[i * len(crfs) + j] + (1 + 1) * bar_width, median_value, bar_width, label=f'Median ({preset} - {crf})', color='#3B758C')
# styling
ax.set_title(title, color='white')
ax.set_xlabel("Preset - CRF", color='white')
ax.set_ylabel(ylabel, color='white')
ax.spines['bottom'].set_color('white')
ax.spines['top'].set_color('white')
ax.spines['right'].set_color('white')
ax.spines['left'].set_color('white')
ax.xaxis.label.set_color('white')
ax.yaxis.label.set_color('white')
ax.tick_params(axis='x', colors='white')
ax.tick_params(axis='y', colors='white')
ax.set_xticks(index + bar_width * len(presets) / 2)
ax.set_xticklabels([f"{preset} - {crf}" for preset in presets for crf in crfs], rotation=45, ha='right')
# legend
legend = ax.legend(handles=ax.containers[:2], loc='upper right')
frame = legend.get_frame()
frame.set_facecolor(get_background_color_for_diagrams())
frame.set_edgecolor(get_background_color_for_diagrams())
legend_texts = ['Mean', 'Median']
for i, text in enumerate(legend.get_texts()):
text.set_text(legend_texts[i])
text.set_color('white')
return fig
def generate_diagram_normal_distribution(values, title, legend_unit, xlabel):
fig, ax = plt.subplots()
# add histogram
hist = ax.hist(values, bins=20, density=True, alpha=0.6, color='#106daa', edgecolor='black')
# add normal distribution
mu, sigma = np.mean(values), np.std(values)
xmin, xmax = plt.xlim()
x = np.linspace(xmin, xmax, 100)
p = np.exp(-(x - mu)**2 / (2 * sigma**2)) / (sigma * np.sqrt(2 * np.pi))
ax.plot(x, p, linewidth=2, color='white')
# add mean
ax.axvline(x=mu, linestyle='--', label=f'Mean: {int(round(mu, 0))} {legend_unit}', color='red')
# add median
median_value = np.median(values)
ax.axvline(x=median_value, linestyle='--', label=f'Median: {int(round(median_value, 0))} {legend_unit}', color='blue')
# add min and max value
min_value = min(values)
max_value = max(values)
ax.axvline(x=min_value, linestyle='--', label=f'Min: {int(round(min_value, 0))} {legend_unit}', color='green')
ax.axvline(x=max_value, linestyle='--', label=f'Max: {int(round(max_value, 0))} {legend_unit}', color='green')
# title and labels
ax.set_title(title, color='white')
ax.set_xlabel(xlabel, color='white')
ax.set_ylabel("Frequency", color='white')
# legend
legend = ax.legend()
frame = legend.get_frame()
frame.set_facecolor(get_background_color_for_diagrams())
frame.set_edgecolor(get_background_color_for_diagrams())
for text in legend.get_texts():
text.set_color('white')
# styling
fig.patch.set_facecolor(get_background_color_for_diagrams())
ax.set_facecolor(get_background_color_for_diagrams())
ax.spines['bottom'].set_color('white')
ax.spines['top'].set_color('white')
ax.spines['right'].set_color('white')
ax.spines['left'].set_color('white')
ax.xaxis.label.set_color('white')
ax.yaxis.label.set_color('white')
ax.tick_params(axis='x', colors='white')
ax.tick_params(axis='y', colors='white')
return fig
def get_all_diagrams():
samples = glob.glob(os.path.join(get_path_results_diagrams(), "*.png"))
samples.sort()
return samples
def get_all_encoded_files_av1():
encodes = glob.glob(os.path.join(get_path_data_encodes_av1(), "*.mkv"))
encodes.sort()
return encodes
def get_all_encoded_files_hevc():
encodes = glob.glob(os.path.join(get_path_data_encodes_hevc(), "*.mkv"))
encodes.sort()
return encodes
def get_all_sample_files():
samples = glob.glob(os.path.join(get_path_data_samples(), "*.mkv"))
samples.sort()
return samples
def get_background_color_for_diagrams():
return "#11171f"
def get_benchmark_command(f):
filenameEncode = os.path.splitext(os.path.basename(f))[0]
cmd_parts = [
'ffmpeg',
f'-i "{f}"',
f'-i "{get_sample_file_of_encode(filenameEncode)}"',
'-lavfi',
f'libvmaf="n_threads={os.cpu_count()}:log_fmt=json:log_path={get_filepath_metric_log(f)}"',
'-f',
'null',
'-'
]
return " ".join(cmd_parts)
def get_diagrams_hevc():
diagrams = {}
for path_to_diagram in get_all_diagrams():
filename = os.path.splitext(os.path.basename(path_to_diagram))[0]
if not "hevc" in filename:
continue
if "encoding_time" in filename:
diagrams["encoding_time"] = f"{filename}.png"
continue
if "filesize_percentage" in filename:
diagrams["filesize_percentage"] = f"{filename}.png"
continue
if "vmaf_score" in filename:
diagrams["vmaf_score"] = f"{filename}.png"
continue
return diagrams
def get_filepath_metric_log(f):
filenameEncode = os.path.splitext(os.path.basename(f))[0]
return os.path.join(
get_path_results_metrics(),
f"{filenameEncode}.json"
)
def get_filesize_percentage(f):
filesize_encode = os.path.getsize(f)
filesize_sample = os.path.getsize(
get_sample_file_of_encode(
os.path.splitext(os.path.basename(f))[0]
)
)
return round((filesize_encode / filesize_sample) * 100, 2)
def get_encoding_time_av1(filenameEncode):
path_to_file_results_encoding_time_av1 = os.path.join(
get_path_results_encoding_time(),
get_filename_results_encoding_time_av1()
)
encoding_times_av1 = read_dict_from_json_file(
path_to_file_results_encoding_time_av1
)
preset = get_preset_from_encode_filename(filenameEncode)
if not preset in encoding_times_av1.keys():
raise ValueError(
f'Missing preset "{preset}" in file: {path_to_file_results_encoding_time_av1}'
)
crf = get_crf_from_encode_filename(filenameEncode)
if not crf in encoding_times_av1[preset].keys():
raise ValueError(
f'Missing crf "{crf}" for preset "{preset}" in file: {path_to_file_results_encoding_time_av1}'
)
filename_sample = filenameEncode.split('.')[0]
if not filename_sample in encoding_times_av1[preset][crf].keys():
raise ValueError(
f'Missing sample filename "{filename_sample}" for preset "{preset}" and crf "{crf}" in file: {path_to_file_results_encoding_time_av1}'
)
return encoding_times_av1[preset][crf][filename_sample]
def get_encoding_time_hevc(filenameEncode):
path_to_file_results_encoding_time_hevc = os.path.join(
get_path_results_encoding_time(),
get_filename_results_encoding_time_hevc()
)
encoding_times_hevc = read_dict_from_json_file(
path_to_file_results_encoding_time_hevc
)
filename_sample = filenameEncode.split('.')[0]
if not filename_sample in encoding_times_hevc.keys():
raise ValueError(
f'Missing key "{filename_sample}" in file: {path_to_file_results_encoding_time_hevc}'
)
return encoding_times_hevc[filename_sample]
def get_filename_results_aggregations_av1():
return "av1.json"
def get_filename_results_aggregations_hevc():
return "hevc.json"
def get_filename_results_candidates_viable():
return "viable.json"
def get_filename_results_candidates_viable_with_tolerance():
return "viable_with_tolerance.json"
def get_filename_results_encoding_time_av1():
return "av1.json"
def get_filename_results_encoding_time_hevc():
return "hevc.json"
def get_all_values_encoding_time(aggregated_metrics):
values = []
for k, v in aggregated_metrics["samples"].items():
values.append(v["encoding_time"])
return values
def get_all_values_filesize_percentage(aggregated_metrics):
values = []
for k, v in aggregated_metrics["samples"].items():
values.append(v["filesize_percentage"])
return values
def get_all_values_vmaf_score(aggregated_metrics):
values = []
for k, v in aggregated_metrics["samples"].items():
values.append(v["vmaf_score"])
return values
def get_path_data():
return os.path.join(get_path_project(), "data")
def get_path_data_encodes():
return os.path.join(get_path_data(), "encodes")
def get_path_data_encodes_av1():
return os.path.join(get_path_data_encodes(), "av1")
def get_path_data_encodes_hevc():
return os.path.join(get_path_data_encodes(), "hevc")
def get_path_data_samples():
return os.path.join(get_path_data(), "samples")
def get_path_project():
return os.path.dirname(get_path_script())
def get_path_results():
return os.path.join(get_path_project(), "results")
def get_path_results_aggregations():
return os.path.join(get_path_results(), "aggregations")
def get_path_results_candidates():
return os.path.join(get_path_results(), "candidates")
def get_path_results_diagrams():
return os.path.join(get_path_results(), "diagrams")
def get_path_results_encoding_time():
return os.path.join(get_path_results(), "encoding_time")
def get_path_results_metrics():
return os.path.join(get_path_results(), "metrics")
def get_path_script():
return os.path.realpath(sys.argv[0])
def get_path_templates():
return os.path.join(get_path_project(), "templates")
def get_crf_from_encode_filename(filename):
match = re.compile(
r'.+\.Preset\.\w+\.CRF\.(\d+)'
).search(filename)
if match is None:
raise ValueError('Could not determine crf from filename')
return ''.join(match.groups())
def get_preset_from_encode_filename(filename):
match = re.compile(
r'.+\.Preset\.(\w+)\.CRF\..+'
).search(filename)
if match is None:
raise ValueError('Could not determine preset from filename')
return ''.join(match.groups())
def get_sample_file_of_encode(filename):
match = re.compile(
r'(sample\d\d)'
).search(filename)
if match is None:
raise ValueError('Could not determine sample from filename')
return os.path.join(
get_path_data_samples(),
f"{''.join(match.groups())}.mkv"
)
def get_sample_number_from_filename(filename):
match = re.compile(
r'sample(\d+)\.'
).search(filename)
if match is None:
raise ValueError('Could not determine sample number from filename')
return ''.join(match.groups())
def get_vmaf_score_of_encode(filename):
metric_file = os.path.join(
get_path_results_metrics(),
f"{filename}.json"
)
metrics = read_dict_from_json_file(metric_file)
if "frames" in metrics.keys():
del metrics['frames']
write_dict_to_json_file(metric_file, metrics)
if not "pooled_metrics" in metrics.keys():
return 0.0
if not "vmaf" in metrics["pooled_metrics"].keys():
return 0.0
if not "mean" in metrics["pooled_metrics"]["vmaf"].keys():
return 0.0
return round(metrics["pooled_metrics"]["vmaf"]["mean"], 2)
def read_dict_from_json_file(path_to_file):
if not os.path.exists(path_to_file):
write_dict_to_json_file(path_to_file, {})
return {}
with open(path_to_file, 'r', encoding='UTF-8') as f:
data_as_dict = json.load(f)
f.close()
return data_as_dict
def render_template(template_file_path, output_file_path, data):
with open(template_file_path, "r") as template_file:
template_content = template_file.read()
template_env = Environment(loader=BaseLoader())
template = template_env.from_string(template_content)
rendered_template = template.render(data)
with open(output_file_path, "w") as output_file:
output_file.write(rendered_template)
def save_diagram_bars(diagram, path_to_file):
diagram.set_size_inches(24, 18)
diagram.savefig(
path_to_file,
bbox_inches="tight",
facecolor=get_background_color_for_diagrams(),
dpi=300
)
def save_diagram_normal_distribution(diagram, path_to_file):
diagram.set_size_inches(12, 9)
diagram.savefig(
path_to_file,
bbox_inches="tight",
facecolor=get_background_color_for_diagrams(),
dpi=300
)
def sort_dict(input_dict):
sorted_dict = {}
for key, value in sorted(input_dict.items()):
if isinstance(value, dict):
sorted_dict[key] = sort_dict(value)
else:
sorted_dict[key] = value
return sorted_dict
def write_dict_to_json_file(path_to_file, data_as_dict):
with open(path_to_file, 'w', encoding='UTF-8') as f:
f.write(
json.dumps(
sort_dict(data_as_dict),
indent=4
)
)
f.close()