import os
import argparse
from dataclasses import dataclass
import typing as tp
from numbers import Number
import locale
import numpy as np
import numpy.typing as npt
try:
import click
except ImportError:
click = None
from .math import *
__all__ = (
'TableParameters', 'TableResult', 'well_height_table', 'prime_root_table',
'well_width', 'build_diag_indices', 'ValidationError', 'TableArray', 'TABLE_DTYPE',
)
TABLE_DTYPE = np.dtype([
('primes', int),
('indices', int),
('wells', float),
])
"""A :term:`structured data type` for table results
:param primes: Value from the :func:`prime root sequence <.math.prime_root_seq>`
:param indices: The index of the *primes* value within the sequence
:param wells: The physical well height calculated from *primes* and the
design wavelength
"""
TableArray = tp.NewType('TableArray', npt.NDArray[TABLE_DTYPE])
"""A structured array of type :data:`TABLE_DTYPE`
"""
[docs]class ValidationError(ValueError):
"""Raised by :class:`TableParameters` if any parameter values are invalid
"""
#: Error message
msg: str
#: Tuples of ``(field_name, value)`` that caused the error
fields: tp.Sequence[tp.Tuple[str, tp.Any]]
#: Field name(s) that caused the error
field_names: tp.Tuple[str]
def __init__(self, msg, *fields):
self.msg = msg
self.fields = fields
self.field_names = tuple([fname for fname, fval in fields])
def __str__(self):
if len(self.fields) == 1:
fname, fval = self.fields[0]
return f'Value "{fval}" invalid for "{fname}": {self.msg}'
elif len(self.fields):
fields = ', '.join(self.field_names)
return f'Invalid values for "{fields}": {self.msg}'
else:
return self.msg
[docs]def build_diag_indices(
nrows: int, ncols: int
) -> tp.Tuple[npt.NDArray[int], npt.NDArray[int]]:
"""Create indices for all diagonals of a 2-d array of shape ``(nrows, ncols)``
The results may be used to directly index an array of shape
``(nrows, ncols)`` along the diagonals.
Arguments:
nrows: Number of rows in the array (``shape[0]``)
ncols: Number of columns in the array (``shape[1]``)
"""
size = nrows * ncols
rows = np.resize(np.arange(nrows), size)
cols = np.resize(np.arange(ncols), size)
return rows, cols
[docs]def prime_root_table(parameters: 'TableParameters') -> TableArray:
"""Calculate well indices and prime elements for the given
:class:`TableParameters`
The returned array will be of shape ``(nrows, ncols)``
"""
p = parameters
p.validate()
root_gen = prime_root_seq(p.prime_num, p.prime_root)
result = np.zeros((p.nrows, p.ncols), dtype=TABLE_DTYPE)
diag_ix = build_diag_indices(p.nrows, p.ncols)
primes = np.fromiter(root_gen, dtype=int)
primes = np.resize(primes, result.size)
result['primes'][diag_ix] = primes
result['indices'][diag_ix] = np.arange(result.size)
return result
def calc_hi_frequency(
well_width: Number, speed_of_sound: tp.Optional[Number] = SPEED_OF_SOUND
) -> int:
"""Calculate the highest diffusion frequency for the given well width
Arguments:
well_width: The well width in centimeters
speed_of_sound: Speed of sound in meters per second
"""
hf = frequency_cm(well_width * 2, speed_of_sound)
return round(hf)
[docs]def well_height_table(parameters: 'TableParameters') -> TableArray:
"""Calculate the well heights in centimeters for the given
:class:`TableParameters`
The returned array will be of shape ``(nrows, ncols)``
"""
p = parameters
result = prime_root_table(p)
w = wavelength_cm(p.design_freq, p.speed_of_sound)
result['wells'] = result['primes'] * w / (p.prime_num*2)
return result
[docs]@dataclass
class TableParameters:
"""Parameters used to calculate a :class:`TableResult`
:attr:`ncols` and :attr:`nrows` must be coprime factors of :attr:`prime_num`
"""
#: Number of table columns
ncols: int
#: Number of table rows
nrows: int
#: The basis prime number where ``prime_num - 1 == ncols * nrows``
prime_num: int
#: A :term:`primitive root` of :attr:`prime_num` used to calculate the sequence
prime_root: int
#: The lowest frequency (in Hz) the diffusor is designed for
design_freq: int
#: The width of each well in centimeters
well_width: tp.Optional[float] = 3.81
#: Speed of sound in meters per second
speed_of_sound: tp.Optional[int] = SPEED_OF_SOUND
@property
def high_frequency(self) -> int:
"""The highest diffusion frequency possible with the specified
:attr:`well_width` and :attr:`speed_of_sound`
"""
return calc_hi_frequency(self.well_width, self.speed_of_sound)
@property
def total_width(self) -> float:
"""The total width of the diffusor in centimeters
"""
return self.well_width * self.ncols
@property
def total_height(self) -> float:
"""The total height of the diffusor in centimeters
"""
return self.well_width * self.nrows
[docs] def validate(self) -> None:
"""Validate the parameters
Raises:
ValidationError: If any parameters are invalid
"""
p = self.prime_num
r = self.prime_root
if not is_prime(p):
raise ValidationError(
'Not a prime number', ('prime_num', p),
)
if not is_prim_root(r, p):
raise ValidationError(
f'{r} is not a primitive root of {p}', ('prime_root', r),
)
num_wells = self.ncols * self.nrows
if num_wells != p - 1:
raise ValidationError(
'ncols * nrows must equal prime_num-1',
('ncols', self.ncols), ('nrows', self.nrows),
)
if not is_coprime(self.ncols, self.nrows):
raise ValidationError(
'ncols and nrows must be coprime',
('ncols', self.ncols), ('nrows', self.nrows),
)
[docs] def calculate(self) -> 'TableResult':
"""Calculate the :func:`well height table <well_height_table>` and
return it as a :class:`TableResult`
"""
data = well_height_table(self)
return TableResult(self, data)
[docs]class TableResult:
"""A calculated table result
"""
#: The :class:`TableParameters` used to generate the result
parameters: TableParameters
#: The result array calculated by :func:`well_height_table`
data: TableArray
#: The well heights in :attr:`data` rounded to the nearest centimeter
well_heights: npt.NDArray[int]
def __init__(self, parameters: TableParameters, data: TableArray):
self.parameters = parameters
self.data = data
self.well_heights = np.asarray(np.rint(data['wells']), dtype=int)
self._line_width = None
[docs] @classmethod
def from_parameters(cls, parameters: TableParameters) -> 'TableResult':
"""Calculate the result from the given :class:`TableParameters`
"""
return parameters.calculate()
[docs] @classmethod
def from_kwargs(cls, **kwargs) -> 'TableResult':
"""Calculate the result using parameter values as keyword arguments
The keyword arguments given must include all necessary values to create
a :class:`TableParameters` instance
"""
parameters = TableParameters(**kwargs)
return cls.from_parameters(parameters)
[docs] def get_well_counts(self) -> tp.Dict[int, int]:
"""Count the total number of each unique well height in
:attr:`well_heights`
Returns the result as a dict of ``{well_height: count}``
"""
heights = self.well_heights
bins = np.unique(heights)
bins.sort()
counts = [heights[heights==h].size for h in bins]
return {h:c for h,c in zip(bins, counts)}
def _calc_line_width(self) -> int:
line_width = self._line_width
if line_width is not None:
return line_width
line_width = np.get_printoptions()['linewidth']
heights = self.well_heights
s = np.array2string(heights, separator=',', max_line_width=line_width)
lines = s.splitlines()
if not lines[0].endswith('],'):
line_width += len(lines[1])
s = np.array2string(heights, separator=',', max_line_width=line_width)
self._line_width = line_width
return line_width
[docs] def to_csv(
self, separator: tp.Optional[str] = ',', offset: tp.Optional[int] = 0
) -> str:
"""Format the :attr:`well_heights` array as a multiline string of
comma-separated values
The *offset* argument will be added to the :attr:`well_heights` before
output. For well heights of ``0``, there would be no block attached to
the diffusor in that position. If (for aesthetic reasons) this is
undesirable, an offset of ``1`` (for example) could be applied to all
wells.
Arguments:
offset: An offset to apply to the well heights. Can be used if wells
of height ``0`` are undesired.
"""
heights = self.well_heights + offset
line_width = self._calc_line_width()
lines = []
for i in range(heights.shape[0]):
row = heights[i]
s = np.array2string(row, separator=separator, max_line_width=line_width)
s = s.lstrip('[').rstrip(']')
lines.append(s)
return os.linesep.join(lines)
[docs] def to_rst(self, offset: tp.Optional[int] = 0) -> str:
"""Format the :attr:`well_heights` array as an :duref:`rST table <grid-tables>`
The *offset* argument will be added to the :attr:`well_heights` before
output. For well heights of ``0``, there would be no block attached to
the diffusor in that position. If (for aesthetic reasons) this is
undesirable, an offset of ``1`` (for example) could be applied to all
wells.
Arguments:
offset: An offset to apply to the well heights. Can be used if wells
of height ``0`` are undesired.
"""
nrows, ncols = self.parameters.nrows, self.parameters.ncols
value_lines = self.to_csv(offset=offset).splitlines()
cell_width = value_lines[0].index(',') + 1
row_sep = '-' * cell_width
row_sep = f'+{row_sep}' * ncols
row_sep = f'{row_sep}+'
lines = [row_sep]
for line in value_lines:
line = ' |'.join(line.split(','))
line = f'|{line} |'
lines.append(line)
lines.append(row_sep)
return os.linesep.join(lines)
def get_info_str(self, offset: tp.Optional[int] = 0) -> str:
lines = ['Well Counts:']
counts = {k+offset:v for k,v in self.get_well_counts().items()}
lines.append(os.linesep.join([f'{k:2d}cm: {v:2d}' for k,v in counts.items()]))
total_length = (self.well_heights + offset).sum()
p = self.parameters
lines.extend([
'',
f'Total well length: {total_length:n}cm',
f'Highest frequency: {p.high_frequency:n} Hz',
f'Total size: {p.total_width:.3f}cm x {p.total_height}cm'
])
return os.linesep.join(lines)
def _main_argparse():
locale.setlocale(locale.LC_NUMERIC, '')
p = argparse.ArgumentParser()
p.add_argument('ncols', type=int)
p.add_argument('nrows', type=int)
p.add_argument('-p', '--prime', dest='prime_num', type=int)
p.add_argument('-r', '--root', dest='prime_root', type=int)
p.add_argument('-f', '--freq', dest='design_freq', type=int, required=True)
p.add_argument('-w', '--well-width',
dest='well_width', type=float, default=3.81,
)
p.add_argument('-s', '--sos',
dest='speed_of_sound', type=int,
help='Speed of sound (in meters per second)', default=SPEED_OF_SOUND,
)
p.add_argument(
'--offset', dest='offset', type=int, default=0,
)
p.add_argument(
'--format', dest='format', choices=('csv', 'rst'), default='csv',
)
args = p.parse_args()
if args.prime_num is None:
args.prime_num = args.ncols * args.nrows + 1
print(f'{args.prime_num=}')
assert is_prime(args.prime_num)
if args.prime_root is None:
args.prime_root = min(prim_roots(args.prime_num))
out_fmt = args.format
offset = args.offset
kwargs = vars(args)
del kwargs['format']
del kwargs['offset']
result = TableResult.from_kwargs(**kwargs)
if out_fmt == 'csv':
print(result.to_csv(offset=offset))
else:
print(result.to_rst(offset=offset))
print('')
print(result.get_info_str(offset=offset))
print('')
return result
if click is not None:
@click.command()
@click.argument('ncols', type=int)
@click.argument('nrows', type=int)
@click.option('-p', '--prime-num', type=int)
@click.option('-r', '--prime-root', type=int)
@click.option('-f', '--design-freq', type=int, required=True)
@click.option('-w', '--well-width', default=3.81)
@click.option('-s', '--speed-of-sound', default=SPEED_OF_SOUND)
@click.option('--offset', default=0)
@click.option('--format',
type=click.Choice(['csv', 'rst'], case_sensitive=False),
default='rst',
)
def build(**kwargs):
offset, out_fmt = kwargs.pop('offset'), kwargs.pop('format')
param_kw = kwargs.copy()
if not param_kw['prime_num']:
param_kw['prime_num'] = param_kw['ncols'] * param_kw['nrows'] + 1
if not param_kw['prime_root']:
roots = list(prim_roots(param_kw['prime_num']))
if len(roots) > 10:
roots = roots[:11]
roots = [str(v) for v in roots]
pr = click.prompt(
'Choose a primitive root', type=click.Choice(roots),
)
param_kw['prime_root'] = int(pr)
result = TableResult.from_kwargs(**param_kw)
if out_fmt == 'csv':
print(result.to_csv(offset=offset))
else:
print(result.to_rst(offset=offset))
print('')
print(result.get_info_str(offset=offset))
print('')
def main():
if click is not None:
click_cli()
else:
_main_argparse()
if __name__ == '__main__':
main()