Source code for ocrd.resource_manager

from pathlib import Path
from os.path import join
from os import environ, listdir, getcwd
import re
from shutil import copytree
from datetime import datetime
from tarfile import open as open_tarfile
from urllib.parse import urlparse, unquote

import requests
from yaml import safe_load, safe_dump

from ocrd_validators import OcrdResourceListValidator
from ocrd_utils import getLogger
from ocrd_utils.constants import HOME, XDG_DATA_HOME, XDG_CONFIG_HOME
from ocrd_utils.os import list_all_resources, pushd_popd

from .constants import RESOURCE_LIST_FILENAME, RESOURCE_USER_LIST_COMMENT

[docs]class OcrdResourceManager(): """ Managing processor resources """ def __init__(self): self.log = getLogger('ocrd.resource_manager') self.database = {} self.load_resource_list(Path(RESOURCE_LIST_FILENAME)) self.user_list = Path(XDG_CONFIG_HOME, 'ocrd', 'resources.yml') if not self.user_list.exists(): if not self.user_list.parent.exists(): self.user_list.parent.mkdir(parents=True) with open(str(self.user_list), 'w', encoding='utf-8') as f: f.write(RESOURCE_USER_LIST_COMMENT) self.load_resource_list(self.user_list)
[docs] def load_resource_list(self, list_filename, database=None): if not database: database = self.database if list_filename.is_file(): with open(list_filename, 'r', encoding='utf-8') as f: list_loaded = safe_load(f) or {} report = OcrdResourceListValidator.validate(list_loaded) if not report.is_valid: self.log.error('\n'.join(report.errors)) raise ValueError("Resource list %s is invalid!" % (list_filename)) for executable, resource_list in list_loaded.items(): if executable not in database: database[executable] = [] # Prepend, so user provided is sorted before builtin database[executable] = list_loaded[executable] + database[executable] return database
[docs] def list_available(self, executable=None): """ List models available for download by processor """ if executable: return [(executable, self.database[executable])] return [(x, y) for x, y in self.database.items()]
[docs] def list_installed(self, executable=None): """ List installed resources, matching with registry by ``name`` """ ret = [] if executable: all_executables = [executable] else: # resources we know about all_executables = list(self.database.keys()) # resources in the file system parent_dirs = [join(x, 'ocrd-resources') for x in [XDG_DATA_HOME, '/usr/local/share']] for parent_dir in parent_dirs: if Path(parent_dir).exists(): all_executables += [x for x in listdir(parent_dir) if x.startswith('ocrd-')] for this_executable in set(all_executables): reslist = [] for res_filename in list_all_resources(this_executable): res_name = Path(res_filename).name resdict = [x for x in self.database.get(this_executable, []) if x['name'] == res_name] if not resdict: self.log.info("%s resource '%s' (%s) not a known resource, creating stub in %s'" % (this_executable, res_name, res_filename, self.user_list)) resdict = [self.add_to_user_database(this_executable, res_filename)] resdict[0]['path'] = res_filename reslist.append(resdict[0]) ret.append((this_executable, reslist)) return ret
[docs] def add_to_user_database(self, executable, res_filename, url=None): """ Add a stub entry to the user resource.yml """ res_name = Path(res_filename).name res_size = Path(res_filename).stat().st_size with open(self.user_list, 'r', encoding='utf-8') as f: user_database = safe_load(f) or {} if executable not in user_database: user_database[executable] = [] if not self.find_resources(executable=executable, name=res_name, database=user_database): resdict = { 'name': res_name, 'url': url if url else '???', 'description': 'Found at %s on %s' % (self.resource_dir_to_location(res_filename), datetime.now()), 'version_range': '???', 'size': res_size } user_database[executable].append(resdict) with open(self.user_list, 'w', encoding='utf-8') as f: f.write(RESOURCE_USER_LIST_COMMENT) f.write('\n') f.write(safe_dump(user_database)) return resdict
[docs] def find_resources(self, executable=None, name=None, url=None, database=None): """ Find resources in the registry """ if not database: database = self.database ret = [] if executable and executable not in database.keys(): return ret for executable in [executable] if executable else database.keys(): for resdict in database[executable]: if not name and not url: ret.append((executable, resdict)) elif url and url == resdict['url']: ret.append((executable, resdict)) elif name and name == resdict['name']: ret.append((executable, resdict)) return ret
@property def default_resource_dir(self): return self.location_to_resource_dir('data')
[docs] def location_to_resource_dir(self, location): return '/usr/local/share/ocrd-resources' if location == 'system' else \ join(XDG_DATA_HOME, 'ocrd-resources') if location == 'data' else \ getcwd()
[docs] def resource_dir_to_location(self, resource_path): resource_path = str(resource_path) return 'system' if resource_path.startswith('/usr/local/share/ocrd-resources') else \ 'data' if resource_path.startswith(join(XDG_DATA_HOME, 'ocrd-resources')) else \ 'cwd' if resource_path.startswith(getcwd()) else \ resource_path
[docs] def parameter_usage(self, name, usage='as-is'): if usage == 'as-is': return name if usage == 'without-extension': return Path(name).stem
def _download_impl(self, url, filename, progress_cb=None, size=None): log = getLogger('ocrd.resource_manager._download_impl') log.info("Downloading %s to %s" % (url, filename)) with open(filename, 'wb') as f: with requests.get(url, stream=True) as r: total = size if size else int(r.headers.get('content-length')) for data in r.iter_content(chunk_size=4096): if progress_cb: progress_cb(len(data)) f.write(data) def _copy_impl(self, src_filename, filename, progress_cb=None): log = getLogger('ocrd.resource_manager._copy_impl') log.info("Copying %s" % src_filename) with open(filename, 'wb') as f_out, open(src_filename, 'rb') as f_in: while True: chunk = f_in.read(4096) if chunk: f_out.write(chunk) if progress_cb: progress_cb(len(chunk)) else: break # TODO Proper caching (make head request for size, If-Modified etc)
[docs] def download( self, executable, url, basedir, overwrite=False, no_subdir=False, name=None, resource_type='file', path_in_archive='.', progress_cb=None, size=None, ): """ Download a resource by URL """ log = getLogger('ocrd.resource_manager.download') destdir = Path(basedir) if no_subdir else Path(basedir, executable) if not name: url_parsed = urlparse(url) name = Path(unquote(url_parsed.path)).name fpath = Path(destdir, name) is_url = url.startswith('https://') or url.startswith('http://') if fpath.exists() and not overwrite: log.info("%s to be %s to %s which already exists and overwrite is False" % (url, 'downloaded' if is_url else 'copied', fpath)) return fpath destdir.mkdir(parents=True, exist_ok=True) if resource_type == 'file': if is_url: self._download_impl(url, fpath, progress_cb) else: self._copy_impl(url, fpath, progress_cb) elif resource_type == 'tarball': with pushd_popd(tempdir=True): if is_url: self._download_impl(url, 'download.tar.xx', progress_cb, size) else: self._copy_impl(url, 'download.tar.xx', progress_cb) Path('out').mkdir() with pushd_popd('out'): log.info("Extracting tarball") with open_tarfile('../download.tar.xx', 'r:*') as tar: tar.extractall() log.info("Copying '%s' from tarball to %s" % (path_in_archive, fpath)) copytree(path_in_archive, str(fpath)) # TODO # elif resource_type == 'github-dir': return fpath