Source code for naginterfaces.base.utils

"""
Base utilities for the `naginterfaces` package.
"""

# NAG Copyright 2017-2021.

# pylint: disable=protected-access,too-many-lines

from __future__ import absolute_import

import array as _array
import collections as _collections
import configparser as _configparser
import ctypes as _ctypes
import functools as _functools
import inspect as _inspect
import io as _io
import logging as _logging
import os as _os
import socket as _socket
import struct as _struct
import subprocess as _subprocess
import sys as _sys
import warnings as _warnings

import numpy as _np

from ._nag_engine_data_consts import (
    NAG_ED_COL_MAJOR as _NAG_ED_COL_MAJOR,
    NAG_ED_ROW_MAJOR as _NAG_ED_ROW_MAJOR,
    NAG_ED_YES as _NAG_ED_YES,
)
from ._nag_p_consts import ( # pylint: disable=unused-import
    IERR_ALLOC_FAIL as _IERR_ALLOC_FAIL,
    IERR_HELPER as _IERR_HELPER,
    IERR_NO_LIC as _IERR_NO_LIC,
    IERR_SUCCESS as _IERR_SUCCESS,
    IERR_UNEXPECTED as _IERR_UNEXPECTED,
)

from .. import _THE_SYSTEM, ENGINE_LIBDIR as _ENGINE_LIBDIR, __version__

_PKG_NAME = 'naginterfaces'
_MODULE_NAME = '.'.join([_PKG_NAME, 'base.utils'])

def _join_sequence(
        sequence,
        separator=', ',
        last_separator=' or ',
):
    """
    Return a string joining the input sequence using separator,
    with last_separator between the final pair of elements.
    """
    if len(sequence) <= 1:
        return separator.join(sequence)

    if len(sequence) == 2:
        return last_separator.join(sequence)

    return last_separator.join([
        separator.join(sequence[:-1]),
        sequence[-1]
    ])

def _bad_instance_msg(
        exp_insts,
        allow_none,
):
    """
    Return a string for when a data-type payload was not any instance
    from the exp_insts list.
    """
    return ' '.join([
        'must be ' + ('' if not allow_none else 'None or ')  +
        'an instance of',
        _join_sequence(exp_insts)
    ])

def _bad_rank_msg(
        exp_rank,
        actual_rank,
):
    """
    Return a string for when a data-type payload was not of the expected
    rank.
    """
    return ' '.join([
        'has rank',
        str(actual_rank) + ',',
        'but must have rank',
        str(exp_rank),
    ])

def _bad_shape_msg(
        exp_shape,
        actual_shape,
):
    """
    Return a string for when a data-type payload was not of the expected
    shape
    """
    if len(exp_shape) == 1:

        if len(actual_shape) == 1:
            return ' '.join([
                'has length',
                str(actual_shape[0]) + ',',
                'but must have length',
                str(exp_shape[0])
            ])

        return ' '.join([
            'has shape',
            str(actual_shape) + ',',
            'but must be rank 1 with length',
            str(exp_shape[0])
        ])

    str_exp_shape = '(' + ', '.join([
        ':' if extent is None else str(extent) for extent in exp_shape
    ]) + ')'
    return ' '.join([
        'has shape',
        str(actual_shape) + ',',
        'but must have shape',
        str_exp_shape,
    ])

[docs]class NagException(Exception): """ Base class for errors from this package. Attributes ---------- errno : int or None When a call to the underlying NAG Library Engine flags an error or warning, this attribute is set to an integer identifier for the exit case. The value may be used to cross reference the failure in the corresponding NAG Library document for the routine in question. The ``__str__`` representation of an exception having non-``None`` ``errno`` will display a substring of the form ``code x[:y[,z]]`` in which the ``x`` integer is the same as the ``errno`` value (and where the ``y`` and ``z`` values, which may be absent, only have internal significance to NAG). return_data : collections.namedtuple (nag_return_data) or None When a call to the underlying NAG Engine flags a non-fatal warning and you have configured the Python ``warnings`` module to escalate NAG warnings to exceptions, this attribute is populated with the return arguments for the routine in question (in argument order). Thus after catching the warning you may access the routine's return data via this attribute if desired. The tuple subclass - i.e., the type of this attribute - is named ``nag_return_data``. Notes ----- Concrete subclasses ``NagLicenceError`` and ``NagMemoryError`` are potential exceptions that may be raised by any function in the `naginterfaces` package. """ _contact_nag = True _contact_nag_msg = 'Please contact NAG.' def __init__( self, locus, msg, ): """ Store the exception locus dict and exception message. """ Exception.__init__(self, locus, msg) self.locus = locus self.msg = msg self._errcodes = ( self.locus['errcodes'] if 'errcodes' in self.locus else None ) self.errno = self._errcodes[0] if self._errcodes is not None else None self.return_data = self.locus.get('return_data') def __str__( self, ): err_str = [] l_t_s = self._locus_to_string() if l_t_s: err_str.append(l_t_s) if self.msg and self.msg[0] != '\n': err_str.append(' ') err_str.append(self.msg) if self._contact_nag: if self.msg[-1] != '.': err_str[-1] += '.' err_str.extend([ '\n', '** ' + self._contact_nag_msg ]) return ''.join(err_str) def _locus_to_string( self ): """ Return a string formatting the current locus for printing. """ msg = [] if 'fun_name' in self.locus: msg.append('NAG Python function ' + self.locus['fun_name']) if 'returned' in self.locus: msg.append('returned ' + self.locus['returned']) elif 'entity' in self.locus: msg.append('argument ' + self.locus['entity']) if 'element' in self.locus: msg.append('element ' + str(self.locus['element'])) if 'errcodes' in self.locus: codes_str = 'code ' + str(self.errno) if len(self.locus['errcodes']) > 1: codes_str += ':' + ','.join( [str(l) for l in self.locus['errcodes'][1:]] ) msg.append(codes_str) if msg: return '(' + ', '.join(msg) + ')' return ''
[docs]class NagWarning(NagException, Warning): # pylint: disable=line-too-long """ Base class for warnings from NAG wrappers. As a subclass of the Python ``Warning`` class the behaviour of a ``NagWarning`` can be controlled using the methods in the core ``warnings`` module. Examples -------- To escalate occurrences of ``NagAlgorithmicWarning`` to exceptions and to never show occurrences of ``NagWithdrawalWarning`` (which would not be recommended in practice): >>> import warnings >>> from naginterfaces.base import utils >>> warnings.simplefilter('error', utils.NagAlgorithmicWarning) >>> warnings.simplefilter('ignore', utils.NagWithdrawalWarning) Any promoted ``NagAlgorithmicWarning`` exits can then be caught and their ``errno`` and ``return_data`` attributes accessed as required: >>> try: # doctest: +SKIP ... some_nag_call(...) ... except NagAlgorithmicWarning as exc: ... if exc.errno == xyz: ... look at exc.return_data See Also -------- :func:`naginterfaces.library.examples.glopt.nlp_multistart_sqp_lsq_ex.main` : This example catches a ``NagAlgorithmicWarning`` and retrieves some of its ``return_data``. :func:`naginterfaces.library.examples.roots.sys_func_rcomm_ex.main` : This example promotes instances of ``NagAlgorithmicWarning`` to exceptions. """ # pylint: enable=line-too-long _contact_nag = False
[docs]class NagAlgorithmicWarning(NagWarning): """ Concrete class for warnings from NAG wrappers that were identified by the NAG Library Engine. """ _contact_nag = False
[docs]class NagAlgorithmicMajorWarning(NagAlgorithmicWarning): """ Concrete class for major warnings from NAG wrappers that were identified by the NAG Library Engine. """ _contact_nag = False
[docs]class NagAttributeError(NagException, AttributeError): """ Concrete class for attribute errors from NAG wrappers. """ _contact_nag = False
[docs]class NagCallbackTerminateWarning(NagAlgorithmicWarning): """ Concrete class for warnings when early termination from a callback was triggered by the user. Attributes ---------- cb_excs : list of UserCallbackTerminate The original exception(s) raised by the user. """ _contact_nag = False def __init__( self, locus, msg, ): """ Extract the original exception from the locus. """ NagAlgorithmicWarning.__init__(self, locus, msg) self.cb_excs = self.locus['cb_excs']
[docs]class NagDeprecatedWarning(NagWarning): """ Concrete class for warnings about NAG wrappers that are deprecated. This category is not suppressed by default. """ _contact_nag = False
[docs]class NagIndexError(NagException, IndexError): """ Concrete class for index errors from NAG wrappers. """ _contact_nag = False
[docs]class NagKeyError(NagException, KeyError): """ Concrete class for key errors from NAG wrappers. """ _contact_nag = False
[docs]class NagLicenceError(NagException): """ Concrete class for licensing errors from NAG wrappers. """
[docs]class NagMemoryError(NagException, MemoryError): """ Concrete class for allocation errors from NAG wrappers. """ _contact_nag = False
[docs]class NagNotImplementedError(NagException, NotImplementedError): """ Concrete class for not-implemented errors from NAG wrappers. """ _contact_nag = True
[docs]class NagOSError(NagException, OSError): """ Concrete class for OS errors from NAG wrappers. """ _contact_nag = False
[docs]class NagOverflowError(NagException, OverflowError): """ Concrete class for overflow errors from NAG wrappers. """ _contact_nag = False
[docs]class NagTypeError(NagException, TypeError): """ Concrete class for type errors from NAG wrappers. """ _contact_nag = False
[docs]class NagValueError(NagException, ValueError): """ Concrete class for value errors from NAG wrappers. """ _contact_nag = False
[docs]class NagShapeError(NagValueError): """ Concrete class for shape errors from NAG wrappers. """
[docs]class NagWithdrawalWarning(NagDeprecatedWarning): """ Concrete class for warnings about NAG wrappers that are scheduled for withdrawal. This category is not suppressed by default. """
[docs]class UserCallbackTerminate(Exception): """ Concrete class for user-raised callback termination requests. """
_IS_IL32 = (_ctypes.c_int is _ctypes.c_long) # ctypes integers for this platform: _SYS_CTYPES_INTS = { 32: (_ctypes.c_long if _IS_IL32 else _ctypes.c_int), 64: (_ctypes.c_longlong if _IS_IL32 else _ctypes.c_long) } def _get_sys_data_model(): """ Return a string for the data model used by the Python interpreter on this platform. """ if _struct.calcsize('P') == 4: # pragma: no cover return 'ILP32' if _THE_SYSTEM == 'Windows': return 'IL32P64' return 'I32LP64' class _EngineData(_ctypes.Structure): """ State structure for the NAG Engine. """ # pylint: disable=too-few-public-methods _fields_ = [ ('cpuser', _ctypes.c_void_p), ('hlperr', _SYS_CTYPES_INTS[32]), ('ad_handle', _ctypes.c_void_p), ('storage_order', _SYS_CTYPES_INTS[32]), ('reorder_packed', _SYS_CTYPES_INTS[32]), ('allocate_workspace', _SYS_CTYPES_INTS[32]), ('wrapptr1', _ctypes.c_void_p), ('wrapptr2', _ctypes.c_void_p), ('y90_print_rec', _ctypes.c_void_p), ('num_threads', _SYS_CTYPES_INTS[32]), ('thread_num', _SYS_CTYPES_INTS[32]), ('hlperr_thread_num', _SYS_CTYPES_INTS[32]), ] default_sorder = 'C' default_spiked_sorder = 'F' def __init__( self, sorder=default_sorder, py_c_print_rec=None, ): """ Return an initialized _EngineData structure. """ _ctypes.Structure.__init__(self) y90haan_ctypes = _EngineState._get_symbol('y90haan') y90haan_ctypes.restype = None y90haan_ctypes.argtypes = ( _BYREF_ENGINEDATA, ) y90haan_ctypes(self) self._set_sorder(sorder) self.allocate_workspace = _NAG_ED_YES self._set_print_rec(py_c_print_rec) def _set_print_rec( self, py_c_print_rec, ): """ Update the y90_print_rec field. """ if ( py_c_print_rec is None or _EngineState._debug_level == _EngineState._debug_off ): return c_p_r = _C_PRINT_REC_FUNCTYPE(py_c_print_rec) self.y90_print_rec = _ctypes.cast(c_p_r, _ctypes.c_void_p) def _set_sorder( self, sorder, ): """ Update the storage-order field. """ if sorder == 'F': self.storage_order = _NAG_ED_COL_MAJOR else: self.storage_order = _NAG_ED_ROW_MAJOR _BYREF_ENGINEDATA = _ctypes.POINTER(_EngineData) def _cast_callback( proto, callback, locus, ): """ Return a ctypes FUNCTYPE for the input callback. """ if callback is None: return _ctypes.cast(callback, proto) try: return proto(callback) except TypeError: pass raise NagException( # pragma: no cover locus, 'unexpected failure casting to C function' ) def _get_symbol_from_lib_handle( libh, symbol_name, ): """ Utility for loading symbol_name from library handle libh. """ return getattr(libh, _mangle_sym_name(symbol_name)) def _mangle_sym_name( symbol_name, ): """ Mangle symbol_name into a valid shared-library symbol name on this platform. """ if ( ( _THE_SYSTEM == 'Windows' or _THE_SYSTEM.startswith('CYGWIN') ) and _ENGINE_DATA_MODEL != 'ILP32' ): return symbol_name.strip('_').upper() return symbol_name.strip('_').lower() + '_' _SPEC_CFG_FILE = _os.path.join( _os.path.abspath(_ENGINE_LIBDIR), 'lib_spec.cfg', ) if not _os.path.isfile(_SPEC_CFG_FILE): raise NagException( {}, 'missing ' + _SPEC_CFG_FILE, ) _CF = _configparser.ConfigParser() _CF.read(_SPEC_CFG_FILE) _ENGINE_COMPILERS_DICT = {} for _n, _v in _CF.items('compilers'): if _n in ['fortran', 'c', 'c++']: _dict_n = _n[0].upper() + _n[1:] else: # pragma: no cover _dict_n = _n _ENGINE_COMPILERS_DICT[_dict_n] = _v _ENGINE_DATA_MODEL = _CF.get('datamodel', 'model') _ENGINE_INT_BITS = 64 if _ENGINE_DATA_MODEL == 'ILP64' else 32 _ENGINE_BUILD_ID_STR = ( _CF.get('buildid', 'idnum') if ( _CF.has_section('buildid') and _CF.has_option('buildid', 'idnum') ) else None ) _ENGINE_PCODE = ( _CF.get('productcode', 'pcode') if ( _CF.has_section('productcode') and _CF.has_option('productcode', 'pcode') ) else None ) _ENGINE_LIB_NAME_FROM_SPEC = ( _CF.get('meta', 'lib_name') if ( _CF.has_section('meta') and _CF.has_option('meta', 'lib_name') ) else None ) if _ENGINE_PCODE is None: # pragma: no cover _ENGINE_PCODE = _ENGINE_LIB_NAME_FROM_SPEC if _ENGINE_PCODE is None: # pragma: no cover raise NagException( {}, 'missing productcode and lib_name in ' + _SPEC_CFG_FILE, ) def _get_sys_shlib_ext(): """ Return a string for the file extension of shared libraries on this platform. """ if _THE_SYSTEM == 'Darwin': return '.dylib' if _THE_SYSTEM == 'Linux': return '.so' if _THE_SYSTEM == 'Windows' or _THE_SYSTEM.startswith('CYGWIN'): return '.dll' raise NagException( # pragma: no cover {'fun_name': '_get_sys_shlib_ext'}, 'inconceivable platform' ) def _is_ascii( data, locus, ): """ Python 3+ check for data being an ASCII string. """ if isinstance(data, str): try: data.encode('ascii') except UnicodeError: return False else: return True elif isinstance(data, bytes): try: data.decode('ascii') except UnicodeError: return False else: return True else: raise NagException( # pragma: no cover locus, 'inconceivable string class' ) class _EngineExit(object): """ Base class for categorization of error exits from the NAG Engine. """ # pylint: disable=too-few-public-methods __slots__ = ['fun_name', 'entity', 'msg_lines', 'errcodes', 'return_data'] def __init__( self, fun_name=None, entity=None, msg_lines=None, errcodes=None, return_data=None, ): # pylint: disable=too-many-arguments self.fun_name = fun_name self.entity = entity self.msg_lines = msg_lines self.errcodes = errcodes self.return_data = return_data def manage(self): """ Take the appropriate action for this exit category. """ raise NotImplementedError # pragma: no cover def _set_locus(self): """ Construct the exception locus. """ locus = {} if self.fun_name is not None: locus['fun_name'] = self.fun_name if self.errcodes is not None: locus['errcodes'] = self.errcodes if self.entity is not None: locus['entity'] = self.entity if self.return_data is not None: locus['return_data'] = self.return_data return locus class _EngineNoErrorExit(_EngineExit): """ Concrete class for successful exits from the NAG Engine. """ # pylint: disable=too-few-public-methods __slots__ = [] def manage(self): return class _EngineWarningExit(_EngineExit): """ Concrete class for warning exits from the NAG Engine. """ # pylint: disable=too-few-public-methods __slots__ = [] def manage(self): _raise_warning( NagAlgorithmicWarning, self.msg_lines, self._set_locus(), ) class _EngineMajorWarningExit(_EngineWarningExit): """ Concrete class for major warning exits from the NAG Engine. """ # pylint: disable=too-few-public-methods __slots__ = [] def manage(self): _raise_warning( NagAlgorithmicMajorWarning, self.msg_lines, self._set_locus(), ) class _EngineErrorExit(_EngineExit): """ Concrete class for routine-specific error exits from the NAG Engine. """ # pylint: disable=too-few-public-methods __slots__ = [] def manage(self): raise NagValueError( self._set_locus(), _format_msg_lines(self.msg_lines) ) class _EngineAllocationErrorExit(_EngineExit): """ Concrete class for memory-allocation error exits from the NAG Engine. """ # pylint: disable=too-few-public-methods __slots__ = [] def manage(self): locus = self._set_locus() locus['errcodes'] = (_IERR_ALLOC_FAIL,) raise NagMemoryError(locus, _format_msg_lines( [ 'Dynamic memory allocation failed.', ] )) class _UserCallbackExit(_EngineWarningExit): """ Concrete class for early termination exits from callbacks. """ # pylint: disable=too-few-public-methods __slots__ = ['cb_excs'] def __init__( self, fun_name=None, entity=None, msg_lines=None, errcodes=None, return_data=None, cb_excs=None, ): # pylint: disable=too-many-arguments _EngineWarningExit.__init__( self, fun_name, entity, msg_lines, errcodes, return_data, ) self.cb_excs = cb_excs def manage(self): locus = self._set_locus() locus['cb_excs'] = self.cb_excs _raise_warning( NagCallbackTerminateWarning, self.msg_lines, locus, ) def _resolve_kpath(k_util): """ Return the absolute path of `k_util`. """ k_util_name = k_util if _THE_SYSTEM == 'Windows' or _THE_SYSTEM.startswith('CYGWIN'): k_util_name += '.exe' k_util_path = _os.path.join( _os.path.abspath(_ENGINE_LIBDIR), k_util_name ) if not _os.path.isfile(k_util_path): raise NagException( # pragma: no cover {}, 'Missing utility ' + k_util_path ) return k_util_path def _capture_hostid(): """ Return the output of ``khostid``. """ try: return _EngineCharScalarType.to_py( _subprocess.check_output(_resolve_kpath('khostid')) ) except _subprocess.CalledProcessError: print( "khostid returned non zero return code. " "Most likely no valid ethernet interface is present." ) return "NO_VALID_HOST_ID" def _capture_lcheck(pcode, quiet=False): """ Return the output of ``klcheck`` with some postprocessing. """ if not pcode.endswith('L'): return ( 'Licence available; this product is not licence managed' ) cmd_list = [_resolve_kpath('klcheck')] if quiet: cmd_list.append('-quiet') cmd_list.append(pcode) cmd_output = _subprocess.check_output(cmd_list) try: lcheck_decode = cmd_output.decode(encoding='utf-8') except UnicodeDecodeError: lcheck_decode = cmd_output.decode(encoding='latin-1') lcheck_txt = lcheck_decode.rstrip().splitlines() for lcheck_i, lcheck_line in enumerate(lcheck_txt): if lcheck_line.startswith('Licence available'): lcheck_txt[lcheck_i] = ( 'Licence available; the required ' + pcode + ' licence key ' 'for this product is valid' ) elif lcheck_line in [ 'Error: Licence file not found', 'Error: No licence found for this product' ]: lcheck_txt[lcheck_i] = ( 'Error: Licence not found; this product requires a key for ' + pcode ) return '\n'.join(lcheck_txt) def _lcheck_lines(pcode, quiet=False): """ Return a list of lines forming a licensing message. """ l_line = _capture_lcheck(pcode, quiet=quiet) if 'Licence available' in l_line: return [l_line] lc_lines = [ 'The NAG Library for Python on this platform uses', 'underlying Library ' + pcode + '.', 'This Library has been installed as part of the package', 'and it requires a valid licence key.', 'No such key could be validated:', 'the key may not have been installed correctly or', 'it may have expired.', 'The Kusari licence-check utility reports the following:', ] lc_lines.append(l_line) lc_lines.append( 'The above information has been generated on machine ' + _socket.gethostname() ) lc_lines.extend([ 'For information on how to obtain a licence, please see', 'https://www.nag.com/numeric/py/nagdoc_latest/' 'naginterfaces.kusari.html', ]) return lc_lines class _EngineLicenceErrorExit(_EngineExit): """ Concrete class for licensing error exits from the NAG Engine. """ # pylint: disable=too-few-public-methods __slots__ = [] def manage(self): from .info import impl_details # pylint:disable=bad-option-value,import-outside-toplevel pcode = impl_details()['product_code'] locus = self._set_locus() locus['errcodes'] = (_IERR_NO_LIC,) msg_lines = _lcheck_lines(pcode) msg_lines.extend([ _capture_hostid(), 'PRODUCT CODE = ' + pcode, ]) raise NagLicenceError(locus, _format_msg_lines(msg_lines)) class _EngineUnexpectedExit(_EngineExit): """ Concrete class for unexpected exits from the NAG Engine. """ # pylint: disable=too-few-public-methods __slots__ = [] def manage(self): locus = self._set_locus() new_errcodes = [_IERR_UNEXPECTED] if len(locus['errcodes']) > 1: new_errcodes.extend(locus['errcodes'][1:]) locus['errcodes'] = tuple(new_errcodes) raise NagException(locus, _format_msg_lines(['Unexpected error.'])) def _format_msg_lines(msg_lines): """ Formats msg_lines into a single string for printing as an error/warning message. """ return '\n' + '\n'.join( ['** ' + line for line in msg_lines] ) class _EngineState(object): """ Class for the data state of the underlying NAG Engine on this platform. Raises ------ NagOSError if running using 32-bit Python with a 64-bit Engine or vice versa. NagOSError if the underlying Engine is 32-bit and the system is not Windows. NagTypeError upon instantiation (only the single class is permitted). """ # pylint: disable=too-few-public-methods _compilers = _ENGINE_COMPILERS_DICT _sys_functype = staticmethod(_ctypes.CFUNCTYPE) if ( _THE_SYSTEM != 'Windows' and not _THE_SYSTEM.startswith('CYGWIN') and _ENGINE_DATA_MODEL == 'ILP32' ): # pragma: no cover raise NagOSError( {}, 'this product does not support use with a 32-bit NAG library ' 'on this system.' ) if ( _ENGINE_DATA_MODEL[-2:] != _get_sys_data_model()[-2:] ): # pragma: no cover raise NagOSError( {}, 'the Python interpreter is ' + _get_sys_data_model()[-2:] + '-bit ' 'but the underlying NAG library is ' + _ENGINE_DATA_MODEL[-2:] + '-bit. Please run using a matching pair.' ) _int_model = { 'bits': _ENGINE_INT_BITS, 'ctypes': _SYS_CTYPES_INTS[_ENGINE_INT_BITS], 'numpy': getattr(_np, 'int' + str(_ENGINE_INT_BITS)) } if _compilers['Fortran'] == 'Intel' and _ENGINE_DATA_MODEL != 'ILP32': _chrlen_ctype = _SYS_CTYPES_INTS[64] else: _chrlen_ctype = _SYS_CTYPES_INTS[32] if _compilers['Fortran'] == 'Intel': _literal_true_int = -1 elif _compilers['Fortran'] == 'NAG': _literal_true_int = 1 else: raise NagException( # pragma: no cover {}, 'NYI for Fortran compiler' ) _literal_true_cint = _int_model['ctypes'](_literal_true_int) if ( _compilers['Fortran'] == 'Intel' or ( _compilers['Fortran'] == 'NAG' and ( _THE_SYSTEM == 'Windows' or _THE_SYSTEM.startswith('CYGWIN') ) and _ENGINE_DATA_MODEL != 'ILP32' ) ): _complex_fun_rtn = 'as argument' else: _complex_fun_rtn = 'as retval' _engine_lib_base = ( _ENGINE_LIB_NAME_FROM_SPEC if _ENGINE_LIB_NAME_FROM_SPEC is not None else _ENGINE_PCODE ) _engine_lib = _engine_lib_base + _get_sys_shlib_ext() if _THE_SYSTEM == 'Windows' or _THE_SYSTEM.startswith('CYGWIN'): _cwd = _os.getcwd() if not _os.path.isdir(_ENGINE_LIBDIR): # pragma: no cover _have_handle = False msg = 'No such directory ' + _ENGINE_LIBDIR else: try: dll_dirs = _os.add_dll_directory(_ENGINE_LIBDIR) except AttributeError: # pragma: no cover dll_dirs = None try: _ctypes.CDLL("libiomp5md") except: # pylint: disable=bare-except pass _os.chdir(_ENGINE_LIBDIR) try: _engine_handle = _ctypes.CDLL(_engine_lib) except OSError as exc: _have_handle = False msg = str(exc) else: _have_handle = True _os.chdir(_cwd) if dll_dirs is not None: # pragma: no cover dll_dirs.close() else: try: _engine_handle = _ctypes.CDLL( _os.path.join(_ENGINE_LIBDIR, _engine_lib) ) except OSError as exc: _have_handle = False msg = str(exc) else: _have_handle = True if not _have_handle: pretty_bits = ('32' if _struct.calcsize('P') == 4 else '64') + '-bit' if _THE_SYSTEM == 'Windows' or _THE_SYSTEM.startswith('CYGWIN'): pretty_sys = 'Windows' elif _THE_SYSTEM == 'Darwin': pretty_sys = 'Mac' elif _THE_SYSTEM == 'Linux': pretty_sys = 'Linux' else: raise NagException( # pragma: no cover {}, 'inconceivable system ' + _THE_SYSTEM, ) diag_msg_lines = [ ' '.join([ 'The', pretty_bits, pretty_sys, _PKG_NAME, 'package', 'version', __version__ ]), 'failed to obtain a handle to the NAG library ' + _ENGINE_PCODE + '.', 'It is possible that incompatible Intel MKL or compiler runtimes ' 'have been loaded from the environment.', 'Please read the compatibility notes at', 'https://www.nag.com/numeric/py/nagdoc_latest/readme.html' '#software-stack-compatibility-notes', msg, ] raise NagException( {}, _format_msg_lines(diag_msg_lines), ) _errbuf_sep = b'\1' ( _debug_off, _debug_args_short, _debug_args_full, _debug_naninf_counts ) = range(4) _debug_level = _debug_off _break_ctypes_ptr_ref_cycles = False _cast_pointer_to_ctypes_array = True def __new__( cls, *args, **kwargs ): # pylint: disable=unused-argument raise NagTypeError( {}, 'instances of _EngineState class not permitted' ) @classmethod def _call_char_fun( cls, fun, chrlen, *args ): """ Return the result of calling the fixed-length character-valued ctypes function fun with arguments args. """ result = _EngineCharScalarType.empty_ctypes(chrlen=chrlen) fun(result, chrlen, *args) return result @classmethod def _format_char_cfun_proto( cls, *args ): """ Return a ctypes FUNCTYPE for a callback function having arguments args and returning fixed-length character data. """ return _EngineState._sys_functype( None, *cls._format_char_fun_args( *args ) ) @classmethod def _format_char_fun_proto( cls, fun, *args ): """ Update the ctypes prototype attributes for function fun having arguments args, when fun returns fixed-length character data. """ fun.restype = None fun.argtypes = cls._format_char_fun_args(*args) @classmethod def _format_char_fun_args( cls, *args ): """ Return an argument tuple for a function having arguments args and returning fixed-length character data. """ argtypes = [_EngineCharScalarType.c_type, _EngineState._chrlen_ctype] argtypes.extend(args) return tuple(argtypes) @classmethod def _call_complex_fun( cls, fun, *args ): """ Return the result of calling the complex-valued ctypes function fun with arguments args. """ if cls._complex_fun_rtn == 'as argument': result = fun.argtypes[0]._type_() fun(result, *args) else: result = fun(*args) return result @classmethod def _format_complex_cfun_proto( cls, fun_type, *args ): """ Return a ctypes FUNCTYPE for a callback function having arguments args and returning complex data of type fun_type. """ return _EngineState._sys_functype( cls._format_complex_fun_rtn(fun_type), *cls._format_complex_fun_args( fun_type, *args ) ) @classmethod def _format_complex_fun_args( cls, fun_type, *args ): """ Return an argument tuple for a function having arguments args and returning complex data of type fun_type. """ if cls._complex_fun_rtn == 'as argument': argtypes = [_ctypes.POINTER(fun_type)] argtypes.extend(args) return tuple(argtypes) return args @classmethod def _format_complex_fun_proto( cls, fun, fun_type, *args ): """ Update the ctypes prototype attributes for function fun having arguments args, when fun returns complex data of type fun_type. """ fun.restype = cls._format_complex_fun_rtn(fun_type) fun.argtypes = cls._format_complex_fun_args(fun_type, *args) @classmethod def _format_complex_fun_rtn( cls, fun_type, ): """ Return a ctypes function-result type for a function returning complex data of type fun_type. """ return None if cls._complex_fun_rtn == 'as argument' else fun_type @classmethod def _from_bool_to_int( cls, bool_data, ): """ Return an Engine-compatible integer for the input boolean. """ return 0 if not bool_data else cls._literal_true_int @classmethod def _get_symbol( cls, symbol_name, ): """ Utility for loading symbol_name from the Engine handle. """ return _get_symbol_from_lib_handle( cls._engine_handle, symbol_name, ) @classmethod def _modify_unit( cls, modify_mode, unit_name, unit, ): """ Set or get the Engine unit_name from unit number unit (a ctypes int). Raises NagException if modify_mode or unit_name are invalid. """ allowed_modes = ['set', 'get'] if modify_mode not in allowed_modes: raise NagException( # pragma: no cover { 'fun_name': '_EngineState._modify_unit', 'entity': 'modify_mode' }, ' '.join([ 'must be', _join_sequence(['"' + a_m + '"' for a_m in allowed_modes]) ]) ) allowed_names = ['adv', 'err'] if unit_name not in allowed_names: raise NagException( # pragma: no cover { 'fun_name': '_EngineState._modify_unit', 'entity': 'unit_name' }, ' '.join([ 'must be', _join_sequence(['"' + a_n + '"' for a_n in allowed_names]) ]) ) if unit_name == 'adv': unit_manager_name = 'x04abft' elif unit_name == 'err': unit_manager_name = 'x04aaft' else: raise NagException( # pragma: no cover {'fun_name': '_EngineState._modify_unit'}, 'inconceivable unit name' ) unit_manager = _get_symbol_from_lib_handle( cls._engine_handle, unit_manager_name ) unit_manager.restype = None unit_manager.argtypes = ( _BYREF_ENGINEDATA, _BYREF_INT, # Input, iflag. _BYREF_INT, # Input/output, nerr/nadv resp. ) _en_data = _EngineData() if modify_mode == 'set': _iflag = _EngineIntScalarType.c_type(1) elif modify_mode == 'get': _iflag = _EngineIntScalarType.c_type(0) else: raise NagException( # pragma: no cover {'fun_name': '_EngineState._modify_unit'}, 'inconceivable modify mode' ) old_debug_level = cls._debug_level cls._set_debug_level(cls._debug_off) unit_manager(_en_data, _iflag, unit) cls._set_debug_level(old_debug_level) return _EngineIntScalarType.to_py(unit) @classmethod def _set_unit( cls, unit_name, unit, ): """ Set the Engine unit_name to unit number unit. """ _unit = _EngineIntScalarType( unit, {'fun_name': '_EngineState._set_unit', 'entity': 'unit'} ).to_ctypes() cls._modify_unit('set', unit_name, _unit) @classmethod def _get_unit( cls, unit_name, ): """ Return an int for the unit number of Engine unit unit_name. """ _unit = _EngineIntScalarType.empty_ctypes() return cls._modify_unit('get', unit_name, _unit) @classmethod def _set_debug_level( cls, level, ): """ Set the current debug printing or NaN/infinity detection flag for arguments in NAG computational and dummy callback routines. When printing is enabled, each argument is printed on input and/or output (depending on its input/output class), and if possible the memory address of the argument is also shown. Output is done using Python's `logging` module, to the logger named 'naginterfaces.base.<chapter>.<function>', at logging level `DEBUG`. The default is no printing and no NaN/infinity detection, which may be explicitly requested by calling with `level` = `_debug_off`. A call with `level` = `_debug_args_full` enables printing of all arguments on entry and exit in subsequent calls of procedures in the NAG Library Engine, and this includes full printing of arrays (ignoring array increments). A diagnostic message will be output if for any reason the extent of an array is not known at run time. A call with `level` = `_debug_args_short` requests elided array printing (only the first and last elements being displayed). A call with `level` = `_debug_naninf_counts` indicates that no printing is to take place, but all floating-point scalar and array values are examined and a count of NaN/infinity values maintained. Thread-Safety Considerations ---------------------------- The debug printing level in the NAG Engine is managed by a global variable, and setting the level using `_set_debug_level` requires modifying this global variable. Examples -------- To send output from `naginterfaces.library.lapacklin.dgesv` (sic) to file 'spam.txt': >>> from logging import DEBUG, getLogger ... >>> logger = getLogger('naginterfaces.base.lapacklin.dgesv') >>> logger.setLevel(DEBUG) >>> fh = FileHandler('spam.txt') # doctest: +SKIP >>> logger.addHandler(fh) # doctest: +SKIP """ if level not in [ cls._debug_off, cls._debug_args_short, cls._debug_args_full, cls._debug_naninf_counts, ]: raise NagValueError( { 'fun_name': '.'.join( [_MODULE_NAME, '_EngineState._set_debug_level'] ), 'entity': 'level' }, 'must be _debug_off, _debug_args_short, ' + '_debug_args_full or _debug_naninf_counts' ) y90gcft_ctypes = _get_symbol_from_lib_handle( cls._engine_handle, 'y90gcft' ) y90gcft_ctypes.restype = None y90gcft_ctypes.argtypes = ( _ctypes.POINTER(cls._int_model['ctypes']), # Input, ival. ) _level = _EngineIntScalarType( level, { 'fun_name': '.'.join( [_MODULE_NAME, '_EngineState._set_debug_level'] ), 'entity': 'level', }, ).to_ctypes() y90gcft_ctypes(_level) cls._debug_level = level @classmethod def _handle_global_errors( cls, fun_name, errbuf, ifail, ): """ Process global (allocation, licence, ...) failures. Return split errbuf data otherwise. """ if ifail == _IERR_ALLOC_FAIL: _EngineAllocationErrorExit(fun_name=fun_name).manage() elif ifail == _IERR_NO_LIC: _EngineLicenceErrorExit(fun_name=fun_name).manage() elif ifail == _IERR_SUCCESS: return 0, None eb_data = cls._split_errbuf(errbuf) return int(eb_data[1]), eb_data[2:] @classmethod def _split_errbuf( cls, errbuf, ): """ Return a list of the split contents of the input errbuf c_char_Array. """ split_eb = [ _EngineCharScalarType.to_py(ed).lstrip() for ed in errbuf[:].strip(b'\0 ' + cls._errbuf_sep).split(cls._errbuf_sep) if ed ] if len(split_eb) < 3: raise NagException( # pragma: no cover {'fun_name': '_split_errbuf'}, 'inconceivable errbuf structure' ) return split_eb
[docs]class EngineIntCType(_EngineState._int_model['ctypes']): # pylint: disable=too-few-public-methods """ Class for the ``ctypes`` integer type that is compatible with the NAG Library Engine's integer type. The base class is platform dependent. Examples -------- >>> from naginterfaces.base import utils >>> i = utils.EngineIntCType(42) """
[docs]class EngineIntNumPyType(_EngineState._int_model['numpy']): # pylint: disable=too-few-public-methods """ Class for the NumPy integer type that is compatible with the NAG Library Engine's integer type. The base class is platform dependent. Examples -------- >>> import numpy as np >>> from naginterfaces.base import utils >>> i = np.array([42, 53], dtype=utils.EngineIntNumPyType) """
class _Debug(object): """ Context-management class for NAG Library Engine debug dumping. """ # pylint: disable=too-few-public-methods __slots__ = [ '_filename', '_level', '_old_log_level', '_old_debug_level', '_pop_handler', ] def __init__(self, filename=None, level=_EngineState._debug_args_short): """ Initialize `level`-level Engine debug dumping. The default level is `_EngineState._debug_args_short`. (See also `_EngineState._set_debug_level`.) Examples -------- >>> from naginterfaces.base import utils >>> with utils._Debug(): ... pass """ self._filename = filename self._level = level self._old_log_level = None self._old_debug_level = None self._pop_handler = False def __enter__(self): """ Activate Engine debug dumping. Save the current logging and debug levels. Output using Python's `logging` module, via the root logger. This is configured using `basicConfig(filename, DEBUG)`; i.e., a root logger with handlers already configured will not be affected. Note that if `filename` was not supplied to the instance initializer, or was None, this implies that output through a previously unconfigured root logger is sent to `sys.stderr`. """ self._old_log_level = _logging.getLogger().getEffectiveLevel() self._old_debug_level = _EngineState._debug_level self._pop_handler = not _logging.getLogger().handlers _logging.basicConfig(filename=self._filename, level=_logging.DEBUG) _EngineState._set_debug_level(self._level) def __exit__(self, *args): """ Restore the previous debug state. If on entry the root logger had no pre-configured handlers, here the list of root handlers is cleared. """ _EngineState._set_debug_level(self._old_debug_level) _logging.getLogger().setLevel(self._old_log_level) if self._pop_handler: our_handler = _logging.getLogger().handlers.pop() our_handler.close() class _NagLibOutput(object): """ Context-management class for restoring NAG Library output I/O units. Thread-Safety Considerations ---------------------------- The default output I/O units in the NAG Engine are managed by global variables, and this context manager requires modifying these global variables. Examples -------- >>> from naginterfaces.base import utils >>> with utils._NagLibOutput(): ... pass """ # pylint: disable=too-few-public-methods __slots__ = [] def __enter__(self): FileObjManager._set_default_units() def __exit__(self, *args): FileObjManager._set_ni_units() def _break_ref_cycles(func): """ Decorate with forced breaking of ctypes pointer reference cycles. """ @_functools.wraps(func) def wrapper(*args, **kwargs): # pylint: disable=missing-docstring old_break = _EngineState._break_ctypes_ptr_ref_cycles _EngineState._break_ctypes_ptr_ref_cycles = True result = func(*args, **kwargs) _EngineState._break_ctypes_ptr_ref_cycles = old_break return result return wrapper
[docs]class Handle(_ctypes.c_void_p): """ Class for communication handles in the NAG Library Engine. Overrides ``__copy__``. Overrides ``__deepcopy__``. Raises ------ NagAttributeError Shallow or deep copy was requested; these operations are prohibited for this class. See Also -------- :func:`naginterfaces.library.examples.blgm.lm_formula_ex.main` : This example uses ``Handle`` instances. """ # pylint: disable=too-few-public-methods def __copy__(self): locus = {'fun_name': '.'.join([_MODULE_NAME, 'Handle.__copy__'])} raise NagAttributeError( locus, 'shallow copy not permitted' ) def __deepcopy__(self, memo): locus = {'fun_name': '.'.join([_MODULE_NAME, 'Handle.__deepcopy__'])} raise NagAttributeError( locus, 'deep copy not permitted' )
_BYREF_HANDLE = _ctypes.POINTER(Handle) _BYREF_C_PTR = _ctypes.POINTER(_ctypes.c_void_p) class _EngineType(object): """ Base class for data passed to the NAG Engine. """ # pylint: disable=too-few-public-methods __slots__ = ['allow_none', 'data', 'locus'] _module_key_ctypes = 'ctypes' _module_key_ni = _MODULE_NAME _module_key_numpy = 'numpy' _module_key_python = 'builtin' def __init__( self, data, locus, allow_none=False, ): """ Store the payload data and wrapper locus. """ self.data = data self.locus = locus self.allow_none = allow_none class _EngineScalarType(_EngineType): """ Base class for scalar NAG Engine data. Extends __init__. """ # pylint: disable=too-few-public-methods __slots__ = [] c_type = lambda *args: None # pragma: no cover numpy_type = lambda *args: None # pragma: no cover type_map = {} def __init__( self, data, locus, allow_none=False, ): """ Raises NagTypeError if the payload is not a compatible instance for scalar Engine data. """ _EngineType.__init__(self, data, locus, allow_none) self._check_scalar_type_compat() @staticmethod def _copy_ctypes_data( data, target, ): """ Copy data from ctypes data to ctypes target. """ if data is not None: target.value = data.value @classmethod def empty_ctypes( cls, chrlen=None, # pylint: disable=unused-argument ): """ Return an empty ctypes scalar with the same data type as the payload. """ return (cls.c_type)() def refresh_ctypes_buffer( self, ctypes_buffer, ): """ Refresh ctypes_buffer with the contents of the payload. """ self._copy_ctypes_data(self.to_ctypes(), ctypes_buffer.contents) def to_ctypes( self, ): """ Return the underlying C-compatible cast of the payload. """ if self.allow_none and self.data is None: return self.data if isinstance(self.data, self.c_type): # pylint: disable=isinstance-second-argument-not-valid-type return self.data return self.c_type(self.data) def _check_scalar_type_compat( self, ): """ Raises NagTypeError if the payload is not an instance of any scalar type in the type_map. """ if self.allow_none and self.data is None: return if not isinstance( self.data, tuple([t for ttup in self.type_map.values() for t in ttup]) # pylint: disable=consider-using-generator ): raise NagTypeError( self.locus, _bad_instance_msg( self._type_map_to_strlist(), allow_none=self.allow_none, ) ) def _type_map_to_strlist( self, ): """ Return a list of sorted qualified type names from the type_map. """ type_names = [] for mkey, mval in self.type_map.items(): for type_val in mval: full_name = [] if ( mkey == self._module_key_ctypes and type_val.__name__ == 'Handle' ): continue if mkey != self._module_key_python: full_name.append(mkey) full_name.append(type_val.__name__) type_names.append('.'.join(full_name)) return sorted(type_names) class _EngineCharScalarType(_EngineScalarType): """ Concrete class for character scalar NAG Engine data. Extends __init__. Overrides empty_ctypes. Overrides refresh_ctypes_buffer. Overrides to_ctypes. """ # pylint: disable=too-few-public-methods __slots__ = ['chrlen'] _valid_np_string_classes = (_np.bytes_, _np.str_) _valid_py_string_classes = (bytes, str) numpy_type = _np.str_ c_type = _ctypes.POINTER(_ctypes.c_char) type_map = { _EngineScalarType._module_key_numpy: _valid_np_string_classes, _EngineScalarType._module_key_python: _valid_py_string_classes, } def __init__( self, data, locus, exp_chrlen, allow_none=False, ): """ Raises NagTypeError from the super __init__ if the payload data is not the correct type. Raises NagValueError if the payload contains non-ASCII characters. Raises NagValueError if the payload exceeds a required length. """ _EngineScalarType.__init__(self, data, locus, allow_none) if self.allow_none and self.data is None: self.chrlen = 0 elif exp_chrlen is None: self.chrlen = len(self.data) else: self.chrlen = exp_chrlen self._check_is_ascii() @classmethod def empty_ctypes( cls, chrlen=None, ): """ Return an empty ctypes string buffer with the same data type as the payload. """ if chrlen is None or chrlen <= 0: return _NullEngineCharArray() new_arr_cls = _ctypes.c_char*chrlen globals()[new_arr_cls.__name__] = new_arr_cls return new_arr_cls() def refresh_ctypes_buffer( self, ctypes_buffer, ): """ Refresh ctypes_buffer with the contents of the payload. """ if self.allow_none and self.data is None: return ct_data = self.to_ctypes() for i, ct_char in enumerate(ct_data): ctypes_buffer[i] = ct_char def to_ctypes( self, ): """ Return the underlying C-compatible cast of the payload taking care to explicitly encode to ASCII in Python 3+. If the payload is shorter than a non-None chrlen then it is blank padded on the right to this length. """ if self.allow_none and self.data is None: return self.data if isinstance(self.data, (str, _np.str_)): str_to_cast = self.data.encode('ascii') else: str_to_cast = self.data if self.chrlen > len(self.data): str_to_cast = str_to_cast.ljust(self.chrlen) return _ctypes.create_string_buffer( str_to_cast[:self.chrlen], size=self.chrlen ) @classmethod def to_py( cls, data, chrlen=None, ): """ Return the value of ctypes (char_p) data as a str. """ if data is None: return data data_block = ( data[:chrlen] if chrlen is not None else data[:] ) return data_block.decode(encoding='ascii').rstrip() to_py_from_retval = to_py def _check_is_ascii( self, ): """ Raises NagValueError if the payload contains non-ASCII characters. """ if self.allow_none and self.data is None: return if not _is_ascii(self.data[:self.chrlen], self.locus): raise NagValueError( self.locus, 'must ' + ('' if not self.allow_none else 'be None or ') + 'comprise only ASCII characters' ) _BYREF_STRING = _EngineCharScalarType.c_type class _EngineComplexScalarType(_EngineScalarType): # pylint: disable=too-few-public-methods """ Abstract class for complex scalar NAG Engine data. Overrides _copy_ctypes_data. Overrides to_ctypes. """ __slots__ = [] @staticmethod def _copy_ctypes_data( data, target, ): """ Copy data from ctypes data to ctypes target. """ if data is None: return target.real = data.real target.imag = data.imag def to_ctypes( self, ): """ Cast to the associated Structure. """ if self.allow_none and self.data is None: return self.data return self.c_type(self.data.real, self.data.imag) @classmethod def _to_py_complex( cls, data, ): """ Cast back from the Structure to Python native complex. """ return complex(data.real, data.imag) def _set_complex_fields(c_type): """ Return a _fields_ list for a complex C structure of type c_type. """ return [ ('real', c_type), ('imag', c_type), ] class _EngineComplexStructure(_ctypes.Structure): # pylint: disable=too-few-public-methods """ Abstract class for complex scalar NAG Engine data. """ _component_c_type = lambda *args: None # pragma: no cover class _EngineComplex64Structure(_EngineComplexStructure): # pylint: disable=too-few-public-methods """ Concrete C structure for single-precision complex. """ _component_c_type = _ctypes.c_float _fields_ = _set_complex_fields(_component_c_type) class _EngineComplex64ScalarType(_EngineComplexScalarType, _EngineScalarType): """ Concrete class for single-precision complex scalar NAG Engine data. """ # pylint: disable=too-few-public-methods __slots__ = [] c_type = _EngineComplex64Structure numpy_type = _np.complex64 type_map = { _EngineScalarType._module_key_numpy: (numpy_type,), } @classmethod def to_py( cls, data, chrlen=None, # pylint: disable=unused-argument ): """ Return the complex64 representation of the input data. """ if data is None: return data return cls.numpy_type(cls._to_py_complex(data)) to_py_from_retval = to_py _BYREF_COMPLEX64 = _ctypes.POINTER(_EngineComplex64ScalarType.c_type) class _EngineComplex128Structure(_EngineComplexStructure): # pylint: disable=too-few-public-methods """ Concrete C structure for double-precision complex. """ _component_c_type = _ctypes.c_double _fields_ = _set_complex_fields(_component_c_type) class _EngineComplex128ScalarType(_EngineComplexScalarType, _EngineScalarType): """ Concrete class for double-precision complex scalar NAG Engine data. """ # pylint: disable=too-few-public-methods __slots__ = [] c_type = _EngineComplex128Structure numpy_type = _np.complex128 type_map = { _EngineScalarType._module_key_numpy: (numpy_type,), _EngineScalarType._module_key_python: (complex,), } @classmethod def to_py( cls, data, chrlen=None, # pylint: disable=unused-argument ): """ Return the complex128 representation of the input data. """ if data is None: return data return cls._to_py_complex(data) to_py_from_retval = to_py _BYREF_COMPLEX128 = _ctypes.POINTER(_EngineComplex128ScalarType.c_type) class _EngineCptrScalarType(_EngineScalarType): """ Concrete class for void* scalar NAG Engine data. Overrides to_ctypes. """ # pylint: disable=too-few-public-methods __slots__ = [] c_type = Handle type_map = { _EngineScalarType._module_key_ctypes: (c_type,), _EngineScalarType._module_key_ni: (Handle,), } def to_ctypes( self, ): return self.data @classmethod def to_py( cls, data, chrlen=None, # pylint: disable=unused-argument ): """ Return the original data (which must therefore already be a Handle). """ return data _BUILTIN_INT_TYPES = (int,) _INT_TYPE_MAP = { _EngineScalarType._module_key_ctypes: ( _SYS_CTYPES_INTS[32], _SYS_CTYPES_INTS[64] ), _EngineScalarType._module_key_numpy: (_np.int32, _np.int64), _EngineScalarType._module_key_python: _BUILTIN_INT_TYPES, } _PROMOTABLE_INTS = tuple([ # pylint: disable=consider-using-generator t for tkey in ( _EngineScalarType._module_key_numpy, _EngineScalarType._module_key_python, ) for t in _INT_TYPE_MAP[tkey] ]) def _is_promotable_int(data): """ Return True if we consider scalar int data to be appropriate for promotion to float. """ return ( data is not None and isinstance(data, _PROMOTABLE_INTS) ) class _EngineFloatScalarType(_EngineScalarType): # pylint: disable=too-few-public-methods """ Abstract class for float scalar NAG Engine data. Extends __init__. """ __slots__ = [] def __init__( self, data, locus, allow_none=False, ): """ Promotes incoming data if an appropriate int type. """ _EngineScalarType.__init__( self, data, locus, allow_none, ) if _is_promotable_int(self.data): self.data = self.numpy_type(self.data) class _EngineFloat32ScalarType(_EngineFloatScalarType, _EngineScalarType): """ Concrete class for 32-bit float scalar NAG Engine data. """ # pylint: disable=too-few-public-methods __slots__ = [] c_type = _ctypes.c_float numpy_type = _np.float32 _numpy_types = [] _numpy_types.extend(_INT_TYPE_MAP[_EngineScalarType._module_key_numpy]) _numpy_types.append(numpy_type) _builtin_types = [] _builtin_types.extend(_INT_TYPE_MAP[_EngineScalarType._module_key_python]) type_map = { _EngineScalarType._module_key_ctypes: (c_type,), _EngineScalarType._module_key_numpy: tuple(_numpy_types), _EngineScalarType._module_key_python: tuple(_builtin_types), } @classmethod def _to_py_from_float( cls, data, ): """ Return the data as np.float32. """ if data is None: return data return cls.numpy_type(data) to_py_from_retval = _to_py_from_float @classmethod def to_py( cls, data, chrlen=None, # pylint: disable=unused-argument ): """ Return the float32 representation of the input data. """ if data is None: return data return cls._to_py_from_float( data.value if isinstance(data, cls.c_type) else data ) _BYREF_FLOAT32 = _ctypes.POINTER(_EngineFloat32ScalarType.c_type) class _EngineFloat64ScalarType(_EngineFloatScalarType, _EngineScalarType): """ Concrete class for 64-bit float scalar NAG Engine data. """ # pylint: disable=too-few-public-methods __slots__ = [] c_type = _ctypes.c_double numpy_type = _np.float64 _numpy_types = [] _numpy_types.extend(_INT_TYPE_MAP[_EngineScalarType._module_key_numpy]) _numpy_types.append(numpy_type) _builtin_types = [float] _builtin_types.extend(_INT_TYPE_MAP[_EngineScalarType._module_key_python]) type_map = { _EngineScalarType._module_key_ctypes: (c_type,), _EngineScalarType._module_key_numpy: tuple(_numpy_types), _EngineScalarType._module_key_python: tuple(_builtin_types), } @classmethod def _to_py_from_float( cls, data, ): """ Return the data as float. """ if data is None: return data return float(data) to_py_from_retval = _to_py_from_float @classmethod def to_py( cls, data, chrlen=None, # pylint: disable=unused-argument ): """ Return the float64 representation of the input data. """ if data is None: return data return cls._to_py_from_float( data.value if isinstance(data, cls.c_type) else data ) _BYREF_FLOAT64 = _ctypes.POINTER(_EngineFloat64ScalarType.c_type) class _EngineIntScalarType(_EngineScalarType): """ Concrete class for integer scalar NAG Engine data. This could be 32- or 64-bits depending on the underlying Engine. Extends __init__. Extends cast. """ # pylint: disable=too-few-public-methods __slots__ = [] _builtin_types = _BUILTIN_INT_TYPES c_type = _EngineState._int_model['ctypes'] numpy_type = _EngineState._int_model['numpy'] _ctypes_int64 = _SYS_CTYPES_INTS[64] type_map = _INT_TYPE_MAP def __init__( self, data, locus, allow_none=False, ): """ Raises NagTypeError from the super __init__ if the payload data is not the correct type. Raises NagOverflowError if the payload overflows the underlying integer type used by the NAG Engine. """ _EngineScalarType.__init__(self, data, locus, allow_none) self._check_is_repr() def to_ctypes( self, ): """ Return the underlying C-compatible cast of the payload taking care to account for 32- and 64-bit input. """ if self.allow_none and self.data is None: return self.data if isinstance( self.data, self.type_map[_EngineScalarType._module_key_ctypes] ): return self.c_type(self.data.value) return _EngineScalarType.to_ctypes(self) @classmethod def _to_py_from_int( cls, data, ): """ Return the data as int. """ if data is None: return data return int(data) to_py_from_retval = _to_py_from_int @classmethod def to_py( cls, data, chrlen=None, # pylint: disable=unused-argument ): """ Return the int representation of the input data. """ if data is None: return data return cls._to_py_from_int( data.value if isinstance(data, cls.c_type) else data ) def _check_is_repr( self ): """ Raises NagOverflowError if the payload overflows the underlying integer type used by the NAG Engine. """ if self.allow_none and self.data is None: return if self._payload_overflows(): raise NagOverflowError( self.locus, 'value must not overflow ' + str(_EngineState._int_model['bits']) + '-bit NAG Library Engine integer' ) def _do_overflow_check( self, ): """ Return a bool shortcutting for whether overflow checking is relevant for the payload. """ if isinstance(self.data, int): return True if _EngineState._int_model['bits'] == 64: # We don't allow long on platforms # where the Engine int is 64 bit return False return isinstance( self.data, (_SYS_CTYPES_INTS[64], _np.int64) ) def _payload_overflows( self, ): """ Return a bool for when the payload is outside the range of the integer type used by the NAG Engine. """ if not self._do_overflow_check(): return False if isinstance(self.data, _SYS_CTYPES_INTS[64]): val_to_check = self.data.value else: val_to_check = self.data return ( val_to_check >= 2**(_EngineState._int_model['bits'] - 1) or val_to_check <= -2**(_EngineState._int_model['bits'] - 1) - 1 ) _BYREF_INT = _ctypes.POINTER(_EngineIntScalarType.c_type) class _EngineLogicalScalarType(_EngineScalarType): """ Concrete class for logical scalar NAG Engine data. Overrides to_ctypes. """ # pylint: disable=too-few-public-methods __slots__ = [] c_type = _EngineState._int_model['ctypes'] numpy_type = _np.bool_ type_map = { _EngineScalarType._module_key_numpy: (numpy_type,), _EngineScalarType._module_key_python: (bool,), } _f_comp = _EngineState._compilers['Fortran'] def to_ctypes( self, ): """ Return the underlying C-compatible cast using the correct integer representation. """ if self.allow_none and self.data is None: return self.data return self.c_type( _EngineState._from_bool_to_int(self.data) ) @classmethod def _to_py_from_int( cls, data, ): """ Return the bool representation of the input int. """ if data is None: return data if cls._f_comp == 'Intel': return bool(data & 1) if cls._f_comp == 'NAG': return data != 0 raise NagException( # pragma: no cover {'fun_name': '_EngineLogicalScalarType._to_py_from_int'}, 'NYI for Fortran compiler ' + cls._f_comp ) to_py_from_retval = _to_py_from_int @classmethod def to_py( cls, data, chrlen=None, # pylint: disable=unused-argument ): """ Return the bool representation of the input int. """ if data is None: return data return cls._to_py_from_int( data.value if isinstance(data, cls.c_type) else data ) _BYREF_LOGICAL = _ctypes.POINTER(_EngineLogicalScalarType.c_type) _IS_DISCONTIG = lambda a: not ( a.flags.c_contiguous or a.flags.f_contiguous ) _CONTIG_ERR_STR = ( 'must be contiguous, ' + 'in either the C sense, the Fortran sense, or both.' ) def _get_sorder(locus, array): """ Return a string for the storage order used by `array`, or None if not multidimensional. """ if ( not isinstance(array, _np.ndarray) or array.ndim == 1 ): return None if array.flags.f_contiguous and array.flags.c_contiguous: return 'A' if array.flags.f_contiguous: return 'F' if array.flags.c_contiguous: return 'C' raise NagTypeError( locus, 'all arrays in the argument list ' + _CONTIG_ERR_STR ) def _get_mutual_sorder( locus, arrays, in_callback=False, spiked_sorder=_EngineData.default_spiked_sorder, ): """ Return a string for the storage order used by the input sequence of arrays, or None if no multidimensional arrays feature. Raises NagTypeError if the order is not uniform across all the arrays. Raises NagTypeError if spiked_sorder is required but not a str. Raises NagValueError if spiked_sorder is required but invalid. """ sorder = None for array, array_name in arrays: this_sorder = _get_sorder(locus, array) if this_sorder is None: if not in_callback: continue locus['returned'] = array_name raise NagException( locus, 'Inconceivable; not a multidimensional ndarray', ) if ( sorder is None or ( sorder == 'A' and sorder != this_sorder ) ): sorder = this_sorder elif this_sorder == 'A': continue elif sorder != this_sorder: locus['entity' if not in_callback else 'returned'] = array_name raise NagTypeError( locus, 'arrays ' + ( 'on input' if not in_callback else 'returned from this callback' ) + ' must use a consistent storage order' ) if sorder == 'A' or not arrays: _spiked_sorder = _EngineCharScalarType( spiked_sorder, locus, exp_chrlen=1, ).to_ctypes() _spiked_sorder_py = _EngineCharScalarType.to_py(_spiked_sorder) if _spiked_sorder_py not in ['C', 'F']: raise NagValueError( locus, 'supplied spiked_sorder was not \'C\' or \'F\'.', ) return _spiked_sorder_py if sorder is None: # Some arrays, all were None: return _EngineData.default_sorder return sorder def _get_size_from_shape( shape, chrlen=None, ): """ Return the product of the shape tuple. """ size = 1 for extent in shape: size *= extent if chrlen is not None: size *= chrlen return size def _to_sized_buffer( pointer, buffer_size, ): """ Return the input ctypes pointer type as an addressable buffer. """ return _ctypes.cast( pointer, _ctypes.POINTER(pointer._type_ * buffer_size), ).contents def _from_pointer_to_buffer( pointer, shape, dtype, force_ctypes_cast, chrlen=None, ): """ Return the input ctypes pointer type as a flat ndarray. """ size = _get_size_from_shape(shape, chrlen) if ( force_ctypes_cast or _EngineState._cast_pointer_to_ctypes_array ): sized_buffer = _to_sized_buffer( pointer, size, ) np_buffer = _np.frombuffer(sized_buffer, dtype=dtype) else: np_buffer = _np.ctypeslib.as_array(pointer, shape=(size,)) return np_buffer def _unlink_pointer( pointer, ): """ Break ctypes pointer reference cycles. """ if ( _EngineState._break_ctypes_ptr_ref_cycles and _EngineState._cast_pointer_to_ctypes_array ): del pointer._objects[id(pointer)] def _check_shape_good(data, exp_shape): """ Return a Boolean for `data` having the expected shape `exp_shape`. """ if isinstance(data, _np.ndarray): if ( data.size == 0 and any(extent == 0 for extent in exp_shape) ): return True if data.ndim != len(exp_shape): return False for extent_i, extent in enumerate(exp_shape): if extent is None: continue if data.shape[extent_i] != extent: return False return True if len(exp_shape) != 1: raise NagException( # pragma: no cover {'fun_name': '_check_shape_good'}, 'inconceivable in shape check' ) if exp_shape[0] is None: return True return len(data) == exp_shape[0] def _check_shape_nonneg(locus, shape): """ Raises NagShapeError if an entry in `shape` was negative. """ for dim_i, dim_n in enumerate(shape): if dim_n is None: continue if dim_n < 0: raise NagShapeError( locus, ( 'expected extent in dimension ' + str(dim_i + 1) + ' was ' + str(dim_n) + ' but must not be negative. ' 'Check that all supplied scalar integer arguments ' 'are correct.' ) ) class _EngineArrayType(_EngineType): """ Base class for array NAG Engine data. Extends __init__. """ # pylint: disable=too-few-public-methods,too-many-instance-attributes __slots__ = [ 'exp_shape', 'check_contents', 'rank', 'shape', 'size', 'sorder', ] containers = tuple() array_typecodes = tuple() _underlying_scalar = lambda *args: None # pragma: no cover c_type = lambda *args: None # pragma: no cover _null_ctypes = None _force_ctypes_cast = False _dtypes_equiv = staticmethod( lambda data_dtype, np_dtype: data_dtype == np_dtype ) _allowed_np_dtypes = (None,) def __init__( self, data, locus, exp_shape, mutual_sorder=None, check_contents=True, allow_none=False, ): """ Store the expected shape tuple. Register whether to iterate over the payload contents in order to deep-check scalar contents for compatibility. Raises NagTypeError if the payload is not a compatible container for array Engine data. Raises NagTypeError if the container contents are not type compatible for the Engine array type. Raises NagShapeError if the payload does not have the expected shape. """ # pylint: disable=too-many-arguments _EngineType.__init__(self, data, locus, allow_none) self.exp_shape = exp_shape self.check_contents = ( False if self.allow_none and data is None else check_contents ) self._check_container_compat( self.data, self.locus, len(self.exp_shape), self.allow_none, ) self._check_shape() ( self.rank, self.shape, self.size, ) = self._get_rank_shape_and_size( self.data, len(self.exp_shape), allow_none, ) self._check_contents_type_compat() self.sorder = mutual_sorder if self.rank > 1 else None @classmethod def empty_ctypes( cls, size, chrlen=None, # pylint: disable=unused-argument ): """ Return an empty ctypes array with the same data type as the payload. """ if size <= 0: return cls._null_ctypes new_arr_cls = cls.c_type._type_*size globals()[new_arr_cls.__name__] = new_arr_cls return new_arr_cls() @staticmethod def _make_shape_nonneg(shape): """ Make the input shape tuple have non-negative elements. """ return [max(shape_el, 0) for shape_el in shape] @classmethod def empty_ndarray( cls, shape, sorder=_EngineData.default_sorder, chrlen=None, # pylint: disable=unused-argument ): """ Return an empty ndarray with the same data type as the payload. """ return _np.empty( cls._make_shape_nonneg(shape), dtype=cls._underlying_scalar.numpy_type, order=sorder ) @classmethod def from_pointer( cls, pointer, shape, **kwargs ): """ Return the input ctypes pointer type as an ndarray. """ np_buffer = _from_pointer_to_buffer( pointer, cls._make_shape_nonneg(shape), cls._underlying_scalar.numpy_type, cls._force_ctypes_cast, chrlen=kwargs.get('chrlen'), ) if len(shape) == 1: return np_buffer return np_buffer.reshape( cls._make_shape_nonneg(shape), order=kwargs.get('sorder') ) @classmethod def _check_and_get_element( # pylint: disable=too-many-branches cls, data, locus, element_tuple, validate, ): """ Return element element_tuple from data as Python data. Raises NagIndexError if element_tuple is out of range. Raises exception from the scalar constructor if the element is invalid. """ el_tuple = ( element_tuple[0] if len(element_tuple) == 1 else element_tuple ) if not validate: data_el = data[el_tuple] else: try: data_el = data[el_tuple] index_ok = True except IndexError: index_ok = False locus_el = ', '.join([str(i) for i in element_tuple]) if len(element_tuple) > 1: locus_el = '[' + locus_el + ']' if not index_ok: raise NagIndexError( locus, 'requested index value ' + locus_el + ' is out of range' ) if isinstance(data, _np.ndarray): cls._check_dtype(data, locus) if cls is _EngineCharArrayType: locus['element'] = locus_el cls._underlying_scalar(data_el, locus, None) elif ( cls is _EngineIntArrayType and data.dtype != _np.int32 ): locus['element'] = locus_el cls._underlying_scalar(data_el, locus) elif isinstance(data, _array.array): cls._check_typecode(data, locus) if ( cls is _EngineIntArrayType and _EngineState._int_model['bits'] == 32 ): locus['element'] = locus_el cls._underlying_scalar(data_el, locus) elif isinstance(data, _ctypes.Array): cls._check_ctype(data, locus) if ( cls is _EngineIntArrayType and _EngineState._int_model['bits'] == 32 and data._type_ is not _SYS_CTYPES_INTS[32] ): locus['element'] = locus_el cls._underlying_scalar(data_el, locus) elif isinstance(data, list): locus['element'] = locus_el if cls is _EngineCharArrayType: cls._underlying_scalar(data_el, locus, None) else: cls._underlying_scalar(data_el, locus) else: raise NagException( # pragma: no cover locus, 'Inconceivable data instance', ) if ( cls is _EngineCharArrayType and isinstance(data_el, (bytes, _np.bytes_)) ): data_el = data_el.decode(encoding='ascii') return ( data_el.value if ( isinstance( data_el, ( cls._underlying_scalar.c_type, _ctypes.c_int, _ctypes.c_long, _ctypes.c_longlong, ) ) and not isinstance( data_el, ( _EngineComplex64Structure, _EngineComplex128Structure, ) ) ) else data_el ) @classmethod def get_element( cls, data, locus, element_tuple, validate=True, ): """ Return element element_tuple from data. """ if validate: cls._check_container_compat( data, locus, exp_rank=len(element_tuple), allow_none=False, ) cls._check_rank( data, locus, exp_rank=len(element_tuple), allow_none=False, ) return cls._check_and_get_element( data, locus, element_tuple, validate, ) @classmethod def get_nd( cls, data, locus, exp_rank, sorder, dim_type, allow_none=False, ): """ Return the lead/second/... dimension for data. """ # pylint: disable=too-many-arguments cls._check_container_compat(data, locus, exp_rank, allow_none) cls._check_rank(data, locus, exp_rank, allow_none) if exp_rank < 2: raise NagException( # pragma: no cover {'fun_name': 'get_nd'}, 'inconceivable dim request', ) if allow_none and data is None: return 0 if dim_type == 'l': if sorder == 'F': shape_index = 0 elif sorder == 'C': shape_index = -1 else: raise NagException( # pragma: no cover {'fun_name': 'get_nd'}, 'inconceivable sorder' ) elif dim_type == 's': if sorder == 'F': shape_index = 1 elif sorder == 'C': shape_index = -2 else: raise NagException( # pragma: no cover {'fun_name': 'get_nd'}, 'inconceivable sorder' ) else: raise NagException( # pragma: no cover {'fun_name': 'get_nd'}, 'inconceivable dim_type' ) return cls._get_rank_shape_and_size( data, exp_rank, allow_none )[1][shape_index] def _get_max( self, start_i, stop_i, ): """ Return the max of the (1d) payload over the requested range. """ if ( not isinstance(self.data, list) or not isinstance( self.data[start_i], ( self._underlying_scalar.c_type, _ctypes.c_int, _ctypes.c_long, _ctypes.c_longlong, ) ) ): return max(self.data[start_i:stop_i + 1]) return max( data_el.value for data_el in self.data[start_i:stop_i + 1] ) def _get_product( self, start_i, stop_i, ): """ Return the product of the (1d) payload over the requested range. """ product_val = 1 if ( not isinstance(self.data, list) or not isinstance( self.data[start_i], ( self._underlying_scalar.c_type, _ctypes.c_int, _ctypes.c_long, _ctypes.c_longlong, ) ) ): for data_el in self.data[start_i:stop_i + 1]: product_val *= data_el else: for data_el in self.data[start_i:stop_i + 1]: product_val *= data_el.value return product_val @classmethod def get_shape( cls, data, locus, exp_rank, allow_none=False, validate=True, ): """ Return the shape tuple for data. """ # pylint: disable=too-many-arguments if validate: cls._check_container_compat(data, locus, exp_rank, allow_none) cls._check_rank(data, locus, exp_rank, allow_none) return cls._get_rank_shape_and_size(data, exp_rank, allow_none)[1] def _get_sum( self, start_i, stop_i, ): """ Return the sum of the (1d) payload over the requested range. """ if ( not isinstance(self.data, list) or not isinstance( self.data[start_i], ( self._underlying_scalar.c_type, _ctypes.c_int, _ctypes.c_long, _ctypes.c_longlong, ) ) ): return sum(self.data[start_i:stop_i + 1]) return sum( data_el.value for data_el in self.data[start_i:stop_i + 1] ) def _is_monotonic( self, start_i, stop_i, direction, ): """ Return a Boolean for when the (1d) payload is monotonic over the requested range in the requested direction. """ incr = True decr = True take_val = ( isinstance(self.data, list) and isinstance( self.data[start_i], ( self._underlying_scalar.c_type, _ctypes.c_int, _ctypes.c_long, _ctypes.c_longlong, ) ) ) data_el = lambda i: self.data[i].value if take_val else self.data[i] for i in range(start_i, stop_i): if incr: incr = incr and data_el(i) <= data_el(i+1) if decr: decr = decr and data_el(i) >= data_el(i+1) if direction == 'either' and not incr and not decr: return False if direction == 'increasing' and not incr: return False if direction == 'decreasing' and not decr: return False return True def _is_nonnegative( self, start_i, stop_i, ): """ Return a Boolean for when the (1d) payload is non-negative over the requested range. """ take_val = ( isinstance(self.data, list) and isinstance( self.data[start_i], ( self._underlying_scalar.c_type, _ctypes.c_int, _ctypes.c_long, _ctypes.c_longlong, ) ) ) data_el = lambda i: self.data[i].value if take_val else self.data[i] for i in range(start_i, stop_i): if data_el(i) < 0: return False return True def to_ctypes( self, **kwargs ): """ Return the underlying C-compatible cast of the payload. """ if self.allow_none and self.data is None: return None if isinstance(self.data, _np.ndarray): return self._to_ctypes_from_ndarray(**kwargs) if isinstance(self.data, _array.array): return self._to_ctypes_from_array(**kwargs) if isinstance(self.data, _ctypes.Array): return self._to_ctypes_from_ctypes(**kwargs) if isinstance(self.data, list): return self._to_ctypes_from_list(**kwargs) raise NagException( # pragma: no cover self.locus, 'inconceivable container' ) def _to_ctypes_from_array( self, **_kwargs ): """ Return the underlying C-compatible cast of the payload array.array. """ return _ctypes.cast( self.data.buffer_info()[0], self.c_type ) def _to_ctypes_from_ctypes( self, **_kwargs ): """ Return the underlying C-compatible cast of the payload ctypes.Array. """ return self.data def _to_ctypes_from_list( self, **kwargs ): """ Return the underlying C-compatible cast of the payload list. """ if kwargs.get('can_reference_contents', True): return (self.c_type._type_*self.size)(*self.data) return self.empty_ctypes(self.size) def _to_ctypes_from_ndarray( self, **_kwargs ): """ Return the underlying C-compatible cast of the payload numpy.ndarray. """ return self.data.ctypes.data_as(self.c_type) @classmethod def _check_container_compat( cls, data, locus, exp_rank, allow_none, ): """ Raises NagTypeError if data is not an instance of any valid container. """ if allow_none and data is None: return if ( exp_rank <= 1 and not isinstance(data, cls.containers) ): raise NagTypeError( locus, _bad_instance_msg(cls._containers_to_strlist(), allow_none) ) if ( exp_rank > 1 and not isinstance(data, _np.ndarray) ): raise NagTypeError( locus, 'multidimensional array data ' + _bad_instance_msg(['numpy.ndarray'], allow_none) ) if isinstance(data, _np.ndarray) and _IS_DISCONTIG(data): raise NagTypeError( locus, 'numpy.ndarray data ' + _CONTIG_ERR_STR, ) def _check_contents_type_compat( self, ): """ Raises NagTypeError if the contents of the payload container are not compatible types for this array class. """ if self.allow_none and self.data is None: return if isinstance(self.data, _np.ndarray): self._check_contents_ndarray() elif isinstance(self.data, _array.array): self._check_contents_array() elif isinstance(self.data, _ctypes.Array): self._check_contents_ctypes() elif isinstance(self.data, list): self._check_contents_list() else: raise NagException( # pragma: no cover self.locus, 'inconceivable container ' + type(self.data).__name__ ) def _check_contents_1dim( self, ): """ Raises NagTypeError if the contents of the rank-1 payload fail their respective type checks. """ for arg_i, arg_el in enumerate(self.data): self.locus['element'] = arg_i self._underlying_scalar(arg_el, self.locus) def _check_contents_anydim( self, ): """ Raises NagTypeError if the contents of the payload fail their respective type checks. """ if ( isinstance(self.data, _np.ndarray) and self.data.ndim > 1 ): self._check_contents_multidim() else: self._check_contents_1dim() def _check_contents_multidim( # pylint: disable=no-self-use self, ): """ Raises NagTypeError if the contents of the multidimensional payload fail their respective type checks. """ raise NagException( # pragma: no cover {'fun_name': '_check_contents_multidim'}, 'not yet implemented', ) @classmethod def _check_typecode( cls, data, locus, ): """ Raises NagTypeError if the contents of the data array.array are not compatible types for this array class. """ if data.typecode not in cls.array_typecodes: raise NagTypeError( locus, 'typecode for array.array entity was ' + str(data.typecode) + ' but must be ' + _join_sequence( ['"' + a_t + '"' for a_t in cls.array_typecodes] ) ) def _check_contents_array( self, ): """ Raises NagTypeError if the contents of the payload array.array are not compatible types for this array class. """ self._check_typecode(self.data, self.locus) @classmethod def _check_ctype( cls, data, locus, ): """ Raises NagTypeError if the contents of the data ctypes.Array are not compatible types for this array class. """ allowed_ctypes = cls._underlying_scalar.type_map[ _EngineType._module_key_ctypes ] if data._type_ not in allowed_ctypes: raise NagTypeError( locus, '_type_ for ctypes.Array entity must be ' + _join_sequence( [ ( ctype.__name__ if ctype.__name__ == 'Handle' else '.'.join(['ctypes', ctype.__name__]) ) for ctype in allowed_ctypes ] ) ) def _check_contents_ctypes( self, ): """ Raises NagTypeError if the contents of the payload ctypes.Array are not compatible types for this array class. """ self._check_ctype(self.data, self.locus) def _check_contents_list( self, ): """ Raises NagTypeError if the contents of the payload list are not compatible types for this array class. """ if not self.check_contents: return self._check_contents_1dim() if self.size <= 1: return for arg_el in self.data[1:]: if not isinstance(arg_el, type(self.data[0])): raise NagTypeError( self.locus, 'must contain elements all of the same instance' ) def _check_contents_ndarray( self, ): """ Raises NagTypeError if the contents of the payload numpy.ndarray are not compatible types for this array class. """ self._check_dtype(self.data, self.locus) @classmethod def _check_dtype( cls, data, locus, allowed_dtypes=None, ): """ Raises NagTypeError if data is not one of the allowed numpy dtypes for this array class. """ a_dtypes = ( cls._allowed_np_dtypes if allowed_dtypes is None else allowed_dtypes ) for np_dtype in a_dtypes: if cls._dtypes_equiv(data.dtype, np_dtype): return raise NagTypeError( locus, 'dtype for numpy.ndarray entity must be an instance of ' + _join_sequence( ['.'.join(['numpy', ndtype.__name__]) for ndtype in a_dtypes] ) ) @classmethod def _check_rank( cls, data, locus, exp_rank, allow_none, ): """ Check the rank of data. Raises NagShapeError if the payload container does not have the expected rank. """ if allow_none and data is None: return rank, _, _ = cls._get_rank_shape_and_size(data, exp_rank, allow_none) if rank != exp_rank: raise NagShapeError( locus, _bad_rank_msg(exp_rank, rank) ) def _check_shape( self, ): """ Check the payload's shape. Raises NagShapeError if the payload container does not have the expected shape. Raises NagShapeError if any element of the expected shape was negative. """ if self.allow_none and self.data is None: return if _check_shape_good(self.data, self.exp_shape): return _check_shape_nonneg(self.locus, self.exp_shape) raise NagShapeError( self.locus, _bad_shape_msg( self.exp_shape, ( self.data.shape if isinstance(self.data, _np.ndarray) else (len(self.data),) ), ), ) @classmethod def _containers_to_strlist( cls, ): """ Return a list of sorted qualified type names from the containers tuple. """ type_names = [] for container in cls.containers: full_name = [] if container is _array.array: full_name.append('array') elif container is _ctypes.Array: full_name.append('ctypes') elif container is _np.ndarray: full_name.append('numpy') elif container is list: pass else: raise NagException( # pragma: no cover {'fun_name': '_containers_to_strlist'}, 'inconceivable container' ) full_name.append(container.__name__) type_names.append('.'.join(full_name)) return sorted(type_names) @staticmethod def _get_rank_shape_and_size( data, exp_rank, allow_none, ): """ Return the rank, shape and size of data. """ if allow_none and data is None: return exp_rank, tuple([0]*exp_rank), 0 if isinstance(data, _np.ndarray): return data.ndim, data.shape, data.size if isinstance(data, (_array.array, _ctypes.Array, list)): return 1, (len(data),), len(data) raise NagException( # pragma: no cover {'fun_name': '_get_rank_shape_and_size'}, 'inconceivable container', ) @classmethod def refresh_ctypes_buffer( cls, _data, _locus, ctypes_buffer, chrlen=None # pylint: disable=unused-argument ): # pylint: disable=too-many-arguments """ Refresh ctypes_buffer with the contents of data. This is the version where no back-conversion is necessary, e.g. for float64. """ def _refresh_py_list( self, data, ): """ Fill the payload list with ctypes `data`. """ self.data[:] = self._to_py_list(data) def _refresh_py_ndarray_copy( self, data, ): """ Fill the payload ndarray with ctypes `data`. This is the copy-back version. """ self.data[:] = self._to_py_ndarray( data, self.data.shape, sorder=self.sorder, chrlen=getattr(self, 'chrlen', None), ) def _refresh_py_noop( # pylint: disable=no-self-use self, data, # pylint: disable=unused-argument ): """ Fill the payload with ctypes `data`. This is the no-op version assuming that the memory buffer has been updated directly (and no copying was required). """ return _refresh_py_ndarray = _refresh_py_noop _refresh_py_array = _refresh_py_noop _refresh_py_ctarray = _refresh_py_noop def refresh_py( self, data, ): """ Fill the payload with ctypes `data`. """ if self.allow_none and self.data is None: return if isinstance(self.data, list): self._refresh_py_list(data) elif isinstance(self.data, _np.ndarray): self._refresh_py_ndarray(data) elif isinstance(self.data, _array.array): self._refresh_py_array(data) elif isinstance(self.data, _ctypes.Array): self._refresh_py_ctarray(data) else: raise NagException( # pragma: no cover {'fun_name': 'refresh_py'}, 'inconceivable container' ) def _to_py_list( self, data, ): """ Return ctypes `data` reshaped into a list. """ if not self.data: return [] cast_type = ( self._underlying_scalar.to_py if self.data[0] is None else type(self.data[0]) ) return [cast_type(data[i]) for i in range(self.size)] _to_py_ndarray = from_pointer class _NullEngineCharArray(_ctypes.Array): """ Null array. """ # pylint: disable=too-few-public-methods _length_ = 0 _type_ = _EngineCharScalarType.c_type._type_ def _byteorder_stripped_dtype( data, ): """ Return a stripped copy of the dtype of numpy `data` with byte-order markers removed. """ return data.dtype.str.lstrip('|<>=') class _EngineCharArrayType(_EngineArrayType): """ Concrete class for character array NAG Engine data. Extends __init__. Extends _check_contents_ndarray. Extends _check_contents_type_compat. Overrides _check_contents_1dim. Overrides empty_ctypes. Overrides empty_ndarray. Overrides from_pointer. Overrides refresh_ctypes_buffer. Overrides _refresh_py_ndarray. Overrides _to_ctypes_from_list. Overrides _to_ctypes_from_ndarray. Overrides _to_py_list. Overrides _to_py_ndarray. Supplies _check_contents_multidim. """ # pylint: disable=too-few-public-methods __slots__ = ['chrlen', 'flat_seq'] containers = (list, _np.ndarray) _underlying_scalar = _EngineCharScalarType c_type = _underlying_scalar.c_type _null_ctypes = _NullEngineCharArray() _force_ctypes_cast = True _dtypes_equiv = staticmethod(_np.issubdtype) _allowed_np_dtypes = _underlying_scalar.type_map[ _EngineType._module_key_numpy ] _numpy_kind = 'U' def __init__( self, data, locus, exp_chrlen, exp_shape, mutual_sorder=None, check_contents=True, allow_none=False, ): """ Raises NagShapeError from the super __init__ if the payload data is not the correct shape. Raises NagTypeError from the super __init__ if the payload data is not the correct type. Raises NagValueError if any payload element exceeds a required length. """ # pylint: disable=too-many-arguments self.chrlen = exp_chrlen self.flat_seq = None _EngineArrayType.__init__( self, data, locus, exp_shape, mutual_sorder, check_contents, allow_none, ) def _check_contents_1dim( self, ): """ Raises NagTypeError if the contents of the rank-1 payload fail their respective type checks. """ for arg_i, arg_el in enumerate(self.data): self.locus['element'] = arg_i self._underlying_scalar( arg_el, self.locus, self.chrlen, ) @staticmethod def _check_contents_multidim_direct( data, locus, chrlen, ): """ Raises NagTypeError if the contents of data fail their respective type checks. """ if data.size == 0: return data_it = _np.nditer(data, flags=['multi_index']) while not data_it.finished: locus['element'] = ', '.join([str(i) for i in data_it.multi_index]) if data.ndim > 1: locus['element'] = '[' + locus['element'] + ']' _EngineCharScalarType( data[data_it.multi_index], locus, chrlen, ) data_it.iternext() def _check_contents_multidim( self, ): """ Raises NagTypeError if the contents of the multidimensional payload fail their respective type checks. """ if self.size == 0: return self._check_contents_multidim_direct( self.data, self.locus, self.chrlen ) def _check_contents_type_compat( self, ): """ Set or compute chrlen before checking. """ self.flat_seq = self._set_flat_seq(self.data, self.allow_none) self._infer_chrlen() _EngineArrayType._check_contents_type_compat(self) @classmethod def empty_ctypes( cls, size, chrlen=None, ): """ Return an empty string buffer. """ if ( size <= 0 or chrlen is None or chrlen <= 0 ): return cls._null_ctypes new_arr_cls = _ctypes.c_char*(size*chrlen) globals()[new_arr_cls.__name__] = new_arr_cls return new_arr_cls() @classmethod def empty_ndarray( cls, shape, sorder=_EngineData.default_sorder, chrlen=None, ): """ Return an empty ndarray with the same data type as the payload. """ return _np.empty( cls._make_shape_nonneg(shape), dtype=( cls._numpy_kind + ('0' if chrlen is None else str(max(chrlen, 0))) ), order=sorder, ) @classmethod def from_pointer( cls, pointer, shape, **kwargs ): """ Return the input ctypes pointer type as an ndarray of byte chars. """ chrlen = kwargs.get('chrlen') _np_buffer = _from_pointer_to_buffer( pointer, cls._make_shape_nonneg(shape), 'S' + str(chrlen), cls._force_ctypes_cast, chrlen=chrlen, ) np_buffer = _np_buffer if kwargs.get('can_reference_contents', True): for buff_i, buff_el in enumerate(np_buffer): np_buffer[buff_i] = buff_el.rstrip() else: np_buffer[:] = [b'\0']*len(np_buffer) if len(shape) == 1: return np_buffer return np_buffer.reshape( cls._make_shape_nonneg(shape), order=kwargs.get('sorder') ) def _infer_chrlen(self): """ Compute maximal chrlen from the payload data if not already known. """ if self.chrlen is not None: return if isinstance(self.flat_seq, _np.ndarray): self.chrlen = int(self.flat_seq.dtype.str[2:]) return if isinstance(self.flat_seq, list): self.chrlen = ( 0 if not self.flat_seq else max([ len(c) for c in self.flat_seq if isinstance(c, _collections.abc.Sized) ]) ) return raise NagException( # pragma: no cover {'fun_name': '_infer_chrlen'}, 'inconceivable chrlen inference' ) @classmethod def refresh_ctypes_buffer( cls, data, locus, ctypes_buffer, chrlen=None ): """ This version back-casts to short-enough ASCII correctly. """ # pylint: disable=too-many-arguments cls._check_contents_multidim_direct(data, locus, chrlen) _ctypes_buffer = cls._to_ctypes_from_ndarray_data( data, chrlen, allow_none=False, ) for i in range(data.size*chrlen): ctypes_buffer[i] = _ctypes_buffer[i] _refresh_py_ndarray = _EngineArrayType._refresh_py_ndarray_copy @staticmethod def _set_flat_seq(data, allow_none): """ Return data as a flat ASCII sequence. """ if allow_none and data is None: return [] if isinstance(data, _np.ndarray): flat_data = data if data.ndim == 1 else data.flatten(order='K') elif isinstance(data, list): flat_data = data else: raise NagException( # pragma: no cover {}, 'inconceivable instance for _set_flat_seq' ) if ( len(flat_data) and # pylint: disable=len-as-condition isinstance(flat_data[0], (str, _np.str_)) ): strs_to_cast = [ c.encode('ascii') for c in flat_data if ( isinstance(c, (str, _np.str_)) and _is_ascii(c, {}) ) ] else: strs_to_cast = flat_data return ( strs_to_cast if isinstance(data, list) else _np.array( strs_to_cast, dtype='S' + _byteorder_stripped_dtype(data)[1:] ) ) def _to_ctypes_from_list( self, **kwargs ): """ Pad and join the payload to be contiguous. """ if kwargs.get('can_reference_contents', True): casted = self._pad_and_join_to_string_seq( self.flat_seq, self.chrlen ) return _ctypes.create_string_buffer(casted, size=len(casted)) return self.empty_ctypes(self.size, chrlen=self.chrlen) @classmethod def _to_ctypes_from_ndarray_data( cls, data, chrlen, allow_none, ): """ Direct creation of a string buffer for the data. """ casted = cls._pad_and_join_to_string_seq( cls._set_flat_seq(data, allow_none), chrlen, ) return _ctypes.create_string_buffer(casted, size=len(casted)) def _to_ctypes_from_ndarray( self, **kwargs ): """ Return a contiguous string with padding. If the payload is shorter than a non-None chrlen then it is blank padded on the right to this length. """ if kwargs.get('can_reference_contents', True): return self._to_ctypes_from_ndarray_data( self.data, self.chrlen, self.allow_none, ) return self.empty_ctypes(self.size, chrlen=self.chrlen) @classmethod def _to_bytes_list_from_ctypes( cls, data, size, chrlen=None, ): """ Return ctypes `data` reshaped into a list. """ return [ data[(i * chrlen):((i + 1) * chrlen)].rstrip() for i in range(size) ] def _to_py_list( self, data, ): """ Return ctypes `data` reshaped into a list. """ bytes_list = self._to_bytes_list_from_ctypes( data, self.size, chrlen=self.chrlen ) if ( not bytes_list or isinstance(self.data[0], bytes) ): return bytes_list return [c.decode(encoding='ascii') for c in bytes_list] def _to_py_ndarray( self, data, shape, **kwargs ): bytes_array = self.from_pointer(data, shape, **kwargs) data_char_class = _byteorder_stripped_dtype(self.data) return ( bytes_array if data_char_class[0] == 'S' else _np.array(bytes_array, dtype=data_char_class) ) def _check_contents_ndarray( self, ): """ Extend the super method to also check for ASCII and short-enough input. """ _EngineArrayType._check_contents_ndarray(self) if not self.check_contents: return self._check_contents_anydim() @staticmethod def _pad_and_join_to_string_seq( strs_to_cast, chrlen, ): """ Return a contiguous string with padding. If the payload is shorter than a non-None chrlen then it is blank padded on the right to this length. """ return b''.join( [c_str.ljust(chrlen)[:chrlen] for c_str in strs_to_cast] ) class _EngineComplexArrayType(_EngineArrayType): """ Abstract class for complex array NAG Engine data. Overrides _to_ctypes_from_list. Overrides _to_py_list. """ __slots__ = [] # Complex ctype arrays become void numpy dtypes when using as_array, so # are useless in that form. _force_ctypes_cast = True def _to_ctypes_from_list( self, **kwargs ): """ Return a ctypes Array of Complex Structures. """ if kwargs.get('can_reference_contents', True): return (self.c_type._type_*self.size)( *[(z.real, z.imag) for z in self.data] ) return self.empty_ctypes(self.size) def _to_py_list( self, data, ): """ Return ctypes `data` reshaped into a list. """ if not self.data: return [] return [ self._underlying_scalar.to_py(data[i]) if self.data[0] is None else type(self.data[0])(self._underlying_scalar.to_py(data[i])) for i in range(self.size) ] class _NullEngineComplex64Array(_ctypes.Array): """ Null array. """ # pylint: disable=too-few-public-methods _length_ = 0 _type_ = _EngineComplex64ScalarType.c_type class _EngineComplex64ArrayType(_EngineComplexArrayType, _EngineArrayType): """ Concrete class for single-precision complex array NAG Engine data. """ # pylint: disable=too-few-public-methods __slots__ = [] containers = (list, _np.ndarray) _underlying_scalar = _EngineComplex64ScalarType c_type = _ctypes.POINTER(_underlying_scalar.c_type) _null_ctypes = _NullEngineComplex64Array() _allowed_np_dtypes = _underlying_scalar.type_map[ _EngineType._module_key_numpy ] class _NullEngineComplex128Array(_ctypes.Array): """ Null array. """ # pylint: disable=too-few-public-methods _length_ = 0 _type_ = _EngineComplex128ScalarType.c_type class _EngineComplex128ArrayType(_EngineComplexArrayType, _EngineArrayType): """ Concrete class for double-precision complex array NAG Engine data. """ # pylint: disable=too-few-public-methods __slots__ = [] containers = (list, _np.ndarray) _underlying_scalar = _EngineComplex128ScalarType c_type = _ctypes.POINTER(_underlying_scalar.c_type) _null_ctypes = _NullEngineComplex128Array() _allowed_np_dtypes = _underlying_scalar.type_map[ _EngineType._module_key_numpy ] class _NullEngineCptrArray(_ctypes.Array): """ Null array. """ # pylint: disable=too-few-public-methods _length_ = 0 _type_ = _EngineCptrScalarType.c_type class _EngineCptrArrayType(_EngineArrayType): """ Concrete class for void* array NAG Engine data. Overrides empty_ctypes. Overrides empty_ndarray. Overrides from_pointer. Overrides refresh_ctypes_buffer. Overrides refresh_py. """ # pylint: disable=too-few-public-methods __slots__ = [] containers = (_ctypes.Array, list) _underlying_scalar = _EngineCptrScalarType c_type = _ctypes.POINTER(_underlying_scalar.c_type) _null_ctypes = _NullEngineCptrArray() @classmethod def empty_ndarray( cls, shape, sorder=_EngineData.default_sorder, chrlen=None, ): raise NotImplementedError @classmethod def from_pointer( cls, pointer, shape, **kwargs ): raise NotImplementedError @classmethod def refresh_ctypes_buffer( cls, data, locus, ctypes_buffer, chrlen=None, ): raise NotImplementedError def refresh_py( self, data, ): raise NotImplementedError class _EngineFloatArrayType(_EngineArrayType): """ Abstract class for float array NAG Engine data. Overrides _check_contents_1dim. """ __slots__ = [] def _check_contents_1dim( self, ): """ Promote promotable ints. """ for arg_i, arg_el in enumerate(self.data): self.locus['element'] = arg_i self._underlying_scalar(arg_el, self.locus) if ( isinstance(self.data, list) and _is_promotable_int(arg_el) ): self.data[arg_i] = self._underlying_scalar.numpy_type( self.data[arg_i] ) class _NullEngineFloat32Array(_ctypes.Array): """ Null array. """ # pylint: disable=too-few-public-methods _length_ = 0 _type_ = _EngineFloat32ScalarType.c_type class _EngineFloat32ArrayType(_EngineFloatArrayType, _EngineArrayType): """ Concrete class for 32-bit float array NAG Engine data. """ # pylint: disable=too-few-public-methods __slots__ = [] containers = (_array.array, _ctypes.Array, list, _np.ndarray) array_typecodes = ('f',) _underlying_scalar = _EngineFloat32ScalarType c_type = _ctypes.POINTER(_underlying_scalar.c_type) _null_ctypes = _NullEngineFloat32Array() _allowed_np_dtypes = (_underlying_scalar.numpy_type,) class _NullEngineFloat64Array(_ctypes.Array): """ Null array. """ # pylint: disable=too-few-public-methods _length_ = 0 _type_ = _EngineFloat64ScalarType.c_type class _EngineFloat64ArrayType(_EngineFloatArrayType, _EngineArrayType): """ Concrete class for 64-bit float array NAG Engine data. """ # pylint: disable=too-few-public-methods __slots__ = [] containers = (_array.array, _ctypes.Array, list, _np.ndarray) array_typecodes = ('d',) _underlying_scalar = _EngineFloat64ScalarType c_type = _ctypes.POINTER(_underlying_scalar.c_type) _null_ctypes = _NullEngineFloat64Array() _allowed_np_dtypes = (_underlying_scalar.numpy_type,) get_max = _EngineArrayType._get_max get_product = _EngineArrayType._get_product get_sum = _EngineArrayType._get_sum is_monotonic = _EngineArrayType._is_monotonic is_nonnegative = _EngineArrayType._is_nonnegative class _NullEngineIntArray(_ctypes.Array): """ Null array. """ # pylint: disable=too-few-public-methods _length_ = 0 _type_ = _EngineIntScalarType.c_type class _EngineIntArrayType(_EngineArrayType): """ Concrete class for integer array NAG Engine data. This could be 32- or 64-bits depending on the underlying Engine. Overrides _to_ctypes_from_list. Extends _check_contents_array. Extends _check_contents_ctypes. Extends _check_contents_ndarray. Extends _refresh_py_array. Extends _refresh_py_ctarray. Extends _refresh_py_ndarray. Extends _to_ctypes_from_array. Extends _to_ctypes_from_ctypes. Extends _to_ctypes_from_ndarray. Supplies _check_contents_multidim (int32 only). """ # pylint: disable=too-few-public-methods __slots__ = [] containers = (_array.array, _ctypes.Array, list, _np.ndarray) array_typecodes = ('l', 'q') _underlying_scalar = _EngineIntScalarType c_type = _ctypes.POINTER(_underlying_scalar.c_type) _null_ctypes = _NullEngineIntArray() _allowed_np_dtypes = _underlying_scalar.type_map[ _EngineType._module_key_numpy ] get_max = _EngineArrayType._get_max get_product = _EngineArrayType._get_product get_sum = _EngineArrayType._get_sum is_monotonic = _EngineArrayType._is_monotonic is_nonnegative = _EngineArrayType._is_nonnegative if _EngineState._int_model['bits'] == 32: def _to_ctypes_from_array( self, **kwargs ): """ Override the super method to take care to downcast 64-bit integer input. """ if ( self.data.typecode == 'q' or ( self.data.typecode == 'l' and not _IS_IL32 ) ): if kwargs.get('can_reference_contents', True): data_32 = _np.array( self.data, dtype=_EngineState._int_model['numpy'] ) return data_32.ctypes.data_as(self.c_type) return self.empty_ctypes(self.size) return _EngineArrayType._to_ctypes_from_array(self, **kwargs) def _refresh_py_array( self, data, ): """ Fill the payload array.array with ctypes `data`. This is the mixed copy-back or no-op version. """ if ( self.data.typecode == 'q' or ( self.data.typecode == 'l' and not _IS_IL32 ) ): for i in range(self.size): self.data[i] = data[i] return _EngineArrayType._refresh_py_array(self, data) def _refresh_py_ctarray( self, data, ): """ Fill the payload ctypes.Array with ctypes `data`. This is the mixed copy-back or no-op version. """ if data._type_ is not self.data._type_: self.data[:] = (self.data._type_*self.size)(*data) return _EngineArrayType._refresh_py_ctarray(self, data) def _refresh_py_ndarray( self, data, ): """ Fill the payload ndarray with ctypes `data`. This is the mixed copy-back or no-op version. """ refresh_fun = ( self._refresh_py_ndarray_copy if self._has_different_int_width(self.data) else self._refresh_py_noop ) return refresh_fun(data) def _to_ctypes_from_ctypes( self, **kwargs ): """ Override the super method to take care to cast to Engine integer input. """ if self.data._type_ is not self.c_type._type_: return (self.c_type._type_*self.size)(*self.data) return _EngineArrayType._to_ctypes_from_ctypes(self, **kwargs) def _to_ctypes_from_list( self, **kwargs ): """ Return the underlying C-compatible cast of the payload list. """ if kwargs.get('can_reference_contents', True): return (self.c_type._type_*self.size)( *[ _EngineState._int_model['ctypes']( x.value if isinstance(x, ( _ctypes.c_int, _ctypes.c_long, _ctypes.c_longlong )) else x ) for x in self.data ] ) return self.empty_ctypes(self.size) @staticmethod def _has_different_int_width( data, ): """ Return True when numpy int `data` is of a different type to the Engine int. """ return ( ( _EngineState._int_model['bits'] == 32 and data.dtype == _np.int64 ) or ( _EngineState._int_model['bits'] == 64 and data.dtype == _np.int32 ) ) @classmethod def _to_ctypes_from_flat_np_data( cls, data, ): """ Direct conversion of 1d data to Engine integer. """ return (cls.c_type._type_*len(data))( *[ _EngineState._int_model['numpy'](x) for x in data ] ) def _to_ctypes_from_ndarray( self, **kwargs ): """ Override the super method to take care to cast to Engine integer input. """ if self._has_different_int_width(self.data): if kwargs.get('can_reference_contents', True): return self._to_ctypes_from_flat_np_data( self.data.flatten(order='K') ) return self.empty_ctypes(self.size) return _EngineArrayType._to_ctypes_from_ndarray(self, **kwargs) if _EngineState._int_model['bits'] == 32: def _check_contents_array( self, ): """ Extend the super method to also check for in-range input. """ _EngineArrayType._check_contents_array(self) if not self.check_contents: return self._check_contents_1dim() def _check_contents_ctypes( self, ): """ Extend the super method to also check for in-range input. """ _EngineArrayType._check_contents_ctypes(self) if not self.check_contents: return if self.data._type_ is _SYS_CTYPES_INTS[32]: return self._check_contents_1dim() def _check_contents_ndarray( self, ): """ Extend the super method to also check for in-range input. """ _EngineArrayType._check_contents_ndarray(self) if not self.check_contents: return if self.data.dtype == _np.int32: return self._check_contents_anydim() def _check_contents_multidim( self, ): if self.size == 0: return data_it = _np.nditer(self.data, flags=['multi_index']) while not data_it.finished: self.locus['element'] = ( '[' + ', '.join( [str(i) for i in data_it.multi_index] ) + ']' ) self._underlying_scalar( self.data[data_it.multi_index], self.locus ) data_it.iternext() class _NullEngineLogicalArray(_ctypes.Array): """ Null array. """ # pylint: disable=too-few-public-methods _length_ = 0 _type_ = _EngineLogicalScalarType.c_type class _EngineLogicalArrayType(_EngineArrayType): """ Concrete class for logical array NAG Engine data. Overrides from_pointer. Overrides refresh_ctypes_buffer. Overrides _refresh_py_ndarray. Overrides _to_ctypes_from_list. Overrides _to_ctypes_from_ndarray. """ # pylint: disable=too-few-public-methods __slots__ = [] containers = (list, _np.ndarray) _underlying_scalar = _EngineLogicalScalarType c_type = _ctypes.POINTER(_underlying_scalar.c_type) _null_ctypes = _NullEngineLogicalArray() _force_ctypes_cast = True _allowed_np_dtypes = _underlying_scalar.type_map[ _EngineType._module_key_numpy ] @classmethod def from_pointer( cls, pointer, shape, **kwargs ): """ Convert from int to bool. """ int_buffer = _from_pointer_to_buffer( pointer, cls._make_shape_nonneg(shape), _EngineState._int_model['numpy'], cls._force_ctypes_cast, ) if kwargs.get('can_reference_contents', True): bool_ndarray = _np.array( [ cls._underlying_scalar._to_py_from_int(int_buffer[i]) for i in range(len(int_buffer)) ], dtype=cls._underlying_scalar.numpy_type ) else: bool_ndarray = _np.array( [False]*len(int_buffer), dtype=cls._underlying_scalar.numpy_type ) if len(shape) == 1: return bool_ndarray return bool_ndarray.reshape( cls._make_shape_nonneg(shape), order=kwargs.get('sorder') ) @classmethod def refresh_ctypes_buffer( cls, data, _locus, ctypes_buffer, chrlen=None ): """ This version back-casts to integer correctly. """ # pylint: disable=too-many-arguments _ctypes_buffer = cls._to_ctypes_from_flat_data(data.flatten(order='K')) for i in range(data.size): ctypes_buffer[i] = _ctypes_buffer[i] _refresh_py_ndarray = _EngineArrayType._refresh_py_ndarray_copy def _to_ctypes_from_list( self, **kwargs ): """ Return the underlying C-compatible cast of the payload list. """ if kwargs.get('can_reference_contents', True): return self._to_ctypes_from_flat_data(self.data) return self.empty_ctypes(self.size) @classmethod def _to_ctypes_from_flat_data( cls, data, ): """ Direct conversion of 1d data to Engine integer. """ return (cls.c_type._type_*len(data))( *[ _EngineState._from_bool_to_int(x) for x in data ] ) def _to_ctypes_from_ndarray( self, **kwargs ): """ Convert to Engine int and then return as a pointer. """ if kwargs.get('can_reference_contents', True): return self._to_ctypes_from_flat_data(self.data.flatten(order='K')) return self.empty_ctypes(self.size) _to_py_ndarray = from_pointer class _EngineErrbuf(_ctypes.Array): """ NAG Engine error-state string. """ # pylint: disable=too-few-public-methods _length_ = 200 _type_ = _ctypes.c_char class _EngineRoutineName(_ctypes.Array): """ NAG Engine short routine name using during e.g. option listing. """ # pylint: disable=too-few-public-methods _length_ = 6 _type_ = _ctypes.c_char # ctypes function prototype for the Engine's explicitly C-interoperable # print handler. _C_PRINT_REC_FUNCTYPE = _EngineState._sys_functype( None, _BYREF_ENGINEDATA, _BYREF_INT, # Input, nout. _EngineCharArrayType.c_type, # Input, rec(len=1)[200]. _BYREF_INT, # Input, length_rec. _BYREF_INT, # Output, ifail. ) # ctypes args for the Engine's default print handler. _PRINT_REC_ARGS = ( _BYREF_ENGINEDATA, _BYREF_INT, # Input, nout. _BYREF_STRING, # Input, rec(len=*). _BYREF_INT, # Output, ifail. _EngineState._chrlen_ctype, # Input, hidden chrlen of rec. ) # ctypes function prototype for the Engine's default # print handler. _PRINT_REC_FUNCTYPE = _EngineState._sys_functype( None, *_PRINT_REC_ARGS ) _UNUSED_PRINT_REC = _cast_callback( _PRINT_REC_FUNCTYPE, None, {'fun_name': _MODULE_NAME} ) # ctypes function prototype for the Engine's default # print handler helper. _PRINT_RECH_FUNCTYPE = _EngineState._sys_functype( None, _PRINT_REC_FUNCTYPE, *_PRINT_REC_ARGS ) # ctypes args for the Engine's default read handler. _READ_REC_ARGS = ( _BYREF_ENGINEDATA, _BYREF_INT, # Input, nin. _BYREF_STRING, # Output, recin(len=*). _BYREF_STRING, # Output, errbuf(len=200). _BYREF_INT, # Output, ifail. _EngineState._chrlen_ctype, # Input, hidden chrlen of recin. _EngineState._chrlen_ctype, # Input, hidden chrlen of errbuf. ) # ctypes function prototype for the Engine's default # read handler. _READ_REC_FUNCTYPE = _EngineState._sys_functype( None, *_READ_REC_ARGS ) _UNUSED_READ_REC = _cast_callback( _READ_REC_FUNCTYPE, None, {'fun_name': _MODULE_NAME} ) # ctypes function prototype for the Engine's default # read handler helper. _READ_RECH_FUNCTYPE = _EngineState._sys_functype( None, _READ_REC_FUNCTYPE, *_READ_REC_ARGS ) # ctypes args for the Engine's default nonadvancing-read handler. _NONADV_READ_REC_ARGS = ( _BYREF_ENGINEDATA, _BYREF_INT, # Input, nin. _BYREF_STRING, # Output, recin(len=*). _BYREF_INT, # Output, readlen. _BYREF_INT, # Output, inform. _EngineState._chrlen_ctype, # Input, hidden chrlen of recin. ) # ctypes function prototype for the Engine's default # nonadvancing-read handler. _NONADV_READ_REC_FUNCTYPE = _EngineState._sys_functype( None, *_NONADV_READ_REC_ARGS ) _UNUSED_NONADV_READ_REC = _cast_callback( _NONADV_READ_REC_FUNCTYPE, None, {'fun_name': _MODULE_NAME} ) # ctypes function prototype for the Engine's default # nonadvancing-read handler helper. _NONADV_READ_RECH_FUNCTYPE = _EngineState._sys_functype( None, _NONADV_READ_REC_FUNCTYPE, *_NONADV_READ_REC_ARGS ) # Dummy callback prototype and definition for unreferenced # Engine callbacks. _UNUSED_FUNCTYPE = _EngineState._sys_functype(None) _UNUSED_NULL_CALLBACK = _cast_callback( _UNUSED_FUNCTYPE, None, {'fun_name': _MODULE_NAME} ) def _unused_callback(): # pragma: no cover return None _UNUSED_NONNULL_CALLBACK = _cast_callback( _UNUSED_FUNCTYPE, _unused_callback, {'fun_name': _MODULE_NAME} ) def _cast_to_unused_callback(user_cb): return ( _UNUSED_NULL_CALLBACK if user_cb is None else _UNUSED_NONNULL_CALLBACK )
[docs]class FileObjManager(object): # pylint: disable=line-too-long """ Class for translating between Python file objects and NAG Library Engine I/O units. NAG Engine I/O units are roughly synonymous with Unix file descriptors. File objects may be registered with I/O units using the instance initializer or the :meth:`register` method. Convenience methods :meth:`register_to_errunit` and :meth:`register_to_advunit` are supplied to associate file objects with the Engine error unit (analogous to standard error, stderr) and the Engine advisory unit (analogous to standard output, stdout), respectively. A Python file object is considered valid for use with this class if it has a ``fileno`` method returning its underlying file descriptor. In particular, you may use any concrete subclass of ``io.IOBase`` (including the ``file`` attribute of a file object returned by ``tempfile.NamedTemporaryFile``). Naturally, the object must also have ``read``/``readline`` and ``write`` methods. Resource management (opening/closing/deleting/... files) remains the reponsibility of the caller. In particular, objects to be used for output streams must have been opened for writing (mut. mut. for objects to be used for input) before their associated units are used by the Engine. To communicate I/O unit numbers to the NAG Engine the :meth:`unit_from_fileobj` method must be used. This maps a (registered) file object to a valid unit number. Writes requested from the Engine to a unit number that does not have a file object associated with it will fall back to being sent to ``sys.stdout``. By default written data is prefixed with the name of the calling NAG Python function. This behaviour can be disabled when constructing an instance by using the initializer's `locus_in_output` argument. Reads requested by the Engine from a unit number that does not have a file object associated with it will cause **NagOSError** to be raised. File objects may be removed from the class's registry of I/O units by calling the :meth:`deregister` method. The special nature of the Engine advisory and error units means that these have persistent file-object associations; they are not affected by a call to ``deregister``. The methods :meth:`deregister_errunit` and :meth:`deregister_advunit` are available to restore the associations with ``sys.stderr`` and ``sys.stdout``, respectively. A module-level instance of this class called ``GLOBAL_IO_MANAGER`` is supplied for convenience. This is queried as the fallback I/O manager whenever the Engine requires an I/O manager but one has not been explicitly supplied to the calling Python function. It is constructed using the initializer's default argument set: in particular, prefixing of output data with NAG Python function names is enabled. Attributes ---------- errunit : int The value denoting the Engine error unit. This attribute should be considered read only. By default `errunit` is associated with ``sys.stderr``. advunit : int The value denoting the Engine advisory unit. This attribute should be considered read only. By default `advunit` is associated with ``sys.stdout``. Parameters ---------- fileobjs : None or tuple, optional If not ``None``, the file objects to register. locus_in_output : bool, optional Controls whether output managed by this instance is prefixed with the name of the calling NAG Python function. Raises ------ NagTypeError `fileobjs` is neither ``None`` nor an instance of ``tuple``. NagOSError An element of the `fileobjs` ``tuple`` is not a valid file-object instance. Examples -------- To send monitoring from a (fictional) optimization function to a temporary file: >>> import os >>> import tempfile >>> t_fd, t_path = tempfile.mkstemp() >>> f1 = open(t_path, 'w') >>> io_m = FileObjManager((f1,)) >>> io_m_unit = io_m.unit_from_fileobj(f1) >>> OptimizerOptionSet("Monitoring File", io_m_unit) # doctest: +SKIP ... >>> Optimizer(..., io_manager=io_m) # doctest: +SKIP ... >>> f1.close() >>> os.close(t_fd) ... >>> os.unlink(t_path) To display iteration statistics from another (fictional) optimization function, in which the statistics are documented as being sent to the advisory unit, delegating handling to ``GLOBAL_IO_MANAGER``: >>> AnotherOptimizerOptionSet("Print Level = 1") # doctest: +SKIP # Don't supply a value for io_manager: >>> AnotherOptimizer(...) # doctest: +SKIP See Also -------- :func:`naginterfaces.library.examples.opt.handle_solve_bounds_foas_ex.main` : This example uses a ``FileObjManager``. """ # pylint: enable=line-too-long __slots__ = ['_locus_in_output', '_mappings'] _unit_shift = 10 _default_errunit = _EngineState._get_unit('err') errunit = _unit_shift - 2 _EngineState._set_unit('err', errunit) _default_advunit = _EngineState._get_unit('adv') advunit = _unit_shift - 1 _EngineState._set_unit('adv', advunit) _print_err = -10000 _read_err_unassoc = -20000 _read_err_nonascii = -20001 _read_err = -20002 _allowed_fo_insts = (_io.IOBase,) _allowed_fo_insts_str = 'io.IOBase' def __init__(self, fileobjs=None, locus_in_output=True): """ Register the supplied file objects with this manager instance. Parameters ---------- fileobjs : None or tuple, optional If not ``None``, the file objects to register. locus_in_output : bool, optional Controls whether output managed by this instance is prefixed with the name of the calling NAG Python function. Raises ------ NagTypeError `fileobjs` is neither ``None`` nor an instance of ``tuple``. NagOSError An element of the `fileobjs` ``tuple`` is not a valid file-object instance. """ self._locus_in_output = locus_in_output self._mappings = { self.errunit: _sys.stderr, self.advunit: _sys.stdout, } if fileobjs is None: return locus = { 'fun_name': '.'.join([_MODULE_NAME, 'FileObjManager.__init__']), 'entity': 'fileobjs' } self._check_valid_tuple(fileobjs, locus) for fileobj in fileobjs: self._check_valid_fo_and_fd(fileobj, locus) self.register(fileobjs) @classmethod def _check_type_compat(cls, data, locus): """ Raises NagTypeError if non-None `data` is not an instance of `cls`. """ if ( data is None or isinstance(data, cls) ): return raise NagTypeError( locus, _bad_instance_msg( ['.'.join([_MODULE_NAME, 'FileObjManager'])], allow_none=False, ) ) @staticmethod def _check_valid_fd(fileobj, locus): """ Raises NagOSError if `fileobj` does not use a file descriptor. """ try: fileobj.fileno() except OSError: has_fd = False else: has_fd = True if not has_fd: raise NagOSError( locus, 'must implement the fileno method' ) def _check_valid_fo_and_fd(self, fileobj, locus): """ Raises NagOSError if `fileobj` is not a valid file-object instance. Raises NagOSError if `fileobj` does not use a file descriptor. """ if not isinstance(fileobj, self._allowed_fo_insts): raise NagOSError( locus, 'must be an instance of ' + self._allowed_fo_insts_str ) self._check_valid_fd(fileobj, locus) @staticmethod def _check_valid_tuple(fileobjs, locus): """ Raises NagTypeError if argument `fileobjs` is not an instance of tuple. """ if not isinstance(fileobjs, tuple): raise NagTypeError( locus, 'must be an instance of tuple' )
[docs] def deregister(self, fileobjs): """ Deregister the supplied file objects from this manager instance. Parameters ---------- fileobjs : tuple The file objects to deregister. Raises ------ NagTypeError `fileobjs` is not an instance of ``tuple``. """ locus = { 'fun_name': '.'.join([_MODULE_NAME, 'FileObjManager.deregister']), 'entity': 'fileobjs' } self._check_valid_tuple(fileobjs, locus) for fileobj in fileobjs: unit_to_dereg = self._fileobj_to_unit( fileobj, locus, check_range=False ) self._deregister_unit(unit_to_dereg)
[docs] def deregister_advunit(self): """ Remove the current association for the Engine advisory unit and restore it to ``sys.stdout``. """ self._mappings[self.advunit] = _sys.stdout
[docs] def deregister_errunit(self): """ Remove the current association for the Engine error unit and restore it to ``sys.stderr``. """ self._mappings[self.errunit] = _sys.stderr
def _deregister_unit(self, unit): """ Deregister the supplied unit number. """ if unit not in self._mappings: return del self._mappings[unit] @classmethod def _fileno_to_unit(cls, fileno, check_range=True): """ Map the input file-descriptor number to a 'safe' (unreserved) unit number for the Engine. """ unitno = fileno + cls._unit_shift if check_range and unitno > 2147483647: raise NagException( # pragma: no cover {'fun_name': '_fileno_to_unit'}, 'unit number will be out of range. ' 'Consider closing other open files', ) return unitno def _fileobj_from_unit(self, unit): """ Return the file object registered with the input unit number, or None. """ return self._mappings.get(unit) def _fileobj_to_unit(self, fileobj, locus, check_range=True): """ Return the unit number for the input file object. `fileobj` does not need to have been registered. Raises NagOSError if argument `fileobj` is not a valid file-object instance. Raises NagOSError if argument `fileobj` does not use a file descriptor. """ self._check_valid_fo_and_fd(fileobj, locus) return self._fileno_to_unit(fileobj.fileno(), check_range=check_range)
[docs] def register(self, fileobjs): """ Register the supplied file objects with this manager instance. Parameters ---------- fileobjs : tuple The file objects to register. Raises ------ NagTypeError `fileobjs` is not an instance of ``tuple``. NagOSError An element of the `fileobjs` ``tuple`` is not a valid file-object instance. An element of the `fileobjs` ``tuple`` does not use a file descriptor. """ locus = { 'fun_name': '.'.join([_MODULE_NAME, 'FileObjManager.register']), 'entity': 'fileobjs' } self._check_valid_tuple(fileobjs, locus) for fileobj in fileobjs: unit_to_reg = self._fileobj_to_unit(fileobj, locus) self._register_to_unit(fileobj, unit_to_reg, locus)
[docs] def register_to_advunit(self, fileobj): """ Register the supplied file object with the NAG Library Engine advisory output unit (equivalent to stdout). Parameters ---------- fileobj : file object The file object to register. Raises ------ NagOSError `fileobj` is not a valid file-object instance. `fileobj` does not use a file descriptor. """ self._register_to_unit( fileobj, self.advunit, { 'fun_name': '.'.join( [_MODULE_NAME, 'FileObjManager.register_to_advunit'] ), 'entity': 'fileobj' }, )
[docs] def register_to_errunit(self, fileobj): """ Register the supplied file object with the NAG Library Engine error output unit (equivalent to stderr). Parameters ---------- fileobj : file object The file object to register. Raises ------ NagOSError `fileobj` is not a valid file-object instance. `fileobj` does not use a file descriptor. """ self._register_to_unit( fileobj, self.errunit, { 'fun_name': '.'.join( [_MODULE_NAME, 'FileObjManager.register_to_errunit'] ), 'entity': 'fileobj' }, )
def _register_to_unit(self, fileobj, unit, locus): """ Register the input file object with the input unit number. Raises NagOSError if argument `fileobj` is not a valid file-object instance. Raises NagOSError if argument `fileobj` does not use a file descriptor. """ self._check_valid_fo_and_fd(fileobj, locus) self._mappings[unit] = fileobj @classmethod def _set_default_units(cls): """ Set the default values for the advisory and error units. """ _EngineState._set_unit('err', cls._default_errunit) _EngineState._set_unit('adv', cls._default_advunit) @classmethod def _set_ni_units(cls): """ Set the modified values for the advisory and error units. """ _EngineState._set_unit('err', cls.errunit) _EngineState._set_unit('adv', cls.advunit)
[docs] def unit_from_fileobj(self, fileobj): """ Return a NAG Library Engine unit number for the input file object. Parameters ---------- fileobj : file object The file object to query. It is registered with this manager instance if it has not already been. Returns ------- int The NAG Engine unit number corresponding to `fileobj`. Raises ------ NagOSError `fileobj` is not a valid file-object instance. `fileobj` does not use a file descriptor. """ locus = { 'fun_name': '.'.join( [_MODULE_NAME, 'FileObjManager.unit_from_fileobj'] ), 'entity': 'fileobj' } unit = self._fileobj_to_unit(fileobj, locus) if unit not in self._mappings: self._register_to_unit(fileobj, unit, locus) return unit
#: The fallback I/O manager, used whenever the NAG Library Engine requires an #: I/O manager but one has not been explicitly supplied to the calling #: Python function. It has been constructed such that prefixing of #: output data with NAG Python function names is enabled. GLOBAL_IO_MANAGER = FileObjManager() def _py_print_rec( # pylint: disable=too-many-arguments,too-many-branches en_data, nout, rec, io_manager, excs, ifail, logger=None, fun_name=None ): """ Write the input string rec. If logger is None, write to Engine unit nout. Else write using logger.debug. Update the excs exception list with the tuple for any exception generated. Update ifail.value with the exit flag. If logger is None, the file object registered with nout is extracted from the input io_manager if this argument is not None, or from the global manager otherwise. If logger is None and no file object has been registered the object associated with the advisory unit is used instead. """ if io_manager is not None: this_io_manager = io_manager else: this_io_manager = GLOBAL_IO_MANAGER if logger is None: this_fo = this_io_manager._fileobj_from_unit(nout) if this_fo is None: this_fo = this_io_manager._fileobj_from_unit( this_io_manager.advunit ) writer = this_fo.write else: writer = logger.debug if ( logger is None and isinstance(this_fo, (_io.BufferedIOBase, _io.RawIOBase)) ): rec_to_write = rec if this_io_manager._locus_in_output: rec_to_write = b': '.join([fun_name.encode('ascii'), rec_to_write]) rec_to_write = rec_to_write.rstrip() + b'\n' elif ( logger is not None or not isinstance(this_fo, (_io.BufferedIOBase, _io.RawIOBase)) ): rec_to_write = _EngineCharScalarType.to_py(rec) if logger is None: if this_io_manager._locus_in_output: rec_to_write = ': '.join([fun_name, rec_to_write]) rec_to_write = rec_to_write.rstrip() + '\n' else: raise NagException( # pragma: no cover {'fun_name': '_py_print_rec'}, 'could not initialize record to write', ) try: writer(rec_to_write) except: # pylint: disable=bare-except _print_err( _sys.exc_info(), excs, en_data, ifail, ) else: ifail.value = 0 def _print_err(exc_info, excs, en_data, ifail): """ Trigger printing error. """ excs.append(exc_info) en_data.hlperr = FileObjManager._print_err ifail.value = 1 def _py_read_rec( # pylint: disable=too-many-arguments en_data, nin, rec, errbuf, rec_len, io_manager, excs, ifail ): """ Read a record from Engine unit nin. rec is a ctypes c_char * rec_len array and will be modified in place. errbuf is a ctypes c_char * 200 array and will be modified in place. Its contents are not referenced by the caller. Update the excs exception list with the tuple for any exception generated. Update ifail.value with the exit flag. The file object registered with nin is extracted from the input io_manager if this argument is not None, or from the global manager otherwise. If no file object has been registered with nin in a non-None local io_manager, the global manager will be queried instead. Raises NagOSError if no file object has been registered with nin. Raises NagValueError if the read record contains non-ASCII characters. """ locus = {'fun_name': '_py_read_rec'} errbuf_a = _ctypes.cast( errbuf, _ctypes.POINTER(_ctypes.c_char * 200) ).contents errbuf_a[:200] = b'\0' * 200 if io_manager is not None: this_io_manager = io_manager else: this_io_manager = GLOBAL_IO_MANAGER this_fo = this_io_manager._fileobj_from_unit(nin) if ( this_fo is None and io_manager is not None ): this_fo = GLOBAL_IO_MANAGER._fileobj_from_unit(nin) if this_fo is None: _read_err( ( None, NagOSError( locus, 'no file object registered with ' + 'NAG Library Engine I/O unit ' + str(nin) ) ), excs, FileObjManager._read_err_unassoc, en_data, ifail, ) return try: _rec = this_fo.readline() except: # pylint: disable=bare-except _read_err( _sys.exc_info(), excs, FileObjManager._read_err, en_data, ifail, ) return if not _rec: ifail.value = 1 return if not _is_ascii(_rec, locus): _read_err( ( None, NagValueError( locus, 'data read from NAG Library Engine I/O unit ' + str(nin) + ' must comprise only ASCII characters' ) ), excs, FileObjManager._read_err_nonascii, en_data, ifail, ) return if isinstance(_rec, str): _rec = _rec.encode('ascii') _rec = _rec.rstrip(b'\r\n') rec_a = _ctypes.cast( rec, _ctypes.POINTER(_ctypes.c_char * rec_len) ).contents minlen = min(rec_len, len(_rec)) rec_a[:minlen] = _rec[:minlen] rec_a[minlen:rec_len] = b' ' * (rec_len - minlen) ifail.value = 0 return def _read_err(exc_info, excs, err_val, en_data, ifail): """ Trigger (advancing) reading error. """ excs.append(exc_info) en_data.hlperr = err_val ifail.value = 1 def _py_nonadv_read_rec( # pylint: disable=too-many-arguments,too-many-statements en_data, nin, rec, rec_len, io_manager, excs, inform ): """ Read a record from Engine unit nin. Nonadvancing. rec is a ctypes c_char * rec_len array and will be modified in place. Update the excs exception list with the tuple for any exception generated. Update inform.value with the exit flag. Return no. chars read. The file object registered with nin is extracted from the input io_manager if this argument is not None, or from the global manager otherwise. If no file object has been registered with nin in a non-None local io_manager, the global manager will be queried instead. Raises NagOSError if no file object has been registered with nin. Raises NagValueError if the read record contains non-ASCII characters. """ locus = {'fun_name': '_py_nonadv_read_rec'} def read_chars(): """ Return a bytes string of length rec_len reading from this_fo as far as '\n'. """ the_string = [] if isinstance(this_fo, (_io.BufferedIOBase, _io.RawIOBase)): line_feed = b'\n' join_char = b'' else: line_feed = '\n' join_char = '' i = 0 while i < rec_len: one_char = this_fo.read(1) if not one_char: return b'', 2 if one_char == line_feed: break the_string.append(one_char) i += 1 strng = join_char.join(the_string) if not _is_ascii(strng, locus): return None, 3 if isinstance(this_fo, _io.TextIOBase): strng = strng.encode('ascii') strng = strng.rstrip(b'\r') if not strng: return strng, 1 strng = strng.ljust(rec_len) return strng, 0 if io_manager is not None: this_io_manager = io_manager else: this_io_manager = GLOBAL_IO_MANAGER this_fo = this_io_manager._fileobj_from_unit(nin) if ( this_fo is None and io_manager is not None ): this_fo = GLOBAL_IO_MANAGER._fileobj_from_unit(nin) if this_fo is None: return _nonadv_read_err( ( None, NagOSError( locus, 'no file object registered with ' + 'NAG Library Engine I/O unit ' + str(nin) ) ), excs, FileObjManager._read_err_unassoc, en_data, inform, ) try: _rec, inform.value = read_chars() except: # pylint: disable=bare-except return _nonadv_read_err( _sys.exc_info(), excs, FileObjManager._read_err, en_data, inform, ) if _rec is None: return _nonadv_read_err( ( None, NagValueError( locus, 'data read from NAG Library Engine I/O unit ' + str(nin) + ' must comprise only ASCII characters' ) ), excs, FileObjManager._read_err_nonascii, en_data, inform, ) readlen = len(_rec.rstrip()) if not _rec: return readlen rec_a = _ctypes.cast( rec, _ctypes.POINTER(_ctypes.c_char * rec_len) ).contents rec_a[:readlen] = _rec[:readlen] rec_a[readlen:rec_len] = b' ' * (rec_len - readlen) if readlen < rec_len: inform.value = 1 return readlen def _nonadv_read_err(exc_info, excs, err_val, en_data, inform): """ Trigger nonadvancing reading error. """ excs.append(exc_info) en_data.hlperr = err_val inform.value = 3 readlen = 0 return readlen def _handle_cb_exception(excs): """ Reraise an exception stored in the excs list. """ for exc in excs: if exc[0] is not UserCallbackTerminate: _raise_exc_info(exc) def _raise_exc_info(exc_info): """ (Re)raise a sys.exc_info-style exception. """ raise exc_info[1].with_traceback(exc_info[2]) def _violated_constraint_err_text(constraint): """ Standard text for a violated input-constraint error. """ return ' '.join( ['On entry, the following constraint was not satisfied:', constraint] ) _lapack_constraint_err_text = _violated_constraint_err_text # pylint: disable=invalid-name def _raise_warning(warn_class, msg_lines, locus): """ Raises warn_class with formatted `msg_lines`. """ _warnings.warn( warn_class( locus, _format_msg_lines(msg_lines), ), stacklevel=_get_user_stack_level(), ) _STR_FMT_STR = '{:s}' _STR_FMT_INT_AS_STR = '{:s}' _STR_FMT_INT = '{:d}' _STR_FMT_FLOAT_AS_STR = '{:s}' _STR_FMT_FLOAT = '{:22.15e}' def _withdrawal(fun_name, mkwdrn, advice=None): """ Decorate with a deprecation warning if called from outside the package. """ def decorate(func): # pylint: disable=missing-docstring @_functools.wraps(func) def wrapper(*args, **kwargs): # pylint: disable=missing-docstring caller_dirname = _os.path.dirname(_inspect.stack()[1][1]) caller_parent_dirbase = _os.path.split( _os.path.split(caller_dirname)[0] )[-1] if caller_parent_dirbase != _PKG_NAME: msg_lines = [] if mkwdrn is None: msg_lines.append( 'This function is deprecated.' ) warn_cls = NagDeprecatedWarning else: msg_lines.append( 'This function is scheduled for withdrawal, ' 'no earlier than Mark ' + mkwdrn + '.', ) warn_cls = NagWithdrawalWarning if advice is None: msg_lines.append( 'There is no replacement for this routine.' ) else: msg_lines.append( 'The following advice is given for making a ' 'replacement:' ) msg_lines.extend(advice) _warnings.warn( warn_cls( {'fun_name': fun_name}, '\n' + '\n'.join(msg_lines), ), stacklevel=2 ) result = func(*args, **kwargs) return result return wrapper return decorate @_withdrawal('.'.join([_MODULE_NAME, '_withdrawal_test']), '42') def _withdrawal_test(): """ Test of withdrawal decorator. """ @_withdrawal('.'.join([_MODULE_NAME, '_withdrawal_warn_test']), '42') def _withdrawal_warn_test(): """ Test of withdrawal decorator on a function that raises a warning. """ _EngineWarningExit( fun_name='_withdrawal_warn_test', msg_lines=['spam', 'eggs'], errcodes=(43, 44, 45), ).manage() @_withdrawal( '.'.join([_MODULE_NAME, '_withdrawal_test']), None, ['Replacement base advice.'] ) def _deprecation_test(): """ Test of withdrawal decorator. """ def _get_user_stack_level(): """ Return the frame number for the first non-naginterfaces stack frame. """ stack_frames = _inspect.stack() for frame_i, frame in enumerate(stack_frames): frame_dirname = _os.path.dirname(frame[1]) frame_parent_dirbase = _os.path.split( _os.path.split(frame_dirname)[0] )[-1] if frame_parent_dirbase != _PKG_NAME: return frame_i raise NagException( # pragma: no cover {'fun_name': '_get_user_stack_level'}, 'inconceivable stack' ) def _check_comm(comm, locus, req_keys=(), opt_keys=(), allow_none=False): """ Validate the comm blob. Return True if opt_keys was non-empty and none were present in comm. Raises NagTypeError if argument comm is not a dict. Raises NagKeyError if the dictionary keys in comm are incorrect for the communication structure it is supposed to represent. """ # pylint: disable=too-many-branches if allow_none and comm is None: return True if not isinstance(comm, dict): raise NagTypeError( locus, 'must be an instance of dict' ) if not comm: if req_keys: raise NagKeyError( locus, 'must not be empty. ' 'You must pass a communication structure that was returned by ' 'the initializer for this routine (which may be this routine ' 'itself)' ) if opt_keys: return True raise NagException( locus, 'communication structure seems totally redundant', ) for key in req_keys: if key not in comm.keys(): raise NagKeyError( locus, 'invalid communication structure for this function: ' + 'key \'' + key + '\' must be present. ' 'You must pass a communication structure that was returned by ' 'the initializer for this routine (which may be this routine ' 'itself)' ) if not opt_keys: if not req_keys: raise NagException( locus, 'communication structure seems totally redundant', ) return False n_opt_keys_present = len([key for key in opt_keys if key in comm.keys()]) if n_opt_keys_present == 0: return True if len(opt_keys) == n_opt_keys_present: return False for key in opt_keys: if key not in comm.keys(): raise NagKeyError( locus, 'invalid communication structure for this function: ' + 'key \'' + key + '\' must be present' ) raise NagException( # pragma: no cover locus, 'inconceivable exit from _check_comm', ) def _check_callable(cble, locus, allow_none=False): """ Validate the callable `cble`. Raises NagTypeError if argument `cble` is not callable, and not None if None is permitted. """ if ( callable(cble) or ( allow_none and cble is None ) ): return msg = 'must be ' if allow_none: msg += 'None or ' msg += 'callable' raise NagTypeError( locus, msg, ) def _d01mazn(handle): """ Query handle and extract problem dimensions. """ raise NotImplementedError def _e04ptan(handle): """ Query handle and extract problem dimensions. """ fun_name = '.'.join([_MODULE_NAME, '_e04ptan']) _handle = _EngineCptrScalarType( handle, {'fun_name': fun_name, 'entity': 'handle'}, ).to_ctypes() _nvar = _EngineIntScalarType.empty_ctypes() _nnzu = _EngineIntScalarType.empty_ctypes() _nnzuc = _EngineIntScalarType.empty_ctypes() _nnzua = _EngineIntScalarType.empty_ctypes() _ierr = _EngineIntScalarType.empty_ctypes() engine_ctypes = _EngineState._get_symbol('e04ptan') engine_ctypes.restype = None engine_ctypes.argtypes = ( _BYREF_HANDLE, # Input, handle. _BYREF_INT, # Output, nvar. _BYREF_INT, # Output, nnzu. _BYREF_INT, # Output, nnzuc. _BYREF_INT, # Output, nnzua. _BYREF_INT, # Output, ierr. ) engine_ctypes( _handle, _nvar, _nnzu, _nnzuc, _nnzua, _ierr, ) _ierr_py = _EngineIntScalarType.to_py(_ierr) if _ierr_py != _IERR_SUCCESS: _EngineErrorExit( fun_name=fun_name, msg_lines=[ 'handle does not belong to the NAG optimization modelling ' 'suite,', 'has not been initialized properly or is corrupted.', ], errcodes=(_ierr_py, _ierr_py, _ierr_py), ).manage() return _nvar, _nnzu, _nnzuc, _nnzua def _f08mdan(call_vendor, n): # pylint: disable=invalid-name """ Return the required size of Q and IQ in F08MDF. """ # pylint: disable=invalid-name fun_name = '.'.join([_MODULE_NAME, '_f08mdan']) _n = _EngineIntScalarType( n, {'fun_name': fun_name, 'entity': 'n'}, ).to_ctypes() _lq = _EngineIntScalarType.empty_ctypes() _liq = _EngineIntScalarType.empty_ctypes() engine_ctypes = _EngineState._get_symbol('f08mdan') engine_ctypes.restype = None engine_ctypes.argtypes = ( _BYREF_LOGICAL, # Input, call_vendor. _BYREF_INT, # Input, n. _BYREF_INT, # Output lq. _BYREF_INT, # Output liq. ) _call_vendor = _EngineLogicalScalarType( call_vendor, {'fun_name': fun_name, 'entity': 'call_vendor'}, ).to_ctypes() engine_ctypes( _call_vendor, _n, _lq, _liq, ) return (_lq, _liq) def _f12jzyn(handle, irevcm, m0_in): """ Query handle and extract problem dimensions. """ fun_name = '.'.join([_MODULE_NAME, '_f12jzyn']) _handle = _EngineCptrScalarType( handle, {'fun_name': fun_name, 'entity': 'handle'}, ).to_ctypes() _irevcm = _EngineIntScalarType( irevcm, {'fun_name': fun_name, 'entity': 'irevcm'}, ).to_ctypes() _m0_in = _EngineIntScalarType( m0_in, {'fun_name': fun_name, 'entity': 'm0_in'}, ).to_ctypes() _m0_out = _EngineIntScalarType.empty_ctypes() engine_ctypes = _EngineState._get_symbol('f12jzyn') engine_ctypes.restype = None engine_ctypes.argtypes = ( _BYREF_HANDLE, # Input, handle. _BYREF_INT, # Input, irevcm. _BYREF_INT, # Input, m0_in. _BYREF_INT, # Output, m0_out. ) engine_ctypes( _handle, _irevcm, _m0_in, _m0_out, ) return _m0_out def _g02zkbn(init, routine_name, ip, ntau, comm, statecomm): # pylint: disable=invalid-name,too-many-arguments,too-many-locals """ Extract size of optional arrays from comm. """ fun_name = '.'.join([_MODULE_NAME, '_g02zkbn']) engine_ctypes = _EngineState._get_symbol('g02zkbn') engine_ctypes.restype = None engine_ctypes.argtypes = ( _BYREF_INT, # Input, init. _BYREF_STRING, # Input, routine_name(len=6). _BYREF_INT, # Input, ip. _BYREF_INT, # Input, ntau. _EngineIntArrayType.c_type, # Input, iopts[*]. _EngineIntArrayType.c_type, # Input, state[*]. _BYREF_INT, # Output, sdres. _BYREF_INT, # Output, rip. _BYREF_INT, # Output, tdch. _BYREF_INT, # Output, liopts. _BYREF_INT, # Output, lopts. _BYREF_INT, # Output, lstate. _BYREF_INT, # Output, calcib. _EngineState._chrlen_ctype, # Input, hidden chrlen of routine_name. ) _init = _EngineIntScalarType( init, {'fun_name': fun_name, 'entity': 'init'}, ).to_ctypes() _init_py = _EngineIntScalarType.to_py(_init) if _init_py == 0: _routine_name = _EngineCharScalarType( routine_name, {'fun_name': fun_name, 'entity': 'routine_name'}, exp_chrlen=6, ).to_ctypes() else: _routine_name = _EngineRoutineName(*b'unknow') _ip = _EngineIntScalarType( ip, {'fun_name': fun_name, 'entity': 'ip'}, ).to_ctypes() _ntau = _EngineIntScalarType( ntau, {'fun_name': fun_name, 'entity': 'ntau'}, ).to_ctypes() if comm is None: _iopts = None else: _iopts = comm['iopts'] if statecomm is None: _state = _EngineIntArrayType.empty_ctypes(1) else: _state = statecomm['state'] _lstate_py = _EngineIntArrayType.get_shape( _state, {'fun_name': fun_name, 'entity': '_state'}, exp_rank=1, allow_none=True, validate=False, )[0] _sdres = _EngineIntScalarType.empty_ctypes() _rip = _EngineIntScalarType.empty_ctypes() _tdch = _EngineIntScalarType.empty_ctypes() _liopts = _EngineIntScalarType.empty_ctypes() _lopts = _EngineIntScalarType.empty_ctypes() _lstate = _EngineIntScalarType.empty_ctypes() _calcib = _EngineIntScalarType.empty_ctypes() engine_ctypes( _init, _routine_name, _ip, _ntau, _iopts, _state, _sdres, _rip, _tdch, _liopts, _lopts, _lstate, _calcib, _routine_name._length_, ) return ( _sdres, _rip, _tdch, _liopts, _lopts, _lstate, _calcib ) #: A named constant for use with certain functions in the ``blast`` module. NAG_ONE_NORM = 171 #: A named constant for use with certain functions in the ``blast`` module. NAG_TWO_NORM = 173 #: A named constant for use with certain functions in the ``blast`` module. NAG_FROBENIUS_NORM = 174 #: A named constant for use with certain functions in the ``blast`` module. NAG_INF_NORM = 175 #: A named constant for use with certain functions in the ``blast`` module. NAG_MAX_NORM = 177