|
29 | 29 | import numpy as np |
30 | 30 | import quantities as pq |
31 | 31 |
|
32 | | -from neo.core.baseneo import BaseNeo, MergeError, merge_annotations |
| 32 | +from neo.core.baseneo import BaseNeo, MergeError, merge_annotations, intersect_annotations |
33 | 33 | from neo.core.dataobject import DataObject |
34 | 34 | from copy import copy, deepcopy |
35 | 35 |
|
@@ -657,3 +657,136 @@ def rectify(self, **kwargs): |
657 | 657 | rectified_signal.array_annotations = self.array_annotations.copy() |
658 | 658 |
|
659 | 659 | return rectified_signal |
| 660 | + |
| 661 | + def concatenate(self, *signals, overwrite=False, padding=False): |
| 662 | + """ |
| 663 | + Concatenate multiple neo.AnalogSignal objects across time. |
| 664 | +
|
| 665 | + Units, sampling_rate and number of signal traces must be the same |
| 666 | + for all signals. Otherwise a ValueError is raised. |
| 667 | + Note that timestamps of concatenated signals might shift in oder to |
| 668 | + align the sampling times of all signals. |
| 669 | +
|
| 670 | + Parameters |
| 671 | + ---------- |
| 672 | + signals: neo.AnalogSignal objects |
| 673 | + AnalogSignals that will be concatenated |
| 674 | + overwrite : bool |
| 675 | + If True, samples of the earlier (lower index in `signals`) |
| 676 | + signals are overwritten by that of later (higher index in `signals`) |
| 677 | + signals. |
| 678 | + If False, samples of the later are overwritten by earlier signal. |
| 679 | + Default: False |
| 680 | + padding : bool, scalar quantity |
| 681 | + Sampling values to use as padding in case signals do not overlap. |
| 682 | + If False, do not apply padding. Signals have to align or |
| 683 | + overlap. If True, signals will be padded using |
| 684 | + np.NaN as pad values. If a scalar quantity is provided, this |
| 685 | + will be used for padding. The other signal is moved |
| 686 | + forward in time by maximum one sampling period to |
| 687 | + align the sampling times of both signals. |
| 688 | + Default: False |
| 689 | +
|
| 690 | + Returns |
| 691 | + ------- |
| 692 | + signal: neo.AnalogSignal |
| 693 | + concatenated output signal |
| 694 | + """ |
| 695 | + |
| 696 | + # Sanity of inputs |
| 697 | + if not hasattr(signals, '__iter__'): |
| 698 | + raise TypeError('signals must be iterable') |
| 699 | + if not all([isinstance(a, AnalogSignal) for a in signals]): |
| 700 | + raise TypeError('Entries of anasiglist have to be of type neo.AnalogSignal') |
| 701 | + if len(signals) == 0: |
| 702 | + return self |
| 703 | + |
| 704 | + signals = [self] + list(signals) |
| 705 | + |
| 706 | + # Check required common attributes: units, sampling_rate and shape[-1] |
| 707 | + shared_attributes = ['units', 'sampling_rate'] |
| 708 | + attribute_values = [tuple((getattr(anasig, attr) for attr in shared_attributes)) |
| 709 | + for anasig in signals] |
| 710 | + # add shape dimensions that do not relate to time |
| 711 | + attribute_values = [(attribute_values[i] + (signals[i].shape[1:],)) |
| 712 | + for i in range(len(signals))] |
| 713 | + if not all([attrs == attribute_values[0] for attrs in attribute_values]): |
| 714 | + raise MergeError( |
| 715 | + f'AnalogSignals have to share {shared_attributes} attributes to be concatenated.') |
| 716 | + units, sr, shape = attribute_values[0] |
| 717 | + |
| 718 | + # find gaps between Analogsignals |
| 719 | + combined_time_ranges = self._concatenate_time_ranges( |
| 720 | + [(s.t_start, s.t_stop) for s in signals]) |
| 721 | + missing_time_ranges = self._invert_time_ranges(combined_time_ranges) |
| 722 | + if len(missing_time_ranges): |
| 723 | + diffs = np.diff(np.asarray(missing_time_ranges), axis=1) |
| 724 | + else: |
| 725 | + diffs = [] |
| 726 | + |
| 727 | + if padding is False and any(diffs > signals[0].sampling_period): |
| 728 | + raise MergeError(f'Signals are not continuous. Can not concatenate signals with gaps. ' |
| 729 | + f'Please provide a padding value.') |
| 730 | + if padding is not False: |
| 731 | + logger.warning('Signals will be padded using {}.'.format(padding)) |
| 732 | + if padding is True: |
| 733 | + padding = np.NaN * units |
| 734 | + if isinstance(padding, pq.Quantity): |
| 735 | + padding = padding.rescale(units).magnitude |
| 736 | + else: |
| 737 | + raise MergeError('Invalid type of padding value. Please provide a bool value ' |
| 738 | + 'or a quantities object.') |
| 739 | + |
| 740 | + t_start = min([a.t_start for a in signals]) |
| 741 | + t_stop = max([a.t_stop for a in signals]) |
| 742 | + n_samples = int(np.rint(((t_stop - t_start) * sr).rescale('dimensionless').magnitude)) |
| 743 | + shape = (n_samples,) + shape |
| 744 | + |
| 745 | + # Collect attributes and annotations across all concatenated signals |
| 746 | + kwargs = {} |
| 747 | + common_annotations = signals[0].annotations |
| 748 | + common_array_annotations = signals[0].array_annotations |
| 749 | + for anasig in signals[1:]: |
| 750 | + common_annotations = intersect_annotations(common_annotations, anasig.annotations) |
| 751 | + common_array_annotations = intersect_annotations(common_array_annotations, |
| 752 | + anasig.array_annotations) |
| 753 | + |
| 754 | + kwargs['annotations'] = common_annotations |
| 755 | + kwargs['array_annotations'] = common_array_annotations |
| 756 | + |
| 757 | + for name in ("name", "description", "file_origin"): |
| 758 | + attr = [getattr(s, name) for s in signals] |
| 759 | + if all([a == attr[0] for a in attr]): |
| 760 | + kwargs[name] = attr[0] |
| 761 | + else: |
| 762 | + kwargs[name] = f'concatenation ({attr})' |
| 763 | + |
| 764 | + conc_signal = AnalogSignal(np.full(shape=shape, fill_value=padding, dtype=signals[0].dtype), |
| 765 | + sampling_rate=sr, t_start=t_start, units=units, **kwargs) |
| 766 | + |
| 767 | + if not overwrite: |
| 768 | + signals = signals[::-1] |
| 769 | + while len(signals) > 0: |
| 770 | + conc_signal.splice(signals.pop(0), copy=False) |
| 771 | + |
| 772 | + return conc_signal |
| 773 | + |
| 774 | + def _concatenate_time_ranges(self, time_ranges): |
| 775 | + time_ranges = sorted(time_ranges) |
| 776 | + new_ranges = time_ranges[:1] |
| 777 | + for t_start, t_stop in time_ranges[1:]: |
| 778 | + # time range are non continuous -> define new range |
| 779 | + if t_start > new_ranges[-1][1]: |
| 780 | + new_ranges.append((t_start, t_stop)) |
| 781 | + # time range is continuous -> extend time range |
| 782 | + elif t_stop > new_ranges[-1][1]: |
| 783 | + new_ranges[-1] = (new_ranges[-1][0], t_stop) |
| 784 | + return new_ranges |
| 785 | + |
| 786 | + def _invert_time_ranges(self, time_ranges): |
| 787 | + i = 0 |
| 788 | + new_ranges = [] |
| 789 | + while i < len(time_ranges) - 1: |
| 790 | + new_ranges.append((time_ranges[i][1], time_ranges[i + 1][0])) |
| 791 | + i += 1 |
| 792 | + return new_ranges |
0 commit comments