diff --git a/neo/io/neuralynxio.py b/neo/io/neuralynxio.py index 8469c6a44..925a58013 100644 --- a/neo/io/neuralynxio.py +++ b/neo/io/neuralynxio.py @@ -10,7 +10,7 @@ """ from neo.io.basefromrawio import BaseFromRaw -from neo.rawio.neuralynxrawio import NeuralynxRawIO +from neo.rawio.neuralynxrawio.neuralynxrawio import NeuralynxRawIO class NeuralynxIO(NeuralynxRawIO, BaseFromRaw): diff --git a/neo/rawio/__init__.py b/neo/rawio/__init__.py index c9f29f85f..bd1fda3aa 100644 --- a/neo/rawio/__init__.py +++ b/neo/rawio/__init__.py @@ -120,7 +120,7 @@ from neo.rawio.examplerawio import ExampleRawIO from neo.rawio.intanrawio import IntanRawIO from neo.rawio.micromedrawio import MicromedRawIO -from neo.rawio.neuralynxrawio import NeuralynxRawIO +from neo.rawio.neuralynxrawio.neuralynxrawio import NeuralynxRawIO from neo.rawio.neuroexplorerrawio import NeuroExplorerRawIO from neo.rawio.neuroscoperawio import NeuroScopeRawIO from neo.rawio.nixrawio import NIXRawIO diff --git a/neo/rawio/neuralynxrawio.py b/neo/rawio/neuralynxrawio.py index 818a4c270..c9f29f85f 100644 --- a/neo/rawio/neuralynxrawio.py +++ b/neo/rawio/neuralynxrawio.py @@ -1,904 +1,173 @@ """ -Class for reading data from Neuralynx files. -This IO supports NCS, NEV, NSE and NTT file formats. +:mod:`neo.rawio` provides classes for reading with low level API +electrophysiological data files. +:attr:`neo.rawio.rawiolist` provides a list of successfully imported rawio +classes. -NCS contains sampled signal for one channel -NEV contains events -NSE contains spikes and waveforms for mono electrodes -NTT contains spikes and waveforms for tetrodes +Functions: +.. autofunction:: neo.rawio.get_rawio_class -NCS can contains gaps that can be detected in irregularity -in timestamps of data blocks. Each gap leads to one new segment. -Some NCS files may need to be read entirely to detect those gaps which can be slow. +Classes: -Author: Julia Sprenger, Carlos Canova, Samuel Garcia -""" -# from __future__ import unicode_literals is not compatible with numpy.dtype both py2 py3 +* :attr:`AxographRawIO` +* :attr:`AxonRawIO` +* :attr:`BlackrockRawIO` +* :attr:`BrainVisionRawIO` +* :attr:`ElanRawIO` +* :attr:`IntanRawIO` +* :attr:`MicromedRawIO` +* :attr:`NeuralynxRawIO` +* :attr:`NeuroExplorerRawIO` +* :attr:`NeuroScopeRawIO` +* :attr:`NIXRawIO` +* :attr:`OpenEphysRawIO` +* :attr:`PlexonRawIO` +* :attr:`RawBinarySignalRawIO` +* :attr:`RawMCSRawIO` +* :attr:`Spike2RawIO` +* :attr:`TdtRawIO` +* :attr:`WinEdrRawIO` +* :attr:`WinWcpRawIO` -from .baserawio import (BaseRawIO, _signal_channel_dtype, - _unit_channel_dtype, _event_channel_dtype) +.. autoclass:: neo.rawio.AxographRawIO -import numpy as np -import os -import re -import distutils.version -import datetime -from collections import OrderedDict + .. autoattribute:: extensions -BLOCK_SIZE = 512 # nb sample per signal block +.. autoclass:: neo.rawio.AxonRawIO + .. autoattribute:: extensions -class NeuralynxRawIO(BaseRawIO): - """" - Class for reading datasets recorded by Neuralynx. +.. autoclass:: neo.rawio.BlackrockRawIO - This version only works with rawmode of one-dir for a single directory. + .. autoattribute:: extensions - Examples: - >>> reader = NeuralynxRawIO(dirname='Cheetah_v5.5.1/original_data') - >>> reader.parse_header() +.. autoclass:: neo.rawio.BrainVisionRawIO - Inspect all files in the directory. + .. autoattribute:: extensions - >>> print(reader) +.. autoclass:: neo.rawio.ElanRawIO - Display all information about signal channels, units, segment size.... - """ - extensions = ['nse', 'ncs', 'nev', 'ntt'] - rawmode = 'one-dir' - - def __init__(self, dirname='', keep_original_times=False, **kargs): - """ - Parameters - ---------- - dirname: str - name of directory containing all files for dataset - keep_original_times: - if True, keep original start time as in files, - otherwise set 0 of time to first time in dataset - """ - self.dirname = dirname - self.keep_original_times = keep_original_times - BaseRawIO.__init__(self, **kargs) - - def _source_name(self): - return self.dirname - - def _parse_header(self): - - sig_channels = [] - unit_channels = [] - event_channels = [] - - self.ncs_filenames = OrderedDict() # (chan_name, chan_id): filename - self.nse_ntt_filenames = OrderedDict() # (chan_name, chan_id): filename - self.nev_filenames = OrderedDict() # chan_id: filename - - self._nev_memmap = {} - self._spike_memmap = {} - self.internal_unit_ids = [] # channel_index > ((channel_name, channel_id), unit_id) - self.internal_event_ids = [] - self._empty_ncs = [] # this list contains filenames of empty files - self._empty_nev = [] - self._empty_nse_ntt = [] - - # Explore the directory looking for ncs, nev, nse and ntt - # and construct channels headers. - signal_annotations = [] - unit_annotations = [] - event_annotations = [] - - for filename in sorted(os.listdir(self.dirname)): - filename = os.path.join(self.dirname, filename) - - _, ext = os.path.splitext(filename) - ext = ext[1:] # remove dot - ext = ext.lower() # make lower case for comparisons - if ext not in self.extensions: - continue - - # Skip Ncs files with only header. Other empty file types - # will have an empty dataset constructed later. - if (os.path.getsize(filename) <= NlxHeader.HEADER_SIZE) and ext in ['ncs']: - self._empty_ncs.append(filename) - continue - - # All file have more or less the same header structure - info = NlxHeader.build_for_file(filename) - chan_names = info['channel_names'] - chan_ids = info['channel_ids'] - - for idx, chan_id in enumerate(chan_ids): - chan_name = chan_names[idx] - - chan_uid = (chan_name, chan_id) - if ext == 'ncs': - # a sampled signal channel - units = 'uV' - gain = info['bit_to_microVolt'][idx] - if info.get('input_inverted', False): - gain *= -1 - offset = 0. - group_id = 0 - sig_channels.append((chan_name, chan_id, info['sampling_rate'], - 'int16', units, gain, offset, group_id)) - self.ncs_filenames[chan_uid] = filename - keys = [ - 'DspFilterDelay_µs', - 'recording_opened', - 'FileType', - 'DspDelayCompensation', - 'recording_closed', - 'DspLowCutFilterType', - 'HardwareSubSystemName', - 'DspLowCutNumTaps', - 'DSPLowCutFilterEnabled', - 'HardwareSubSystemType', - 'DspHighCutNumTaps', - 'ADMaxValue', - 'DspLowCutFrequency', - 'DSPHighCutFilterEnabled', - 'RecordSize', - 'InputRange', - 'DspHighCutFrequency', - 'input_inverted', - 'NumADChannels', - 'DspHighCutFilterType', - ] - d = {k: info[k] for k in keys if k in info} - signal_annotations.append(d) - - elif ext in ('nse', 'ntt'): - # nse and ntt are pretty similar except for the waveform shape. - # A file can contain several unit_id (so several unit channel). - assert chan_id not in self.nse_ntt_filenames, \ - 'Several nse or ntt files have the same unit_id!!!' - self.nse_ntt_filenames[chan_uid] = filename - - dtype = get_nse_or_ntt_dtype(info, ext) - - if (os.path.getsize(filename) <= NlxHeader.HEADER_SIZE): - self._empty_nse_ntt.append(filename) - data = np.zeros((0,), dtype=dtype) - else: - data = np.memmap(filename, dtype=dtype, mode='r', - offset=NlxHeader.HEADER_SIZE) - - self._spike_memmap[chan_uid] = data - - unit_ids = np.unique(data['unit_id']) - for unit_id in unit_ids: - # a spike channel for each (chan_id, unit_id) - self.internal_unit_ids.append((chan_uid, unit_id)) - - unit_name = "ch{}#{}#{}".format(chan_name, chan_id, unit_id) - unit_id = '{}'.format(unit_id) - wf_units = 'uV' - wf_gain = info['bit_to_microVolt'][idx] - if info.get('input_inverted', False): - wf_gain *= -1 - wf_offset = 0. - wf_left_sweep = -1 # NOT KNOWN - wf_sampling_rate = info['sampling_rate'] - unit_channels.append( - (unit_name, '{}'.format(unit_id), wf_units, wf_gain, - wf_offset, wf_left_sweep, wf_sampling_rate)) - unit_annotations.append(dict(file_origin=filename)) - - elif ext == 'nev': - # an event channel - # each ('event_id', 'ttl_input') give a new event channel - self.nev_filenames[chan_id] = filename - - if (os.path.getsize(filename) <= NlxHeader.HEADER_SIZE): - self._empty_nev.append(filename) - data = np.zeros((0,), dtype=nev_dtype) - internal_ids = [] - else: - data = np.memmap(filename, dtype=nev_dtype, mode='r', - offset=NlxHeader.HEADER_SIZE) - internal_ids = np.unique(data[['event_id', 'ttl_input']]).tolist() - for internal_event_id in internal_ids: - if internal_event_id not in self.internal_event_ids: - event_id, ttl_input = internal_event_id - name = '{} event_id={} ttl={}'.format( - chan_name, event_id, ttl_input) - event_channels.append((name, chan_id, 'event')) - self.internal_event_ids.append(internal_event_id) - - self._nev_memmap[chan_id] = data - - sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype) - unit_channels = np.array(unit_channels, dtype=_unit_channel_dtype) - event_channels = np.array(event_channels, dtype=_event_channel_dtype) - - # require all sampled signals, ncs files, to have same sampling rate - if sig_channels.size > 0: - sampling_rate = np.unique(sig_channels['sampling_rate']) - assert sampling_rate.size == 1 - self._sigs_sampling_rate = sampling_rate[0] - - # set 2 attributes needed later for header in case there are no ncs files in dataset, - # e.g. Pegasus - self._timestamp_limits = None - self._nb_segment = 1 - - # Read ncs files for gap detection and nb_segment computation. - # :TODO: current algorithm depends on side-effect of read_ncs_files on - # self._sigs_memmap, self._sigs_t_start, self._sigs_t_stop, - # self._sigs_length, self._nb_segment, self._timestamp_limits - ncsBlocks = self.read_ncs_files(self.ncs_filenames) - - # Determine timestamp limits in nev, nse file by scanning them. - ts0, ts1 = None, None - for _data_memmap in (self._spike_memmap, self._nev_memmap): - for _, data in _data_memmap.items(): - ts = data['timestamp'] - if ts.size == 0: - continue - if ts0 is None: - ts0 = ts[0] - ts1 = ts[-1] - ts0 = min(ts0, ts[0]) - ts1 = max(ts1, ts[-1]) - - # decide on segment and global start and stop times based on files available - if self._timestamp_limits is None: - # case NO ncs but HAVE nev or nse - self._timestamp_limits = [(ts0, ts1)] - self._seg_t_starts = [ts0 / 1e6] - self._seg_t_stops = [ts1 / 1e6] - self.global_t_start = ts0 / 1e6 - self.global_t_stop = ts1 / 1e6 - elif ts0 is not None: - # case HAVE ncs AND HAVE nev or nse - self.global_t_start = min(ts0 / 1e6, self._sigs_t_start[0]) - self.global_t_stop = max(ts1 / 1e6, self._sigs_t_stop[-1]) - self._seg_t_starts = list(self._sigs_t_start) - self._seg_t_starts[0] = self.global_t_start - self._seg_t_stops = list(self._sigs_t_stop) - self._seg_t_stops[-1] = self.global_t_stop - else: - # case HAVE ncs but NO nev or nse - self._seg_t_starts = self._sigs_t_start - self._seg_t_stops = self._sigs_t_stop - self.global_t_start = self._sigs_t_start[0] - self.global_t_stop = self._sigs_t_stop[-1] - - if self.keep_original_times: - self.global_t_stop = self.global_t_stop - self.global_t_start - self.global_t_start = 0 - - # fill header dictionary - self.header = {} - self.header['nb_block'] = 1 - self.header['nb_segment'] = [self._nb_segment] - self.header['signal_channels'] = sig_channels - self.header['unit_channels'] = unit_channels - self.header['event_channels'] = event_channels - - # Annotations - self._generate_minimal_annotations() - bl_annotations = self.raw_annotations['blocks'][0] - - for seg_index in range(self._nb_segment): - seg_annotations = bl_annotations['segments'][seg_index] - - for c in range(sig_channels.size): - sig_ann = seg_annotations['signals'][c] - sig_ann.update(signal_annotations[c]) - - for c in range(unit_channels.size): - unit_ann = seg_annotations['units'][c] - unit_ann.update(unit_annotations[c]) - - for c in range(event_channels.size): - # annotations for channel events - event_id, ttl_input = self.internal_event_ids[c] - chan_id = event_channels[c]['id'] - - ev_ann = seg_annotations['events'][c] - ev_ann['file_origin'] = self.nev_filenames[chan_id] - - # ~ ev_ann['marker_id'] = - # ~ ev_ann['nttl'] = - # ~ ev_ann['digital_marker'] = - # ~ ev_ann['analog_marker'] = - - # Accessors for segment times which are offset by appropriate global start time - def _segment_t_start(self, block_index, seg_index): - return self._seg_t_starts[seg_index] - self.global_t_start - - def _segment_t_stop(self, block_index, seg_index): - return self._seg_t_stops[seg_index] - self.global_t_start - - def _get_signal_size(self, block_index, seg_index, channel_indexes): - return self._sigs_length[seg_index] - - def _get_signal_t_start(self, block_index, seg_index, channel_indexes): - return self._sigs_t_start[seg_index] - self.global_t_start - - def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes): - """ - Retrieve chunk of analog signal, a chunk being a set of contiguous samples. - - PARAMETERS - ---------- - block_index: - index of block in dataset, ignored as only 1 block in this implementation - seg_index: - index of segment to use - i_start: - sample index of first sample within segment to retrieve - i_stop: - sample index of last sample within segment to retrieve - channel_indexes: - list of channel indices to return data for - - RETURNS - ------- - array of samples, with each requested channel in a column - """ - if i_start is None: - i_start = 0 - if i_stop is None: - i_stop = self._sigs_length[seg_index] - - block_start = i_start // BLOCK_SIZE - block_stop = i_stop // BLOCK_SIZE + 1 - sl0 = i_start % 512 - sl1 = sl0 + (i_stop - i_start) - - if channel_indexes is None: - channel_indexes = slice(None) - - channel_ids = self.header['signal_channels'][channel_indexes]['id'] - channel_names = self.header['signal_channels'][channel_indexes]['name'] - - # create buffer for samples - sigs_chunk = np.zeros((i_stop - i_start, len(channel_ids)), dtype='int16') - - for i, chan_uid in enumerate(zip(channel_names, channel_ids)): - data = self._sigs_memmap[seg_index][chan_uid] - sub = data[block_start:block_stop] - sigs_chunk[:, i] = sub['samples'].flatten()[sl0:sl1] - - return sigs_chunk - - def _spike_count(self, block_index, seg_index, unit_index): - chan_uid, unit_id = self.internal_unit_ids[unit_index] - data = self._spike_memmap[chan_uid] - ts = data['timestamp'] - - ts0, ts1 = self._timestamp_limits[seg_index] - - # only count spikes inside the timestamp limits, inclusive, and for the specified unit - keep = (ts >= ts0) & (ts <= ts1) & (unit_id == data['unit_id']) - nb_spike = int(data[keep].size) - return nb_spike - - def _get_spike_timestamps(self, block_index, seg_index, unit_index, t_start, t_stop): - chan_uid, unit_id = self.internal_unit_ids[unit_index] - data = self._spike_memmap[chan_uid] - ts = data['timestamp'] - - ts0, ts1 = self._timestamp_limits[seg_index] - if t_start is not None: - ts0 = int((t_start + self.global_t_start) * 1e6) - if t_start is not None: - ts1 = int((t_stop + self.global_t_start) * 1e6) - - keep = (ts >= ts0) & (ts <= ts1) & (unit_id == data['unit_id']) - timestamps = ts[keep] - return timestamps - - def _rescale_spike_timestamp(self, spike_timestamps, dtype): - spike_times = spike_timestamps.astype(dtype) - spike_times /= 1e6 - spike_times -= self.global_t_start - return spike_times - - def _get_spike_raw_waveforms(self, block_index, seg_index, unit_index, - t_start, t_stop): - chan_uid, unit_id = self.internal_unit_ids[unit_index] - data = self._spike_memmap[chan_uid] - ts = data['timestamp'] - - ts0, ts1 = self._timestamp_limits[seg_index] - if t_start is not None: - ts0 = int((t_start + self.global_t_start) * 1e6) - if t_start is not None: - ts1 = int((t_stop + self.global_t_start) * 1e6) - - keep = (ts >= ts0) & (ts <= ts1) & (unit_id == data['unit_id']) - - wfs = data[keep]['samples'] - if wfs.ndim == 2: - # case for nse - waveforms = wfs[:, None, :] - else: - # case for ntt change (n, 32, 4) to (n, 4, 32) - waveforms = wfs.swapaxes(1, 2) - - return waveforms - - def _event_count(self, block_index, seg_index, event_channel_index): - event_id, ttl_input = self.internal_event_ids[event_channel_index] - chan_id = self.header['event_channels'][event_channel_index]['id'] - data = self._nev_memmap[chan_id] - ts0, ts1 = self._timestamp_limits[seg_index] - ts = data['timestamp'] - keep = (ts >= ts0) & (ts <= ts1) & (data['event_id'] == event_id) & \ - (data['ttl_input'] == ttl_input) - nb_event = int(data[keep].size) - return nb_event - - def _get_event_timestamps(self, block_index, seg_index, event_channel_index, t_start, t_stop): - event_id, ttl_input = self.internal_event_ids[event_channel_index] - chan_id = self.header['event_channels'][event_channel_index]['id'] - data = self._nev_memmap[chan_id] - ts0, ts1 = self._timestamp_limits[seg_index] - - if t_start is not None: - ts0 = int((t_start + self.global_t_start) * 1e6) - if t_start is not None: - ts1 = int((t_stop + self.global_t_start) * 1e6) - - ts = data['timestamp'] - keep = (ts >= ts0) & (ts <= ts1) & (data['event_id'] == event_id) & \ - (data['ttl_input'] == ttl_input) - - subdata = data[keep] - timestamps = subdata['timestamp'] - labels = subdata['event_string'].astype('U') - durations = None - return timestamps, durations, labels - - def _rescale_event_timestamp(self, event_timestamps, dtype): - event_times = event_timestamps.astype(dtype) - event_times /= 1e6 - event_times -= self.global_t_start - return event_times - - def read_ncs_files(self, ncs_filenames): - """ - Given a list of ncs files, return a dictionary of NcsBlocks indexed by channel uid. - - :TODO: Current algorithm has side effects on following attributes: - * self._sigs_memmap = [ {} for seg_index in range(self._nb_segment) ] - * self._sigs_t_start = [] - * self._sigs_t_stop = [] - * self._sigs_length = [] - * self._nb_segment - * self._timestamp_limits - - The first file is read entirely to detect gaps in timestamp. - each gap lead to a new segment. - - Other files are not read entirely but we check than gaps - are at the same place. - - - gap_indexes can be given (when cached) to avoid full read. - - """ - # :TODO: Needs to account for gaps and start and end times potentially - # being different in different groups of channels. These groups typically - # correspond to the channels collected by a single ADC card. - if len(ncs_filenames) == 0: - return None - - good_delta = int(BLOCK_SIZE * 1e6 / self._sigs_sampling_rate) - chan_uid0 = list(ncs_filenames.keys())[0] - filename0 = ncs_filenames[chan_uid0] - - data0 = np.memmap(filename0, dtype=ncs_dtype, mode='r', offset=NlxHeader.HEADER_SIZE) - - gap_indexes = None - lost_indexes = None - - if self.use_cache: - gap_indexes = self._cache.get('gap_indexes') - lost_indexes = self._cache.get('lost_indexes') - - # detect gaps on first file - if (gap_indexes is None) or (lost_indexes is None): - - # this can be long!!!! - timestamps0 = data0['timestamp'] - deltas0 = np.diff(timestamps0) - - # :TODO: This algorithm needs to account for older style files which had a rounded - # off sampling rate in the header. - # - # It should be that: - # gap_indexes, = np.nonzero(deltas0!=good_delta) - # but for a file I have found many deltas0==15999, 16000, 16001 (for sampling at 32000) - # I guess this is a round problem - # So this is the same with a tolerance of 1 or 2 ticks - max_tolerance = 2 - mask = np.abs((deltas0 - good_delta).astype('int64')) > max_tolerance - - gap_indexes, = np.nonzero(mask) - - if self.use_cache: - self.add_in_cache(gap_indexes=gap_indexes) - - # update for lost_indexes - # Sometimes NLX writes a faulty block, but it then validates how much samples it wrote - # the validation field is in delta0['nb_valid'], it should be equal to BLOCK_SIZE - # :TODO: this algorithm ignores samples in partially filled blocks, which - # is not strictly necessary as all channels might have same partially filled - # blocks at the end. - - lost_indexes, = np.nonzero(data0['nb_valid'] < BLOCK_SIZE) - - if self.use_cache: - self.add_in_cache(lost_indexes=lost_indexes) - - gap_candidates = np.unique([0] - + [data0.size] - + (gap_indexes + 1).tolist() - + lost_indexes.tolist()) # linear - - gap_pairs = np.vstack([gap_candidates[:-1], gap_candidates[1:]]).T # 2D (n_segments, 2) - - # construct proper gap ranges free of lost samples artifacts - minimal_segment_length = 1 # in blocks - goodpairs = np.diff(gap_pairs, 1).reshape(-1) > minimal_segment_length - gap_pairs = gap_pairs[goodpairs] # ensures a segment is at least a block wide - - self._nb_segment = len(gap_pairs) - self._sigs_memmap = [{} for seg_index in range(self._nb_segment)] - self._sigs_t_start = [] - self._sigs_t_stop = [] - self._sigs_length = [] - self._timestamp_limits = [] - - # create segment with subdata block/t_start/t_stop/length - for chan_uid, ncs_filename in self.ncs_filenames.items(): - - data = np.memmap(ncs_filename, dtype=ncs_dtype, mode='r', offset=NlxHeader.HEADER_SIZE) - assert data.size == data0.size, 'ncs files do not have the same data length' - - for seg_index, (i0, i1) in enumerate(gap_pairs): + .. autoattribute:: extensions - assert data[i0]['timestamp'] == data0[i0][ - 'timestamp'], 'ncs files do not have the same gaps' - assert data[i1 - 1]['timestamp'] == data0[i1 - 1][ - 'timestamp'], 'ncs files do not have the same gaps' +.. autoclass:: neo.rawio.IntanRawIO - subdata = data[i0:i1] - self._sigs_memmap[seg_index][chan_uid] = subdata + .. autoattribute:: extensions - if chan_uid == chan_uid0: - ts0 = subdata[0]['timestamp'] - ts1 = subdata[-1]['timestamp'] +\ - np.uint64(BLOCK_SIZE / self._sigs_sampling_rate * 1e6) - self._timestamp_limits.append((ts0, ts1)) - t_start = ts0 / 1e6 - self._sigs_t_start.append(t_start) - t_stop = ts1 / 1e6 - self._sigs_t_stop.append(t_stop) - length = subdata.size * BLOCK_SIZE - self._sigs_length.append(length) - - -class NcsBlocks(): - """ - Contains information regarding the blocks of records in an Ncs file. - Factory methods perform parsing of this information from an Ncs file or - confirmation that file agrees with block structure. - """ +.. autoclass:: neo.rawio.MicromedRawIO - startBlocks = [] - endBlocks = [] - sampFreqUsed = 0 - microsPerSampUsed = 0 + .. autoattribute:: extensions +.. autoclass:: neo.rawio.NeuralynxRawIO -class NlxHeader(OrderedDict): - """ - Representation of basic information in all 16 kbytes Neuralynx file headers, - including dates opened and closed if given. - """ + .. autoattribute:: extensions - HEADER_SIZE = 2 ** 14 # Neuralynx files have a txt header of 16kB - - # helper function to interpret boolean keys - def _to_bool(txt): - if txt == 'True': - return True - elif txt == 'False': - return False - else: - raise Exception('Can not convert %s to bool' % (txt)) - - # keys that may be present in header which we parse - txt_header_keys = [ - ('AcqEntName', 'channel_names', None), # used - ('FileType', '', None), - ('FileVersion', '', None), - ('RecordSize', '', None), - ('HardwareSubSystemName', '', None), - ('HardwareSubSystemType', '', None), - ('SamplingFrequency', 'sampling_rate', float), # used - ('ADMaxValue', '', None), - ('ADBitVolts', 'bit_to_microVolt', None), # used - ('NumADChannels', '', None), - ('ADChannel', 'channel_ids', None), # used - ('InputRange', '', None), - ('InputInverted', 'input_inverted', _to_bool), # used - ('DSPLowCutFilterEnabled', '', None), - ('DspLowCutFrequency', '', None), - ('DspLowCutNumTaps', '', None), - ('DspLowCutFilterType', '', None), - ('DSPHighCutFilterEnabled', '', None), - ('DspHighCutFrequency', '', None), - ('DspHighCutNumTaps', '', None), - ('DspHighCutFilterType', '', None), - ('DspDelayCompensation', '', None), - ('DspFilterDelay_µs', '', None), - ('DisabledSubChannels', '', None), - ('WaveformLength', '', int), - ('AlignmentPt', '', None), - ('ThreshVal', '', None), - ('MinRetriggerSamples', '', None), - ('SpikeRetriggerTime', '', None), - ('DualThresholding', '', None), - (r'Feature \w+ \d+', '', None), - ('SessionUUID', '', None), - ('FileUUID', '', None), - ('CheetahRev', '', None), # only for older versions of Cheetah - ('ProbeName', '', None), - ('OriginalFileName', '', None), - ('TimeCreated', '', None), - ('TimeClosed', '', None), - ('ApplicationName', '', None), # also include version number when present - ('AcquisitionSystem', '', None), - ('ReferenceChannel', '', None), - ('NLX_Base_Class_Type', '', None) # in version 4 and earlier versions of Cheetah - ] - - # Filename and datetime may appear in header lines starting with # at - # beginning of header or in later versions as a property. The exact format - # used depends on the application name and its version as well as the - # -FileVersion property. - # - # There are 3 styles understood by this code and the patterns used for parsing - # the items within each are stored in a dictionary. Each dictionary is then - # stored in main dictionary keyed by an abbreviation for the style. - header_pattern_dicts = { - # Cheetah before version 5 and BML - 'bv5': dict( - datetime1_regex=r'## Time Opened: \(m/d/y\): (?P\S+)' - r' At Time: (?P