AV1-vs-HEVC/workflows/res.py

503 lines
16 KiB
Python
Raw Permalink Normal View History

2024-01-12 20:33:08 +01:00
from jinja2 import Environment, BaseLoader
import glob
import json
import matplotlib.pyplot as plt
import numpy as np
import os
import re
import statistics
import sys
def aggregated_metrics(metrics):
all_encoding_times = []
all_encoding_time_percentages = []
all_filesize_percentages = []
all_vmaf_scores = []
for k, v in metrics["samples"].items():
all_encoding_times.append(v["encoding_time"])
all_filesize_percentages.append(v["filesize_percentage"])
all_vmaf_scores.append(v["vmaf_score"])
if "encoding_time_percentage" in v.keys():
all_encoding_time_percentages.append(v["encoding_time_percentage"])
metrics["encoding_time"] = {
"max": max(all_encoding_times),
"mean": int(sum(all_encoding_times) / len(all_encoding_times)),
"median": int(statistics.median(all_encoding_times)),
"min": min(all_encoding_times)
}
metrics["filesize_percentage"] = {
"max": max(all_filesize_percentages),
"mean": round(sum(all_filesize_percentages) / len(all_filesize_percentages), 2),
"median": round(statistics.median(all_filesize_percentages), 2),
"min": min(all_filesize_percentages)
}
metrics["vmaf_score"] = {
"max": max(all_vmaf_scores),
"mean": round(sum(all_vmaf_scores) / len(all_vmaf_scores), 2),
"median": round(statistics.median(all_vmaf_scores), 2),
"min": min(all_vmaf_scores)
}
if len(all_encoding_time_percentages) > 0:
metrics["encoding_time_percentage"] = {
"max": max(all_encoding_time_percentages),
"mean": round(sum(all_encoding_time_percentages) / len(all_encoding_time_percentages), 2),
"median": round(statistics.median(all_encoding_time_percentages), 2),
"min": min(all_encoding_time_percentages)
}
return metrics
def bootstrap_folder_structure():
folders = [
get_path_data(),
get_path_data_encodes(),
get_path_data_encodes_av1(),
get_path_data_encodes_hevc(),
get_path_data_samples(),
get_path_results(),
get_path_results_aggregations(),
get_path_results_candidates(),
get_path_results_diagrams(),
get_path_results_encoding_time(),
get_path_results_metrics(),
get_path_templates()
]
for f in folders:
os.makedirs(f, exist_ok=True)
def generate_diagram_bars(data, title, ylabel):
fig, ax = plt.subplots()
fig.patch.set_facecolor(get_background_color_for_diagrams())
ax.set_facecolor(get_background_color_for_diagrams())
presets = list(data.keys())
crfs = list(data[presets[0]].keys())
bar_width = 0.35
index = np.arange(len(presets) * len(crfs))
for i, preset in enumerate(presets):
for j, crf in enumerate(crfs):
mean_value = data[preset][crf]['mean']
median_value = data[preset][crf]['median']
mean_bar = ax.bar(index[i * len(crfs) + j] + 1 * bar_width, mean_value, bar_width, label=f'Mean ({preset} - {crf})', color='#106daa')
median_bar = ax.bar(index[i * len(crfs) + j] + (1 + 1) * bar_width, median_value, bar_width, label=f'Median ({preset} - {crf})', color='#3B758C')
# styling
ax.set_title(title, color='white')
ax.set_xlabel("Preset - CRF", color='white')
ax.set_ylabel(ylabel, color='white')
ax.spines['bottom'].set_color('white')
ax.spines['top'].set_color('white')
ax.spines['right'].set_color('white')
ax.spines['left'].set_color('white')
ax.xaxis.label.set_color('white')
ax.yaxis.label.set_color('white')
ax.tick_params(axis='x', colors='white')
ax.tick_params(axis='y', colors='white')
ax.set_xticks(index + bar_width * len(presets) / 2)
ax.set_xticklabels([f"{preset} - {crf}" for preset in presets for crf in crfs], rotation=45, ha='right')
# legend
legend = ax.legend(handles=ax.containers[:2], loc='upper right')
frame = legend.get_frame()
frame.set_facecolor(get_background_color_for_diagrams())
frame.set_edgecolor(get_background_color_for_diagrams())
legend_texts = ['Mean', 'Median']
for i, text in enumerate(legend.get_texts()):
text.set_text(legend_texts[i])
text.set_color('white')
return fig
def generate_diagram_normal_distribution(values, title, legend_unit, xlabel):
fig, ax = plt.subplots()
# add histogram
hist = ax.hist(values, bins=20, density=True, alpha=0.6, color='#106daa', edgecolor='black')
# add normal distribution
mu, sigma = np.mean(values), np.std(values)
xmin, xmax = plt.xlim()
x = np.linspace(xmin, xmax, 100)
p = np.exp(-(x - mu)**2 / (2 * sigma**2)) / (sigma * np.sqrt(2 * np.pi))
ax.plot(x, p, linewidth=2, color='white')
# add mean
ax.axvline(x=mu, linestyle='--', label=f'Mean: {int(round(mu, 0))} {legend_unit}', color='red')
# add median
median_value = np.median(values)
ax.axvline(x=median_value, linestyle='--', label=f'Median: {int(round(median_value, 0))} {legend_unit}', color='blue')
# add min and max value
min_value = min(values)
max_value = max(values)
ax.axvline(x=min_value, linestyle='--', label=f'Min: {int(round(min_value, 0))} {legend_unit}', color='green')
ax.axvline(x=max_value, linestyle='--', label=f'Max: {int(round(max_value, 0))} {legend_unit}', color='green')
# title and labels
ax.set_title(title, color='white')
ax.set_xlabel(xlabel, color='white')
ax.set_ylabel("Frequency", color='white')
# legend
legend = ax.legend()
frame = legend.get_frame()
frame.set_facecolor(get_background_color_for_diagrams())
frame.set_edgecolor(get_background_color_for_diagrams())
for text in legend.get_texts():
text.set_color('white')
# styling
fig.patch.set_facecolor(get_background_color_for_diagrams())
ax.set_facecolor(get_background_color_for_diagrams())
ax.spines['bottom'].set_color('white')
ax.spines['top'].set_color('white')
ax.spines['right'].set_color('white')
ax.spines['left'].set_color('white')
ax.xaxis.label.set_color('white')
ax.yaxis.label.set_color('white')
ax.tick_params(axis='x', colors='white')
ax.tick_params(axis='y', colors='white')
return fig
def get_all_diagrams():
samples = glob.glob(os.path.join(get_path_results_diagrams(), "*.png"))
samples.sort()
return samples
def get_all_encoded_files_av1():
encodes = glob.glob(os.path.join(get_path_data_encodes_av1(), "*.mkv"))
encodes.sort()
return encodes
def get_all_encoded_files_hevc():
encodes = glob.glob(os.path.join(get_path_data_encodes_hevc(), "*.mkv"))
encodes.sort()
return encodes
def get_all_sample_files():
samples = glob.glob(os.path.join(get_path_data_samples(), "*.mkv"))
samples.sort()
return samples
def get_background_color_for_diagrams():
return "#11171f"
def get_benchmark_command(f):
filenameEncode = os.path.splitext(os.path.basename(f))[0]
cmd_parts = [
'ffmpeg',
f'-i "{f}"',
f'-i "{get_sample_file_of_encode(filenameEncode)}"',
'-lavfi',
f'libvmaf="n_threads={os.cpu_count()}:log_fmt=json:log_path={get_filepath_metric_log(f)}"',
'-f',
'null',
'-'
]
return " ".join(cmd_parts)
def get_diagrams_hevc():
diagrams = {}
for path_to_diagram in get_all_diagrams():
filename = os.path.splitext(os.path.basename(path_to_diagram))[0]
if not "hevc" in filename:
continue
if "encoding_time" in filename:
diagrams["encoding_time"] = f"{filename}.png"
continue
if "filesize_percentage" in filename:
diagrams["filesize_percentage"] = f"{filename}.png"
continue
if "vmaf_score" in filename:
diagrams["vmaf_score"] = f"{filename}.png"
continue
return diagrams
def get_filepath_metric_log(f):
filenameEncode = os.path.splitext(os.path.basename(f))[0]
return os.path.join(
get_path_results_metrics(),
f"{filenameEncode}.json"
)
def get_filesize_percentage(f):
filesize_encode = os.path.getsize(f)
filesize_sample = os.path.getsize(
get_sample_file_of_encode(
os.path.splitext(os.path.basename(f))[0]
)
)
return round((filesize_encode / filesize_sample) * 100, 2)
def get_encoding_time_av1(filenameEncode):
path_to_file_results_encoding_time_av1 = os.path.join(
get_path_results_encoding_time(),
get_filename_results_encoding_time_av1()
)
encoding_times_av1 = read_dict_from_json_file(
path_to_file_results_encoding_time_av1
)
preset = get_preset_from_encode_filename(filenameEncode)
if not preset in encoding_times_av1.keys():
raise ValueError(
f'Missing preset "{preset}" in file: {path_to_file_results_encoding_time_av1}'
)
crf = get_crf_from_encode_filename(filenameEncode)
if not crf in encoding_times_av1[preset].keys():
raise ValueError(
f'Missing crf "{crf}" for preset "{preset}" in file: {path_to_file_results_encoding_time_av1}'
)
filename_sample = filenameEncode.split('.')[0]
if not filename_sample in encoding_times_av1[preset][crf].keys():
raise ValueError(
f'Missing sample filename "{filename_sample}" for preset "{preset}" and crf "{crf}" in file: {path_to_file_results_encoding_time_av1}'
)
return encoding_times_av1[preset][crf][filename_sample]
def get_encoding_time_hevc(filenameEncode):
path_to_file_results_encoding_time_hevc = os.path.join(
get_path_results_encoding_time(),
get_filename_results_encoding_time_hevc()
)
encoding_times_hevc = read_dict_from_json_file(
path_to_file_results_encoding_time_hevc
)
filename_sample = filenameEncode.split('.')[0]
if not filename_sample in encoding_times_hevc.keys():
raise ValueError(
f'Missing key "{filename_sample}" in file: {path_to_file_results_encoding_time_hevc}'
)
return encoding_times_hevc[filename_sample]
def get_filename_results_aggregations_av1():
return "av1.json"
def get_filename_results_aggregations_hevc():
return "hevc.json"
def get_filename_results_candidates_viable():
return "viable.json"
def get_filename_results_candidates_viable_with_tolerance():
return "viable_with_tolerance.json"
def get_filename_results_encoding_time_av1():
return "av1.json"
def get_filename_results_encoding_time_hevc():
return "hevc.json"
def get_all_values_encoding_time(aggregated_metrics):
values = []
for k, v in aggregated_metrics["samples"].items():
values.append(v["encoding_time"])
return values
def get_all_values_filesize_percentage(aggregated_metrics):
values = []
for k, v in aggregated_metrics["samples"].items():
values.append(v["filesize_percentage"])
return values
def get_all_values_vmaf_score(aggregated_metrics):
values = []
for k, v in aggregated_metrics["samples"].items():
values.append(v["vmaf_score"])
return values
def get_path_data():
return os.path.join(get_path_project(), "data")
def get_path_data_encodes():
return os.path.join(get_path_data(), "encodes")
def get_path_data_encodes_av1():
return os.path.join(get_path_data_encodes(), "av1")
def get_path_data_encodes_hevc():
return os.path.join(get_path_data_encodes(), "hevc")
def get_path_data_samples():
return os.path.join(get_path_data(), "samples")
def get_path_project():
return os.path.dirname(get_path_script())
def get_path_results():
return os.path.join(get_path_project(), "results")
def get_path_results_aggregations():
return os.path.join(get_path_results(), "aggregations")
def get_path_results_candidates():
return os.path.join(get_path_results(), "candidates")
def get_path_results_diagrams():
return os.path.join(get_path_results(), "diagrams")
def get_path_results_encoding_time():
return os.path.join(get_path_results(), "encoding_time")
def get_path_results_metrics():
return os.path.join(get_path_results(), "metrics")
def get_path_script():
return os.path.realpath(sys.argv[0])
def get_path_templates():
return os.path.join(get_path_project(), "templates")
def get_crf_from_encode_filename(filename):
match = re.compile(
r'.+\.Preset\.\w+\.CRF\.(\d+)'
).search(filename)
if match is None:
raise ValueError('Could not determine crf from filename')
return ''.join(match.groups())
def get_preset_from_encode_filename(filename):
match = re.compile(
r'.+\.Preset\.(\w+)\.CRF\..+'
).search(filename)
if match is None:
raise ValueError('Could not determine preset from filename')
return ''.join(match.groups())
def get_sample_file_of_encode(filename):
match = re.compile(
r'(sample\d\d)'
).search(filename)
if match is None:
raise ValueError('Could not determine sample from filename')
return os.path.join(
get_path_data_samples(),
f"{''.join(match.groups())}.mkv"
)
def get_sample_number_from_filename(filename):
match = re.compile(
r'sample(\d+)\.'
).search(filename)
if match is None:
raise ValueError('Could not determine sample number from filename')
return ''.join(match.groups())
def get_vmaf_score_of_encode(filename):
metric_file = os.path.join(
get_path_results_metrics(),
f"{filename}.json"
)
metrics = read_dict_from_json_file(metric_file)
if "frames" in metrics.keys():
del metrics['frames']
write_dict_to_json_file(metric_file, metrics)
if not "pooled_metrics" in metrics.keys():
return 0.0
if not "vmaf" in metrics["pooled_metrics"].keys():
return 0.0
if not "mean" in metrics["pooled_metrics"]["vmaf"].keys():
return 0.0
return round(metrics["pooled_metrics"]["vmaf"]["mean"], 2)
def read_dict_from_json_file(path_to_file):
if not os.path.exists(path_to_file):
write_dict_to_json_file(path_to_file, {})
return {}
with open(path_to_file, 'r', encoding='UTF-8') as f:
data_as_dict = json.load(f)
f.close()
return data_as_dict
def render_template(template_file_path, output_file_path, data):
with open(template_file_path, "r") as template_file:
template_content = template_file.read()
template_env = Environment(loader=BaseLoader())
template = template_env.from_string(template_content)
rendered_template = template.render(data)
with open(output_file_path, "w") as output_file:
output_file.write(rendered_template)
def save_diagram_bars(diagram, path_to_file):
diagram.set_size_inches(24, 18)
diagram.savefig(
path_to_file,
bbox_inches="tight",
facecolor=get_background_color_for_diagrams(),
dpi=300
)
def save_diagram_normal_distribution(diagram, path_to_file):
diagram.set_size_inches(12, 9)
diagram.savefig(
path_to_file,
bbox_inches="tight",
facecolor=get_background_color_for_diagrams(),
dpi=300
)
def sort_dict(input_dict):
sorted_dict = {}
for key, value in sorted(input_dict.items()):
if isinstance(value, dict):
sorted_dict[key] = sort_dict(value)
else:
sorted_dict[key] = value
return sorted_dict
def write_dict_to_json_file(path_to_file, data_as_dict):
with open(path_to_file, 'w', encoding='UTF-8') as f:
f.write(
json.dumps(
sort_dict(data_as_dict),
indent=4
)
)
f.close()