247 lines
6.2 KiB
Python
247 lines
6.2 KiB
Python
from concurrent.futures import ThreadPoolExecutor, Future
|
|
import logging
|
|
from pathlib import PurePath, Path
|
|
import shutil
|
|
|
|
import config
|
|
|
|
from .algorithms import compress_with_brotli, compress_with_zstandard, compress_with_gzip, compress_with_deflate
|
|
from .utils import get_paths, copy_paths, log
|
|
|
|
|
|
logging.basicConfig(
|
|
level=config.Main.log_level,
|
|
)
|
|
logger = logging.getLogger('WasmEncodingTool')
|
|
|
|
config.Output.Wasm.Brotli.compress = compress_with_brotli
|
|
config.Output.Wasm.Zstandard.compress = compress_with_zstandard
|
|
config.Output.Wasm.Gzip.compress = compress_with_gzip
|
|
config.Output.Wasm.Deflate.compress = compress_with_deflate
|
|
|
|
algorithms = [
|
|
config.Output.Wasm.Brotli,
|
|
config.Output.Wasm.Zstandard,
|
|
config.Output.Wasm.Gzip,
|
|
config.Output.Wasm.Deflate,
|
|
]
|
|
|
|
|
|
def backup():
|
|
@log(
|
|
logger=logger,
|
|
start_text='Backup',
|
|
success_text='Success backup',
|
|
)
|
|
def run():
|
|
shutil.rmtree(
|
|
config.Backup.path,
|
|
ignore_errors=True,
|
|
)
|
|
shutil.copytree(
|
|
config.Output.path,
|
|
config.Backup.path,
|
|
)
|
|
|
|
if not config.Backup.enabled:
|
|
logger.info('Backup disabled')
|
|
return
|
|
if not Path(config.Output.path).exists():
|
|
logger.info('Backup skipped (output path don\'t exists)')
|
|
return
|
|
|
|
run()
|
|
|
|
|
|
def copy_wasm_source(
|
|
input_wasm_paths: list[PurePath],
|
|
):
|
|
@log(
|
|
logger=logger,
|
|
start_text='Copy wasm source',
|
|
success_text='Success copy wasm source',
|
|
)
|
|
def run():
|
|
copy_paths(
|
|
from_dir_path=config.Input.path,
|
|
to_dir_path=config.Output.Wasm.Source.path,
|
|
paths=input_wasm_paths,
|
|
)
|
|
|
|
if not config.Output.Wasm.Source.enabled:
|
|
logger.info('Copy wasm source disabled')
|
|
return
|
|
if not len(input_wasm_paths):
|
|
logger.info('Copy wasm source skipped (input path don\'t exists)')
|
|
return
|
|
|
|
run()
|
|
|
|
|
|
def copy_other_source(
|
|
input_other_paths: list[PurePath],
|
|
):
|
|
@log(
|
|
logger=logger,
|
|
start_text='Copy other source',
|
|
success_text='Success copy other source',
|
|
)
|
|
def run():
|
|
copy_paths(
|
|
from_dir_path=config.Input.path,
|
|
to_dir_path=config.Output.Other.path,
|
|
paths=input_other_paths,
|
|
)
|
|
|
|
if not config.Output.Other.enabled:
|
|
logger.info('Copy other source disabled')
|
|
return
|
|
if not len(input_other_paths):
|
|
logger.info('Copy other source skipped (input path don\'t exists)')
|
|
return
|
|
|
|
run()
|
|
|
|
|
|
def compress_file_with_algorithm(
|
|
file_path: PurePath,
|
|
algorithm: config.Algorithm,
|
|
):
|
|
@log(
|
|
logger=logger,
|
|
start_text='Compress %s with %s' % (
|
|
file_path,
|
|
algorithm.name,
|
|
),
|
|
success_text='Success compress %s with %s' % (
|
|
file_path,
|
|
algorithm.name,
|
|
),
|
|
)
|
|
def run():
|
|
Path(algorithm.path).mkdir(
|
|
parents=True,
|
|
exist_ok=True,
|
|
)
|
|
with (
|
|
Path(config.Input.path, file_path).open('rb') as file_in,
|
|
Path(algorithm.path, file_path).open('wb') as file_out,
|
|
):
|
|
file_out.write(algorithm.compress(
|
|
data=file_in.read(),
|
|
level=algorithm.level,
|
|
))
|
|
|
|
run()
|
|
|
|
|
|
def compress(
|
|
executor: ThreadPoolExecutor,
|
|
input_wasm_paths: list[PurePath],
|
|
):
|
|
@log(
|
|
logger=logger,
|
|
start_text='Compress',
|
|
success_text='Success compress',
|
|
)
|
|
def run():
|
|
futures: list[Future] = []
|
|
for algorithm in algorithms:
|
|
if not algorithm.enabled:
|
|
logger.info('Skipped compress with %s' % algorithm.name)
|
|
continue
|
|
for path in input_wasm_paths:
|
|
future = executor.submit(
|
|
compress_file_with_algorithm,
|
|
file_path=path,
|
|
algorithm=algorithm,
|
|
)
|
|
futures.append(future)
|
|
for future in futures:
|
|
future.result()
|
|
|
|
if not len(input_wasm_paths):
|
|
logger.info('Compress skipped (input path don\'t exists)')
|
|
return
|
|
|
|
run()
|
|
|
|
|
|
def run_multithreading_tasks(
|
|
input_wasm_paths: list[PurePath],
|
|
input_other_paths: list[PurePath],
|
|
):
|
|
@log(
|
|
logger=logger,
|
|
start_text='Run multithreading tasks',
|
|
success_text='Success run multithreading tasks',
|
|
)
|
|
def run():
|
|
shutil.rmtree(
|
|
config.Output.path,
|
|
ignore_errors=True,
|
|
)
|
|
with ThreadPoolExecutor(
|
|
max_workers=config.Main.max_threads,
|
|
) as executor:
|
|
executor.submit(
|
|
copy_wasm_source,
|
|
input_wasm_paths=input_wasm_paths,
|
|
)
|
|
executor.submit(
|
|
copy_other_source,
|
|
input_other_paths=input_other_paths,
|
|
)
|
|
compress(
|
|
executor=executor,
|
|
input_wasm_paths=input_wasm_paths,
|
|
)
|
|
|
|
run()
|
|
|
|
|
|
def change_mode():
|
|
@log(
|
|
logger=logger,
|
|
start_text='Change mode',
|
|
success_text='Success change mode',
|
|
)
|
|
def run():
|
|
for path in Path(config.Output.path).rglob('*'):
|
|
if path.is_dir():
|
|
path.chmod(config.Output.dir_mode)
|
|
elif path.is_file():
|
|
path.chmod(config.Output.file_mode)
|
|
path.chmod(config.Output.dir_mode)
|
|
|
|
if not Path(config.Output.path).exists():
|
|
logger.info('Change mode skipped (output path don\'t exists)')
|
|
return
|
|
|
|
run()
|
|
|
|
|
|
def run():
|
|
@log(
|
|
logger=logger,
|
|
start_text='Start',
|
|
success_text='Done',
|
|
)
|
|
def run():
|
|
backup()
|
|
run_multithreading_tasks(
|
|
input_wasm_paths=get_paths(
|
|
from_dir_path=config.Input.path,
|
|
pattern='*.wasm',
|
|
check_function=lambda path: path.is_file(),
|
|
),
|
|
input_other_paths=get_paths(
|
|
from_dir_path=config.Input.path,
|
|
pattern='*',
|
|
check_function=lambda path: path.is_file() and path.suffix != '.wasm',
|
|
),
|
|
)
|
|
change_mode()
|
|
|
|
run()
|