import os
import argparse
from dataclasses import dataclass
import typing as tp
from numbers import Number
import locale

import numpy as np
import numpy.typing as npt

    import click
except ImportError:
    click = None

from .math import *

__all__ = (
    'TableParameters', 'TableResult', 'well_height_table', 'prime_root_table',
    'well_width', 'build_diag_indices', 'ValidationError', 'TableArray', 'TABLE_DTYPE',

TABLE_DTYPE = np.dtype([
    ('primes', int),
    ('indices', int),
    ('wells', float),
"""A :term:`structured data type` for table results

:param primes: Value from the :func:`prime root sequence <.math.prime_root_seq>`
:param indices: The index of the *primes* value within the sequence
:param wells: The physical well height calculated from *primes* and the
    design wavelength

TableArray = tp.NewType('TableArray', npt.NDArray[TABLE_DTYPE])
"""A structured array of type :data:`TABLE_DTYPE`

[docs]class ValidationError(ValueError): """Raised by :class:`TableParameters` if any parameter values are invalid """ #: Error message msg: str #: Tuples of ``(field_name, value)`` that caused the error fields: tp.Sequence[tp.Tuple[str, tp.Any]] #: Field name(s) that caused the error field_names: tp.Tuple[str] def __init__(self, msg, *fields): self.msg = msg self.fields = fields self.field_names = tuple([fname for fname, fval in fields]) def __str__(self): if len(self.fields) == 1: fname, fval = self.fields[0] return f'Value "{fval}" invalid for "{fname}": {self.msg}' elif len(self.fields): fields = ', '.join(self.field_names) return f'Invalid values for "{fields}": {self.msg}' else: return self.msg
[docs]def build_diag_indices( nrows: int, ncols: int ) -> tp.Tuple[npt.NDArray[int], npt.NDArray[int]]: """Create indices for all diagonals of a 2-d array of shape ``(nrows, ncols)`` The results may be used to directly index an array of shape ``(nrows, ncols)`` along the diagonals. Arguments: nrows: Number of rows in the array (``shape[0]``) ncols: Number of columns in the array (``shape[1]``) """ size = nrows * ncols rows = np.resize(np.arange(nrows), size) cols = np.resize(np.arange(ncols), size) return rows, cols
[docs]def prime_root_table(parameters: 'TableParameters') -> TableArray: """Calculate well indices and prime elements for the given :class:`TableParameters` The returned array will be of shape ``(nrows, ncols)`` """ p = parameters p.validate() root_gen = prime_root_seq(p.prime_num, p.prime_root) result = np.zeros((p.nrows, p.ncols), dtype=TABLE_DTYPE) diag_ix = build_diag_indices(p.nrows, p.ncols) primes = np.fromiter(root_gen, dtype=int) primes = np.resize(primes, result.size) result['primes'][diag_ix] = primes result['indices'][diag_ix] = np.arange(result.size) return result
def calc_hi_frequency( well_width: Number, speed_of_sound: tp.Optional[Number] = SPEED_OF_SOUND ) -> int: """Calculate the highest diffusion frequency for the given well width Arguments: well_width: The well width in centimeters speed_of_sound: Speed of sound in meters per second """ hf = frequency_cm(well_width * 2, speed_of_sound) return round(hf)
[docs]def well_height_table(parameters: 'TableParameters') -> TableArray: """Calculate the well heights in centimeters for the given :class:`TableParameters` The returned array will be of shape ``(nrows, ncols)`` """ p = parameters result = prime_root_table(p) w = wavelength_cm(p.design_freq, p.speed_of_sound) result['wells'] = result['primes'] * w / (p.prime_num*2) return result
[docs]@dataclass class TableParameters: """Parameters used to calculate a :class:`TableResult` :attr:`ncols` and :attr:`nrows` must be coprime factors of :attr:`prime_num` """ #: Number of table columns ncols: int #: Number of table rows nrows: int #: The basis prime number where ``prime_num - 1 == ncols * nrows`` prime_num: int #: A :term:`primitive root` of :attr:`prime_num` used to calculate the sequence prime_root: int #: The lowest frequency (in Hz) the diffusor is designed for design_freq: int #: The width of each well in centimeters well_width: tp.Optional[float] = 3.81 #: Speed of sound in meters per second speed_of_sound: tp.Optional[int] = SPEED_OF_SOUND @property def high_frequency(self) -> int: """The highest diffusion frequency possible with the specified :attr:`well_width` and :attr:`speed_of_sound` """ return calc_hi_frequency(self.well_width, self.speed_of_sound) @property def total_width(self) -> float: """The total width of the diffusor in centimeters """ return self.well_width * self.ncols @property def total_height(self) -> float: """The total height of the diffusor in centimeters """ return self.well_width * self.nrows
[docs] def validate(self) -> None: """Validate the parameters Raises: ValidationError: If any parameters are invalid """ p = self.prime_num r = self.prime_root if not is_prime(p): raise ValidationError( 'Not a prime number', ('prime_num', p), ) if not is_prim_root(r, p): raise ValidationError( f'{r} is not a primitive root of {p}', ('prime_root', r), ) num_wells = self.ncols * self.nrows if num_wells != p - 1: raise ValidationError( 'ncols * nrows must equal prime_num-1', ('ncols', self.ncols), ('nrows', self.nrows), ) if not is_coprime(self.ncols, self.nrows): raise ValidationError( 'ncols and nrows must be coprime', ('ncols', self.ncols), ('nrows', self.nrows), )
[docs] def calculate(self) -> 'TableResult': """Calculate the :func:`well height table <well_height_table>` and return it as a :class:`TableResult` """ data = well_height_table(self) return TableResult(self, data)
[docs]class TableResult: """A calculated table result """ #: The :class:`TableParameters` used to generate the result parameters: TableParameters #: The result array calculated by :func:`well_height_table` data: TableArray #: The well heights in :attr:`data` rounded to the nearest centimeter well_heights: npt.NDArray[int] def __init__(self, parameters: TableParameters, data: TableArray): self.parameters = parameters = data self.well_heights = np.asarray(np.rint(data['wells']), dtype=int) self._line_width = None
[docs] @classmethod def from_parameters(cls, parameters: TableParameters) -> 'TableResult': """Calculate the result from the given :class:`TableParameters` """ return parameters.calculate()
[docs] @classmethod def from_kwargs(cls, **kwargs) -> 'TableResult': """Calculate the result using parameter values as keyword arguments The keyword arguments given must include all necessary values to create a :class:`TableParameters` instance """ parameters = TableParameters(**kwargs) return cls.from_parameters(parameters)
[docs] def get_well_counts(self) -> tp.Dict[int, int]: """Count the total number of each unique well height in :attr:`well_heights` Returns the result as a dict of ``{well_height: count}`` """ heights = self.well_heights bins = np.unique(heights) bins.sort() counts = [heights[heights==h].size for h in bins] return {h:c for h,c in zip(bins, counts)}
def _calc_line_width(self) -> int: line_width = self._line_width if line_width is not None: return line_width line_width = np.get_printoptions()['linewidth'] heights = self.well_heights s = np.array2string(heights, separator=',', max_line_width=line_width) lines = s.splitlines() if not lines[0].endswith('],'): line_width += len(lines[1]) s = np.array2string(heights, separator=',', max_line_width=line_width) self._line_width = line_width return line_width
[docs] def to_csv( self, separator: tp.Optional[str] = ',', offset: tp.Optional[int] = 0 ) -> str: """Format the :attr:`well_heights` array as a multiline string of comma-separated values The *offset* argument will be added to the :attr:`well_heights` before output. For well heights of ``0``, there would be no block attached to the diffusor in that position. If (for aesthetic reasons) this is undesirable, an offset of ``1`` (for example) could be applied to all wells. Arguments: offset: An offset to apply to the well heights. Can be used if wells of height ``0`` are undesired. """ heights = self.well_heights + offset line_width = self._calc_line_width() lines = [] for i in range(heights.shape[0]): row = heights[i] s = np.array2string(row, separator=separator, max_line_width=line_width) s = s.lstrip('[').rstrip(']') lines.append(s) return os.linesep.join(lines)
[docs] def to_rst(self, offset: tp.Optional[int] = 0) -> str: """Format the :attr:`well_heights` array as an :duref:`rST table <grid-tables>` The *offset* argument will be added to the :attr:`well_heights` before output. For well heights of ``0``, there would be no block attached to the diffusor in that position. If (for aesthetic reasons) this is undesirable, an offset of ``1`` (for example) could be applied to all wells. Arguments: offset: An offset to apply to the well heights. Can be used if wells of height ``0`` are undesired. """ nrows, ncols = self.parameters.nrows, self.parameters.ncols value_lines = self.to_csv(offset=offset).splitlines() cell_width = value_lines[0].index(',') + 1 row_sep = '-' * cell_width row_sep = f'+{row_sep}' * ncols row_sep = f'{row_sep}+' lines = [row_sep] for line in value_lines: line = ' |'.join(line.split(',')) line = f'|{line} |' lines.append(line) lines.append(row_sep) return os.linesep.join(lines)
def get_info_str(self, offset: tp.Optional[int] = 0) -> str: lines = ['Well Counts:'] counts = {k+offset:v for k,v in self.get_well_counts().items()} lines.append(os.linesep.join([f'{k:2d}cm: {v:2d}' for k,v in counts.items()])) total_length = (self.well_heights + offset).sum() p = self.parameters lines.extend([ '', f'Total well length: {total_length:n}cm', f'Highest frequency: {p.high_frequency:n} Hz', f'Total size: {p.total_width:.3f}cm x {p.total_height}cm' ]) return os.linesep.join(lines)
def _main_argparse(): locale.setlocale(locale.LC_NUMERIC, '') p = argparse.ArgumentParser() p.add_argument('ncols', type=int) p.add_argument('nrows', type=int) p.add_argument('-p', '--prime', dest='prime_num', type=int) p.add_argument('-r', '--root', dest='prime_root', type=int) p.add_argument('-f', '--freq', dest='design_freq', type=int, required=True) p.add_argument('-w', '--well-width', dest='well_width', type=float, default=3.81, ) p.add_argument('-s', '--sos', dest='speed_of_sound', type=int, help='Speed of sound (in meters per second)', default=SPEED_OF_SOUND, ) p.add_argument( '--offset', dest='offset', type=int, default=0, ) p.add_argument( '--format', dest='format', choices=('csv', 'rst'), default='csv', ) args = p.parse_args() if args.prime_num is None: args.prime_num = args.ncols * args.nrows + 1 print(f'{args.prime_num=}') assert is_prime(args.prime_num) if args.prime_root is None: args.prime_root = min(prim_roots(args.prime_num)) out_fmt = args.format offset = args.offset kwargs = vars(args) del kwargs['format'] del kwargs['offset'] result = TableResult.from_kwargs(**kwargs) if out_fmt == 'csv': print(result.to_csv(offset=offset)) else: print(result.to_rst(offset=offset)) print('') print(result.get_info_str(offset=offset)) print('') return result if click is not None: @click.command() @click.argument('ncols', type=int) @click.argument('nrows', type=int) @click.option('-p', '--prime-num', type=int) @click.option('-r', '--prime-root', type=int) @click.option('-f', '--design-freq', type=int, required=True) @click.option('-w', '--well-width', default=3.81) @click.option('-s', '--speed-of-sound', default=SPEED_OF_SOUND) @click.option('--offset', default=0) @click.option('--format', type=click.Choice(['csv', 'rst'], case_sensitive=False), default='rst', ) def build(**kwargs): offset, out_fmt = kwargs.pop('offset'), kwargs.pop('format') param_kw = kwargs.copy() if not param_kw['prime_num']: param_kw['prime_num'] = param_kw['ncols'] * param_kw['nrows'] + 1 if not param_kw['prime_root']: roots = list(prim_roots(param_kw['prime_num'])) if len(roots) > 10: roots = roots[:11] roots = [str(v) for v in roots] pr = click.prompt( 'Choose a primitive root', type=click.Choice(roots), ) param_kw['prime_root'] = int(pr) result = TableResult.from_kwargs(**param_kw) if out_fmt == 'csv': print(result.to_csv(offset=offset)) else: print(result.to_rst(offset=offset)) print('') print(result.get_info_str(offset=offset)) print('') def main(): if click is not None: click_cli() else: _main_argparse() if __name__ == '__main__': main()