Files
wasm-encoding-tool/app/main.py

247 lines
6.2 KiB
Python

from concurrent.futures import ThreadPoolExecutor, Future
import logging
from pathlib import PurePath, Path
import shutil
import config
from .algorithms import compress_with_brotli, compress_with_zstandard, compress_with_gzip, compress_with_deflate
from .utils import get_paths, copy_paths, log
logging.basicConfig(
level=config.Main.log_level,
)
logger = logging.getLogger('WasmEncodingTool')
config.Output.Wasm.Brotli.compress = compress_with_brotli
config.Output.Wasm.Zstandard.compress = compress_with_zstandard
config.Output.Wasm.Gzip.compress = compress_with_gzip
config.Output.Wasm.Deflate.compress = compress_with_deflate
algorithms = [
config.Output.Wasm.Brotli,
config.Output.Wasm.Zstandard,
config.Output.Wasm.Gzip,
config.Output.Wasm.Deflate,
]
def backup():
@log(
logger=logger,
start_text='Backup',
success_text='Success backup',
)
def run():
shutil.rmtree(
config.Backup.path,
ignore_errors=True,
)
shutil.copytree(
config.Output.path,
config.Backup.path,
)
if not config.Backup.enabled:
logger.info('Backup disabled')
return
if not Path(config.Output.path).exists():
logger.info('Backup skipped (output path don\'t exists)')
return
run()
def copy_wasm_source(
input_wasm_paths: list[PurePath],
):
@log(
logger=logger,
start_text='Copy wasm source',
success_text='Success copy wasm source',
)
def run():
copy_paths(
from_dir_path=config.Input.path,
to_dir_path=config.Output.Wasm.Source.path,
paths=input_wasm_paths,
)
if not config.Output.Wasm.Source.enabled:
logger.info('Copy wasm source disabled')
return
if not len(input_wasm_paths):
logger.info('Copy wasm source skipped (input path don\'t exists)')
return
run()
def copy_other_source(
input_other_paths: list[PurePath],
):
@log(
logger=logger,
start_text='Copy other source',
success_text='Success copy other source',
)
def run():
copy_paths(
from_dir_path=config.Input.path,
to_dir_path=config.Output.Other.path,
paths=input_other_paths,
)
if not config.Output.Other.enabled:
logger.info('Copy other source disabled')
return
if not len(input_other_paths):
logger.info('Copy other source skipped (input path don\'t exists)')
return
run()
def compress_file_with_algorithm(
file_path: PurePath,
algorithm: config.Algorithm,
):
@log(
logger=logger,
start_text='Compress %s with %s' % (
file_path,
algorithm.name,
),
success_text='Success compress %s with %s' % (
file_path,
algorithm.name,
),
)
def run():
Path(algorithm.path).mkdir(
parents=True,
exist_ok=True,
)
with (
Path(config.Input.path, file_path).open('rb') as file_in,
Path(algorithm.path, file_path).open('wb') as file_out,
):
file_out.write(algorithm.compress(
data=file_in.read(),
level=algorithm.level,
))
run()
def compress(
executor: ThreadPoolExecutor,
input_wasm_paths: list[PurePath],
):
@log(
logger=logger,
start_text='Compress',
success_text='Success compress',
)
def run():
futures: list[Future] = []
for algorithm in algorithms:
if not algorithm.enabled:
logger.info('Skipped compress with %s' % algorithm.name)
continue
for path in input_wasm_paths:
future = executor.submit(
compress_file_with_algorithm,
file_path=path,
algorithm=algorithm,
)
futures.append(future)
for future in futures:
future.result()
if not len(input_wasm_paths):
logger.info('Compress skipped (input path don\'t exists)')
return
run()
def run_multithreading_tasks(
input_wasm_paths: list[PurePath],
input_other_paths: list[PurePath],
):
@log(
logger=logger,
start_text='Run multithreading tasks',
success_text='Success run multithreading tasks',
)
def run():
shutil.rmtree(
config.Output.path,
ignore_errors=True,
)
with ThreadPoolExecutor(
max_workers=config.Main.max_threads,
) as executor:
executor.submit(
copy_wasm_source,
input_wasm_paths=input_wasm_paths,
)
executor.submit(
copy_other_source,
input_other_paths=input_other_paths,
)
compress(
executor=executor,
input_wasm_paths=input_wasm_paths,
)
run()
def change_mode():
@log(
logger=logger,
start_text='Change mode',
success_text='Success change mode',
)
def run():
for path in Path(config.Output.path).rglob('*'):
if path.is_dir():
path.chmod(config.Output.dir_mode)
elif path.is_file():
path.chmod(config.Output.file_mode)
path.chmod(config.Output.dir_mode)
if not Path(config.Output.path).exists():
logger.info('Change mode skipped (output path don\'t exists)')
return
run()
def run():
@log(
logger=logger,
start_text='Start',
success_text='Done',
)
def run():
backup()
run_multithreading_tasks(
input_wasm_paths=get_paths(
from_dir_path=config.Input.path,
pattern='*.wasm',
check_function=lambda path: path.is_file(),
),
input_other_paths=get_paths(
from_dir_path=config.Input.path,
pattern='*',
check_function=lambda path: path.is_file() and path.suffix != '.wasm',
),
)
change_mode()
run()