from jinja2 import Environment, BaseLoader import glob import json import matplotlib.pyplot as plt import numpy as np import os import re import statistics import sys def aggregated_metrics(metrics): all_encoding_times = [] all_encoding_time_percentages = [] all_filesize_percentages = [] all_vmaf_scores = [] for k, v in metrics["samples"].items(): all_encoding_times.append(v["encoding_time"]) all_filesize_percentages.append(v["filesize_percentage"]) all_vmaf_scores.append(v["vmaf_score"]) if "encoding_time_percentage" in v.keys(): all_encoding_time_percentages.append(v["encoding_time_percentage"]) metrics["encoding_time"] = { "max": max(all_encoding_times), "mean": int(sum(all_encoding_times) / len(all_encoding_times)), "median": int(statistics.median(all_encoding_times)), "min": min(all_encoding_times) } metrics["filesize_percentage"] = { "max": max(all_filesize_percentages), "mean": round(sum(all_filesize_percentages) / len(all_filesize_percentages), 2), "median": round(statistics.median(all_filesize_percentages), 2), "min": min(all_filesize_percentages) } metrics["vmaf_score"] = { "max": max(all_vmaf_scores), "mean": round(sum(all_vmaf_scores) / len(all_vmaf_scores), 2), "median": round(statistics.median(all_vmaf_scores), 2), "min": min(all_vmaf_scores) } if len(all_encoding_time_percentages) > 0: metrics["encoding_time_percentage"] = { "max": max(all_encoding_time_percentages), "mean": round(sum(all_encoding_time_percentages) / len(all_encoding_time_percentages), 2), "median": round(statistics.median(all_encoding_time_percentages), 2), "min": min(all_encoding_time_percentages) } return metrics def bootstrap_folder_structure(): folders = [ get_path_data(), get_path_data_encodes(), get_path_data_encodes_av1(), get_path_data_encodes_hevc(), get_path_data_samples(), get_path_results(), get_path_results_aggregations(), get_path_results_candidates(), get_path_results_diagrams(), get_path_results_encoding_time(), get_path_results_metrics(), get_path_templates() ] for f in folders: os.makedirs(f, exist_ok=True) def generate_diagram_bars(data, title, ylabel): fig, ax = plt.subplots() fig.patch.set_facecolor(get_background_color_for_diagrams()) ax.set_facecolor(get_background_color_for_diagrams()) presets = list(data.keys()) crfs = list(data[presets[0]].keys()) bar_width = 0.35 index = np.arange(len(presets) * len(crfs)) for i, preset in enumerate(presets): for j, crf in enumerate(crfs): mean_value = data[preset][crf]['mean'] median_value = data[preset][crf]['median'] mean_bar = ax.bar(index[i * len(crfs) + j] + 1 * bar_width, mean_value, bar_width, label=f'Mean ({preset} - {crf})', color='#106daa') median_bar = ax.bar(index[i * len(crfs) + j] + (1 + 1) * bar_width, median_value, bar_width, label=f'Median ({preset} - {crf})', color='#3B758C') # styling ax.set_title(title, color='white') ax.set_xlabel("Preset - CRF", color='white') ax.set_ylabel(ylabel, color='white') ax.spines['bottom'].set_color('white') ax.spines['top'].set_color('white') ax.spines['right'].set_color('white') ax.spines['left'].set_color('white') ax.xaxis.label.set_color('white') ax.yaxis.label.set_color('white') ax.tick_params(axis='x', colors='white') ax.tick_params(axis='y', colors='white') ax.set_xticks(index + bar_width * len(presets) / 2) ax.set_xticklabels([f"{preset} - {crf}" for preset in presets for crf in crfs], rotation=45, ha='right') # legend legend = ax.legend(handles=ax.containers[:2], loc='upper right') frame = legend.get_frame() frame.set_facecolor(get_background_color_for_diagrams()) frame.set_edgecolor(get_background_color_for_diagrams()) legend_texts = ['Mean', 'Median'] for i, text in enumerate(legend.get_texts()): text.set_text(legend_texts[i]) text.set_color('white') return fig def generate_diagram_normal_distribution(values, title, legend_unit, xlabel): fig, ax = plt.subplots() # add histogram hist = ax.hist(values, bins=20, density=True, alpha=0.6, color='#106daa', edgecolor='black') # add normal distribution mu, sigma = np.mean(values), np.std(values) xmin, xmax = plt.xlim() x = np.linspace(xmin, xmax, 100) p = np.exp(-(x - mu)**2 / (2 * sigma**2)) / (sigma * np.sqrt(2 * np.pi)) ax.plot(x, p, linewidth=2, color='white') # add mean ax.axvline(x=mu, linestyle='--', label=f'Mean: {int(round(mu, 0))} {legend_unit}', color='red') # add median median_value = np.median(values) ax.axvline(x=median_value, linestyle='--', label=f'Median: {int(round(median_value, 0))} {legend_unit}', color='blue') # add min and max value min_value = min(values) max_value = max(values) ax.axvline(x=min_value, linestyle='--', label=f'Min: {int(round(min_value, 0))} {legend_unit}', color='green') ax.axvline(x=max_value, linestyle='--', label=f'Max: {int(round(max_value, 0))} {legend_unit}', color='green') # title and labels ax.set_title(title, color='white') ax.set_xlabel(xlabel, color='white') ax.set_ylabel("Frequency", color='white') # legend legend = ax.legend() frame = legend.get_frame() frame.set_facecolor(get_background_color_for_diagrams()) frame.set_edgecolor(get_background_color_for_diagrams()) for text in legend.get_texts(): text.set_color('white') # styling fig.patch.set_facecolor(get_background_color_for_diagrams()) ax.set_facecolor(get_background_color_for_diagrams()) ax.spines['bottom'].set_color('white') ax.spines['top'].set_color('white') ax.spines['right'].set_color('white') ax.spines['left'].set_color('white') ax.xaxis.label.set_color('white') ax.yaxis.label.set_color('white') ax.tick_params(axis='x', colors='white') ax.tick_params(axis='y', colors='white') return fig def get_all_diagrams(): samples = glob.glob(os.path.join(get_path_results_diagrams(), "*.png")) samples.sort() return samples def get_all_encoded_files_av1(): encodes = glob.glob(os.path.join(get_path_data_encodes_av1(), "*.mkv")) encodes.sort() return encodes def get_all_encoded_files_hevc(): encodes = glob.glob(os.path.join(get_path_data_encodes_hevc(), "*.mkv")) encodes.sort() return encodes def get_all_sample_files(): samples = glob.glob(os.path.join(get_path_data_samples(), "*.mkv")) samples.sort() return samples def get_background_color_for_diagrams(): return "#11171f" def get_benchmark_command(f): filenameEncode = os.path.splitext(os.path.basename(f))[0] cmd_parts = [ 'ffmpeg', f'-i "{f}"', f'-i "{get_sample_file_of_encode(filenameEncode)}"', '-lavfi', f'libvmaf="n_threads={os.cpu_count()}:log_fmt=json:log_path={get_filepath_metric_log(f)}"', '-f', 'null', '-' ] return " ".join(cmd_parts) def get_diagrams_hevc(): diagrams = {} for path_to_diagram in get_all_diagrams(): filename = os.path.splitext(os.path.basename(path_to_diagram))[0] if not "hevc" in filename: continue if "encoding_time" in filename: diagrams["encoding_time"] = f"{filename}.png" continue if "filesize_percentage" in filename: diagrams["filesize_percentage"] = f"{filename}.png" continue if "vmaf_score" in filename: diagrams["vmaf_score"] = f"{filename}.png" continue return diagrams def get_filepath_metric_log(f): filenameEncode = os.path.splitext(os.path.basename(f))[0] return os.path.join( get_path_results_metrics(), f"{filenameEncode}.json" ) def get_filesize_percentage(f): filesize_encode = os.path.getsize(f) filesize_sample = os.path.getsize( get_sample_file_of_encode( os.path.splitext(os.path.basename(f))[0] ) ) return round((filesize_encode / filesize_sample) * 100, 2) def get_encoding_time_av1(filenameEncode): path_to_file_results_encoding_time_av1 = os.path.join( get_path_results_encoding_time(), get_filename_results_encoding_time_av1() ) encoding_times_av1 = read_dict_from_json_file( path_to_file_results_encoding_time_av1 ) preset = get_preset_from_encode_filename(filenameEncode) if not preset in encoding_times_av1.keys(): raise ValueError( f'Missing preset "{preset}" in file: {path_to_file_results_encoding_time_av1}' ) crf = get_crf_from_encode_filename(filenameEncode) if not crf in encoding_times_av1[preset].keys(): raise ValueError( f'Missing crf "{crf}" for preset "{preset}" in file: {path_to_file_results_encoding_time_av1}' ) filename_sample = filenameEncode.split('.')[0] if not filename_sample in encoding_times_av1[preset][crf].keys(): raise ValueError( f'Missing sample filename "{filename_sample}" for preset "{preset}" and crf "{crf}" in file: {path_to_file_results_encoding_time_av1}' ) return encoding_times_av1[preset][crf][filename_sample] def get_encoding_time_hevc(filenameEncode): path_to_file_results_encoding_time_hevc = os.path.join( get_path_results_encoding_time(), get_filename_results_encoding_time_hevc() ) encoding_times_hevc = read_dict_from_json_file( path_to_file_results_encoding_time_hevc ) filename_sample = filenameEncode.split('.')[0] if not filename_sample in encoding_times_hevc.keys(): raise ValueError( f'Missing key "{filename_sample}" in file: {path_to_file_results_encoding_time_hevc}' ) return encoding_times_hevc[filename_sample] def get_filename_results_aggregations_av1(): return "av1.json" def get_filename_results_aggregations_hevc(): return "hevc.json" def get_filename_results_candidates_viable(): return "viable.json" def get_filename_results_candidates_viable_with_tolerance(): return "viable_with_tolerance.json" def get_filename_results_encoding_time_av1(): return "av1.json" def get_filename_results_encoding_time_hevc(): return "hevc.json" def get_all_values_encoding_time(aggregated_metrics): values = [] for k, v in aggregated_metrics["samples"].items(): values.append(v["encoding_time"]) return values def get_all_values_filesize_percentage(aggregated_metrics): values = [] for k, v in aggregated_metrics["samples"].items(): values.append(v["filesize_percentage"]) return values def get_all_values_vmaf_score(aggregated_metrics): values = [] for k, v in aggregated_metrics["samples"].items(): values.append(v["vmaf_score"]) return values def get_path_data(): return os.path.join(get_path_project(), "data") def get_path_data_encodes(): return os.path.join(get_path_data(), "encodes") def get_path_data_encodes_av1(): return os.path.join(get_path_data_encodes(), "av1") def get_path_data_encodes_hevc(): return os.path.join(get_path_data_encodes(), "hevc") def get_path_data_samples(): return os.path.join(get_path_data(), "samples") def get_path_project(): return os.path.dirname(get_path_script()) def get_path_results(): return os.path.join(get_path_project(), "results") def get_path_results_aggregations(): return os.path.join(get_path_results(), "aggregations") def get_path_results_candidates(): return os.path.join(get_path_results(), "candidates") def get_path_results_diagrams(): return os.path.join(get_path_results(), "diagrams") def get_path_results_encoding_time(): return os.path.join(get_path_results(), "encoding_time") def get_path_results_metrics(): return os.path.join(get_path_results(), "metrics") def get_path_script(): return os.path.realpath(sys.argv[0]) def get_path_templates(): return os.path.join(get_path_project(), "templates") def get_crf_from_encode_filename(filename): match = re.compile( r'.+\.Preset\.\w+\.CRF\.(\d+)' ).search(filename) if match is None: raise ValueError('Could not determine crf from filename') return ''.join(match.groups()) def get_preset_from_encode_filename(filename): match = re.compile( r'.+\.Preset\.(\w+)\.CRF\..+' ).search(filename) if match is None: raise ValueError('Could not determine preset from filename') return ''.join(match.groups()) def get_sample_file_of_encode(filename): match = re.compile( r'(sample\d\d)' ).search(filename) if match is None: raise ValueError('Could not determine sample from filename') return os.path.join( get_path_data_samples(), f"{''.join(match.groups())}.mkv" ) def get_sample_number_from_filename(filename): match = re.compile( r'sample(\d+)\.' ).search(filename) if match is None: raise ValueError('Could not determine sample number from filename') return ''.join(match.groups()) def get_vmaf_score_of_encode(filename): metric_file = os.path.join( get_path_results_metrics(), f"{filename}.json" ) metrics = read_dict_from_json_file(metric_file) if "frames" in metrics.keys(): del metrics['frames'] write_dict_to_json_file(metric_file, metrics) if not "pooled_metrics" in metrics.keys(): return 0.0 if not "vmaf" in metrics["pooled_metrics"].keys(): return 0.0 if not "mean" in metrics["pooled_metrics"]["vmaf"].keys(): return 0.0 return round(metrics["pooled_metrics"]["vmaf"]["mean"], 2) def read_dict_from_json_file(path_to_file): if not os.path.exists(path_to_file): write_dict_to_json_file(path_to_file, {}) return {} with open(path_to_file, 'r', encoding='UTF-8') as f: data_as_dict = json.load(f) f.close() return data_as_dict def render_template(template_file_path, output_file_path, data): with open(template_file_path, "r") as template_file: template_content = template_file.read() template_env = Environment(loader=BaseLoader()) template = template_env.from_string(template_content) rendered_template = template.render(data) with open(output_file_path, "w") as output_file: output_file.write(rendered_template) def save_diagram_bars(diagram, path_to_file): diagram.set_size_inches(24, 18) diagram.savefig( path_to_file, bbox_inches="tight", facecolor=get_background_color_for_diagrams(), dpi=300 ) def save_diagram_normal_distribution(diagram, path_to_file): diagram.set_size_inches(12, 9) diagram.savefig( path_to_file, bbox_inches="tight", facecolor=get_background_color_for_diagrams(), dpi=300 ) def sort_dict(input_dict): sorted_dict = {} for key, value in sorted(input_dict.items()): if isinstance(value, dict): sorted_dict[key] = sort_dict(value) else: sorted_dict[key] = value return sorted_dict def write_dict_to_json_file(path_to_file, data_as_dict): with open(path_to_file, 'w', encoding='UTF-8') as f: f.write( json.dumps( sort_dict(data_as_dict), indent=4 ) ) f.close()