| |
| """ |
| .. module:: CPU |
| :synopsis: CPU, a CLASS Plotting Utility |
| .. moduleauthor:: Benjamin Audren <benjamin.audren@gmail.com> |
| .. credits:: Benjamin Audren, Jesus Torrado |
| .. version:: 2.0 |
| |
| This is a small python program aimed to gain time when comparing two spectra, |
| e.g. from CAMB and CLASS, or a non-linear spectrum to a linear one. |
| |
| It is designed to be used in a command line fashion, not being restricted to |
| your CLASS directory, though it recognizes mainly CLASS output format. Far from |
| perfect, or complete, it could use any suggestion for enhancing it, |
| just to avoid losing time on useless matters for others. |
| |
| Be warned that, when comparing with other format, the following is assumed: |
| there are no empty line (especially at the end of file). Gnuplot comment lines |
| (starting with a # are allowed). This issue will cause a non-very descriptive |
| error in CPU, any suggestion for testing it is welcome. |
| |
| Example of use: |
| - To superimpose two different spectra and see their global shape : |
| python CPU.py output/lcdm_z2_pk.dat output/lncdm_z2_pk.dat |
| - To see in details their ratio: |
| python CPU.py output/lcdm_z2_pk.dat output/lncdm_z2_pk.dat -r |
| |
| The "PlanckScale" is taken with permission from Jesus Torrado's: |
| cosmo_mini_toolbox, available under GPLv3 at |
| https://github.com/JesusTorrado/cosmo_mini_toolbox |
| |
| """ |
|
|
| from __future__ import unicode_literals, print_function |
|
|
| |
| import os |
| import sys |
| import argparse |
|
|
| |
| import numpy as np |
| from numpy import ma |
| from scipy.interpolate import InterpolatedUnivariateSpline |
| from math import floor |
|
|
| |
| import matplotlib.pyplot as plt |
| from matplotlib import scale as mscale |
| from matplotlib.transforms import Transform |
| from matplotlib.ticker import FixedLocator |
|
|
|
|
| def CPU_parser(): |
| parser = argparse.ArgumentParser( |
| description=( |
| 'CPU, a CLASS Plotting Utility, specify wether you want\n' |
| 'to superimpose, or plot the ratio of different files.'), |
| epilog=( |
| 'A standard usage would be, for instance:\n' |
| 'python CPU.py output/test_pk.dat output/test_pk_nl_density.dat' |
| ' -r\npython CPU.py output/wmap_cl.dat output/planck_cl.dat'), |
| formatter_class=argparse.RawDescriptionHelpFormatter) |
|
|
| parser.add_argument( |
| 'files', type=str, nargs='*', help='Files to plot') |
| parser.add_argument('-r', '--ratio', dest='ratio', action='store_true', |
| help='Plot the ratio of the spectra') |
| parser.add_argument('-y', '--y-axis', dest='y_axis', nargs='+', |
| help='specify the fields you want to plot.') |
| parser.add_argument('-x', '--x-axis', dest='x_axis', type=str, |
| help='specify the field to be used on the x-axis') |
| parser.add_argument('--scale', type=str, |
| choices=['lin', 'loglog', 'loglin', 'george'], |
| help='Specify the scale to use for the plot') |
| parser.add_argument('--xlim', dest='xlim', nargs='+', type=float, |
| default=[], help='Specify the x range') |
| parser.add_argument('--ylim', dest='ylim', nargs='+', type=float, |
| default=[], help='Specify the y range') |
| parser.add_argument( |
| '-p, --print', |
| dest='printfile', default='', |
| help=('print the graph directly in a file. If no name is specified, it' |
| 'uses the name of the first input file')) |
| parser.add_argument( |
| '--repeat', |
| dest='repeat', action='store_true', default=False, |
| help='repeat the step for all redshifts with same base name') |
| return parser |
|
|
|
|
| def plot_CLASS_output(files, x_axis, y_axis, ratio=False, printing='', |
| output_name='', extension='', x_variable='', |
| scale='lin', xlim=[], ylim=[]): |
| """ |
| Load the data to numpy arrays, write all the commands for plotting to a |
| Python script for further refinment, and display them. |
| |
| Inspired heavily by the matlab version by Thomas Tram |
| |
| Parameters |
| ---------- |
| files : list |
| List of files to plot |
| x-axis : string |
| name of the column to use as the x coordinate |
| y-axis : list, str |
| List of items to plot, which should match the way they appear in the |
| file, for instance: ['TT', 'BB] |
| |
| Keyword Arguments |
| ----------------- |
| ratio : bool |
| If set to yes, plots the ratio of the files, taking as a reference the |
| first one |
| output_name : str |
| Specify a different name for the produced figure (by default, it takes |
| the name of the first file, and replace the .dat by .pdf) |
| extension : str |
| |
| """ |
| |
| python_script_path = os.path.splitext(files[0])[0]+'.py' |
|
|
| |
| |
| |
| text = ['import matplotlib.pyplot as plt', |
| 'import numpy as np', |
| 'import itertools', ''] |
|
|
| |
| data = [] |
| for data_file in files: |
| data.append(np.loadtxt(data_file)) |
|
|
| |
| |
| full_path_files = [os.path.abspath(elem) for elem in files] |
|
|
| text += ['files = %s' % full_path_files] |
| text += ['data = []', |
| 'for data_file in files:', |
| ' data.append(np.loadtxt(data_file))'] |
|
|
| |
| roots = [elem.split(os.path.sep)[-1].split('.')[0] for elem in files] |
| text += ['roots = [%s]' % ', '.join(["'%s'" % root for root in roots])] |
|
|
| |
| fig, ax = plt.subplots() |
| text += ['', 'fig, ax = plt.subplots()'] |
|
|
| |
| original_y_axis = y_axis |
| legend = [] |
| if not ratio: |
| for index, curve in enumerate(data): |
| |
| |
| num_columns, names, tex_names = extract_headers(files[index]) |
|
|
| text += ['', 'index, curve = %i, data[%i]' % (index, index)] |
| |
| if num_columns == 2: |
| y_axis = [names[1]] |
| elif num_columns > 2: |
| |
| if isinstance(original_y_axis, str): |
| y_axis = [original_y_axis] |
| else: |
| y_axis = original_y_axis |
|
|
| |
| selected = [] |
| for elem in y_axis: |
| selected.extend( |
| [name for name in names if name.find(elem) != -1 and |
| name not in selected]) |
| if not y_axis: |
| selected = names[1:] |
| y_axis = selected |
|
|
| |
| x_index = 0 |
| if x_axis: |
| for index_name, name in enumerate(names): |
| if name.find(x_axis) != -1: |
| x_index = index_name |
| break |
| |
| text += ['y_axis = %s' % selected] |
| text += ['tex_names = %s' % [elem for (elem, name) in |
| zip(tex_names, names) if name in selected]] |
| text += ["x_axis = '%s'" % tex_names[x_index]] |
| text += ["ylim = %s" % ylim] |
| text += ["xlim = %s" % xlim] |
|
|
| for selec in y_axis: |
| index_selec = names.index(selec) |
| plot_line = 'ax.' |
| if scale == 'lin': |
| plot_line += 'plot(curve[:, %i], curve[:, %i])' % ( |
| x_index, index_selec) |
| ax.plot(curve[:, x_index], curve[:, index_selec]) |
| elif scale == 'loglog': |
| plot_line += 'loglog(curve[:, %i], abs(curve[:, %i]))' % ( |
| x_index, index_selec) |
| ax.loglog(curve[:, x_index], abs(curve[:, index_selec])) |
| elif scale == 'loglin': |
| plot_line += 'semilogx(curve[:, %i], curve[:, %i])' % ( |
| x_index, index_selec) |
| ax.semilogx(curve[:, x_index], curve[:, index_selec]) |
| elif scale == 'george': |
| plot_line += 'plot(curve[:, %i], curve[:, %i])' % ( |
| x_index, index_selec) |
| ax.plot(curve[:, x_index], curve[:, index_selec]) |
| ax.set_xscale('planck') |
| text += [plot_line] |
|
|
| legend.extend([roots[index]+': '+elem for elem in y_axis]) |
|
|
| ax.legend(legend, loc='best') |
| text += ["", |
| "ax.legend([root+': '+elem for (root, elem) in", |
| " itertools.product(roots, y_axis)], loc='best')", |
| ""] |
| else: |
| ref = data[0] |
| num_columns, ref_curve_names, ref_tex_names = extract_headers(files[0]) |
| |
| if num_columns == 2: |
| y_axis_ref = [ref_curve_names[1]] |
| elif num_columns > 2: |
| |
| if isinstance(original_y_axis, str): |
| y_axis_ref = [original_y_axis] |
| else: |
| y_axis_ref = original_y_axis |
|
|
| |
| selected = [] |
| for elem in y_axis_ref: |
| selected.extend([name for name in ref_curve_names if name.find(elem) != -1 and |
| name not in selected]) |
| y_axis_ref = selected |
|
|
| |
| x_index_ref = 0 |
| if x_axis: |
| for index_name, name in enumerate(ref_curve_names): |
| if name.find(x_axis) != -1: |
| x_index_ref = index_name |
| break |
|
|
| for idx in range(1, len(data)): |
| current = data[idx] |
| num_columns, names, tex_names = extract_headers(files[idx]) |
|
|
| |
| if num_columns == 2: |
| y_axis = [names[1]] |
| elif num_columns > 2: |
| |
| if isinstance(original_y_axis, str): |
| y_axis = [original_y_axis] |
| else: |
| y_axis = original_y_axis |
|
|
| |
| selected = [] |
| for elem in y_axis: |
| selected.extend([name for name in names if name.find(elem) != -1 and |
| name not in selected]) |
| y_axis = selected |
|
|
| text += ['y_axis = %s' % selected] |
| text += ['tex_names = %s' % [elem for (elem, name) in |
| zip(tex_names, names) if name in selected]] |
|
|
| |
| x_index = 0 |
| if x_axis: |
| for index_name, name in enumerate(names): |
| if name.find(x_axis) != -1: |
| x_index = index_name |
| break |
|
|
| text += ["x_axis = '%s'" % tex_names[x_index]] |
| for selec in y_axis: |
| |
| axis = ref[:, x_index_ref] |
| reference = ref[:, ref_curve_names.index(selec)] |
| |
| |
| |
| |
| interpolated = InterpolatedUnivariateSpline(current[:, x_index], |
| current[:, names.index(selec)]) |
| if scale == 'lin': |
| |
| |
| ax.plot(axis, interpolated(ref[:, x_index_ref])/reference-1) |
| elif scale == 'loglin': |
| |
| |
| ax.semilogx(axis, interpolated(ref[:, x_index_ref])/reference-1) |
| elif scale == 'loglog': |
| raise InputError( |
| "loglog plot is not available for ratios") |
|
|
| if 'TT' in names: |
| ax.set_xlabel('$\ell$', fontsize=16) |
| text += ["ax.set_xlabel('$\ell$', fontsize=16)"] |
| elif 'P' in names: |
| ax.set_xlabel('$k$ [$h$/Mpc]', fontsize=16) |
| text += ["ax.set_xlabel('$k$ [$h$/Mpc]', fontsize=16)"] |
| else: |
| ax.set_xlabel(tex_names[x_index], fontsize=16) |
| text += ["ax.set_xlabel('%s', fontsize=16)" % tex_names[x_index]] |
| if xlim: |
| if len(xlim) > 1: |
| ax.set_xlim(xlim) |
| text += ["ax.set_xlim(xlim)"] |
| else: |
| ax.set_xlim(xlim[0]) |
| text += ["ax.set_xlim(xlim[0])"] |
| ax.set_ylim() |
| text += ["ax.set_ylim()"] |
| if ylim: |
| if len(ylim) > 1: |
| ax.set_ylim(ylim) |
| text += ["ax.set_ylim(ylim)"] |
| else: |
| ax.set_ylim(ylim[0]) |
| text += ["ax.set_ylim(ylim[0])"] |
| text += ['plt.show()'] |
| plt.show() |
|
|
| |
| if printing: |
| fig.savefig(printing) |
| text += ["fig.savefig('%s')" % printing] |
|
|
| |
| |
| with open(python_script_path, 'w') as python_script: |
| print('Creating a python script to reproduce the figure') |
| print('--> stored in %s' % python_script_path) |
| python_script.write('\n'.join(text)) |
|
|
| |
| if printing: |
| fig.savefig(printing) |
|
|
|
|
| class FormatError(Exception): |
| """Format not recognised""" |
| pass |
|
|
|
|
| class TypeError(Exception): |
| """Spectrum type not recognised""" |
| pass |
|
|
|
|
| class NumberOfFilesError(Exception): |
| """Invalid number of files""" |
| pass |
|
|
|
|
| class InputError(Exception): |
| """Incompatible input requirements""" |
| pass |
|
|
|
|
| def replace_scale(string): |
| """ |
| This assumes that the string starts with "(.)", which will be replaced by |
| (8piG/3) |
| |
| >>> print replace_scale('(.)toto') |
| >>> '(8\\pi G/3)toto' |
| """ |
| string_list = list(string) |
| string_list.pop(1) |
| string_list[1:1] = list('8\\pi G/3') |
| return ''.join(string_list) |
|
|
|
|
| def process_long_names(long_names): |
| """ |
| Given the names extracted from the header, return two arrays, one with the |
| short version, and one tex version |
| |
| >>> names, tex_names = process_long_names(['(.)toto', 'proper time [Gyr]']) |
| >>> print names |
| >>> ['toto', 'proper time'] |
| >>> print tex_names |
| >>> ['(8\\pi G/3)toto, 'proper time [Gyr]'] |
| |
| """ |
| names = [] |
| tex_names = [] |
| |
| for name in long_names: |
| |
| if name.startswith('(.)', 0): |
| temp_name = name[3:] |
| names.append(temp_name) |
| tex_names.append(replace_scale(name)) |
| |
| else: |
| names.append(name) |
| tex_names.append(name) |
|
|
| |
| names = [''.join(elem.split()) for elem in names] |
| return names, tex_names |
|
|
|
|
| def extract_headers(header_path): |
| with open(header_path, 'r') as header_file: |
| header = [line for line in header_file if line[0] == '#'] |
| header = header[-1] |
|
|
| |
| |
| indices = [i+1 for i in range(len(header)) if |
| header.startswith(':', i)] |
| num_columns = len(indices) |
| long_names = [header[indices[i]:indices[(i+1)]-3].strip() if i < num_columns-1 |
| else header[indices[i]:].strip() |
| for i in range(num_columns)] |
|
|
| |
| |
| names, tex_names = process_long_names(long_names) |
|
|
| return num_columns, names, tex_names |
|
|
|
|
| def main(): |
| print('~~~ Running CPU, a CLASS Plotting Utility ~~~') |
| parser = CPU_parser() |
| |
| args = parser.parse_args() |
|
|
| |
| if len(args.files) == 0: |
| parser.print_usage() |
| return |
|
|
| |
| |
| if not args.y_axis: |
| if args.files[0].rfind('cl') != -1: |
| scale = 'loglog' |
| elif args.files[0].rfind('pk') != -1: |
| scale = 'loglog' |
| else: |
| scale = 'lin' |
| args.y_axis = [] |
| else: |
| scale = '' |
| if not args.scale: |
| if scale: |
| args.scale = scale |
| else: |
| args.scale = 'lin' |
|
|
| |
| args.y_axis = [''.join(elem.split()) for elem in args.y_axis] |
| |
| |
| if args.ratio: |
| if len(args.files) < 2: |
| raise NumberOfFilesError( |
| "If you want me to compute a ratio between two files, " |
| "I strongly encourage you to give me at least two of them.") |
| |
| |
| |
| if args.ratio and args.scale == 'loglog': |
| print("Defaulting to loglin scale") |
| args.scale = 'loglin' |
|
|
| plot_CLASS_output(args.files, args.x_axis, args.y_axis, |
| ratio=args.ratio, printing=args.printfile, |
| scale=args.scale, xlim=args.xlim, ylim=args.ylim) |
|
|
|
|
| |
| |
| |
| nonpos = "mask" |
| change = 50.0 |
| factor = 500. |
|
|
|
|
| def _mask_nonpos(a): |
| """ |
| Return a Numpy masked array where all non-positive 1 are |
| masked. If there are no non-positive, the original array |
| is returned. |
| """ |
| mask = a <= 0.0 |
| if mask.any(): |
| return ma.MaskedArray(a, mask=mask) |
| return a |
|
|
|
|
| def _clip_smaller_than_one(a): |
| a[a <= 0.0] = 1e-300 |
| return a |
|
|
|
|
| class PlanckScale(mscale.ScaleBase): |
| """ |
| Scale used by the Planck collaboration to plot Temperature power spectra: |
| base-10 logarithmic up to l=50, and linear from there on. |
| |
| Care is taken so non-positive values are not plotted. |
| """ |
| name = 'planck' |
|
|
| def __init__(self, axis, **kwargs): |
| pass |
|
|
| def set_default_locators_and_formatters(self, axis): |
| axis.set_major_locator( |
| FixedLocator( |
| np.concatenate((np.array([2, 10, change]), |
| np.arange(500, 2500, 500))))) |
| axis.set_minor_locator( |
| FixedLocator( |
| np.concatenate((np.arange(2, 10), |
| np.arange(10, 50, 10), |
| np.arange(floor(change/100), 2500, 100))))) |
|
|
| def get_transform(self): |
| """ |
| Return a :class:`~matplotlib.transforms.Transform` instance |
| appropriate for the given logarithm base. |
| """ |
| return self.PlanckTransform(nonpos) |
|
|
| def limit_range_for_scale(self, vmin, vmax, minpos): |
| """ |
| Limit the domain to positive values. |
| """ |
| return (vmin <= 0.0 and minpos or vmin, |
| vmax <= 0.0 and minpos or vmax) |
|
|
| class PlanckTransform(Transform): |
| input_dims = 1 |
| output_dims = 1 |
| is_separable = True |
| has_inverse = True |
|
|
| def __init__(self, nonpos): |
| Transform.__init__(self) |
| if nonpos == 'mask': |
| self._handle_nonpos = _mask_nonpos |
| else: |
| self._handle_nonpos = _clip_nonpos |
|
|
| def transform_non_affine(self, a): |
| lower = a[np.where(a<=change)] |
| greater = a[np.where(a> change)] |
| if lower.size: |
| lower = self._handle_nonpos(lower * 10.0)/10.0 |
| if isinstance(lower, ma.MaskedArray): |
| lower = ma.log10(lower) |
| else: |
| lower = np.log10(lower) |
| lower = factor*lower |
| if greater.size: |
| greater = (factor*np.log10(change) + (greater-change)) |
| |
| if not(greater.size): |
| return lower |
| |
| if not(lower.size): |
| return greater |
| return np.concatenate((lower, greater)) |
|
|
| def inverted(self): |
| return PlanckScale.InvertedPlanckTransform() |
|
|
| class InvertedPlanckTransform(Transform): |
| input_dims = 1 |
| output_dims = 1 |
| is_separable = True |
| has_inverse = True |
|
|
| def transform_non_affine(self, a): |
| lower = a[np.where(a<=factor*np.log10(change))] |
| greater = a[np.where(a> factor*np.log10(change))] |
| if lower.size: |
| if isinstance(lower, ma.MaskedArray): |
| lower = ma.power(10.0, lower/float(factor)) |
| else: |
| lower = np.power(10.0, lower/float(factor)) |
| if greater.size: |
| greater = (greater + change - factor*np.log10(change)) |
| |
| if not(greater.size): |
| return lower |
| |
| if not(lower.size): |
| return greater |
| return np.concatenate((lower, greater)) |
|
|
| def inverted(self): |
| return PlanckTransform() |
|
|
| |
| mscale.register_scale(PlanckScale) |
|
|
| if __name__ == '__main__': |
| sys.exit(main()) |
|
|