Source code for laf.data

import os
import glob
import time
import array
import pickle
import collections
import gzip
from .names import Names, FabricError
from .parse import parse
from .model import model

GZIP_LEVEL = 2
PICKLE_PROTOCOL = 3

[docs]class LafData(object): '''Manage the compiling and loading of LAF/GraF data.''' def __init__(self): self.log = None self.clog = None self.data_items = {}
[docs] def prepare_dirs(self, annox): env = self.names.env try: if not os.path.exists(env['m_compiled_dir']): os.makedirs(env['m_compiled_dir']) except os.error as e: raise FabricError("could not create bin directory {}".format(env['m_compiled_dir']), self.stamp, cause=e) for anx in annox: try: a_compiled_dir = env['annox'][anx]['a_compiled_dir'] if not os.path.exists(a_compiled_dir): os.makedirs(a_compiled_dir) except os.error as e: raise FabricError("could not create bin directory {}".format(a_compiled_dir), self.stamp, cause=e) try: if not os.path.exists(env['task_dir']): os.makedirs(env['task_dir']) except os.error as e: raise FabricError("could not create result directory {}".format(env['task_dir']), self.stamp, cause=e)
[docs] def finish_task(self, show=True): '''Close all open files that have been opened by the API''' task_dir = self.names.env['task_dir'] for handle in self.result_files: if handle and not handle.closed: handle.close() self.result_files = [] self._flush_logfile() mg = [] if show: self.stamp.Nmsg("Results directory:\n{}".format(task_dir)) for name in sorted(os.listdir(path=task_dir)): path = "{}/{}".format(task_dir, name) size = os.path.getsize(path) mtime = time.ctime(os.path.getmtime(path)) mg.append("{:<30} {:>12} {}".format(name, size, mtime)) self.stamp.Nmsg("\n" + "\n".join(mg), withtime=False) self._finish_logfile()
def _finish_logfile(self, compile=None): the_log = self.log if compile == None else self.clog try: the_log.write("\nCLOSED AT:{}".format(time.strftime("%Y-%m-%dT%H-%M-%S", time.gmtime()))) the_log.close() except: pass self.stamp.disconnect_log() if compile == None: self.log = None else: self.clog = None def _flush_logfile(self): try: self.log.flush() except: pass
[docs] def add_logfile(self, compile=None): env = self.names.env log_dir = env['task_dir'] if compile == None else env["{}_compiled_dir".format(compile)] if compile == 'm' else env['annox'][compile[1:]]["{}_compiled_dir".format(compile[0])] log_path = env['log_path'] if compile == None else env["{}_compiled_path".format(compile)] if compile == 'm' else env['annox'][compile[1:]]["{}_compiled_path".format(compile[0])] the_log = self.log if compile == None else self.clog try: if not os.path.exists(log_dir): os.makedirs(log_dir) except os.error as e: raise FabricError("could not create log directory {}".format(log_dir), self.stamp, cause=e) if not the_log: the_log = open(log_path, "w", encoding="utf-8") the_log.write("OPENED AT:{}\n".format(time.strftime("%Y-%m-%dT%H-%M-%S", time.gmtime()))) self.stamp.connect_log(the_log) self.stamp.Nmsg("LOGFILE={}".format(log_path)) if compile == None: self.log = the_log else: self.clog = the_log
[docs] def compile_all(self, force): env = self.names.env compile_uptodate = collections.OrderedDict() compile_uptodate['m'] = not os.path.exists(env['m_source_path']) or ( os.path.exists(env['m_compiled_path']) and os.path.getmtime(env['m_compiled_path']) >= os.path.getmtime(env['m_source_path']) ) for anx in sorted(env['annox']): uptodate = True for afile in glob.glob('{}/*.xml'.format(env['annox'][anx]['a_source_dir'])): this_uptodate = ( os.path.exists(env['annox'][anx]['a_compiled_path']) and os.path.getmtime(env['annox'][anx]['a_compiled_path']) >= os.path.getmtime(afile) ) if not this_uptodate: uptodate = False break compile_uptodate['a{}'.format(anx)] = uptodate has_compiled_main = False for origin in compile_uptodate: origin_type = origin if origin == 'm' else origin[0] origin_spec = env['source'] if origin == 'm' else origin[1:] if (not compile_uptodate[origin]) or force[origin_type] or has_compiled_main: self.stamp.Nmsg("BEGIN COMPILE {}: {}".format(origin_type, origin_spec)) self._clear_origin_unnec(origin) if origin_type == 'a': self._load_extra(['mXnf()', 'mXef()'] + Names.maingroup('G')) self._compile_origin(origin) if origin_type == 'm': has_compiled_main = True self.stamp.Nmsg("END COMPILE {}: {}".format(origin_type, origin_spec)) else: self.stamp.Dmsg("COMPILING {}: {}: UP TO DATE".format(origin_type, origin_spec)) the_time = 'UNSPECIFIED' compiled_path = env['{}_compiled_path'.format(origin)] if origin_type == 'm' else env['annox'][origin_spec]["{}_compiled_path".format(origin_type)] with open(compiled_path) as h: last_line = list(h)[-1] if ':' in last_line: the_time = last_line.split(':', 1)[1] self.stamp.Nmsg("USING {}: {} DATA COMPILED AT: {}".format('main' if origin_type == 'm' else 'annox', origin_spec, the_time)) if has_compiled_main: for origin in (compile_uptodate): self._clear_origin_unnec(origin) self.names.setenv()
def _compile_origin(self, origin): env = self.names.env self.add_logfile(compile=origin) self._parse(origin) self._model(origin) self._store_origin(origin) self._finish_logfile(compile=origin) def _clear_origin_unnec(self, origin): dkeys = list(self.data_items.keys()) for dkey in sorted(dkeys): if dkey in self.names.req_data_items: continue (dorigin, dgroup, dkind, ddir, dcomps) = Names.decomp_full(dkey) if dorigin == origin: self._clear_file(dkey) def _clear_file(self, dkey): if dkey in self.data_items: del self.data_items[dkey]
[docs] def unload_all(self): self.loadspec = {} for dkey in self.data_items: del self.data_items[dkey] self.names.req_data_items = collections.OrderedDict() self.names._old_data_items = collections.OrderedDict()
[docs] def load_all(self, req_items, prepare, add): dkeys = self.names.request_files(req_items, prepare[0]) self.loadspec = dkeys self.prepare_dict = prepare[0] self.prepare_init = prepare[1] for dkey in dkeys['keep']: if dkey not in dkeys['prep']: self.stamp.Dmsg("keep {}".format(Names.dmsg(dkey))) if not add: for dkey in dkeys['clear']: if dkey not in dkeys['prep']: self.stamp.Dmsg("clear {}".format(Names.dmsg(dkey))) self._clear_file(dkey) for dkey in dkeys['load']: if dkey not in dkeys['prep']: self.stamp.Dmsg("load {}".format(Names.dmsg(dkey))) ism = self.names.dinfo(dkey)[0] self._load_file(dkey, accept_missing=not ism)
def _load_extra(self, dkeys): for dkey in dkeys: self.stamp.Dmsg("load {}".format(Names.dmsg(dkey))) ism = self.names.dinfo(dkey)[0] self._load_file(dkey, accept_missing=not ism)
[docs] def prepare_all(self, api): if hasattr(self, 'api'): self.api.update(api) else: self.api = api dkeys = self.loadspec for dkey in dkeys['keep']: if dkey in dkeys['prep']: self.stamp.Dmsg("keep {}".format(Names.dmsg(dkey))) for dkey in dkeys['clear']: if dkey in dkeys['prep']: self.stamp.Dmsg("clear {}".format(Names.dmsg(dkey))) self._clear_file(dkey) for dkey in dkeys['load']: if dkey in dkeys['prep']: self.stamp.Nmsg("prep {}".format(Names.dmsg(dkey))) self._load_file(dkey, accept_missing=False)
def _load_file(self, dkey, accept_missing=False): env = self.names.env dprep = self.names.dinfo(dkey)[-1] if dprep: if dkey not in self.prepare_dict: raise FabricError("Cannot prepare data for {}. No preparation method available.".format(Names.dmsg(dkey)), self.stamp) return self.names.setenv(zspace=self.prepare_dict[dkey][-1]) (ism, dloc, dfile, dtype, dprep) = self.names.dinfo(dkey) dpath = "{}/{}".format(dloc, dfile) prep_done = False if dprep: (method, method_source, replace, zspace) = self.prepare_dict[dkey] up_to_date = os.path.exists(dpath) and \ os.path.getmtime(dpath) >= os.path.getmtime(method_source) and \ os.path.getmtime(dpath) >= os.path.getmtime(env['m_compiled_path']) if not up_to_date: self.stamp.Nmsg("PREPARING {}".format(Names.dmsg(dkey))) compiled_dir = self.names.env['{}_compiled_dir'.format('z')] try: if not os.path.exists(compiled_dir): os.makedirs(compiled_dir) except os.error as e: raise FabricError("could not create compiled directory {}".format(compiled_dir), self.stamp, cause=e) newdata = method(self.api) self.data_items[dkey] = newdata self.stamp.Nmsg("WRITING {}".format(Names.dmsg(dkey))) self._store_file(dkey) prep_done = True if not os.path.exists(dpath): if not accept_missing: raise FabricError("Cannot load data for {}: File does not exist: {}.".format(Names.dmsg(dkey), dpath), self.stamp) return if not prep_done: newdata = None if dtype == 'arr': newdata = array.array('I') with gzip.open(dpath, "rb") as f: contents = f.read() newdata.frombytes(contents) elif dtype == 'dct': with gzip.open(dpath, "rb") as f: newdata = pickle.load(f) elif dtype == 'str': with gzip.open(dpath, "rt", encoding="utf-8") as f: newdata = f.read(None) self.data_items[dkey] = newdata if dprep: if replace: okey = Names.orig_key(dkey) if okey not in self.data_items: raise FabricError("There is no orginal {} to be replaced by {}".format(Names.dmsg(okey), Names.dmsg(dkey)), self.stamp) return if okey == dkey: self.stamp.Wmsg("Data to be replaced {} is identical to replacement".format(Names.dmsg(okey))) else: self.data_items[okey] = self.data_items[dkey] def _store_origin(self, origin): env = self.names.env origin_type = origin if origin == 'm' else origin[0] origin_spec = env['source'] if origin == 'm' else origin[1:] self.stamp.Nmsg("WRITING RESULT FILES for {}: {}".format(origin_type, origin_spec)) data_items = self.data_items for dkey in sorted(data_items): (dorigin, dgroup, dkind, ddir, dcomps) = Names.decomp_full(dkey) if dorigin == origin: self._store_file(dkey) def _store_file(self, dkey): (ism, dloc, dfile, dtype, dprep) = self.names.dinfo(dkey) dpath = "{}/{}".format(dloc, dfile) if dpath == None: return thedata = self.data_items[dkey] self.stamp.Dmsg("write {}".format(Names.dmsg(dkey))) if dtype == 'arr': with gzip.open(dpath, "wb", compresslevel=GZIP_LEVEL) as f: thedata.tofile(f) elif dtype == 'dct': with gzip.open(dpath, "wb", compresslevel=GZIP_LEVEL) as f: pickle.dump(thedata, f, protocol=PICKLE_PROTOCOL) elif dtype == 'str': with gzip.open(dpath, "wt", encoding="utf-8") as f: f.write(thedata) def _parse(self, origin): env = self.names.env origin_type = origin if origin == 'm' else origin[0] origin_spec = env['source'] if origin == 'm' else origin[1:] self.stamp.Nmsg("PARSING ANNOTATION FILES") env = self.names.env if origin_type == 'm': source_dir = env['{}_source_dir'.format(origin)] source_path = env['{}_source_path'.format(origin)] compiled_dir = env['{}_compiled_dir'.format(origin)] else: source_dir = env['annox'][origin_spec]['{}_source_dir'.format(origin_type)] source_path = env['annox'][origin_spec]['{}_source_path'.format(origin_type)] compiled_dir = env['annox'][origin_spec]['{}_compiled_dir'.format(origin_type)] self.cur_dir = os.getcwd() if not os.path.exists(source_path): raise FabricError("LAF header does not exists {}".format(source_path), self.stamp) try: os.chdir(source_dir) except os.error as e: raise FabricError("could not change to LAF source directory {}".format(source_dir), self.stamp, cause=e) try: if not os.path.exists(compiled_dir): os.makedirs(compiled_dir) except os.error as e: os.chdir(self.cur_dir) raise FabricError("could not create directory for compiled source {}".format(compiled_dir), self.stamp, cause=e) parse( origin, source_path, self.stamp, self.data_items, ) os.chdir(self.cur_dir) def _model(self, origin): self.stamp.Nmsg("MODELING RESULT FILES") model(origin, self.data_items, self.stamp) def __del__(self): self.stamp.Nmsg("END") for handle in (self.log,): if handle and not handle.closed: handle.close()