"""
Base utilities for the `naginterfaces` package.
"""
# NAG Copyright 2017-2021.
# pylint: disable=protected-access,too-many-lines
from __future__ import absolute_import
import array as _array
import collections as _collections
import configparser as _configparser
import ctypes as _ctypes
import functools as _functools
import inspect as _inspect
import io as _io
import logging as _logging
import os as _os
import socket as _socket
import struct as _struct
import subprocess as _subprocess
import sys as _sys
import warnings as _warnings
import numpy as _np
from ._nag_engine_data_consts import (
NAG_ED_COL_MAJOR as _NAG_ED_COL_MAJOR,
NAG_ED_ROW_MAJOR as _NAG_ED_ROW_MAJOR,
NAG_ED_YES as _NAG_ED_YES,
)
from ._nag_p_consts import ( # pylint: disable=unused-import
IERR_ALLOC_FAIL as _IERR_ALLOC_FAIL,
IERR_HELPER as _IERR_HELPER,
IERR_NO_LIC as _IERR_NO_LIC,
IERR_SUCCESS as _IERR_SUCCESS,
IERR_UNEXPECTED as _IERR_UNEXPECTED,
)
from .. import _THE_SYSTEM, ENGINE_LIBDIR as _ENGINE_LIBDIR, __version__
_PKG_NAME = 'naginterfaces'
_MODULE_NAME = '.'.join([_PKG_NAME, 'base.utils'])
def _join_sequence(
sequence,
separator=', ',
last_separator=' or ',
):
"""
Return a string joining the input sequence using separator,
with last_separator between the final pair of elements.
"""
if len(sequence) <= 1:
return separator.join(sequence)
if len(sequence) == 2:
return last_separator.join(sequence)
return last_separator.join([
separator.join(sequence[:-1]),
sequence[-1]
])
def _bad_instance_msg(
exp_insts,
allow_none,
):
"""
Return a string for when a data-type payload was not any instance
from the exp_insts list.
"""
return ' '.join([
'must be ' + ('' if not allow_none else 'None or ') +
'an instance of',
_join_sequence(exp_insts)
])
def _bad_rank_msg(
exp_rank,
actual_rank,
):
"""
Return a string for when a data-type payload was not of the expected
rank.
"""
return ' '.join([
'has rank',
str(actual_rank) + ',',
'but must have rank',
str(exp_rank),
])
def _bad_shape_msg(
exp_shape,
actual_shape,
):
"""
Return a string for when a data-type payload was not of the expected
shape
"""
if len(exp_shape) == 1:
if len(actual_shape) == 1:
return ' '.join([
'has length',
str(actual_shape[0]) + ',',
'but must have length',
str(exp_shape[0])
])
return ' '.join([
'has shape',
str(actual_shape) + ',',
'but must be rank 1 with length',
str(exp_shape[0])
])
str_exp_shape = '(' + ', '.join([
':' if extent is None else str(extent) for extent in exp_shape
]) + ')'
return ' '.join([
'has shape',
str(actual_shape) + ',',
'but must have shape',
str_exp_shape,
])
[docs]class NagException(Exception):
"""
Base class for errors from this package.
Attributes
----------
errno : int or None
When a call to the underlying NAG Library Engine flags an error or
warning, this attribute is set to an integer identifier for the exit
case. The value may be used to cross reference the failure in the
corresponding NAG Library document for the routine in question.
The ``__str__`` representation of an exception having non-``None``
``errno`` will display a substring of the form ``code x[:y[,z]]``
in which the ``x`` integer is the same as the ``errno`` value (and
where the ``y`` and ``z`` values, which may be absent, only have
internal significance to NAG).
return_data : collections.namedtuple (nag_return_data) or None
When a call to the underlying NAG Engine flags a non-fatal warning
and you have configured the Python ``warnings`` module to escalate
NAG warnings to exceptions, this attribute is populated with the
return arguments for the routine in question (in argument order).
Thus after catching the warning you may access the routine's return
data via this attribute if desired. The tuple subclass - i.e., the
type of this attribute - is named ``nag_return_data``.
Notes
-----
Concrete subclasses ``NagLicenceError`` and ``NagMemoryError`` are
potential exceptions that may be raised by any function in the
`naginterfaces` package.
"""
_contact_nag = True
_contact_nag_msg = 'Please contact NAG.'
def __init__(
self,
locus,
msg,
):
"""
Store the exception locus dict and exception message.
"""
Exception.__init__(self, locus, msg)
self.locus = locus
self.msg = msg
self._errcodes = (
self.locus['errcodes'] if 'errcodes' in self.locus
else None
)
self.errno = self._errcodes[0] if self._errcodes is not None else None
self.return_data = self.locus.get('return_data')
def __str__(
self,
):
err_str = []
l_t_s = self._locus_to_string()
if l_t_s:
err_str.append(l_t_s)
if self.msg and self.msg[0] != '\n':
err_str.append(' ')
err_str.append(self.msg)
if self._contact_nag:
if self.msg[-1] != '.':
err_str[-1] += '.'
err_str.extend([
'\n',
'** ' + self._contact_nag_msg
])
return ''.join(err_str)
def _locus_to_string(
self
):
"""
Return a string formatting the current locus for printing.
"""
msg = []
if 'fun_name' in self.locus:
msg.append('NAG Python function ' + self.locus['fun_name'])
if 'returned' in self.locus:
msg.append('returned ' + self.locus['returned'])
elif 'entity' in self.locus:
msg.append('argument ' + self.locus['entity'])
if 'element' in self.locus:
msg.append('element ' + str(self.locus['element']))
if 'errcodes' in self.locus:
codes_str = 'code ' + str(self.errno)
if len(self.locus['errcodes']) > 1:
codes_str += ':' + ','.join(
[str(l) for l in self.locus['errcodes'][1:]]
)
msg.append(codes_str)
if msg:
return '(' + ', '.join(msg) + ')'
return ''
[docs]class NagWarning(NagException, Warning):
# pylint: disable=line-too-long
"""
Base class for warnings from NAG wrappers.
As a subclass of the Python ``Warning`` class the behaviour of
a ``NagWarning`` can be controlled using the methods in the core
``warnings`` module.
Examples
--------
To escalate occurrences of ``NagAlgorithmicWarning`` to exceptions
and to never show occurrences of ``NagWithdrawalWarning`` (which
would not be recommended in practice):
>>> import warnings
>>> from naginterfaces.base import utils
>>> warnings.simplefilter('error', utils.NagAlgorithmicWarning)
>>> warnings.simplefilter('ignore', utils.NagWithdrawalWarning)
Any promoted ``NagAlgorithmicWarning`` exits can then be caught and
their ``errno`` and ``return_data`` attributes accessed as
required:
>>> try: # doctest: +SKIP
... some_nag_call(...)
... except NagAlgorithmicWarning as exc:
... if exc.errno == xyz:
... look at exc.return_data
See Also
--------
:func:`naginterfaces.library.examples.glopt.nlp_multistart_sqp_lsq_ex.main` :
This example catches a ``NagAlgorithmicWarning`` and retrieves
some of its ``return_data``.
:func:`naginterfaces.library.examples.roots.sys_func_rcomm_ex.main` :
This example promotes instances of ``NagAlgorithmicWarning`` to
exceptions.
"""
# pylint: enable=line-too-long
_contact_nag = False
[docs]class NagAlgorithmicWarning(NagWarning):
"""
Concrete class for warnings from NAG wrappers that were identified
by the NAG Library Engine.
"""
_contact_nag = False
[docs]class NagAlgorithmicMajorWarning(NagAlgorithmicWarning):
"""
Concrete class for major warnings from NAG wrappers that were identified
by the NAG Library Engine.
"""
_contact_nag = False
[docs]class NagAttributeError(NagException, AttributeError):
"""
Concrete class for attribute errors from NAG wrappers.
"""
_contact_nag = False
[docs]class NagCallbackTerminateWarning(NagAlgorithmicWarning):
"""
Concrete class for warnings when early termination from a
callback was triggered by the user.
Attributes
----------
cb_excs : list of UserCallbackTerminate
The original exception(s) raised by the user.
"""
_contact_nag = False
def __init__(
self,
locus,
msg,
):
"""
Extract the original exception from the locus.
"""
NagAlgorithmicWarning.__init__(self, locus, msg)
self.cb_excs = self.locus['cb_excs']
[docs]class NagDeprecatedWarning(NagWarning):
"""
Concrete class for warnings about NAG wrappers that are
deprecated.
This category is not suppressed by default.
"""
_contact_nag = False
[docs]class NagIndexError(NagException, IndexError):
"""
Concrete class for index errors from NAG wrappers.
"""
_contact_nag = False
[docs]class NagKeyError(NagException, KeyError):
"""
Concrete class for key errors from NAG wrappers.
"""
_contact_nag = False
[docs]class NagLicenceError(NagException):
"""
Concrete class for licensing errors from NAG wrappers.
"""
[docs]class NagMemoryError(NagException, MemoryError):
"""
Concrete class for allocation errors from NAG wrappers.
"""
_contact_nag = False
[docs]class NagNotImplementedError(NagException, NotImplementedError):
"""
Concrete class for not-implemented errors from NAG wrappers.
"""
_contact_nag = True
[docs]class NagOSError(NagException, OSError):
"""
Concrete class for OS errors from NAG wrappers.
"""
_contact_nag = False
[docs]class NagOverflowError(NagException, OverflowError):
"""
Concrete class for overflow errors from NAG wrappers.
"""
_contact_nag = False
[docs]class NagTypeError(NagException, TypeError):
"""
Concrete class for type errors from NAG wrappers.
"""
_contact_nag = False
[docs]class NagValueError(NagException, ValueError):
"""
Concrete class for value errors from NAG wrappers.
"""
_contact_nag = False
[docs]class NagShapeError(NagValueError):
"""
Concrete class for shape errors from NAG wrappers.
"""
[docs]class NagWithdrawalWarning(NagDeprecatedWarning):
"""
Concrete class for warnings about NAG wrappers that are
scheduled for withdrawal.
This category is not suppressed by default.
"""
[docs]class UserCallbackTerminate(Exception):
"""
Concrete class for user-raised callback termination requests.
"""
_IS_IL32 = (_ctypes.c_int is _ctypes.c_long)
# ctypes integers for this platform:
_SYS_CTYPES_INTS = {
32: (_ctypes.c_long if _IS_IL32 else _ctypes.c_int),
64: (_ctypes.c_longlong if _IS_IL32 else _ctypes.c_long)
}
def _get_sys_data_model():
"""
Return a string for the data model used by the Python
interpreter on this platform.
"""
if _struct.calcsize('P') == 4: # pragma: no cover
return 'ILP32'
if _THE_SYSTEM == 'Windows':
return 'IL32P64'
return 'I32LP64'
class _EngineData(_ctypes.Structure):
"""
State structure for the NAG Engine.
"""
# pylint: disable=too-few-public-methods
_fields_ = [
('cpuser', _ctypes.c_void_p),
('hlperr', _SYS_CTYPES_INTS[32]),
('ad_handle', _ctypes.c_void_p),
('storage_order', _SYS_CTYPES_INTS[32]),
('reorder_packed', _SYS_CTYPES_INTS[32]),
('allocate_workspace', _SYS_CTYPES_INTS[32]),
('wrapptr1', _ctypes.c_void_p),
('wrapptr2', _ctypes.c_void_p),
('y90_print_rec', _ctypes.c_void_p),
('num_threads', _SYS_CTYPES_INTS[32]),
('thread_num', _SYS_CTYPES_INTS[32]),
('hlperr_thread_num', _SYS_CTYPES_INTS[32]),
]
default_sorder = 'C'
default_spiked_sorder = 'F'
def __init__(
self,
sorder=default_sorder,
py_c_print_rec=None,
):
"""
Return an initialized _EngineData structure.
"""
_ctypes.Structure.__init__(self)
y90haan_ctypes = _EngineState._get_symbol('y90haan')
y90haan_ctypes.restype = None
y90haan_ctypes.argtypes = (
_BYREF_ENGINEDATA,
)
y90haan_ctypes(self)
self._set_sorder(sorder)
self.allocate_workspace = _NAG_ED_YES
self._set_print_rec(py_c_print_rec)
def _set_print_rec(
self,
py_c_print_rec,
):
"""
Update the y90_print_rec field.
"""
if (
py_c_print_rec is None or
_EngineState._debug_level == _EngineState._debug_off
):
return
c_p_r = _C_PRINT_REC_FUNCTYPE(py_c_print_rec)
self.y90_print_rec = _ctypes.cast(c_p_r, _ctypes.c_void_p)
def _set_sorder(
self,
sorder,
):
"""
Update the storage-order field.
"""
if sorder == 'F':
self.storage_order = _NAG_ED_COL_MAJOR
else:
self.storage_order = _NAG_ED_ROW_MAJOR
_BYREF_ENGINEDATA = _ctypes.POINTER(_EngineData)
def _cast_callback(
proto,
callback,
locus,
):
"""
Return a ctypes FUNCTYPE for the input callback.
"""
if callback is None:
return _ctypes.cast(callback, proto)
try:
return proto(callback)
except TypeError:
pass
raise NagException( # pragma: no cover
locus,
'unexpected failure casting to C function'
)
def _get_symbol_from_lib_handle(
libh,
symbol_name,
):
"""
Utility for loading symbol_name from library handle libh.
"""
return getattr(libh, _mangle_sym_name(symbol_name))
def _mangle_sym_name(
symbol_name,
):
"""
Mangle symbol_name into a valid shared-library symbol name on this
platform.
"""
if (
(
_THE_SYSTEM == 'Windows' or _THE_SYSTEM.startswith('CYGWIN')
) and
_ENGINE_DATA_MODEL != 'ILP32'
):
return symbol_name.strip('_').upper()
return symbol_name.strip('_').lower() + '_'
_SPEC_CFG_FILE = _os.path.join(
_os.path.abspath(_ENGINE_LIBDIR),
'lib_spec.cfg',
)
if not _os.path.isfile(_SPEC_CFG_FILE):
raise NagException(
{},
'missing ' + _SPEC_CFG_FILE,
)
_CF = _configparser.ConfigParser()
_CF.read(_SPEC_CFG_FILE)
_ENGINE_COMPILERS_DICT = {}
for _n, _v in _CF.items('compilers'):
if _n in ['fortran', 'c', 'c++']:
_dict_n = _n[0].upper() + _n[1:]
else: # pragma: no cover
_dict_n = _n
_ENGINE_COMPILERS_DICT[_dict_n] = _v
_ENGINE_DATA_MODEL = _CF.get('datamodel', 'model')
_ENGINE_INT_BITS = 64 if _ENGINE_DATA_MODEL == 'ILP64' else 32
_ENGINE_BUILD_ID_STR = (
_CF.get('buildid', 'idnum') if (
_CF.has_section('buildid') and
_CF.has_option('buildid', 'idnum')
)
else None
)
_ENGINE_PCODE = (
_CF.get('productcode', 'pcode') if (
_CF.has_section('productcode') and
_CF.has_option('productcode', 'pcode')
)
else None
)
_ENGINE_LIB_NAME_FROM_SPEC = (
_CF.get('meta', 'lib_name') if (
_CF.has_section('meta') and _CF.has_option('meta', 'lib_name')
)
else None
)
if _ENGINE_PCODE is None: # pragma: no cover
_ENGINE_PCODE = _ENGINE_LIB_NAME_FROM_SPEC
if _ENGINE_PCODE is None: # pragma: no cover
raise NagException(
{},
'missing productcode and lib_name in ' + _SPEC_CFG_FILE,
)
def _get_sys_shlib_ext():
"""
Return a string for the file extension of shared libraries on this
platform.
"""
if _THE_SYSTEM == 'Darwin':
return '.dylib'
if _THE_SYSTEM == 'Linux':
return '.so'
if _THE_SYSTEM == 'Windows' or _THE_SYSTEM.startswith('CYGWIN'):
return '.dll'
raise NagException( # pragma: no cover
{'fun_name': '_get_sys_shlib_ext'},
'inconceivable platform'
)
def _is_ascii(
data,
locus,
):
"""
Python 3+ check for data being an ASCII string.
"""
if isinstance(data, str):
try:
data.encode('ascii')
except UnicodeError:
return False
else:
return True
elif isinstance(data, bytes):
try:
data.decode('ascii')
except UnicodeError:
return False
else:
return True
else:
raise NagException( # pragma: no cover
locus,
'inconceivable string class'
)
class _EngineExit(object):
"""
Base class for categorization of error exits from the NAG Engine.
"""
# pylint: disable=too-few-public-methods
__slots__ = ['fun_name', 'entity', 'msg_lines', 'errcodes', 'return_data']
def __init__(
self,
fun_name=None,
entity=None,
msg_lines=None,
errcodes=None,
return_data=None,
):
# pylint: disable=too-many-arguments
self.fun_name = fun_name
self.entity = entity
self.msg_lines = msg_lines
self.errcodes = errcodes
self.return_data = return_data
def manage(self):
"""
Take the appropriate action for this exit category.
"""
raise NotImplementedError # pragma: no cover
def _set_locus(self):
"""
Construct the exception locus.
"""
locus = {}
if self.fun_name is not None:
locus['fun_name'] = self.fun_name
if self.errcodes is not None:
locus['errcodes'] = self.errcodes
if self.entity is not None:
locus['entity'] = self.entity
if self.return_data is not None:
locus['return_data'] = self.return_data
return locus
class _EngineNoErrorExit(_EngineExit):
"""
Concrete class for successful exits from the NAG Engine.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
def manage(self):
return
class _EngineWarningExit(_EngineExit):
"""
Concrete class for warning exits from the NAG Engine.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
def manage(self):
_raise_warning(
NagAlgorithmicWarning, self.msg_lines, self._set_locus(),
)
class _EngineMajorWarningExit(_EngineWarningExit):
"""
Concrete class for major warning exits from the NAG Engine.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
def manage(self):
_raise_warning(
NagAlgorithmicMajorWarning, self.msg_lines, self._set_locus(),
)
class _EngineErrorExit(_EngineExit):
"""
Concrete class for routine-specific error exits from the NAG Engine.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
def manage(self):
raise NagValueError(
self._set_locus(), _format_msg_lines(self.msg_lines)
)
class _EngineAllocationErrorExit(_EngineExit):
"""
Concrete class for memory-allocation error exits from the NAG Engine.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
def manage(self):
locus = self._set_locus()
locus['errcodes'] = (_IERR_ALLOC_FAIL,)
raise NagMemoryError(locus, _format_msg_lines(
[
'Dynamic memory allocation failed.',
]
))
class _UserCallbackExit(_EngineWarningExit):
"""
Concrete class for early termination exits from callbacks.
"""
# pylint: disable=too-few-public-methods
__slots__ = ['cb_excs']
def __init__(
self,
fun_name=None,
entity=None,
msg_lines=None,
errcodes=None,
return_data=None,
cb_excs=None,
):
# pylint: disable=too-many-arguments
_EngineWarningExit.__init__(
self, fun_name, entity, msg_lines, errcodes, return_data,
)
self.cb_excs = cb_excs
def manage(self):
locus = self._set_locus()
locus['cb_excs'] = self.cb_excs
_raise_warning(
NagCallbackTerminateWarning, self.msg_lines, locus,
)
def _resolve_kpath(k_util):
"""
Return the absolute path of `k_util`.
"""
k_util_name = k_util
if _THE_SYSTEM == 'Windows' or _THE_SYSTEM.startswith('CYGWIN'):
k_util_name += '.exe'
k_util_path = _os.path.join(
_os.path.abspath(_ENGINE_LIBDIR),
k_util_name
)
if not _os.path.isfile(k_util_path):
raise NagException( # pragma: no cover
{},
'Missing utility ' + k_util_path
)
return k_util_path
def _capture_hostid():
"""
Return the output of ``khostid``.
"""
try:
return _EngineCharScalarType.to_py(
_subprocess.check_output(_resolve_kpath('khostid'))
)
except _subprocess.CalledProcessError:
print(
"khostid returned non zero return code. "
"Most likely no valid ethernet interface is present."
)
return "NO_VALID_HOST_ID"
def _capture_lcheck(pcode, quiet=False):
"""
Return the output of ``klcheck`` with some postprocessing.
"""
if not pcode.endswith('L'):
return (
'Licence available; this product is not licence managed'
)
cmd_list = [_resolve_kpath('klcheck')]
if quiet:
cmd_list.append('-quiet')
cmd_list.append(pcode)
cmd_output = _subprocess.check_output(cmd_list)
try:
lcheck_decode = cmd_output.decode(encoding='utf-8')
except UnicodeDecodeError:
lcheck_decode = cmd_output.decode(encoding='latin-1')
lcheck_txt = lcheck_decode.rstrip().splitlines()
for lcheck_i, lcheck_line in enumerate(lcheck_txt):
if lcheck_line.startswith('Licence available'):
lcheck_txt[lcheck_i] = (
'Licence available; the required ' + pcode + ' licence key '
'for this product is valid'
)
elif lcheck_line in [
'Error: Licence file not found',
'Error: No licence found for this product'
]:
lcheck_txt[lcheck_i] = (
'Error: Licence not found; this product requires a key for ' +
pcode
)
return '\n'.join(lcheck_txt)
def _lcheck_lines(pcode, quiet=False):
"""
Return a list of lines forming a licensing message.
"""
l_line = _capture_lcheck(pcode, quiet=quiet)
if 'Licence available' in l_line:
return [l_line]
lc_lines = [
'The NAG Library for Python on this platform uses',
'underlying Library ' + pcode + '.',
'This Library has been installed as part of the package',
'and it requires a valid licence key.',
'No such key could be validated:',
'the key may not have been installed correctly or',
'it may have expired.',
'The Kusari licence-check utility reports the following:',
]
lc_lines.append(l_line)
lc_lines.append(
'The above information has been generated on machine ' +
_socket.gethostname()
)
lc_lines.extend([
'For information on how to obtain a licence, please see',
'https://www.nag.com/numeric/py/nagdoc_latest/'
'naginterfaces.kusari.html',
])
return lc_lines
class _EngineLicenceErrorExit(_EngineExit):
"""
Concrete class for licensing error exits from the NAG Engine.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
def manage(self):
from .info import impl_details # pylint:disable=bad-option-value,import-outside-toplevel
pcode = impl_details()['product_code']
locus = self._set_locus()
locus['errcodes'] = (_IERR_NO_LIC,)
msg_lines = _lcheck_lines(pcode)
msg_lines.extend([
_capture_hostid(),
'PRODUCT CODE = ' + pcode,
])
raise NagLicenceError(locus, _format_msg_lines(msg_lines))
class _EngineUnexpectedExit(_EngineExit):
"""
Concrete class for unexpected exits from the NAG Engine.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
def manage(self):
locus = self._set_locus()
new_errcodes = [_IERR_UNEXPECTED]
if len(locus['errcodes']) > 1:
new_errcodes.extend(locus['errcodes'][1:])
locus['errcodes'] = tuple(new_errcodes)
raise NagException(locus, _format_msg_lines(['Unexpected error.']))
def _format_msg_lines(msg_lines):
"""
Formats msg_lines into a single string for printing as an
error/warning message.
"""
return '\n' + '\n'.join(
['** ' + line for line in msg_lines]
)
class _EngineState(object):
"""
Class for the data state of the underlying NAG Engine on this platform.
Raises
------
NagOSError if running using 32-bit Python with a 64-bit Engine or vice
versa.
NagOSError if the underlying Engine is 32-bit and the system is not
Windows.
NagTypeError upon instantiation (only the single class is permitted).
"""
# pylint: disable=too-few-public-methods
_compilers = _ENGINE_COMPILERS_DICT
_sys_functype = staticmethod(_ctypes.CFUNCTYPE)
if (
_THE_SYSTEM != 'Windows' and
not _THE_SYSTEM.startswith('CYGWIN') and
_ENGINE_DATA_MODEL == 'ILP32'
): # pragma: no cover
raise NagOSError(
{},
'this product does not support use with a 32-bit NAG library '
'on this system.'
)
if (
_ENGINE_DATA_MODEL[-2:] != _get_sys_data_model()[-2:]
): # pragma: no cover
raise NagOSError(
{},
'the Python interpreter is ' + _get_sys_data_model()[-2:] + '-bit '
'but the underlying NAG library is ' + _ENGINE_DATA_MODEL[-2:] +
'-bit. Please run using a matching pair.'
)
_int_model = {
'bits': _ENGINE_INT_BITS,
'ctypes': _SYS_CTYPES_INTS[_ENGINE_INT_BITS],
'numpy': getattr(_np, 'int' + str(_ENGINE_INT_BITS))
}
if _compilers['Fortran'] == 'Intel' and _ENGINE_DATA_MODEL != 'ILP32':
_chrlen_ctype = _SYS_CTYPES_INTS[64]
else:
_chrlen_ctype = _SYS_CTYPES_INTS[32]
if _compilers['Fortran'] == 'Intel':
_literal_true_int = -1
elif _compilers['Fortran'] == 'NAG':
_literal_true_int = 1
else:
raise NagException( # pragma: no cover
{},
'NYI for Fortran compiler'
)
_literal_true_cint = _int_model['ctypes'](_literal_true_int)
if (
_compilers['Fortran'] == 'Intel' or
(
_compilers['Fortran'] == 'NAG' and
(
_THE_SYSTEM == 'Windows' or
_THE_SYSTEM.startswith('CYGWIN')
) and
_ENGINE_DATA_MODEL != 'ILP32'
)
):
_complex_fun_rtn = 'as argument'
else:
_complex_fun_rtn = 'as retval'
_engine_lib_base = (
_ENGINE_LIB_NAME_FROM_SPEC if _ENGINE_LIB_NAME_FROM_SPEC is not None
else _ENGINE_PCODE
)
_engine_lib = _engine_lib_base + _get_sys_shlib_ext()
if _THE_SYSTEM == 'Windows' or _THE_SYSTEM.startswith('CYGWIN'):
_cwd = _os.getcwd()
if not _os.path.isdir(_ENGINE_LIBDIR): # pragma: no cover
_have_handle = False
msg = 'No such directory ' + _ENGINE_LIBDIR
else:
try:
dll_dirs = _os.add_dll_directory(_ENGINE_LIBDIR)
except AttributeError: # pragma: no cover
dll_dirs = None
try:
_ctypes.CDLL("libiomp5md")
except: # pylint: disable=bare-except
pass
_os.chdir(_ENGINE_LIBDIR)
try:
_engine_handle = _ctypes.CDLL(_engine_lib)
except OSError as exc:
_have_handle = False
msg = str(exc)
else:
_have_handle = True
_os.chdir(_cwd)
if dll_dirs is not None: # pragma: no cover
dll_dirs.close()
else:
try:
_engine_handle = _ctypes.CDLL(
_os.path.join(_ENGINE_LIBDIR, _engine_lib)
)
except OSError as exc:
_have_handle = False
msg = str(exc)
else:
_have_handle = True
if not _have_handle:
pretty_bits = ('32' if _struct.calcsize('P') == 4 else '64') + '-bit'
if _THE_SYSTEM == 'Windows' or _THE_SYSTEM.startswith('CYGWIN'):
pretty_sys = 'Windows'
elif _THE_SYSTEM == 'Darwin':
pretty_sys = 'Mac'
elif _THE_SYSTEM == 'Linux':
pretty_sys = 'Linux'
else:
raise NagException( # pragma: no cover
{},
'inconceivable system ' + _THE_SYSTEM,
)
diag_msg_lines = [
' '.join([
'The', pretty_bits, pretty_sys, _PKG_NAME, 'package',
'version', __version__
]),
'failed to obtain a handle to the NAG library ' +
_ENGINE_PCODE + '.',
'It is possible that incompatible Intel MKL or compiler runtimes '
'have been loaded from the environment.',
'Please read the compatibility notes at',
'https://www.nag.com/numeric/py/nagdoc_latest/readme.html'
'#software-stack-compatibility-notes',
msg,
]
raise NagException(
{},
_format_msg_lines(diag_msg_lines),
)
_errbuf_sep = b'\1'
(
_debug_off,
_debug_args_short,
_debug_args_full,
_debug_naninf_counts
) = range(4)
_debug_level = _debug_off
_break_ctypes_ptr_ref_cycles = False
_cast_pointer_to_ctypes_array = True
def __new__(
cls,
*args,
**kwargs
): # pylint: disable=unused-argument
raise NagTypeError(
{},
'instances of _EngineState class not permitted'
)
@classmethod
def _call_char_fun(
cls,
fun,
chrlen,
*args
):
"""
Return the result of calling the fixed-length character-valued
ctypes function fun with arguments args.
"""
result = _EngineCharScalarType.empty_ctypes(chrlen=chrlen)
fun(result, chrlen, *args)
return result
@classmethod
def _format_char_cfun_proto(
cls,
*args
):
"""
Return a ctypes FUNCTYPE for a callback function having arguments
args and returning fixed-length character data.
"""
return _EngineState._sys_functype(
None,
*cls._format_char_fun_args(
*args
)
)
@classmethod
def _format_char_fun_proto(
cls,
fun,
*args
):
"""
Update the ctypes prototype attributes for function fun having
arguments args, when fun returns fixed-length character data.
"""
fun.restype = None
fun.argtypes = cls._format_char_fun_args(*args)
@classmethod
def _format_char_fun_args(
cls,
*args
):
"""
Return an argument tuple for a function having arguments args and
returning fixed-length character data.
"""
argtypes = [_EngineCharScalarType.c_type, _EngineState._chrlen_ctype]
argtypes.extend(args)
return tuple(argtypes)
@classmethod
def _call_complex_fun(
cls,
fun,
*args
):
"""
Return the result of calling the complex-valued ctypes function fun
with arguments args.
"""
if cls._complex_fun_rtn == 'as argument':
result = fun.argtypes[0]._type_()
fun(result, *args)
else:
result = fun(*args)
return result
@classmethod
def _format_complex_cfun_proto(
cls,
fun_type,
*args
):
"""
Return a ctypes FUNCTYPE for a callback function having arguments
args and returning complex data of type fun_type.
"""
return _EngineState._sys_functype(
cls._format_complex_fun_rtn(fun_type),
*cls._format_complex_fun_args(
fun_type,
*args
)
)
@classmethod
def _format_complex_fun_args(
cls,
fun_type,
*args
):
"""
Return an argument tuple for a function having arguments args and
returning complex data of type fun_type.
"""
if cls._complex_fun_rtn == 'as argument':
argtypes = [_ctypes.POINTER(fun_type)]
argtypes.extend(args)
return tuple(argtypes)
return args
@classmethod
def _format_complex_fun_proto(
cls,
fun,
fun_type,
*args
):
"""
Update the ctypes prototype attributes for function fun having
arguments args, when fun returns complex data of type fun_type.
"""
fun.restype = cls._format_complex_fun_rtn(fun_type)
fun.argtypes = cls._format_complex_fun_args(fun_type, *args)
@classmethod
def _format_complex_fun_rtn(
cls,
fun_type,
):
"""
Return a ctypes function-result type for a function returning complex
data of type fun_type.
"""
return None if cls._complex_fun_rtn == 'as argument' else fun_type
@classmethod
def _from_bool_to_int(
cls,
bool_data,
):
"""
Return an Engine-compatible integer for the input boolean.
"""
return 0 if not bool_data else cls._literal_true_int
@classmethod
def _get_symbol(
cls,
symbol_name,
):
"""
Utility for loading symbol_name from the Engine handle.
"""
return _get_symbol_from_lib_handle(
cls._engine_handle,
symbol_name,
)
@classmethod
def _modify_unit(
cls,
modify_mode,
unit_name,
unit,
):
"""
Set or get the Engine unit_name from unit number unit (a ctypes int).
Raises NagException if modify_mode or unit_name are invalid.
"""
allowed_modes = ['set', 'get']
if modify_mode not in allowed_modes:
raise NagException( # pragma: no cover
{
'fun_name': '_EngineState._modify_unit',
'entity': 'modify_mode'
},
' '.join([
'must be',
_join_sequence(['"' + a_m + '"' for a_m in allowed_modes])
])
)
allowed_names = ['adv', 'err']
if unit_name not in allowed_names:
raise NagException( # pragma: no cover
{
'fun_name': '_EngineState._modify_unit',
'entity': 'unit_name'
},
' '.join([
'must be',
_join_sequence(['"' + a_n + '"' for a_n in allowed_names])
])
)
if unit_name == 'adv':
unit_manager_name = 'x04abft'
elif unit_name == 'err':
unit_manager_name = 'x04aaft'
else:
raise NagException( # pragma: no cover
{'fun_name': '_EngineState._modify_unit'},
'inconceivable unit name'
)
unit_manager = _get_symbol_from_lib_handle(
cls._engine_handle, unit_manager_name
)
unit_manager.restype = None
unit_manager.argtypes = (
_BYREF_ENGINEDATA,
_BYREF_INT, # Input, iflag.
_BYREF_INT, # Input/output, nerr/nadv resp.
)
_en_data = _EngineData()
if modify_mode == 'set':
_iflag = _EngineIntScalarType.c_type(1)
elif modify_mode == 'get':
_iflag = _EngineIntScalarType.c_type(0)
else:
raise NagException( # pragma: no cover
{'fun_name': '_EngineState._modify_unit'},
'inconceivable modify mode'
)
old_debug_level = cls._debug_level
cls._set_debug_level(cls._debug_off)
unit_manager(_en_data, _iflag, unit)
cls._set_debug_level(old_debug_level)
return _EngineIntScalarType.to_py(unit)
@classmethod
def _set_unit(
cls,
unit_name,
unit,
):
"""
Set the Engine unit_name to unit number unit.
"""
_unit = _EngineIntScalarType(
unit,
{'fun_name': '_EngineState._set_unit', 'entity': 'unit'}
).to_ctypes()
cls._modify_unit('set', unit_name, _unit)
@classmethod
def _get_unit(
cls,
unit_name,
):
"""
Return an int for the unit number of Engine unit unit_name.
"""
_unit = _EngineIntScalarType.empty_ctypes()
return cls._modify_unit('get', unit_name, _unit)
@classmethod
def _set_debug_level(
cls,
level,
):
"""
Set the current debug printing or NaN/infinity detection flag for
arguments in NAG computational and dummy callback routines.
When printing is enabled, each argument is printed on input and/or
output (depending on its input/output class), and if possible the
memory address of the argument is also shown.
Output is done using Python's `logging` module, to the logger named
'naginterfaces.base.<chapter>.<function>', at logging level
`DEBUG`.
The default is no printing and no NaN/infinity detection, which may be
explicitly requested by calling with `level` = `_debug_off`.
A call with `level` = `_debug_args_full` enables printing of
all arguments on entry and exit in subsequent calls of
procedures in the NAG Library Engine, and this includes full printing
of arrays (ignoring array increments).
A diagnostic message will be output if for any reason the extent of an
array is not known at run time.
A call with `level` = `_debug_args_short` requests elided array
printing (only the first and last elements being displayed).
A call with `level` = `_debug_naninf_counts` indicates that no
printing is to take place, but all floating-point scalar and array
values are examined and a count of NaN/infinity values maintained.
Thread-Safety Considerations
----------------------------
The debug printing level in the NAG Engine is managed by a global
variable, and setting the level using `_set_debug_level` requires
modifying this global variable.
Examples
--------
To send output from `naginterfaces.library.lapacklin.dgesv` (sic) to
file 'spam.txt':
>>> from logging import DEBUG, getLogger
...
>>> logger = getLogger('naginterfaces.base.lapacklin.dgesv')
>>> logger.setLevel(DEBUG)
>>> fh = FileHandler('spam.txt') # doctest: +SKIP
>>> logger.addHandler(fh) # doctest: +SKIP
"""
if level not in [
cls._debug_off,
cls._debug_args_short,
cls._debug_args_full,
cls._debug_naninf_counts,
]:
raise NagValueError(
{
'fun_name': '.'.join(
[_MODULE_NAME, '_EngineState._set_debug_level']
),
'entity': 'level'
},
'must be _debug_off, _debug_args_short, ' +
'_debug_args_full or _debug_naninf_counts'
)
y90gcft_ctypes = _get_symbol_from_lib_handle(
cls._engine_handle, 'y90gcft'
)
y90gcft_ctypes.restype = None
y90gcft_ctypes.argtypes = (
_ctypes.POINTER(cls._int_model['ctypes']), # Input, ival.
)
_level = _EngineIntScalarType(
level, {
'fun_name': '.'.join(
[_MODULE_NAME, '_EngineState._set_debug_level']
),
'entity': 'level',
},
).to_ctypes()
y90gcft_ctypes(_level)
cls._debug_level = level
@classmethod
def _handle_global_errors(
cls,
fun_name,
errbuf,
ifail,
):
"""
Process global (allocation, licence, ...) failures.
Return split errbuf data otherwise.
"""
if ifail == _IERR_ALLOC_FAIL:
_EngineAllocationErrorExit(fun_name=fun_name).manage()
elif ifail == _IERR_NO_LIC:
_EngineLicenceErrorExit(fun_name=fun_name).manage()
elif ifail == _IERR_SUCCESS:
return 0, None
eb_data = cls._split_errbuf(errbuf)
return int(eb_data[1]), eb_data[2:]
@classmethod
def _split_errbuf(
cls,
errbuf,
):
"""
Return a list of the split contents of the input errbuf c_char_Array.
"""
split_eb = [
_EngineCharScalarType.to_py(ed).lstrip() for ed in
errbuf[:].strip(b'\0 ' + cls._errbuf_sep).split(cls._errbuf_sep)
if ed
]
if len(split_eb) < 3:
raise NagException( # pragma: no cover
{'fun_name': '_split_errbuf'},
'inconceivable errbuf structure'
)
return split_eb
[docs]class EngineIntCType(_EngineState._int_model['ctypes']): # pylint: disable=too-few-public-methods
"""
Class for the ``ctypes`` integer type that is compatible
with the NAG Library Engine's integer type.
The base class is platform dependent.
Examples
--------
>>> from naginterfaces.base import utils
>>> i = utils.EngineIntCType(42)
"""
[docs]class EngineIntNumPyType(_EngineState._int_model['numpy']): # pylint: disable=too-few-public-methods
"""
Class for the NumPy integer type that is compatible
with the NAG Library Engine's integer type.
The base class is platform dependent.
Examples
--------
>>> import numpy as np
>>> from naginterfaces.base import utils
>>> i = np.array([42, 53], dtype=utils.EngineIntNumPyType)
"""
class _Debug(object):
"""
Context-management class for NAG Library Engine debug dumping.
"""
# pylint: disable=too-few-public-methods
__slots__ = [
'_filename', '_level', '_old_log_level', '_old_debug_level',
'_pop_handler',
]
def __init__(self, filename=None, level=_EngineState._debug_args_short):
"""
Initialize `level`-level Engine debug dumping.
The default level is `_EngineState._debug_args_short`.
(See also `_EngineState._set_debug_level`.)
Examples
--------
>>> from naginterfaces.base import utils
>>> with utils._Debug():
... pass
"""
self._filename = filename
self._level = level
self._old_log_level = None
self._old_debug_level = None
self._pop_handler = False
def __enter__(self):
"""
Activate Engine debug dumping.
Save the current logging and debug levels.
Output using Python's `logging` module, via the root logger.
This is configured using
`basicConfig(filename, DEBUG)`;
i.e., a root logger with handlers already configured will not be
affected.
Note that if `filename` was not supplied to the instance initializer,
or was None, this implies that output through a previously unconfigured
root logger is sent to `sys.stderr`.
"""
self._old_log_level = _logging.getLogger().getEffectiveLevel()
self._old_debug_level = _EngineState._debug_level
self._pop_handler = not _logging.getLogger().handlers
_logging.basicConfig(filename=self._filename, level=_logging.DEBUG)
_EngineState._set_debug_level(self._level)
def __exit__(self, *args):
"""
Restore the previous debug state.
If on entry the root logger had no pre-configured handlers, here the
list of root handlers is cleared.
"""
_EngineState._set_debug_level(self._old_debug_level)
_logging.getLogger().setLevel(self._old_log_level)
if self._pop_handler:
our_handler = _logging.getLogger().handlers.pop()
our_handler.close()
class _NagLibOutput(object):
"""
Context-management class for restoring NAG Library output I/O units.
Thread-Safety Considerations
----------------------------
The default output I/O units in the NAG Engine are managed by global
variables, and this context manager requires modifying these global
variables.
Examples
--------
>>> from naginterfaces.base import utils
>>> with utils._NagLibOutput():
... pass
"""
# pylint: disable=too-few-public-methods
__slots__ = []
def __enter__(self):
FileObjManager._set_default_units()
def __exit__(self, *args):
FileObjManager._set_ni_units()
def _break_ref_cycles(func):
"""
Decorate with forced breaking of ctypes pointer reference cycles.
"""
@_functools.wraps(func)
def wrapper(*args, **kwargs):
# pylint: disable=missing-docstring
old_break = _EngineState._break_ctypes_ptr_ref_cycles
_EngineState._break_ctypes_ptr_ref_cycles = True
result = func(*args, **kwargs)
_EngineState._break_ctypes_ptr_ref_cycles = old_break
return result
return wrapper
[docs]class Handle(_ctypes.c_void_p):
"""
Class for communication handles in the NAG Library Engine.
Overrides ``__copy__``.
Overrides ``__deepcopy__``.
Raises
------
NagAttributeError
Shallow or deep copy was requested; these operations are prohibited
for this class.
See Also
--------
:func:`naginterfaces.library.examples.blgm.lm_formula_ex.main` :
This example uses ``Handle`` instances.
"""
# pylint: disable=too-few-public-methods
def __copy__(self):
locus = {'fun_name': '.'.join([_MODULE_NAME, 'Handle.__copy__'])}
raise NagAttributeError(
locus,
'shallow copy not permitted'
)
def __deepcopy__(self, memo):
locus = {'fun_name': '.'.join([_MODULE_NAME, 'Handle.__deepcopy__'])}
raise NagAttributeError(
locus,
'deep copy not permitted'
)
_BYREF_HANDLE = _ctypes.POINTER(Handle)
_BYREF_C_PTR = _ctypes.POINTER(_ctypes.c_void_p)
class _EngineType(object):
"""
Base class for data passed to the NAG Engine.
"""
# pylint: disable=too-few-public-methods
__slots__ = ['allow_none', 'data', 'locus']
_module_key_ctypes = 'ctypes'
_module_key_ni = _MODULE_NAME
_module_key_numpy = 'numpy'
_module_key_python = 'builtin'
def __init__(
self,
data,
locus,
allow_none=False,
):
"""
Store the payload data and wrapper locus.
"""
self.data = data
self.locus = locus
self.allow_none = allow_none
class _EngineScalarType(_EngineType):
"""
Base class for scalar NAG Engine data.
Extends __init__.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
c_type = lambda *args: None # pragma: no cover
numpy_type = lambda *args: None # pragma: no cover
type_map = {}
def __init__(
self,
data,
locus,
allow_none=False,
):
"""
Raises NagTypeError if the payload is not a compatible instance
for scalar Engine data.
"""
_EngineType.__init__(self, data, locus, allow_none)
self._check_scalar_type_compat()
@staticmethod
def _copy_ctypes_data(
data,
target,
):
"""
Copy data from ctypes data to ctypes target.
"""
if data is not None:
target.value = data.value
@classmethod
def empty_ctypes(
cls,
chrlen=None, # pylint: disable=unused-argument
):
"""
Return an empty ctypes scalar with the same data type as the payload.
"""
return (cls.c_type)()
def refresh_ctypes_buffer(
self,
ctypes_buffer,
):
"""
Refresh ctypes_buffer with the contents of the payload.
"""
self._copy_ctypes_data(self.to_ctypes(), ctypes_buffer.contents)
def to_ctypes(
self,
):
"""
Return the underlying C-compatible cast of the payload.
"""
if self.allow_none and self.data is None:
return self.data
if isinstance(self.data, self.c_type): # pylint: disable=isinstance-second-argument-not-valid-type
return self.data
return self.c_type(self.data)
def _check_scalar_type_compat(
self,
):
"""
Raises NagTypeError if the payload is not an instance of any scalar
type in the type_map.
"""
if self.allow_none and self.data is None:
return
if not isinstance(
self.data,
tuple([t for ttup in self.type_map.values() for t in ttup]) # pylint: disable=consider-using-generator
):
raise NagTypeError(
self.locus,
_bad_instance_msg(
self._type_map_to_strlist(), allow_none=self.allow_none,
)
)
def _type_map_to_strlist(
self,
):
"""
Return a list of sorted qualified type names from the type_map.
"""
type_names = []
for mkey, mval in self.type_map.items():
for type_val in mval:
full_name = []
if (
mkey == self._module_key_ctypes and
type_val.__name__ == 'Handle'
):
continue
if mkey != self._module_key_python:
full_name.append(mkey)
full_name.append(type_val.__name__)
type_names.append('.'.join(full_name))
return sorted(type_names)
class _EngineCharScalarType(_EngineScalarType):
"""
Concrete class for character scalar NAG Engine data.
Extends __init__.
Overrides empty_ctypes.
Overrides refresh_ctypes_buffer.
Overrides to_ctypes.
"""
# pylint: disable=too-few-public-methods
__slots__ = ['chrlen']
_valid_np_string_classes = (_np.bytes_, _np.str_)
_valid_py_string_classes = (bytes, str)
numpy_type = _np.str_
c_type = _ctypes.POINTER(_ctypes.c_char)
type_map = {
_EngineScalarType._module_key_numpy: _valid_np_string_classes,
_EngineScalarType._module_key_python: _valid_py_string_classes,
}
def __init__(
self,
data,
locus,
exp_chrlen,
allow_none=False,
):
"""
Raises NagTypeError from the super __init__ if the payload data
is not the correct type.
Raises NagValueError if the payload contains non-ASCII characters.
Raises NagValueError if the payload exceeds a required length.
"""
_EngineScalarType.__init__(self, data, locus, allow_none)
if self.allow_none and self.data is None:
self.chrlen = 0
elif exp_chrlen is None:
self.chrlen = len(self.data)
else:
self.chrlen = exp_chrlen
self._check_is_ascii()
@classmethod
def empty_ctypes(
cls,
chrlen=None,
):
"""
Return an empty ctypes string buffer with the same data type as the
payload.
"""
if chrlen is None or chrlen <= 0:
return _NullEngineCharArray()
new_arr_cls = _ctypes.c_char*chrlen
globals()[new_arr_cls.__name__] = new_arr_cls
return new_arr_cls()
def refresh_ctypes_buffer(
self,
ctypes_buffer,
):
"""
Refresh ctypes_buffer with the contents of the payload.
"""
if self.allow_none and self.data is None:
return
ct_data = self.to_ctypes()
for i, ct_char in enumerate(ct_data):
ctypes_buffer[i] = ct_char
def to_ctypes(
self,
):
"""
Return the underlying C-compatible cast of the payload taking care to
explicitly encode to ASCII in Python 3+.
If the payload is shorter than a non-None chrlen then it is blank
padded on the right to this length.
"""
if self.allow_none and self.data is None:
return self.data
if isinstance(self.data, (str, _np.str_)):
str_to_cast = self.data.encode('ascii')
else:
str_to_cast = self.data
if self.chrlen > len(self.data):
str_to_cast = str_to_cast.ljust(self.chrlen)
return _ctypes.create_string_buffer(
str_to_cast[:self.chrlen], size=self.chrlen
)
@classmethod
def to_py(
cls,
data,
chrlen=None,
):
"""
Return the value of ctypes (char_p) data as a str.
"""
if data is None:
return data
data_block = (
data[:chrlen] if chrlen is not None
else data[:]
)
return data_block.decode(encoding='ascii').rstrip()
to_py_from_retval = to_py
def _check_is_ascii(
self,
):
"""
Raises NagValueError if the payload contains non-ASCII characters.
"""
if self.allow_none and self.data is None:
return
if not _is_ascii(self.data[:self.chrlen], self.locus):
raise NagValueError(
self.locus,
'must ' + ('' if not self.allow_none else 'be None or ') +
'comprise only ASCII characters'
)
_BYREF_STRING = _EngineCharScalarType.c_type
class _EngineComplexScalarType(_EngineScalarType): # pylint: disable=too-few-public-methods
"""
Abstract class for complex scalar NAG Engine data.
Overrides _copy_ctypes_data.
Overrides to_ctypes.
"""
__slots__ = []
@staticmethod
def _copy_ctypes_data(
data,
target,
):
"""
Copy data from ctypes data to ctypes target.
"""
if data is None:
return
target.real = data.real
target.imag = data.imag
def to_ctypes(
self,
):
"""
Cast to the associated Structure.
"""
if self.allow_none and self.data is None:
return self.data
return self.c_type(self.data.real, self.data.imag)
@classmethod
def _to_py_complex(
cls,
data,
):
"""
Cast back from the Structure to Python native complex.
"""
return complex(data.real, data.imag)
def _set_complex_fields(c_type):
"""
Return a _fields_ list for a complex C structure of type c_type.
"""
return [
('real', c_type),
('imag', c_type),
]
class _EngineComplexStructure(_ctypes.Structure): # pylint: disable=too-few-public-methods
"""
Abstract class for complex scalar NAG Engine data.
"""
_component_c_type = lambda *args: None # pragma: no cover
class _EngineComplex64Structure(_EngineComplexStructure): # pylint: disable=too-few-public-methods
"""
Concrete C structure for single-precision complex.
"""
_component_c_type = _ctypes.c_float
_fields_ = _set_complex_fields(_component_c_type)
class _EngineComplex64ScalarType(_EngineComplexScalarType, _EngineScalarType):
"""
Concrete class for single-precision complex scalar NAG Engine data.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
c_type = _EngineComplex64Structure
numpy_type = _np.complex64
type_map = {
_EngineScalarType._module_key_numpy: (numpy_type,),
}
@classmethod
def to_py(
cls,
data,
chrlen=None, # pylint: disable=unused-argument
):
"""
Return the complex64 representation of the input data.
"""
if data is None:
return data
return cls.numpy_type(cls._to_py_complex(data))
to_py_from_retval = to_py
_BYREF_COMPLEX64 = _ctypes.POINTER(_EngineComplex64ScalarType.c_type)
class _EngineComplex128Structure(_EngineComplexStructure): # pylint: disable=too-few-public-methods
"""
Concrete C structure for double-precision complex.
"""
_component_c_type = _ctypes.c_double
_fields_ = _set_complex_fields(_component_c_type)
class _EngineComplex128ScalarType(_EngineComplexScalarType, _EngineScalarType):
"""
Concrete class for double-precision complex scalar NAG Engine data.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
c_type = _EngineComplex128Structure
numpy_type = _np.complex128
type_map = {
_EngineScalarType._module_key_numpy: (numpy_type,),
_EngineScalarType._module_key_python: (complex,),
}
@classmethod
def to_py(
cls,
data,
chrlen=None, # pylint: disable=unused-argument
):
"""
Return the complex128 representation of the input data.
"""
if data is None:
return data
return cls._to_py_complex(data)
to_py_from_retval = to_py
_BYREF_COMPLEX128 = _ctypes.POINTER(_EngineComplex128ScalarType.c_type)
class _EngineCptrScalarType(_EngineScalarType):
"""
Concrete class for void* scalar NAG Engine data.
Overrides to_ctypes.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
c_type = Handle
type_map = {
_EngineScalarType._module_key_ctypes: (c_type,),
_EngineScalarType._module_key_ni: (Handle,),
}
def to_ctypes(
self,
):
return self.data
@classmethod
def to_py(
cls,
data,
chrlen=None, # pylint: disable=unused-argument
):
"""
Return the original data (which must therefore already be a Handle).
"""
return data
_BUILTIN_INT_TYPES = (int,)
_INT_TYPE_MAP = {
_EngineScalarType._module_key_ctypes: (
_SYS_CTYPES_INTS[32],
_SYS_CTYPES_INTS[64]
),
_EngineScalarType._module_key_numpy: (_np.int32, _np.int64),
_EngineScalarType._module_key_python: _BUILTIN_INT_TYPES,
}
_PROMOTABLE_INTS = tuple([ # pylint: disable=consider-using-generator
t for tkey in (
_EngineScalarType._module_key_numpy,
_EngineScalarType._module_key_python,
) for t in _INT_TYPE_MAP[tkey]
])
def _is_promotable_int(data):
"""
Return True if we consider scalar int data to be
appropriate for promotion to float.
"""
return (
data is not None and
isinstance(data, _PROMOTABLE_INTS)
)
class _EngineFloatScalarType(_EngineScalarType): # pylint: disable=too-few-public-methods
"""
Abstract class for float scalar NAG Engine data.
Extends __init__.
"""
__slots__ = []
def __init__(
self,
data,
locus,
allow_none=False,
):
"""
Promotes incoming data if an appropriate int type.
"""
_EngineScalarType.__init__(
self, data, locus, allow_none,
)
if _is_promotable_int(self.data):
self.data = self.numpy_type(self.data)
class _EngineFloat32ScalarType(_EngineFloatScalarType, _EngineScalarType):
"""
Concrete class for 32-bit float scalar NAG Engine data.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
c_type = _ctypes.c_float
numpy_type = _np.float32
_numpy_types = []
_numpy_types.extend(_INT_TYPE_MAP[_EngineScalarType._module_key_numpy])
_numpy_types.append(numpy_type)
_builtin_types = []
_builtin_types.extend(_INT_TYPE_MAP[_EngineScalarType._module_key_python])
type_map = {
_EngineScalarType._module_key_ctypes: (c_type,),
_EngineScalarType._module_key_numpy: tuple(_numpy_types),
_EngineScalarType._module_key_python: tuple(_builtin_types),
}
@classmethod
def _to_py_from_float(
cls,
data,
):
"""
Return the data as np.float32.
"""
if data is None:
return data
return cls.numpy_type(data)
to_py_from_retval = _to_py_from_float
@classmethod
def to_py(
cls,
data,
chrlen=None, # pylint: disable=unused-argument
):
"""
Return the float32 representation of the input data.
"""
if data is None:
return data
return cls._to_py_from_float(
data.value if isinstance(data, cls.c_type) else data
)
_BYREF_FLOAT32 = _ctypes.POINTER(_EngineFloat32ScalarType.c_type)
class _EngineFloat64ScalarType(_EngineFloatScalarType, _EngineScalarType):
"""
Concrete class for 64-bit float scalar NAG Engine data.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
c_type = _ctypes.c_double
numpy_type = _np.float64
_numpy_types = []
_numpy_types.extend(_INT_TYPE_MAP[_EngineScalarType._module_key_numpy])
_numpy_types.append(numpy_type)
_builtin_types = [float]
_builtin_types.extend(_INT_TYPE_MAP[_EngineScalarType._module_key_python])
type_map = {
_EngineScalarType._module_key_ctypes: (c_type,),
_EngineScalarType._module_key_numpy: tuple(_numpy_types),
_EngineScalarType._module_key_python: tuple(_builtin_types),
}
@classmethod
def _to_py_from_float(
cls,
data,
):
"""
Return the data as float.
"""
if data is None:
return data
return float(data)
to_py_from_retval = _to_py_from_float
@classmethod
def to_py(
cls,
data,
chrlen=None, # pylint: disable=unused-argument
):
"""
Return the float64 representation of the input data.
"""
if data is None:
return data
return cls._to_py_from_float(
data.value if isinstance(data, cls.c_type) else data
)
_BYREF_FLOAT64 = _ctypes.POINTER(_EngineFloat64ScalarType.c_type)
class _EngineIntScalarType(_EngineScalarType):
"""
Concrete class for integer scalar NAG Engine data.
This could be 32- or 64-bits depending on the underlying Engine.
Extends __init__.
Extends cast.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
_builtin_types = _BUILTIN_INT_TYPES
c_type = _EngineState._int_model['ctypes']
numpy_type = _EngineState._int_model['numpy']
_ctypes_int64 = _SYS_CTYPES_INTS[64]
type_map = _INT_TYPE_MAP
def __init__(
self,
data,
locus,
allow_none=False,
):
"""
Raises NagTypeError from the super __init__ if the payload data
is not the correct type.
Raises NagOverflowError if the payload overflows the
underlying integer type used by the NAG Engine.
"""
_EngineScalarType.__init__(self, data, locus, allow_none)
self._check_is_repr()
def to_ctypes(
self,
):
"""
Return the underlying C-compatible cast of the payload taking care to
account for 32- and 64-bit input.
"""
if self.allow_none and self.data is None:
return self.data
if isinstance(
self.data, self.type_map[_EngineScalarType._module_key_ctypes]
):
return self.c_type(self.data.value)
return _EngineScalarType.to_ctypes(self)
@classmethod
def _to_py_from_int(
cls,
data,
):
"""
Return the data as int.
"""
if data is None:
return data
return int(data)
to_py_from_retval = _to_py_from_int
@classmethod
def to_py(
cls,
data,
chrlen=None, # pylint: disable=unused-argument
):
"""
Return the int representation of the input data.
"""
if data is None:
return data
return cls._to_py_from_int(
data.value if isinstance(data, cls.c_type) else data
)
def _check_is_repr(
self
):
"""
Raises NagOverflowError if the payload overflows the
underlying integer type used by the NAG Engine.
"""
if self.allow_none and self.data is None:
return
if self._payload_overflows():
raise NagOverflowError(
self.locus,
'value must not overflow ' +
str(_EngineState._int_model['bits']) +
'-bit NAG Library Engine integer'
)
def _do_overflow_check(
self,
):
"""
Return a bool shortcutting for whether overflow checking is relevant
for the payload.
"""
if isinstance(self.data, int):
return True
if _EngineState._int_model['bits'] == 64:
# We don't allow long on platforms
# where the Engine int is 64 bit
return False
return isinstance(
self.data,
(_SYS_CTYPES_INTS[64], _np.int64)
)
def _payload_overflows(
self,
):
"""
Return a bool for when the payload is outside the range of
the integer type used by the NAG Engine.
"""
if not self._do_overflow_check():
return False
if isinstance(self.data, _SYS_CTYPES_INTS[64]):
val_to_check = self.data.value
else:
val_to_check = self.data
return (
val_to_check >= 2**(_EngineState._int_model['bits'] - 1) or
val_to_check <= -2**(_EngineState._int_model['bits'] - 1) - 1
)
_BYREF_INT = _ctypes.POINTER(_EngineIntScalarType.c_type)
class _EngineLogicalScalarType(_EngineScalarType):
"""
Concrete class for logical scalar NAG Engine data.
Overrides to_ctypes.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
c_type = _EngineState._int_model['ctypes']
numpy_type = _np.bool_
type_map = {
_EngineScalarType._module_key_numpy: (numpy_type,),
_EngineScalarType._module_key_python: (bool,),
}
_f_comp = _EngineState._compilers['Fortran']
def to_ctypes(
self,
):
"""
Return the underlying C-compatible cast using the correct integer
representation.
"""
if self.allow_none and self.data is None:
return self.data
return self.c_type(
_EngineState._from_bool_to_int(self.data)
)
@classmethod
def _to_py_from_int(
cls,
data,
):
"""
Return the bool representation of the input int.
"""
if data is None:
return data
if cls._f_comp == 'Intel':
return bool(data & 1)
if cls._f_comp == 'NAG':
return data != 0
raise NagException( # pragma: no cover
{'fun_name': '_EngineLogicalScalarType._to_py_from_int'},
'NYI for Fortran compiler ' + cls._f_comp
)
to_py_from_retval = _to_py_from_int
@classmethod
def to_py(
cls,
data,
chrlen=None, # pylint: disable=unused-argument
):
"""
Return the bool representation of the input int.
"""
if data is None:
return data
return cls._to_py_from_int(
data.value if isinstance(data, cls.c_type) else data
)
_BYREF_LOGICAL = _ctypes.POINTER(_EngineLogicalScalarType.c_type)
_IS_DISCONTIG = lambda a: not (
a.flags.c_contiguous or a.flags.f_contiguous
)
_CONTIG_ERR_STR = (
'must be contiguous, ' +
'in either the C sense, the Fortran sense, or both.'
)
def _get_sorder(locus, array):
"""
Return a string for the storage order used by `array`,
or None if not multidimensional.
"""
if (
not isinstance(array, _np.ndarray) or
array.ndim == 1
):
return None
if array.flags.f_contiguous and array.flags.c_contiguous:
return 'A'
if array.flags.f_contiguous:
return 'F'
if array.flags.c_contiguous:
return 'C'
raise NagTypeError(
locus,
'all arrays in the argument list ' + _CONTIG_ERR_STR
)
def _get_mutual_sorder(
locus, arrays,
in_callback=False,
spiked_sorder=_EngineData.default_spiked_sorder,
):
"""
Return a string for the storage order used by the input sequence of arrays,
or None if no multidimensional arrays feature.
Raises NagTypeError if the order is not uniform across all the arrays.
Raises NagTypeError if spiked_sorder is required but not a str.
Raises NagValueError if spiked_sorder is required but invalid.
"""
sorder = None
for array, array_name in arrays:
this_sorder = _get_sorder(locus, array)
if this_sorder is None:
if not in_callback:
continue
locus['returned'] = array_name
raise NagException(
locus,
'Inconceivable; not a multidimensional ndarray',
)
if (
sorder is None or
(
sorder == 'A' and
sorder != this_sorder
)
):
sorder = this_sorder
elif this_sorder == 'A':
continue
elif sorder != this_sorder:
locus['entity' if not in_callback else 'returned'] = array_name
raise NagTypeError(
locus,
'arrays ' +
(
'on input' if not in_callback
else 'returned from this callback'
) +
' must use a consistent storage order'
)
if sorder == 'A' or not arrays:
_spiked_sorder = _EngineCharScalarType(
spiked_sorder, locus, exp_chrlen=1,
).to_ctypes()
_spiked_sorder_py = _EngineCharScalarType.to_py(_spiked_sorder)
if _spiked_sorder_py not in ['C', 'F']:
raise NagValueError(
locus,
'supplied spiked_sorder was not \'C\' or \'F\'.',
)
return _spiked_sorder_py
if sorder is None:
# Some arrays, all were None:
return _EngineData.default_sorder
return sorder
def _get_size_from_shape(
shape,
chrlen=None,
):
"""
Return the product of the shape tuple.
"""
size = 1
for extent in shape:
size *= extent
if chrlen is not None:
size *= chrlen
return size
def _to_sized_buffer(
pointer,
buffer_size,
):
"""
Return the input ctypes pointer type as an addressable buffer.
"""
return _ctypes.cast(
pointer,
_ctypes.POINTER(pointer._type_ * buffer_size),
).contents
def _from_pointer_to_buffer(
pointer,
shape,
dtype,
force_ctypes_cast,
chrlen=None,
):
"""
Return the input ctypes pointer type as a flat ndarray.
"""
size = _get_size_from_shape(shape, chrlen)
if (
force_ctypes_cast or
_EngineState._cast_pointer_to_ctypes_array
):
sized_buffer = _to_sized_buffer(
pointer,
size,
)
np_buffer = _np.frombuffer(sized_buffer, dtype=dtype)
else:
np_buffer = _np.ctypeslib.as_array(pointer, shape=(size,))
return np_buffer
def _unlink_pointer(
pointer,
):
"""
Break ctypes pointer reference cycles.
"""
if (
_EngineState._break_ctypes_ptr_ref_cycles and
_EngineState._cast_pointer_to_ctypes_array
):
del pointer._objects[id(pointer)]
def _check_shape_good(data, exp_shape):
"""
Return a Boolean for `data` having the expected shape `exp_shape`.
"""
if isinstance(data, _np.ndarray):
if (
data.size == 0 and
any(extent == 0 for extent in exp_shape)
):
return True
if data.ndim != len(exp_shape):
return False
for extent_i, extent in enumerate(exp_shape):
if extent is None:
continue
if data.shape[extent_i] != extent:
return False
return True
if len(exp_shape) != 1:
raise NagException( # pragma: no cover
{'fun_name': '_check_shape_good'},
'inconceivable in shape check'
)
if exp_shape[0] is None:
return True
return len(data) == exp_shape[0]
def _check_shape_nonneg(locus, shape):
"""
Raises NagShapeError if an entry in `shape` was negative.
"""
for dim_i, dim_n in enumerate(shape):
if dim_n is None:
continue
if dim_n < 0:
raise NagShapeError(
locus,
(
'expected extent in dimension ' + str(dim_i + 1) +
' was ' + str(dim_n) + ' but must not be negative. '
'Check that all supplied scalar integer arguments '
'are correct.'
)
)
class _EngineArrayType(_EngineType):
"""
Base class for array NAG Engine data.
Extends __init__.
"""
# pylint: disable=too-few-public-methods,too-many-instance-attributes
__slots__ = [
'exp_shape', 'check_contents',
'rank', 'shape', 'size', 'sorder',
]
containers = tuple()
array_typecodes = tuple()
_underlying_scalar = lambda *args: None # pragma: no cover
c_type = lambda *args: None # pragma: no cover
_null_ctypes = None
_force_ctypes_cast = False
_dtypes_equiv = staticmethod(
lambda data_dtype, np_dtype: data_dtype == np_dtype
)
_allowed_np_dtypes = (None,)
def __init__(
self,
data,
locus,
exp_shape,
mutual_sorder=None,
check_contents=True,
allow_none=False,
):
"""
Store the expected shape tuple.
Register whether to iterate over the payload contents in order to
deep-check scalar contents for compatibility.
Raises NagTypeError if the payload is not a compatible container
for array Engine data.
Raises NagTypeError if the container contents are not type compatible
for the Engine array type.
Raises NagShapeError if the payload does not have the expected shape.
"""
# pylint: disable=too-many-arguments
_EngineType.__init__(self, data, locus, allow_none)
self.exp_shape = exp_shape
self.check_contents = (
False if self.allow_none and data is None
else check_contents
)
self._check_container_compat(
self.data, self.locus, len(self.exp_shape), self.allow_none,
)
self._check_shape()
(
self.rank, self.shape, self.size,
) = self._get_rank_shape_and_size(
self.data, len(self.exp_shape), allow_none,
)
self._check_contents_type_compat()
self.sorder = mutual_sorder if self.rank > 1 else None
@classmethod
def empty_ctypes(
cls,
size,
chrlen=None, # pylint: disable=unused-argument
):
"""
Return an empty ctypes array with the same data type as the payload.
"""
if size <= 0:
return cls._null_ctypes
new_arr_cls = cls.c_type._type_*size
globals()[new_arr_cls.__name__] = new_arr_cls
return new_arr_cls()
@staticmethod
def _make_shape_nonneg(shape):
"""
Make the input shape tuple have non-negative elements.
"""
return [max(shape_el, 0) for shape_el in shape]
@classmethod
def empty_ndarray(
cls,
shape,
sorder=_EngineData.default_sorder,
chrlen=None, # pylint: disable=unused-argument
):
"""
Return an empty ndarray with the same data type as the payload.
"""
return _np.empty(
cls._make_shape_nonneg(shape),
dtype=cls._underlying_scalar.numpy_type,
order=sorder
)
@classmethod
def from_pointer(
cls,
pointer,
shape,
**kwargs
):
"""
Return the input ctypes pointer type as an ndarray.
"""
np_buffer = _from_pointer_to_buffer(
pointer,
cls._make_shape_nonneg(shape),
cls._underlying_scalar.numpy_type,
cls._force_ctypes_cast,
chrlen=kwargs.get('chrlen'),
)
if len(shape) == 1:
return np_buffer
return np_buffer.reshape(
cls._make_shape_nonneg(shape),
order=kwargs.get('sorder')
)
@classmethod
def _check_and_get_element( # pylint: disable=too-many-branches
cls,
data,
locus,
element_tuple,
validate,
):
"""
Return element element_tuple from data as Python data.
Raises NagIndexError if element_tuple is out of range.
Raises exception from the scalar constructor if the element is invalid.
"""
el_tuple = (
element_tuple[0] if len(element_tuple) == 1
else element_tuple
)
if not validate:
data_el = data[el_tuple]
else:
try:
data_el = data[el_tuple]
index_ok = True
except IndexError:
index_ok = False
locus_el = ', '.join([str(i) for i in element_tuple])
if len(element_tuple) > 1:
locus_el = '[' + locus_el + ']'
if not index_ok:
raise NagIndexError(
locus,
'requested index value ' + locus_el + ' is out of range'
)
if isinstance(data, _np.ndarray):
cls._check_dtype(data, locus)
if cls is _EngineCharArrayType:
locus['element'] = locus_el
cls._underlying_scalar(data_el, locus, None)
elif (
cls is _EngineIntArrayType and
data.dtype != _np.int32
):
locus['element'] = locus_el
cls._underlying_scalar(data_el, locus)
elif isinstance(data, _array.array):
cls._check_typecode(data, locus)
if (
cls is _EngineIntArrayType and
_EngineState._int_model['bits'] == 32
):
locus['element'] = locus_el
cls._underlying_scalar(data_el, locus)
elif isinstance(data, _ctypes.Array):
cls._check_ctype(data, locus)
if (
cls is _EngineIntArrayType and
_EngineState._int_model['bits'] == 32 and
data._type_ is not _SYS_CTYPES_INTS[32]
):
locus['element'] = locus_el
cls._underlying_scalar(data_el, locus)
elif isinstance(data, list):
locus['element'] = locus_el
if cls is _EngineCharArrayType:
cls._underlying_scalar(data_el, locus, None)
else:
cls._underlying_scalar(data_el, locus)
else:
raise NagException( # pragma: no cover
locus,
'Inconceivable data instance',
)
if (
cls is _EngineCharArrayType and
isinstance(data_el, (bytes, _np.bytes_))
):
data_el = data_el.decode(encoding='ascii')
return (
data_el.value if (
isinstance(
data_el, (
cls._underlying_scalar.c_type,
_ctypes.c_int, _ctypes.c_long, _ctypes.c_longlong,
)
) and
not isinstance(
data_el, (
_EngineComplex64Structure, _EngineComplex128Structure,
)
)
)
else data_el
)
@classmethod
def get_element(
cls,
data,
locus,
element_tuple,
validate=True,
):
"""
Return element element_tuple from data.
"""
if validate:
cls._check_container_compat(
data, locus, exp_rank=len(element_tuple), allow_none=False,
)
cls._check_rank(
data, locus, exp_rank=len(element_tuple), allow_none=False,
)
return cls._check_and_get_element(
data, locus, element_tuple, validate,
)
@classmethod
def get_nd(
cls,
data,
locus,
exp_rank,
sorder,
dim_type,
allow_none=False,
):
"""
Return the lead/second/... dimension for data.
"""
# pylint: disable=too-many-arguments
cls._check_container_compat(data, locus, exp_rank, allow_none)
cls._check_rank(data, locus, exp_rank, allow_none)
if exp_rank < 2:
raise NagException( # pragma: no cover
{'fun_name': 'get_nd'},
'inconceivable dim request',
)
if allow_none and data is None:
return 0
if dim_type == 'l':
if sorder == 'F':
shape_index = 0
elif sorder == 'C':
shape_index = -1
else:
raise NagException( # pragma: no cover
{'fun_name': 'get_nd'},
'inconceivable sorder'
)
elif dim_type == 's':
if sorder == 'F':
shape_index = 1
elif sorder == 'C':
shape_index = -2
else:
raise NagException( # pragma: no cover
{'fun_name': 'get_nd'},
'inconceivable sorder'
)
else:
raise NagException( # pragma: no cover
{'fun_name': 'get_nd'},
'inconceivable dim_type'
)
return cls._get_rank_shape_and_size(
data, exp_rank, allow_none
)[1][shape_index]
def _get_max(
self,
start_i,
stop_i,
):
"""
Return the max of the (1d) payload over the requested range.
"""
if (
not isinstance(self.data, list) or
not isinstance(
self.data[start_i], (
self._underlying_scalar.c_type,
_ctypes.c_int, _ctypes.c_long, _ctypes.c_longlong,
)
)
):
return max(self.data[start_i:stop_i + 1])
return max(
data_el.value for data_el in self.data[start_i:stop_i + 1]
)
def _get_product(
self,
start_i,
stop_i,
):
"""
Return the product of the (1d) payload over the requested range.
"""
product_val = 1
if (
not isinstance(self.data, list) or
not isinstance(
self.data[start_i], (
self._underlying_scalar.c_type,
_ctypes.c_int, _ctypes.c_long, _ctypes.c_longlong,
)
)
):
for data_el in self.data[start_i:stop_i + 1]:
product_val *= data_el
else:
for data_el in self.data[start_i:stop_i + 1]:
product_val *= data_el.value
return product_val
@classmethod
def get_shape(
cls,
data,
locus,
exp_rank,
allow_none=False,
validate=True,
):
"""
Return the shape tuple for data.
"""
# pylint: disable=too-many-arguments
if validate:
cls._check_container_compat(data, locus, exp_rank, allow_none)
cls._check_rank(data, locus, exp_rank, allow_none)
return cls._get_rank_shape_and_size(data, exp_rank, allow_none)[1]
def _get_sum(
self,
start_i,
stop_i,
):
"""
Return the sum of the (1d) payload over the requested range.
"""
if (
not isinstance(self.data, list) or
not isinstance(
self.data[start_i], (
self._underlying_scalar.c_type,
_ctypes.c_int, _ctypes.c_long, _ctypes.c_longlong,
)
)
):
return sum(self.data[start_i:stop_i + 1])
return sum(
data_el.value for data_el in self.data[start_i:stop_i + 1]
)
def _is_monotonic(
self,
start_i,
stop_i,
direction,
):
"""
Return a Boolean for when the (1d) payload is monotonic over the
requested range in the requested direction.
"""
incr = True
decr = True
take_val = (
isinstance(self.data, list) and
isinstance(
self.data[start_i], (
self._underlying_scalar.c_type,
_ctypes.c_int, _ctypes.c_long, _ctypes.c_longlong,
)
)
)
data_el = lambda i: self.data[i].value if take_val else self.data[i]
for i in range(start_i, stop_i):
if incr:
incr = incr and data_el(i) <= data_el(i+1)
if decr:
decr = decr and data_el(i) >= data_el(i+1)
if direction == 'either' and not incr and not decr:
return False
if direction == 'increasing' and not incr:
return False
if direction == 'decreasing' and not decr:
return False
return True
def _is_nonnegative(
self,
start_i,
stop_i,
):
"""
Return a Boolean for when the (1d) payload is non-negative over the
requested range.
"""
take_val = (
isinstance(self.data, list) and
isinstance(
self.data[start_i], (
self._underlying_scalar.c_type,
_ctypes.c_int, _ctypes.c_long, _ctypes.c_longlong,
)
)
)
data_el = lambda i: self.data[i].value if take_val else self.data[i]
for i in range(start_i, stop_i):
if data_el(i) < 0:
return False
return True
def to_ctypes(
self,
**kwargs
):
"""
Return the underlying C-compatible cast of the payload.
"""
if self.allow_none and self.data is None:
return None
if isinstance(self.data, _np.ndarray):
return self._to_ctypes_from_ndarray(**kwargs)
if isinstance(self.data, _array.array):
return self._to_ctypes_from_array(**kwargs)
if isinstance(self.data, _ctypes.Array):
return self._to_ctypes_from_ctypes(**kwargs)
if isinstance(self.data, list):
return self._to_ctypes_from_list(**kwargs)
raise NagException( # pragma: no cover
self.locus,
'inconceivable container'
)
def _to_ctypes_from_array(
self,
**_kwargs
):
"""
Return the underlying C-compatible cast of the payload array.array.
"""
return _ctypes.cast(
self.data.buffer_info()[0],
self.c_type
)
def _to_ctypes_from_ctypes(
self,
**_kwargs
):
"""
Return the underlying C-compatible cast of the payload ctypes.Array.
"""
return self.data
def _to_ctypes_from_list(
self,
**kwargs
):
"""
Return the underlying C-compatible cast of the payload list.
"""
if kwargs.get('can_reference_contents', True):
return (self.c_type._type_*self.size)(*self.data)
return self.empty_ctypes(self.size)
def _to_ctypes_from_ndarray(
self,
**_kwargs
):
"""
Return the underlying C-compatible cast of the payload numpy.ndarray.
"""
return self.data.ctypes.data_as(self.c_type)
@classmethod
def _check_container_compat(
cls,
data,
locus,
exp_rank,
allow_none,
):
"""
Raises NagTypeError if data is not an instance of any valid
container.
"""
if allow_none and data is None:
return
if (
exp_rank <= 1 and
not isinstance(data, cls.containers)
):
raise NagTypeError(
locus,
_bad_instance_msg(cls._containers_to_strlist(), allow_none)
)
if (
exp_rank > 1 and
not isinstance(data, _np.ndarray)
):
raise NagTypeError(
locus,
'multidimensional array data ' +
_bad_instance_msg(['numpy.ndarray'], allow_none)
)
if isinstance(data, _np.ndarray) and _IS_DISCONTIG(data):
raise NagTypeError(
locus,
'numpy.ndarray data ' + _CONTIG_ERR_STR,
)
def _check_contents_type_compat(
self,
):
"""
Raises NagTypeError if the contents of the payload container are
not compatible types for this array class.
"""
if self.allow_none and self.data is None:
return
if isinstance(self.data, _np.ndarray):
self._check_contents_ndarray()
elif isinstance(self.data, _array.array):
self._check_contents_array()
elif isinstance(self.data, _ctypes.Array):
self._check_contents_ctypes()
elif isinstance(self.data, list):
self._check_contents_list()
else:
raise NagException( # pragma: no cover
self.locus,
'inconceivable container ' + type(self.data).__name__
)
def _check_contents_1dim(
self,
):
"""
Raises NagTypeError if the contents of the rank-1 payload fail
their respective type checks.
"""
for arg_i, arg_el in enumerate(self.data):
self.locus['element'] = arg_i
self._underlying_scalar(arg_el, self.locus)
def _check_contents_anydim(
self,
):
"""
Raises NagTypeError if the contents of the payload fail
their respective type checks.
"""
if (
isinstance(self.data, _np.ndarray) and
self.data.ndim > 1
):
self._check_contents_multidim()
else:
self._check_contents_1dim()
def _check_contents_multidim( # pylint: disable=no-self-use
self,
):
"""
Raises NagTypeError if the contents of the multidimensional payload
fail their respective type checks.
"""
raise NagException( # pragma: no cover
{'fun_name': '_check_contents_multidim'},
'not yet implemented',
)
@classmethod
def _check_typecode(
cls,
data,
locus,
):
"""
Raises NagTypeError if the contents of the data array.array are
not compatible types for this array class.
"""
if data.typecode not in cls.array_typecodes:
raise NagTypeError(
locus,
'typecode for array.array entity was ' +
str(data.typecode) + ' but must be ' +
_join_sequence(
['"' + a_t + '"' for a_t in cls.array_typecodes]
)
)
def _check_contents_array(
self,
):
"""
Raises NagTypeError if the contents of the payload array.array are
not compatible types for this array class.
"""
self._check_typecode(self.data, self.locus)
@classmethod
def _check_ctype(
cls,
data,
locus,
):
"""
Raises NagTypeError if the contents of the data ctypes.Array are
not compatible types for this array class.
"""
allowed_ctypes = cls._underlying_scalar.type_map[
_EngineType._module_key_ctypes
]
if data._type_ not in allowed_ctypes:
raise NagTypeError(
locus,
'_type_ for ctypes.Array entity must be ' +
_join_sequence(
[
(
ctype.__name__ if ctype.__name__ == 'Handle'
else '.'.join(['ctypes', ctype.__name__])
)
for ctype in allowed_ctypes
]
)
)
def _check_contents_ctypes(
self,
):
"""
Raises NagTypeError if the contents of the payload ctypes.Array are
not compatible types for this array class.
"""
self._check_ctype(self.data, self.locus)
def _check_contents_list(
self,
):
"""
Raises NagTypeError if the contents of the payload list are
not compatible types for this array class.
"""
if not self.check_contents:
return
self._check_contents_1dim()
if self.size <= 1:
return
for arg_el in self.data[1:]:
if not isinstance(arg_el, type(self.data[0])):
raise NagTypeError(
self.locus,
'must contain elements all of the same instance'
)
def _check_contents_ndarray(
self,
):
"""
Raises NagTypeError if the contents of the payload numpy.ndarray are
not compatible types for this array class.
"""
self._check_dtype(self.data, self.locus)
@classmethod
def _check_dtype(
cls,
data,
locus,
allowed_dtypes=None,
):
"""
Raises NagTypeError if data is not one of the allowed numpy dtypes for
this array class.
"""
a_dtypes = (
cls._allowed_np_dtypes if allowed_dtypes is None
else allowed_dtypes
)
for np_dtype in a_dtypes:
if cls._dtypes_equiv(data.dtype, np_dtype):
return
raise NagTypeError(
locus,
'dtype for numpy.ndarray entity must be an instance of ' +
_join_sequence(
['.'.join(['numpy', ndtype.__name__]) for ndtype in a_dtypes]
)
)
@classmethod
def _check_rank(
cls,
data,
locus,
exp_rank,
allow_none,
):
"""
Check the rank of data.
Raises NagShapeError if the payload container does not have the
expected rank.
"""
if allow_none and data is None:
return
rank, _, _ = cls._get_rank_shape_and_size(data, exp_rank, allow_none)
if rank != exp_rank:
raise NagShapeError(
locus,
_bad_rank_msg(exp_rank, rank)
)
def _check_shape(
self,
):
"""
Check the payload's shape.
Raises NagShapeError if the payload container does not have the
expected shape.
Raises NagShapeError if any element of the expected shape was negative.
"""
if self.allow_none and self.data is None:
return
if _check_shape_good(self.data, self.exp_shape):
return
_check_shape_nonneg(self.locus, self.exp_shape)
raise NagShapeError(
self.locus,
_bad_shape_msg(
self.exp_shape,
(
self.data.shape if isinstance(self.data, _np.ndarray)
else (len(self.data),)
),
),
)
@classmethod
def _containers_to_strlist(
cls,
):
"""
Return a list of sorted qualified type names from the containers tuple.
"""
type_names = []
for container in cls.containers:
full_name = []
if container is _array.array:
full_name.append('array')
elif container is _ctypes.Array:
full_name.append('ctypes')
elif container is _np.ndarray:
full_name.append('numpy')
elif container is list:
pass
else:
raise NagException( # pragma: no cover
{'fun_name': '_containers_to_strlist'},
'inconceivable container'
)
full_name.append(container.__name__)
type_names.append('.'.join(full_name))
return sorted(type_names)
@staticmethod
def _get_rank_shape_and_size(
data,
exp_rank,
allow_none,
):
"""
Return the rank, shape and size of data.
"""
if allow_none and data is None:
return exp_rank, tuple([0]*exp_rank), 0
if isinstance(data, _np.ndarray):
return data.ndim, data.shape, data.size
if isinstance(data, (_array.array, _ctypes.Array, list)):
return 1, (len(data),), len(data)
raise NagException( # pragma: no cover
{'fun_name': '_get_rank_shape_and_size'},
'inconceivable container',
)
@classmethod
def refresh_ctypes_buffer(
cls,
_data,
_locus,
ctypes_buffer,
chrlen=None # pylint: disable=unused-argument
): # pylint: disable=too-many-arguments
"""
Refresh ctypes_buffer with the contents of data.
This is the version where no back-conversion is necessary, e.g. for
float64.
"""
def _refresh_py_list(
self,
data,
):
"""
Fill the payload list with ctypes `data`.
"""
self.data[:] = self._to_py_list(data)
def _refresh_py_ndarray_copy(
self,
data,
):
"""
Fill the payload ndarray with ctypes `data`.
This is the copy-back version.
"""
self.data[:] = self._to_py_ndarray(
data, self.data.shape,
sorder=self.sorder,
chrlen=getattr(self, 'chrlen', None),
)
def _refresh_py_noop( # pylint: disable=no-self-use
self,
data, # pylint: disable=unused-argument
):
"""
Fill the payload with ctypes `data`.
This is the no-op version assuming that the memory buffer has
been updated directly (and no copying was required).
"""
return
_refresh_py_ndarray = _refresh_py_noop
_refresh_py_array = _refresh_py_noop
_refresh_py_ctarray = _refresh_py_noop
def refresh_py(
self,
data,
):
"""
Fill the payload with ctypes `data`.
"""
if self.allow_none and self.data is None:
return
if isinstance(self.data, list):
self._refresh_py_list(data)
elif isinstance(self.data, _np.ndarray):
self._refresh_py_ndarray(data)
elif isinstance(self.data, _array.array):
self._refresh_py_array(data)
elif isinstance(self.data, _ctypes.Array):
self._refresh_py_ctarray(data)
else:
raise NagException( # pragma: no cover
{'fun_name': 'refresh_py'},
'inconceivable container'
)
def _to_py_list(
self,
data,
):
"""
Return ctypes `data` reshaped into a list.
"""
if not self.data:
return []
cast_type = (
self._underlying_scalar.to_py if self.data[0] is None
else type(self.data[0])
)
return [cast_type(data[i]) for i in range(self.size)]
_to_py_ndarray = from_pointer
class _NullEngineCharArray(_ctypes.Array):
"""
Null array.
"""
# pylint: disable=too-few-public-methods
_length_ = 0
_type_ = _EngineCharScalarType.c_type._type_
def _byteorder_stripped_dtype(
data,
):
"""
Return a stripped copy of the dtype of numpy `data` with
byte-order markers removed.
"""
return data.dtype.str.lstrip('|<>=')
class _EngineCharArrayType(_EngineArrayType):
"""
Concrete class for character array NAG Engine data.
Extends __init__.
Extends _check_contents_ndarray.
Extends _check_contents_type_compat.
Overrides _check_contents_1dim.
Overrides empty_ctypes.
Overrides empty_ndarray.
Overrides from_pointer.
Overrides refresh_ctypes_buffer.
Overrides _refresh_py_ndarray.
Overrides _to_ctypes_from_list.
Overrides _to_ctypes_from_ndarray.
Overrides _to_py_list.
Overrides _to_py_ndarray.
Supplies _check_contents_multidim.
"""
# pylint: disable=too-few-public-methods
__slots__ = ['chrlen', 'flat_seq']
containers = (list, _np.ndarray)
_underlying_scalar = _EngineCharScalarType
c_type = _underlying_scalar.c_type
_null_ctypes = _NullEngineCharArray()
_force_ctypes_cast = True
_dtypes_equiv = staticmethod(_np.issubdtype)
_allowed_np_dtypes = _underlying_scalar.type_map[
_EngineType._module_key_numpy
]
_numpy_kind = 'U'
def __init__(
self,
data,
locus,
exp_chrlen,
exp_shape,
mutual_sorder=None,
check_contents=True,
allow_none=False,
):
"""
Raises NagShapeError from the super __init__ if the payload data
is not the correct shape.
Raises NagTypeError from the super __init__ if the payload data
is not the correct type.
Raises NagValueError if any payload element exceeds a required length.
"""
# pylint: disable=too-many-arguments
self.chrlen = exp_chrlen
self.flat_seq = None
_EngineArrayType.__init__(
self,
data,
locus,
exp_shape,
mutual_sorder,
check_contents,
allow_none,
)
def _check_contents_1dim(
self,
):
"""
Raises NagTypeError if the contents of the rank-1 payload fail
their respective type checks.
"""
for arg_i, arg_el in enumerate(self.data):
self.locus['element'] = arg_i
self._underlying_scalar(
arg_el,
self.locus,
self.chrlen,
)
@staticmethod
def _check_contents_multidim_direct(
data,
locus,
chrlen,
):
"""
Raises NagTypeError if the contents of data fail
their respective type checks.
"""
if data.size == 0:
return
data_it = _np.nditer(data, flags=['multi_index'])
while not data_it.finished:
locus['element'] = ', '.join([str(i) for i in data_it.multi_index])
if data.ndim > 1:
locus['element'] = '[' + locus['element'] + ']'
_EngineCharScalarType(
data[data_it.multi_index],
locus,
chrlen,
)
data_it.iternext()
def _check_contents_multidim(
self,
):
"""
Raises NagTypeError if the contents of the multidimensional payload
fail their respective type checks.
"""
if self.size == 0:
return
self._check_contents_multidim_direct(
self.data, self.locus, self.chrlen
)
def _check_contents_type_compat(
self,
):
"""
Set or compute chrlen before checking.
"""
self.flat_seq = self._set_flat_seq(self.data, self.allow_none)
self._infer_chrlen()
_EngineArrayType._check_contents_type_compat(self)
@classmethod
def empty_ctypes(
cls,
size,
chrlen=None,
):
"""
Return an empty string buffer.
"""
if (
size <= 0 or
chrlen is None or
chrlen <= 0
):
return cls._null_ctypes
new_arr_cls = _ctypes.c_char*(size*chrlen)
globals()[new_arr_cls.__name__] = new_arr_cls
return new_arr_cls()
@classmethod
def empty_ndarray(
cls,
shape,
sorder=_EngineData.default_sorder,
chrlen=None,
):
"""
Return an empty ndarray with the same data type as the payload.
"""
return _np.empty(
cls._make_shape_nonneg(shape),
dtype=(
cls._numpy_kind +
('0' if chrlen is None else str(max(chrlen, 0)))
),
order=sorder,
)
@classmethod
def from_pointer(
cls,
pointer,
shape,
**kwargs
):
"""
Return the input ctypes pointer type as an ndarray of byte chars.
"""
chrlen = kwargs.get('chrlen')
_np_buffer = _from_pointer_to_buffer(
pointer,
cls._make_shape_nonneg(shape),
'S' + str(chrlen),
cls._force_ctypes_cast,
chrlen=chrlen,
)
np_buffer = _np_buffer
if kwargs.get('can_reference_contents', True):
for buff_i, buff_el in enumerate(np_buffer):
np_buffer[buff_i] = buff_el.rstrip()
else:
np_buffer[:] = [b'\0']*len(np_buffer)
if len(shape) == 1:
return np_buffer
return np_buffer.reshape(
cls._make_shape_nonneg(shape),
order=kwargs.get('sorder')
)
def _infer_chrlen(self):
"""
Compute maximal chrlen from the payload data if not already known.
"""
if self.chrlen is not None:
return
if isinstance(self.flat_seq, _np.ndarray):
self.chrlen = int(self.flat_seq.dtype.str[2:])
return
if isinstance(self.flat_seq, list):
self.chrlen = (
0 if not self.flat_seq
else max([
len(c) for c in self.flat_seq
if isinstance(c, _collections.abc.Sized)
])
)
return
raise NagException( # pragma: no cover
{'fun_name': '_infer_chrlen'},
'inconceivable chrlen inference'
)
@classmethod
def refresh_ctypes_buffer(
cls,
data,
locus,
ctypes_buffer,
chrlen=None
):
"""
This version back-casts to short-enough ASCII correctly.
"""
# pylint: disable=too-many-arguments
cls._check_contents_multidim_direct(data, locus, chrlen)
_ctypes_buffer = cls._to_ctypes_from_ndarray_data(
data, chrlen, allow_none=False,
)
for i in range(data.size*chrlen):
ctypes_buffer[i] = _ctypes_buffer[i]
_refresh_py_ndarray = _EngineArrayType._refresh_py_ndarray_copy
@staticmethod
def _set_flat_seq(data, allow_none):
"""
Return data as a flat ASCII sequence.
"""
if allow_none and data is None:
return []
if isinstance(data, _np.ndarray):
flat_data = data if data.ndim == 1 else data.flatten(order='K')
elif isinstance(data, list):
flat_data = data
else:
raise NagException( # pragma: no cover
{},
'inconceivable instance for _set_flat_seq'
)
if (
len(flat_data) and # pylint: disable=len-as-condition
isinstance(flat_data[0], (str, _np.str_))
):
strs_to_cast = [
c.encode('ascii') for c in flat_data
if (
isinstance(c, (str, _np.str_)) and
_is_ascii(c, {})
)
]
else:
strs_to_cast = flat_data
return (
strs_to_cast if isinstance(data, list) else
_np.array(
strs_to_cast, dtype='S' + _byteorder_stripped_dtype(data)[1:]
)
)
def _to_ctypes_from_list(
self,
**kwargs
):
"""
Pad and join the payload to be contiguous.
"""
if kwargs.get('can_reference_contents', True):
casted = self._pad_and_join_to_string_seq(
self.flat_seq, self.chrlen
)
return _ctypes.create_string_buffer(casted, size=len(casted))
return self.empty_ctypes(self.size, chrlen=self.chrlen)
@classmethod
def _to_ctypes_from_ndarray_data(
cls,
data,
chrlen,
allow_none,
):
"""
Direct creation of a string buffer for the data.
"""
casted = cls._pad_and_join_to_string_seq(
cls._set_flat_seq(data, allow_none), chrlen,
)
return _ctypes.create_string_buffer(casted, size=len(casted))
def _to_ctypes_from_ndarray(
self,
**kwargs
):
"""
Return a contiguous string with padding.
If the payload is shorter than a non-None chrlen then it is blank
padded on the right to this length.
"""
if kwargs.get('can_reference_contents', True):
return self._to_ctypes_from_ndarray_data(
self.data, self.chrlen, self.allow_none,
)
return self.empty_ctypes(self.size, chrlen=self.chrlen)
@classmethod
def _to_bytes_list_from_ctypes(
cls,
data,
size,
chrlen=None,
):
"""
Return ctypes `data` reshaped into a list.
"""
return [
data[(i * chrlen):((i + 1) * chrlen)].rstrip()
for i in range(size)
]
def _to_py_list(
self,
data,
):
"""
Return ctypes `data` reshaped into a list.
"""
bytes_list = self._to_bytes_list_from_ctypes(
data, self.size, chrlen=self.chrlen
)
if (
not bytes_list or
isinstance(self.data[0], bytes)
):
return bytes_list
return [c.decode(encoding='ascii') for c in bytes_list]
def _to_py_ndarray(
self,
data,
shape,
**kwargs
):
bytes_array = self.from_pointer(data, shape, **kwargs)
data_char_class = _byteorder_stripped_dtype(self.data)
return (
bytes_array if data_char_class[0] == 'S' else
_np.array(bytes_array, dtype=data_char_class)
)
def _check_contents_ndarray(
self,
):
"""
Extend the super method to also check for ASCII and short-enough input.
"""
_EngineArrayType._check_contents_ndarray(self)
if not self.check_contents:
return
self._check_contents_anydim()
@staticmethod
def _pad_and_join_to_string_seq(
strs_to_cast,
chrlen,
):
"""
Return a contiguous string with padding.
If the payload is shorter than a non-None chrlen then it is blank
padded on the right to this length.
"""
return b''.join(
[c_str.ljust(chrlen)[:chrlen] for c_str in strs_to_cast]
)
class _EngineComplexArrayType(_EngineArrayType):
"""
Abstract class for complex array NAG Engine data.
Overrides _to_ctypes_from_list.
Overrides _to_py_list.
"""
__slots__ = []
# Complex ctype arrays become void numpy dtypes when using as_array, so
# are useless in that form.
_force_ctypes_cast = True
def _to_ctypes_from_list(
self,
**kwargs
):
"""
Return a ctypes Array of Complex Structures.
"""
if kwargs.get('can_reference_contents', True):
return (self.c_type._type_*self.size)(
*[(z.real, z.imag) for z in self.data]
)
return self.empty_ctypes(self.size)
def _to_py_list(
self,
data,
):
"""
Return ctypes `data` reshaped into a list.
"""
if not self.data:
return []
return [
self._underlying_scalar.to_py(data[i]) if self.data[0] is None
else type(self.data[0])(self._underlying_scalar.to_py(data[i]))
for i in range(self.size)
]
class _NullEngineComplex64Array(_ctypes.Array):
"""
Null array.
"""
# pylint: disable=too-few-public-methods
_length_ = 0
_type_ = _EngineComplex64ScalarType.c_type
class _EngineComplex64ArrayType(_EngineComplexArrayType, _EngineArrayType):
"""
Concrete class for single-precision complex array NAG Engine data.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
containers = (list, _np.ndarray)
_underlying_scalar = _EngineComplex64ScalarType
c_type = _ctypes.POINTER(_underlying_scalar.c_type)
_null_ctypes = _NullEngineComplex64Array()
_allowed_np_dtypes = _underlying_scalar.type_map[
_EngineType._module_key_numpy
]
class _NullEngineComplex128Array(_ctypes.Array):
"""
Null array.
"""
# pylint: disable=too-few-public-methods
_length_ = 0
_type_ = _EngineComplex128ScalarType.c_type
class _EngineComplex128ArrayType(_EngineComplexArrayType, _EngineArrayType):
"""
Concrete class for double-precision complex array NAG Engine data.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
containers = (list, _np.ndarray)
_underlying_scalar = _EngineComplex128ScalarType
c_type = _ctypes.POINTER(_underlying_scalar.c_type)
_null_ctypes = _NullEngineComplex128Array()
_allowed_np_dtypes = _underlying_scalar.type_map[
_EngineType._module_key_numpy
]
class _NullEngineCptrArray(_ctypes.Array):
"""
Null array.
"""
# pylint: disable=too-few-public-methods
_length_ = 0
_type_ = _EngineCptrScalarType.c_type
class _EngineCptrArrayType(_EngineArrayType):
"""
Concrete class for void* array NAG Engine data.
Overrides empty_ctypes.
Overrides empty_ndarray.
Overrides from_pointer.
Overrides refresh_ctypes_buffer.
Overrides refresh_py.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
containers = (_ctypes.Array, list)
_underlying_scalar = _EngineCptrScalarType
c_type = _ctypes.POINTER(_underlying_scalar.c_type)
_null_ctypes = _NullEngineCptrArray()
@classmethod
def empty_ndarray(
cls,
shape,
sorder=_EngineData.default_sorder,
chrlen=None,
):
raise NotImplementedError
@classmethod
def from_pointer(
cls,
pointer,
shape,
**kwargs
):
raise NotImplementedError
@classmethod
def refresh_ctypes_buffer(
cls,
data,
locus,
ctypes_buffer,
chrlen=None,
):
raise NotImplementedError
def refresh_py(
self,
data,
):
raise NotImplementedError
class _EngineFloatArrayType(_EngineArrayType):
"""
Abstract class for float array NAG Engine data.
Overrides _check_contents_1dim.
"""
__slots__ = []
def _check_contents_1dim(
self,
):
"""
Promote promotable ints.
"""
for arg_i, arg_el in enumerate(self.data):
self.locus['element'] = arg_i
self._underlying_scalar(arg_el, self.locus)
if (
isinstance(self.data, list) and
_is_promotable_int(arg_el)
):
self.data[arg_i] = self._underlying_scalar.numpy_type(
self.data[arg_i]
)
class _NullEngineFloat32Array(_ctypes.Array):
"""
Null array.
"""
# pylint: disable=too-few-public-methods
_length_ = 0
_type_ = _EngineFloat32ScalarType.c_type
class _EngineFloat32ArrayType(_EngineFloatArrayType, _EngineArrayType):
"""
Concrete class for 32-bit float array NAG Engine data.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
containers = (_array.array, _ctypes.Array, list, _np.ndarray)
array_typecodes = ('f',)
_underlying_scalar = _EngineFloat32ScalarType
c_type = _ctypes.POINTER(_underlying_scalar.c_type)
_null_ctypes = _NullEngineFloat32Array()
_allowed_np_dtypes = (_underlying_scalar.numpy_type,)
class _NullEngineFloat64Array(_ctypes.Array):
"""
Null array.
"""
# pylint: disable=too-few-public-methods
_length_ = 0
_type_ = _EngineFloat64ScalarType.c_type
class _EngineFloat64ArrayType(_EngineFloatArrayType, _EngineArrayType):
"""
Concrete class for 64-bit float array NAG Engine data.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
containers = (_array.array, _ctypes.Array, list, _np.ndarray)
array_typecodes = ('d',)
_underlying_scalar = _EngineFloat64ScalarType
c_type = _ctypes.POINTER(_underlying_scalar.c_type)
_null_ctypes = _NullEngineFloat64Array()
_allowed_np_dtypes = (_underlying_scalar.numpy_type,)
get_max = _EngineArrayType._get_max
get_product = _EngineArrayType._get_product
get_sum = _EngineArrayType._get_sum
is_monotonic = _EngineArrayType._is_monotonic
is_nonnegative = _EngineArrayType._is_nonnegative
class _NullEngineIntArray(_ctypes.Array):
"""
Null array.
"""
# pylint: disable=too-few-public-methods
_length_ = 0
_type_ = _EngineIntScalarType.c_type
class _EngineIntArrayType(_EngineArrayType):
"""
Concrete class for integer array NAG Engine data.
This could be 32- or 64-bits depending on the underlying Engine.
Overrides _to_ctypes_from_list.
Extends _check_contents_array.
Extends _check_contents_ctypes.
Extends _check_contents_ndarray.
Extends _refresh_py_array.
Extends _refresh_py_ctarray.
Extends _refresh_py_ndarray.
Extends _to_ctypes_from_array.
Extends _to_ctypes_from_ctypes.
Extends _to_ctypes_from_ndarray.
Supplies _check_contents_multidim (int32 only).
"""
# pylint: disable=too-few-public-methods
__slots__ = []
containers = (_array.array, _ctypes.Array, list, _np.ndarray)
array_typecodes = ('l', 'q')
_underlying_scalar = _EngineIntScalarType
c_type = _ctypes.POINTER(_underlying_scalar.c_type)
_null_ctypes = _NullEngineIntArray()
_allowed_np_dtypes = _underlying_scalar.type_map[
_EngineType._module_key_numpy
]
get_max = _EngineArrayType._get_max
get_product = _EngineArrayType._get_product
get_sum = _EngineArrayType._get_sum
is_monotonic = _EngineArrayType._is_monotonic
is_nonnegative = _EngineArrayType._is_nonnegative
if _EngineState._int_model['bits'] == 32:
def _to_ctypes_from_array(
self,
**kwargs
):
"""
Override the super method to take care to downcast 64-bit integer
input.
"""
if (
self.data.typecode == 'q' or
(
self.data.typecode == 'l' and
not _IS_IL32
)
):
if kwargs.get('can_reference_contents', True):
data_32 = _np.array(
self.data, dtype=_EngineState._int_model['numpy']
)
return data_32.ctypes.data_as(self.c_type)
return self.empty_ctypes(self.size)
return _EngineArrayType._to_ctypes_from_array(self, **kwargs)
def _refresh_py_array(
self,
data,
):
"""
Fill the payload array.array with ctypes `data`.
This is the mixed copy-back or no-op version.
"""
if (
self.data.typecode == 'q' or
(
self.data.typecode == 'l' and
not _IS_IL32
)
):
for i in range(self.size):
self.data[i] = data[i]
return
_EngineArrayType._refresh_py_array(self, data)
def _refresh_py_ctarray(
self,
data,
):
"""
Fill the payload ctypes.Array with ctypes `data`.
This is the mixed copy-back or no-op version.
"""
if data._type_ is not self.data._type_:
self.data[:] = (self.data._type_*self.size)(*data)
return
_EngineArrayType._refresh_py_ctarray(self, data)
def _refresh_py_ndarray(
self,
data,
):
"""
Fill the payload ndarray with ctypes `data`.
This is the mixed copy-back or no-op version.
"""
refresh_fun = (
self._refresh_py_ndarray_copy
if self._has_different_int_width(self.data)
else self._refresh_py_noop
)
return refresh_fun(data)
def _to_ctypes_from_ctypes(
self,
**kwargs
):
"""
Override the super method to take care to cast to Engine integer
input.
"""
if self.data._type_ is not self.c_type._type_:
return (self.c_type._type_*self.size)(*self.data)
return _EngineArrayType._to_ctypes_from_ctypes(self, **kwargs)
def _to_ctypes_from_list(
self,
**kwargs
):
"""
Return the underlying C-compatible cast of the payload list.
"""
if kwargs.get('can_reference_contents', True):
return (self.c_type._type_*self.size)(
*[
_EngineState._int_model['ctypes'](
x.value if isinstance(x, (
_ctypes.c_int, _ctypes.c_long, _ctypes.c_longlong
))
else x
)
for x in self.data
]
)
return self.empty_ctypes(self.size)
@staticmethod
def _has_different_int_width(
data,
):
"""
Return True when numpy int `data` is of a different type to
the Engine int.
"""
return (
(
_EngineState._int_model['bits'] == 32 and
data.dtype == _np.int64
) or
(
_EngineState._int_model['bits'] == 64 and
data.dtype == _np.int32
)
)
@classmethod
def _to_ctypes_from_flat_np_data(
cls,
data,
):
"""
Direct conversion of 1d data to Engine integer.
"""
return (cls.c_type._type_*len(data))(
*[
_EngineState._int_model['numpy'](x)
for x in data
]
)
def _to_ctypes_from_ndarray(
self,
**kwargs
):
"""
Override the super method to take care to cast to Engine integer
input.
"""
if self._has_different_int_width(self.data):
if kwargs.get('can_reference_contents', True):
return self._to_ctypes_from_flat_np_data(
self.data.flatten(order='K')
)
return self.empty_ctypes(self.size)
return _EngineArrayType._to_ctypes_from_ndarray(self, **kwargs)
if _EngineState._int_model['bits'] == 32:
def _check_contents_array(
self,
):
"""
Extend the super method to also check for in-range input.
"""
_EngineArrayType._check_contents_array(self)
if not self.check_contents:
return
self._check_contents_1dim()
def _check_contents_ctypes(
self,
):
"""
Extend the super method to also check for in-range input.
"""
_EngineArrayType._check_contents_ctypes(self)
if not self.check_contents:
return
if self.data._type_ is _SYS_CTYPES_INTS[32]:
return
self._check_contents_1dim()
def _check_contents_ndarray(
self,
):
"""
Extend the super method to also check for in-range input.
"""
_EngineArrayType._check_contents_ndarray(self)
if not self.check_contents:
return
if self.data.dtype == _np.int32:
return
self._check_contents_anydim()
def _check_contents_multidim(
self,
):
if self.size == 0:
return
data_it = _np.nditer(self.data, flags=['multi_index'])
while not data_it.finished:
self.locus['element'] = (
'[' +
', '.join(
[str(i) for i in data_it.multi_index]
) +
']'
)
self._underlying_scalar(
self.data[data_it.multi_index],
self.locus
)
data_it.iternext()
class _NullEngineLogicalArray(_ctypes.Array):
"""
Null array.
"""
# pylint: disable=too-few-public-methods
_length_ = 0
_type_ = _EngineLogicalScalarType.c_type
class _EngineLogicalArrayType(_EngineArrayType):
"""
Concrete class for logical array NAG Engine data.
Overrides from_pointer.
Overrides refresh_ctypes_buffer.
Overrides _refresh_py_ndarray.
Overrides _to_ctypes_from_list.
Overrides _to_ctypes_from_ndarray.
"""
# pylint: disable=too-few-public-methods
__slots__ = []
containers = (list, _np.ndarray)
_underlying_scalar = _EngineLogicalScalarType
c_type = _ctypes.POINTER(_underlying_scalar.c_type)
_null_ctypes = _NullEngineLogicalArray()
_force_ctypes_cast = True
_allowed_np_dtypes = _underlying_scalar.type_map[
_EngineType._module_key_numpy
]
@classmethod
def from_pointer(
cls,
pointer,
shape,
**kwargs
):
"""
Convert from int to bool.
"""
int_buffer = _from_pointer_to_buffer(
pointer,
cls._make_shape_nonneg(shape),
_EngineState._int_model['numpy'],
cls._force_ctypes_cast,
)
if kwargs.get('can_reference_contents', True):
bool_ndarray = _np.array(
[
cls._underlying_scalar._to_py_from_int(int_buffer[i])
for i in range(len(int_buffer))
],
dtype=cls._underlying_scalar.numpy_type
)
else:
bool_ndarray = _np.array(
[False]*len(int_buffer),
dtype=cls._underlying_scalar.numpy_type
)
if len(shape) == 1:
return bool_ndarray
return bool_ndarray.reshape(
cls._make_shape_nonneg(shape),
order=kwargs.get('sorder')
)
@classmethod
def refresh_ctypes_buffer(
cls,
data,
_locus,
ctypes_buffer,
chrlen=None
):
"""
This version back-casts to integer correctly.
"""
# pylint: disable=too-many-arguments
_ctypes_buffer = cls._to_ctypes_from_flat_data(data.flatten(order='K'))
for i in range(data.size):
ctypes_buffer[i] = _ctypes_buffer[i]
_refresh_py_ndarray = _EngineArrayType._refresh_py_ndarray_copy
def _to_ctypes_from_list(
self,
**kwargs
):
"""
Return the underlying C-compatible cast of the payload list.
"""
if kwargs.get('can_reference_contents', True):
return self._to_ctypes_from_flat_data(self.data)
return self.empty_ctypes(self.size)
@classmethod
def _to_ctypes_from_flat_data(
cls,
data,
):
"""
Direct conversion of 1d data to Engine integer.
"""
return (cls.c_type._type_*len(data))(
*[
_EngineState._from_bool_to_int(x)
for x in data
]
)
def _to_ctypes_from_ndarray(
self,
**kwargs
):
"""
Convert to Engine int and then return as a pointer.
"""
if kwargs.get('can_reference_contents', True):
return self._to_ctypes_from_flat_data(self.data.flatten(order='K'))
return self.empty_ctypes(self.size)
_to_py_ndarray = from_pointer
class _EngineErrbuf(_ctypes.Array):
"""
NAG Engine error-state string.
"""
# pylint: disable=too-few-public-methods
_length_ = 200
_type_ = _ctypes.c_char
class _EngineRoutineName(_ctypes.Array):
"""
NAG Engine short routine name using during e.g.
option listing.
"""
# pylint: disable=too-few-public-methods
_length_ = 6
_type_ = _ctypes.c_char
# ctypes function prototype for the Engine's explicitly C-interoperable
# print handler.
_C_PRINT_REC_FUNCTYPE = _EngineState._sys_functype(
None,
_BYREF_ENGINEDATA,
_BYREF_INT, # Input, nout.
_EngineCharArrayType.c_type, # Input, rec(len=1)[200].
_BYREF_INT, # Input, length_rec.
_BYREF_INT, # Output, ifail.
)
# ctypes args for the Engine's default print handler.
_PRINT_REC_ARGS = (
_BYREF_ENGINEDATA,
_BYREF_INT, # Input, nout.
_BYREF_STRING, # Input, rec(len=*).
_BYREF_INT, # Output, ifail.
_EngineState._chrlen_ctype, # Input, hidden chrlen of rec.
)
# ctypes function prototype for the Engine's default
# print handler.
_PRINT_REC_FUNCTYPE = _EngineState._sys_functype(
None,
*_PRINT_REC_ARGS
)
_UNUSED_PRINT_REC = _cast_callback(
_PRINT_REC_FUNCTYPE, None,
{'fun_name': _MODULE_NAME}
)
# ctypes function prototype for the Engine's default
# print handler helper.
_PRINT_RECH_FUNCTYPE = _EngineState._sys_functype(
None,
_PRINT_REC_FUNCTYPE,
*_PRINT_REC_ARGS
)
# ctypes args for the Engine's default read handler.
_READ_REC_ARGS = (
_BYREF_ENGINEDATA,
_BYREF_INT, # Input, nin.
_BYREF_STRING, # Output, recin(len=*).
_BYREF_STRING, # Output, errbuf(len=200).
_BYREF_INT, # Output, ifail.
_EngineState._chrlen_ctype, # Input, hidden chrlen of recin.
_EngineState._chrlen_ctype, # Input, hidden chrlen of errbuf.
)
# ctypes function prototype for the Engine's default
# read handler.
_READ_REC_FUNCTYPE = _EngineState._sys_functype(
None,
*_READ_REC_ARGS
)
_UNUSED_READ_REC = _cast_callback(
_READ_REC_FUNCTYPE, None,
{'fun_name': _MODULE_NAME}
)
# ctypes function prototype for the Engine's default
# read handler helper.
_READ_RECH_FUNCTYPE = _EngineState._sys_functype(
None,
_READ_REC_FUNCTYPE,
*_READ_REC_ARGS
)
# ctypes args for the Engine's default nonadvancing-read handler.
_NONADV_READ_REC_ARGS = (
_BYREF_ENGINEDATA,
_BYREF_INT, # Input, nin.
_BYREF_STRING, # Output, recin(len=*).
_BYREF_INT, # Output, readlen.
_BYREF_INT, # Output, inform.
_EngineState._chrlen_ctype, # Input, hidden chrlen of recin.
)
# ctypes function prototype for the Engine's default
# nonadvancing-read handler.
_NONADV_READ_REC_FUNCTYPE = _EngineState._sys_functype(
None,
*_NONADV_READ_REC_ARGS
)
_UNUSED_NONADV_READ_REC = _cast_callback(
_NONADV_READ_REC_FUNCTYPE, None,
{'fun_name': _MODULE_NAME}
)
# ctypes function prototype for the Engine's default
# nonadvancing-read handler helper.
_NONADV_READ_RECH_FUNCTYPE = _EngineState._sys_functype(
None,
_NONADV_READ_REC_FUNCTYPE,
*_NONADV_READ_REC_ARGS
)
# Dummy callback prototype and definition for unreferenced
# Engine callbacks.
_UNUSED_FUNCTYPE = _EngineState._sys_functype(None)
_UNUSED_NULL_CALLBACK = _cast_callback(
_UNUSED_FUNCTYPE, None,
{'fun_name': _MODULE_NAME}
)
def _unused_callback(): # pragma: no cover
return None
_UNUSED_NONNULL_CALLBACK = _cast_callback(
_UNUSED_FUNCTYPE, _unused_callback,
{'fun_name': _MODULE_NAME}
)
def _cast_to_unused_callback(user_cb):
return (
_UNUSED_NULL_CALLBACK if user_cb is None else
_UNUSED_NONNULL_CALLBACK
)
[docs]class FileObjManager(object):
# pylint: disable=line-too-long
"""
Class for translating between Python file objects and NAG Library Engine
I/O units.
NAG Engine I/O units are roughly synonymous with Unix file descriptors.
File objects may be registered with I/O units using the instance
initializer or the :meth:`register` method.
Convenience methods :meth:`register_to_errunit` and
:meth:`register_to_advunit` are supplied to associate file objects with the
Engine error unit (analogous to standard error, stderr) and the Engine
advisory unit (analogous to standard output, stdout), respectively.
A Python file object is considered valid for use with this class if it
has a ``fileno`` method returning its underlying file descriptor.
In particular, you may use any concrete subclass of ``io.IOBase``
(including the ``file`` attribute of a file object returned by
``tempfile.NamedTemporaryFile``).
Naturally, the object must also have ``read``/``readline`` and ``write``
methods.
Resource management (opening/closing/deleting/... files) remains the
reponsibility of the caller.
In particular, objects to be used for output streams must have been opened
for writing (mut. mut. for objects to be used for input) before their
associated units are used by the Engine.
To communicate I/O unit numbers to the NAG Engine the
:meth:`unit_from_fileobj` method must be used.
This maps a (registered) file object to a valid unit number.
Writes requested from the Engine to a unit number that does not have a
file object associated with it will fall back to being sent to
``sys.stdout``.
By default written data is prefixed with the name of the calling NAG
Python function. This behaviour can be disabled when constructing an
instance by using the initializer's `locus_in_output` argument.
Reads requested by the Engine from a unit number that does not have a
file object associated with it will cause **NagOSError** to be raised.
File objects may be removed from the class's registry of I/O units
by calling the :meth:`deregister` method.
The special nature of the Engine advisory and error units means that
these have persistent file-object associations; they are not
affected by a call to ``deregister``. The methods
:meth:`deregister_errunit` and :meth:`deregister_advunit` are available to
restore the associations with ``sys.stderr`` and ``sys.stdout``,
respectively.
A module-level instance of this class called ``GLOBAL_IO_MANAGER`` is
supplied for convenience. This is queried as the fallback I/O manager
whenever the Engine requires an I/O manager but one has not been
explicitly supplied to the calling Python function. It is constructed
using the initializer's default argument set: in particular, prefixing
of output data with NAG Python function names is enabled.
Attributes
----------
errunit : int
The value denoting the Engine error unit.
This attribute should be considered read only.
By default `errunit` is associated with ``sys.stderr``.
advunit : int
The value denoting the Engine advisory unit.
This attribute should be considered read only.
By default `advunit` is associated with ``sys.stdout``.
Parameters
----------
fileobjs : None or tuple, optional
If not ``None``, the file objects to register.
locus_in_output : bool, optional
Controls whether output managed by this instance is prefixed with
the name of the calling NAG Python function.
Raises
------
NagTypeError
`fileobjs` is neither ``None`` nor an instance of ``tuple``.
NagOSError
An element of the `fileobjs` ``tuple`` is not a valid file-object
instance.
Examples
--------
To send monitoring from a (fictional) optimization function to a
temporary file:
>>> import os
>>> import tempfile
>>> t_fd, t_path = tempfile.mkstemp()
>>> f1 = open(t_path, 'w')
>>> io_m = FileObjManager((f1,))
>>> io_m_unit = io_m.unit_from_fileobj(f1)
>>> OptimizerOptionSet("Monitoring File", io_m_unit) # doctest: +SKIP
...
>>> Optimizer(..., io_manager=io_m) # doctest: +SKIP
...
>>> f1.close()
>>> os.close(t_fd)
...
>>> os.unlink(t_path)
To display iteration statistics from another (fictional) optimization
function, in which the statistics are documented as being sent to the
advisory unit, delegating handling to ``GLOBAL_IO_MANAGER``:
>>> AnotherOptimizerOptionSet("Print Level = 1") # doctest: +SKIP
# Don't supply a value for io_manager:
>>> AnotherOptimizer(...) # doctest: +SKIP
See Also
--------
:func:`naginterfaces.library.examples.opt.handle_solve_bounds_foas_ex.main` :
This example uses a ``FileObjManager``.
"""
# pylint: enable=line-too-long
__slots__ = ['_locus_in_output', '_mappings']
_unit_shift = 10
_default_errunit = _EngineState._get_unit('err')
errunit = _unit_shift - 2
_EngineState._set_unit('err', errunit)
_default_advunit = _EngineState._get_unit('adv')
advunit = _unit_shift - 1
_EngineState._set_unit('adv', advunit)
_print_err = -10000
_read_err_unassoc = -20000
_read_err_nonascii = -20001
_read_err = -20002
_allowed_fo_insts = (_io.IOBase,)
_allowed_fo_insts_str = 'io.IOBase'
def __init__(self, fileobjs=None, locus_in_output=True):
"""
Register the supplied file objects with this manager instance.
Parameters
----------
fileobjs : None or tuple, optional
If not ``None``, the file objects to register.
locus_in_output : bool, optional
Controls whether output managed by this instance is prefixed with
the name of the calling NAG Python function.
Raises
------
NagTypeError
`fileobjs` is neither ``None`` nor an instance of ``tuple``.
NagOSError
An element of the `fileobjs` ``tuple`` is not a valid file-object
instance.
"""
self._locus_in_output = locus_in_output
self._mappings = {
self.errunit: _sys.stderr,
self.advunit: _sys.stdout,
}
if fileobjs is None:
return
locus = {
'fun_name': '.'.join([_MODULE_NAME, 'FileObjManager.__init__']),
'entity': 'fileobjs'
}
self._check_valid_tuple(fileobjs, locus)
for fileobj in fileobjs:
self._check_valid_fo_and_fd(fileobj, locus)
self.register(fileobjs)
@classmethod
def _check_type_compat(cls, data, locus):
"""
Raises NagTypeError if non-None `data` is not an instance of `cls`.
"""
if (
data is None or
isinstance(data, cls)
):
return
raise NagTypeError(
locus,
_bad_instance_msg(
['.'.join([_MODULE_NAME, 'FileObjManager'])], allow_none=False,
)
)
@staticmethod
def _check_valid_fd(fileobj, locus):
"""
Raises NagOSError if `fileobj` does not use a file descriptor.
"""
try:
fileobj.fileno()
except OSError:
has_fd = False
else:
has_fd = True
if not has_fd:
raise NagOSError(
locus,
'must implement the fileno method'
)
def _check_valid_fo_and_fd(self, fileobj, locus):
"""
Raises NagOSError if `fileobj` is not a valid file-object instance.
Raises NagOSError if `fileobj` does not use a file descriptor.
"""
if not isinstance(fileobj, self._allowed_fo_insts):
raise NagOSError(
locus,
'must be an instance of ' + self._allowed_fo_insts_str
)
self._check_valid_fd(fileobj, locus)
@staticmethod
def _check_valid_tuple(fileobjs, locus):
"""
Raises NagTypeError if argument `fileobjs` is not an instance of tuple.
"""
if not isinstance(fileobjs, tuple):
raise NagTypeError(
locus,
'must be an instance of tuple'
)
[docs] def deregister(self, fileobjs):
"""
Deregister the supplied file objects from this manager instance.
Parameters
----------
fileobjs : tuple
The file objects to deregister.
Raises
------
NagTypeError
`fileobjs` is not an instance of ``tuple``.
"""
locus = {
'fun_name': '.'.join([_MODULE_NAME, 'FileObjManager.deregister']),
'entity': 'fileobjs'
}
self._check_valid_tuple(fileobjs, locus)
for fileobj in fileobjs:
unit_to_dereg = self._fileobj_to_unit(
fileobj,
locus,
check_range=False
)
self._deregister_unit(unit_to_dereg)
[docs] def deregister_advunit(self):
"""
Remove the current association for the Engine advisory unit and
restore it to ``sys.stdout``.
"""
self._mappings[self.advunit] = _sys.stdout
[docs] def deregister_errunit(self):
"""
Remove the current association for the Engine error unit and
restore it to ``sys.stderr``.
"""
self._mappings[self.errunit] = _sys.stderr
def _deregister_unit(self, unit):
"""
Deregister the supplied unit number.
"""
if unit not in self._mappings:
return
del self._mappings[unit]
@classmethod
def _fileno_to_unit(cls, fileno, check_range=True):
"""
Map the input file-descriptor number to a 'safe' (unreserved)
unit number for the Engine.
"""
unitno = fileno + cls._unit_shift
if check_range and unitno > 2147483647:
raise NagException( # pragma: no cover
{'fun_name': '_fileno_to_unit'},
'unit number will be out of range. '
'Consider closing other open files',
)
return unitno
def _fileobj_from_unit(self, unit):
"""
Return the file object registered with the input unit number,
or None.
"""
return self._mappings.get(unit)
def _fileobj_to_unit(self, fileobj, locus, check_range=True):
"""
Return the unit number for the input file object.
`fileobj` does not need to have been registered.
Raises NagOSError if argument `fileobj` is not a valid file-object
instance.
Raises NagOSError if argument `fileobj` does not use a file descriptor.
"""
self._check_valid_fo_and_fd(fileobj, locus)
return self._fileno_to_unit(fileobj.fileno(), check_range=check_range)
[docs] def register(self, fileobjs):
"""
Register the supplied file objects with this manager instance.
Parameters
----------
fileobjs : tuple
The file objects to register.
Raises
------
NagTypeError
`fileobjs` is not an instance of ``tuple``.
NagOSError
An element of the `fileobjs` ``tuple`` is not a valid file-object
instance.
An element of the `fileobjs` ``tuple`` does not use a file
descriptor.
"""
locus = {
'fun_name': '.'.join([_MODULE_NAME, 'FileObjManager.register']),
'entity': 'fileobjs'
}
self._check_valid_tuple(fileobjs, locus)
for fileobj in fileobjs:
unit_to_reg = self._fileobj_to_unit(fileobj, locus)
self._register_to_unit(fileobj, unit_to_reg, locus)
[docs] def register_to_advunit(self, fileobj):
"""
Register the supplied file object with the NAG Library Engine advisory
output unit (equivalent to stdout).
Parameters
----------
fileobj : file object
The file object to register.
Raises
------
NagOSError
`fileobj` is not a valid file-object instance.
`fileobj` does not use a file descriptor.
"""
self._register_to_unit(
fileobj,
self.advunit,
{
'fun_name': '.'.join(
[_MODULE_NAME, 'FileObjManager.register_to_advunit']
),
'entity': 'fileobj'
},
)
[docs] def register_to_errunit(self, fileobj):
"""
Register the supplied file object with the NAG Library Engine error
output unit (equivalent to stderr).
Parameters
----------
fileobj : file object
The file object to register.
Raises
------
NagOSError
`fileobj` is not a valid file-object instance.
`fileobj` does not use a file descriptor.
"""
self._register_to_unit(
fileobj,
self.errunit,
{
'fun_name': '.'.join(
[_MODULE_NAME, 'FileObjManager.register_to_errunit']
),
'entity': 'fileobj'
},
)
def _register_to_unit(self, fileobj, unit, locus):
"""
Register the input file object with the input unit number.
Raises NagOSError if argument `fileobj` is not a valid file-object
instance.
Raises NagOSError if argument `fileobj` does not use a file descriptor.
"""
self._check_valid_fo_and_fd(fileobj, locus)
self._mappings[unit] = fileobj
@classmethod
def _set_default_units(cls):
"""
Set the default values for the advisory and error units.
"""
_EngineState._set_unit('err', cls._default_errunit)
_EngineState._set_unit('adv', cls._default_advunit)
@classmethod
def _set_ni_units(cls):
"""
Set the modified values for the advisory and error units.
"""
_EngineState._set_unit('err', cls.errunit)
_EngineState._set_unit('adv', cls.advunit)
[docs] def unit_from_fileobj(self, fileobj):
"""
Return a NAG Library Engine unit number for the input file object.
Parameters
----------
fileobj : file object
The file object to query.
It is registered with this manager instance if it has not already
been.
Returns
-------
int
The NAG Engine unit number corresponding to `fileobj`.
Raises
------
NagOSError
`fileobj` is not a valid file-object instance.
`fileobj` does not use a file descriptor.
"""
locus = {
'fun_name': '.'.join(
[_MODULE_NAME, 'FileObjManager.unit_from_fileobj']
),
'entity': 'fileobj'
}
unit = self._fileobj_to_unit(fileobj, locus)
if unit not in self._mappings:
self._register_to_unit(fileobj, unit, locus)
return unit
#: The fallback I/O manager, used whenever the NAG Library Engine requires an
#: I/O manager but one has not been explicitly supplied to the calling
#: Python function. It has been constructed such that prefixing of
#: output data with NAG Python function names is enabled.
GLOBAL_IO_MANAGER = FileObjManager()
def _py_print_rec( # pylint: disable=too-many-arguments,too-many-branches
en_data, nout, rec, io_manager, excs, ifail,
logger=None, fun_name=None
):
"""
Write the input string rec.
If logger is None, write to Engine unit nout.
Else write using logger.debug.
Update the excs exception list with the tuple for any exception generated.
Update ifail.value with the exit flag.
If logger is None, the file object registered with nout is
extracted from the input io_manager if this argument is not None,
or from the global manager otherwise.
If logger is None and no file object has been registered the
object associated with the advisory unit is used instead.
"""
if io_manager is not None:
this_io_manager = io_manager
else:
this_io_manager = GLOBAL_IO_MANAGER
if logger is None:
this_fo = this_io_manager._fileobj_from_unit(nout)
if this_fo is None:
this_fo = this_io_manager._fileobj_from_unit(
this_io_manager.advunit
)
writer = this_fo.write
else:
writer = logger.debug
if (
logger is None and
isinstance(this_fo, (_io.BufferedIOBase, _io.RawIOBase))
):
rec_to_write = rec
if this_io_manager._locus_in_output:
rec_to_write = b': '.join([fun_name.encode('ascii'), rec_to_write])
rec_to_write = rec_to_write.rstrip() + b'\n'
elif (
logger is not None or
not isinstance(this_fo, (_io.BufferedIOBase, _io.RawIOBase))
):
rec_to_write = _EngineCharScalarType.to_py(rec)
if logger is None:
if this_io_manager._locus_in_output:
rec_to_write = ': '.join([fun_name, rec_to_write])
rec_to_write = rec_to_write.rstrip() + '\n'
else:
raise NagException( # pragma: no cover
{'fun_name': '_py_print_rec'},
'could not initialize record to write',
)
try:
writer(rec_to_write)
except: # pylint: disable=bare-except
_print_err(
_sys.exc_info(),
excs,
en_data,
ifail,
)
else:
ifail.value = 0
def _print_err(exc_info, excs, en_data, ifail):
"""
Trigger printing error.
"""
excs.append(exc_info)
en_data.hlperr = FileObjManager._print_err
ifail.value = 1
def _py_read_rec( # pylint: disable=too-many-arguments
en_data, nin, rec, errbuf, rec_len, io_manager, excs, ifail
):
"""
Read a record from Engine unit nin.
rec is a ctypes c_char * rec_len array and will be modified in place.
errbuf is a ctypes c_char * 200 array and will be modified in place.
Its contents are not referenced by the caller.
Update the excs exception list with the tuple for any exception generated.
Update ifail.value with the exit flag.
The file object registered with nin is extracted from
the input io_manager if this argument is not None, or from the
global manager otherwise.
If no file object has been registered with nin in a non-None
local io_manager, the global manager will be queried instead.
Raises NagOSError if no file object has been registered with nin.
Raises NagValueError if the read record contains non-ASCII characters.
"""
locus = {'fun_name': '_py_read_rec'}
errbuf_a = _ctypes.cast(
errbuf, _ctypes.POINTER(_ctypes.c_char * 200)
).contents
errbuf_a[:200] = b'\0' * 200
if io_manager is not None:
this_io_manager = io_manager
else:
this_io_manager = GLOBAL_IO_MANAGER
this_fo = this_io_manager._fileobj_from_unit(nin)
if (
this_fo is None and
io_manager is not None
):
this_fo = GLOBAL_IO_MANAGER._fileobj_from_unit(nin)
if this_fo is None:
_read_err(
(
None,
NagOSError(
locus,
'no file object registered with ' +
'NAG Library Engine I/O unit ' +
str(nin)
)
),
excs,
FileObjManager._read_err_unassoc,
en_data,
ifail,
)
return
try:
_rec = this_fo.readline()
except: # pylint: disable=bare-except
_read_err(
_sys.exc_info(),
excs,
FileObjManager._read_err,
en_data,
ifail,
)
return
if not _rec:
ifail.value = 1
return
if not _is_ascii(_rec, locus):
_read_err(
(
None,
NagValueError(
locus,
'data read from NAG Library Engine I/O unit ' +
str(nin) +
' must comprise only ASCII characters'
)
),
excs,
FileObjManager._read_err_nonascii,
en_data,
ifail,
)
return
if isinstance(_rec, str):
_rec = _rec.encode('ascii')
_rec = _rec.rstrip(b'\r\n')
rec_a = _ctypes.cast(
rec, _ctypes.POINTER(_ctypes.c_char * rec_len)
).contents
minlen = min(rec_len, len(_rec))
rec_a[:minlen] = _rec[:minlen]
rec_a[minlen:rec_len] = b' ' * (rec_len - minlen)
ifail.value = 0
return
def _read_err(exc_info, excs, err_val, en_data, ifail):
"""
Trigger (advancing) reading error.
"""
excs.append(exc_info)
en_data.hlperr = err_val
ifail.value = 1
def _py_nonadv_read_rec( # pylint: disable=too-many-arguments,too-many-statements
en_data, nin, rec, rec_len, io_manager, excs, inform
):
"""
Read a record from Engine unit nin. Nonadvancing.
rec is a ctypes c_char * rec_len array and will be modified in place.
Update the excs exception list with the tuple for any exception generated.
Update inform.value with the exit flag.
Return no. chars read.
The file object registered with nin is extracted from
the input io_manager if this argument is not None, or from the
global manager otherwise.
If no file object has been registered with nin in a non-None
local io_manager, the global manager will be queried instead.
Raises NagOSError if no file object has been registered with nin.
Raises NagValueError if the read record contains non-ASCII characters.
"""
locus = {'fun_name': '_py_nonadv_read_rec'}
def read_chars():
"""
Return a bytes string of length rec_len reading from this_fo as far as
'\n'.
"""
the_string = []
if isinstance(this_fo, (_io.BufferedIOBase, _io.RawIOBase)):
line_feed = b'\n'
join_char = b''
else:
line_feed = '\n'
join_char = ''
i = 0
while i < rec_len:
one_char = this_fo.read(1)
if not one_char:
return b'', 2
if one_char == line_feed:
break
the_string.append(one_char)
i += 1
strng = join_char.join(the_string)
if not _is_ascii(strng, locus):
return None, 3
if isinstance(this_fo, _io.TextIOBase):
strng = strng.encode('ascii')
strng = strng.rstrip(b'\r')
if not strng:
return strng, 1
strng = strng.ljust(rec_len)
return strng, 0
if io_manager is not None:
this_io_manager = io_manager
else:
this_io_manager = GLOBAL_IO_MANAGER
this_fo = this_io_manager._fileobj_from_unit(nin)
if (
this_fo is None and
io_manager is not None
):
this_fo = GLOBAL_IO_MANAGER._fileobj_from_unit(nin)
if this_fo is None:
return _nonadv_read_err(
(
None,
NagOSError(
locus,
'no file object registered with ' +
'NAG Library Engine I/O unit ' +
str(nin)
)
),
excs,
FileObjManager._read_err_unassoc,
en_data,
inform,
)
try:
_rec, inform.value = read_chars()
except: # pylint: disable=bare-except
return _nonadv_read_err(
_sys.exc_info(),
excs,
FileObjManager._read_err,
en_data,
inform,
)
if _rec is None:
return _nonadv_read_err(
(
None,
NagValueError(
locus,
'data read from NAG Library Engine I/O unit ' +
str(nin) +
' must comprise only ASCII characters'
)
),
excs,
FileObjManager._read_err_nonascii,
en_data,
inform,
)
readlen = len(_rec.rstrip())
if not _rec:
return readlen
rec_a = _ctypes.cast(
rec, _ctypes.POINTER(_ctypes.c_char * rec_len)
).contents
rec_a[:readlen] = _rec[:readlen]
rec_a[readlen:rec_len] = b' ' * (rec_len - readlen)
if readlen < rec_len:
inform.value = 1
return readlen
def _nonadv_read_err(exc_info, excs, err_val, en_data, inform):
"""
Trigger nonadvancing reading error.
"""
excs.append(exc_info)
en_data.hlperr = err_val
inform.value = 3
readlen = 0
return readlen
def _handle_cb_exception(excs):
"""
Reraise an exception stored in the excs list.
"""
for exc in excs:
if exc[0] is not UserCallbackTerminate:
_raise_exc_info(exc)
def _raise_exc_info(exc_info):
"""
(Re)raise a sys.exc_info-style exception.
"""
raise exc_info[1].with_traceback(exc_info[2])
def _violated_constraint_err_text(constraint):
"""
Standard text for a violated input-constraint error.
"""
return ' '.join(
['On entry, the following constraint was not satisfied:', constraint]
)
_lapack_constraint_err_text = _violated_constraint_err_text # pylint: disable=invalid-name
def _raise_warning(warn_class, msg_lines, locus):
"""
Raises warn_class with formatted `msg_lines`.
"""
_warnings.warn(
warn_class(
locus,
_format_msg_lines(msg_lines),
),
stacklevel=_get_user_stack_level(),
)
_STR_FMT_STR = '{:s}'
_STR_FMT_INT_AS_STR = '{:s}'
_STR_FMT_INT = '{:d}'
_STR_FMT_FLOAT_AS_STR = '{:s}'
_STR_FMT_FLOAT = '{:22.15e}'
def _withdrawal(fun_name, mkwdrn, advice=None):
"""
Decorate with a deprecation warning if called from outside the package.
"""
def decorate(func):
# pylint: disable=missing-docstring
@_functools.wraps(func)
def wrapper(*args, **kwargs):
# pylint: disable=missing-docstring
caller_dirname = _os.path.dirname(_inspect.stack()[1][1])
caller_parent_dirbase = _os.path.split(
_os.path.split(caller_dirname)[0]
)[-1]
if caller_parent_dirbase != _PKG_NAME:
msg_lines = []
if mkwdrn is None:
msg_lines.append(
'This function is deprecated.'
)
warn_cls = NagDeprecatedWarning
else:
msg_lines.append(
'This function is scheduled for withdrawal, '
'no earlier than Mark ' + mkwdrn + '.',
)
warn_cls = NagWithdrawalWarning
if advice is None:
msg_lines.append(
'There is no replacement for this routine.'
)
else:
msg_lines.append(
'The following advice is given for making a '
'replacement:'
)
msg_lines.extend(advice)
_warnings.warn(
warn_cls(
{'fun_name': fun_name},
'\n' + '\n'.join(msg_lines),
),
stacklevel=2
)
result = func(*args, **kwargs)
return result
return wrapper
return decorate
@_withdrawal('.'.join([_MODULE_NAME, '_withdrawal_test']), '42')
def _withdrawal_test():
"""
Test of withdrawal decorator.
"""
@_withdrawal('.'.join([_MODULE_NAME, '_withdrawal_warn_test']), '42')
def _withdrawal_warn_test():
"""
Test of withdrawal decorator on a function that raises a warning.
"""
_EngineWarningExit(
fun_name='_withdrawal_warn_test',
msg_lines=['spam', 'eggs'],
errcodes=(43, 44, 45),
).manage()
@_withdrawal(
'.'.join([_MODULE_NAME, '_withdrawal_test']), None,
['Replacement base advice.']
)
def _deprecation_test():
"""
Test of withdrawal decorator.
"""
def _get_user_stack_level():
"""
Return the frame number for the first non-naginterfaces
stack frame.
"""
stack_frames = _inspect.stack()
for frame_i, frame in enumerate(stack_frames):
frame_dirname = _os.path.dirname(frame[1])
frame_parent_dirbase = _os.path.split(
_os.path.split(frame_dirname)[0]
)[-1]
if frame_parent_dirbase != _PKG_NAME:
return frame_i
raise NagException( # pragma: no cover
{'fun_name': '_get_user_stack_level'},
'inconceivable stack'
)
def _check_comm(comm, locus, req_keys=(), opt_keys=(), allow_none=False):
"""
Validate the comm blob.
Return True if opt_keys was non-empty and none were present in comm.
Raises NagTypeError if argument comm is not a dict.
Raises NagKeyError if the dictionary keys in comm are incorrect for
the communication structure it is supposed to represent.
"""
# pylint: disable=too-many-branches
if allow_none and comm is None:
return True
if not isinstance(comm, dict):
raise NagTypeError(
locus,
'must be an instance of dict'
)
if not comm:
if req_keys:
raise NagKeyError(
locus,
'must not be empty. '
'You must pass a communication structure that was returned by '
'the initializer for this routine (which may be this routine '
'itself)'
)
if opt_keys:
return True
raise NagException(
locus,
'communication structure seems totally redundant',
)
for key in req_keys:
if key not in comm.keys():
raise NagKeyError(
locus,
'invalid communication structure for this function: ' +
'key \'' + key + '\' must be present. '
'You must pass a communication structure that was returned by '
'the initializer for this routine (which may be this routine '
'itself)'
)
if not opt_keys:
if not req_keys:
raise NagException(
locus,
'communication structure seems totally redundant',
)
return False
n_opt_keys_present = len([key for key in opt_keys if key in comm.keys()])
if n_opt_keys_present == 0:
return True
if len(opt_keys) == n_opt_keys_present:
return False
for key in opt_keys:
if key not in comm.keys():
raise NagKeyError(
locus,
'invalid communication structure for this function: ' +
'key \'' + key + '\' must be present'
)
raise NagException( # pragma: no cover
locus,
'inconceivable exit from _check_comm',
)
def _check_callable(cble, locus, allow_none=False):
"""
Validate the callable `cble`.
Raises NagTypeError if argument `cble` is not callable, and not None if
None is permitted.
"""
if (
callable(cble) or
(
allow_none and
cble is None
)
):
return
msg = 'must be '
if allow_none:
msg += 'None or '
msg += 'callable'
raise NagTypeError(
locus,
msg,
)
def _d01mazn(handle):
"""
Query handle and extract problem dimensions.
"""
raise NotImplementedError
def _e04ptan(handle):
"""
Query handle and extract problem dimensions.
"""
fun_name = '.'.join([_MODULE_NAME, '_e04ptan'])
_handle = _EngineCptrScalarType(
handle, {'fun_name': fun_name, 'entity': 'handle'},
).to_ctypes()
_nvar = _EngineIntScalarType.empty_ctypes()
_nnzu = _EngineIntScalarType.empty_ctypes()
_nnzuc = _EngineIntScalarType.empty_ctypes()
_nnzua = _EngineIntScalarType.empty_ctypes()
_ierr = _EngineIntScalarType.empty_ctypes()
engine_ctypes = _EngineState._get_symbol('e04ptan')
engine_ctypes.restype = None
engine_ctypes.argtypes = (
_BYREF_HANDLE, # Input, handle.
_BYREF_INT, # Output, nvar.
_BYREF_INT, # Output, nnzu.
_BYREF_INT, # Output, nnzuc.
_BYREF_INT, # Output, nnzua.
_BYREF_INT, # Output, ierr.
)
engine_ctypes(
_handle,
_nvar,
_nnzu,
_nnzuc,
_nnzua,
_ierr,
)
_ierr_py = _EngineIntScalarType.to_py(_ierr)
if _ierr_py != _IERR_SUCCESS:
_EngineErrorExit(
fun_name=fun_name,
msg_lines=[
'handle does not belong to the NAG optimization modelling '
'suite,',
'has not been initialized properly or is corrupted.',
],
errcodes=(_ierr_py, _ierr_py, _ierr_py),
).manage()
return _nvar, _nnzu, _nnzuc, _nnzua
def _f08mdan(call_vendor, n): # pylint: disable=invalid-name
"""
Return the required size of Q and IQ in F08MDF.
"""
# pylint: disable=invalid-name
fun_name = '.'.join([_MODULE_NAME, '_f08mdan'])
_n = _EngineIntScalarType(
n, {'fun_name': fun_name, 'entity': 'n'},
).to_ctypes()
_lq = _EngineIntScalarType.empty_ctypes()
_liq = _EngineIntScalarType.empty_ctypes()
engine_ctypes = _EngineState._get_symbol('f08mdan')
engine_ctypes.restype = None
engine_ctypes.argtypes = (
_BYREF_LOGICAL, # Input, call_vendor.
_BYREF_INT, # Input, n.
_BYREF_INT, # Output lq.
_BYREF_INT, # Output liq.
)
_call_vendor = _EngineLogicalScalarType(
call_vendor, {'fun_name': fun_name, 'entity': 'call_vendor'},
).to_ctypes()
engine_ctypes(
_call_vendor,
_n,
_lq,
_liq,
)
return (_lq, _liq)
def _f12jzyn(handle, irevcm, m0_in):
"""
Query handle and extract problem dimensions.
"""
fun_name = '.'.join([_MODULE_NAME, '_f12jzyn'])
_handle = _EngineCptrScalarType(
handle, {'fun_name': fun_name, 'entity': 'handle'},
).to_ctypes()
_irevcm = _EngineIntScalarType(
irevcm, {'fun_name': fun_name, 'entity': 'irevcm'},
).to_ctypes()
_m0_in = _EngineIntScalarType(
m0_in, {'fun_name': fun_name, 'entity': 'm0_in'},
).to_ctypes()
_m0_out = _EngineIntScalarType.empty_ctypes()
engine_ctypes = _EngineState._get_symbol('f12jzyn')
engine_ctypes.restype = None
engine_ctypes.argtypes = (
_BYREF_HANDLE, # Input, handle.
_BYREF_INT, # Input, irevcm.
_BYREF_INT, # Input, m0_in.
_BYREF_INT, # Output, m0_out.
)
engine_ctypes(
_handle,
_irevcm,
_m0_in,
_m0_out,
)
return _m0_out
def _g02zkbn(init, routine_name, ip, ntau, comm, statecomm): # pylint: disable=invalid-name,too-many-arguments,too-many-locals
"""
Extract size of optional arrays from comm.
"""
fun_name = '.'.join([_MODULE_NAME, '_g02zkbn'])
engine_ctypes = _EngineState._get_symbol('g02zkbn')
engine_ctypes.restype = None
engine_ctypes.argtypes = (
_BYREF_INT, # Input, init.
_BYREF_STRING, # Input, routine_name(len=6).
_BYREF_INT, # Input, ip.
_BYREF_INT, # Input, ntau.
_EngineIntArrayType.c_type, # Input, iopts[*].
_EngineIntArrayType.c_type, # Input, state[*].
_BYREF_INT, # Output, sdres.
_BYREF_INT, # Output, rip.
_BYREF_INT, # Output, tdch.
_BYREF_INT, # Output, liopts.
_BYREF_INT, # Output, lopts.
_BYREF_INT, # Output, lstate.
_BYREF_INT, # Output, calcib.
_EngineState._chrlen_ctype, # Input, hidden chrlen of routine_name.
)
_init = _EngineIntScalarType(
init, {'fun_name': fun_name, 'entity': 'init'},
).to_ctypes()
_init_py = _EngineIntScalarType.to_py(_init)
if _init_py == 0:
_routine_name = _EngineCharScalarType(
routine_name, {'fun_name': fun_name, 'entity': 'routine_name'},
exp_chrlen=6,
).to_ctypes()
else:
_routine_name = _EngineRoutineName(*b'unknow')
_ip = _EngineIntScalarType(
ip, {'fun_name': fun_name, 'entity': 'ip'},
).to_ctypes()
_ntau = _EngineIntScalarType(
ntau, {'fun_name': fun_name, 'entity': 'ntau'},
).to_ctypes()
if comm is None:
_iopts = None
else:
_iopts = comm['iopts']
if statecomm is None:
_state = _EngineIntArrayType.empty_ctypes(1)
else:
_state = statecomm['state']
_lstate_py = _EngineIntArrayType.get_shape(
_state, {'fun_name': fun_name, 'entity': '_state'},
exp_rank=1, allow_none=True, validate=False,
)[0]
_sdres = _EngineIntScalarType.empty_ctypes()
_rip = _EngineIntScalarType.empty_ctypes()
_tdch = _EngineIntScalarType.empty_ctypes()
_liopts = _EngineIntScalarType.empty_ctypes()
_lopts = _EngineIntScalarType.empty_ctypes()
_lstate = _EngineIntScalarType.empty_ctypes()
_calcib = _EngineIntScalarType.empty_ctypes()
engine_ctypes(
_init,
_routine_name,
_ip,
_ntau,
_iopts,
_state,
_sdres,
_rip,
_tdch,
_liopts,
_lopts,
_lstate,
_calcib,
_routine_name._length_,
)
return (
_sdres, _rip, _tdch, _liopts, _lopts, _lstate, _calcib
)
#: A named constant for use with certain functions in the ``blast`` module.
NAG_ONE_NORM = 171
#: A named constant for use with certain functions in the ``blast`` module.
NAG_TWO_NORM = 173
#: A named constant for use with certain functions in the ``blast`` module.
NAG_FROBENIUS_NORM = 174
#: A named constant for use with certain functions in the ``blast`` module.
NAG_INF_NORM = 175
#: A named constant for use with certain functions in the ``blast`` module.
NAG_MAX_NORM = 177