|
6 | 6 | import posixpath
|
7 | 7 | import pdb
|
8 | 8 | import struct
|
| 9 | +import sys |
9 | 10 |
|
10 | 11 | from wfdb.io import download
|
11 | 12 | from wfdb.io import _header
|
@@ -2570,20 +2571,22 @@ def rdedfann(record_name, pn_dir=None, delete_file=True, info_only=True,
|
2570 | 2571 | by the original WFDB package. Must not be True if `record_only` is
|
2571 | 2572 | True.
|
2572 | 2573 | record_only : bool, optional
|
2573 |
| - Whether to only return the record information (True) or not (False). |
2574 |
| - If False, this function will generate both a .dat and .hea file. Must |
2575 |
| - not be True if `info_only` is True. |
| 2574 | + Whether to only return the annotation information (True) or not |
| 2575 | + (False). If False, this function will generate a WFDB-formatted |
| 2576 | + annotation file. If True, it will return the object returned if that |
| 2577 | + file were read with `rdann`. Must not be True if `info_only` is True. |
2576 | 2578 | verbose : bool, optional
|
2577 | 2579 | Whether to print all the information read about the file (True) or
|
2578 | 2580 | not (False).
|
2579 | 2581 |
|
2580 | 2582 | Returns
|
2581 | 2583 | -------
|
2582 |
| - record : dict, optional |
2583 |
| - All of the record information needed to generate MIT formatted files. |
2584 |
| - Only returns if 'record_only' is set to True, else generates the |
2585 |
| - corresponding .dat and .hea files. This record file will not match the |
2586 |
| - `rdrecord` output since it will only give us the digital signal for now. |
| 2584 | + N/A : dict, Annotation, optional |
| 2585 | + If 'info_only' is set to True, return all of the annotation |
| 2586 | + information needed to generate WFDB-formatted annotation files. |
| 2587 | + If 'record_only' is set to True, return the WFDB-formatted annotation |
| 2588 | + object generated by the `rdann` output. If none are set to True, write |
| 2589 | + the WFDB-formatted annotation file. |
2587 | 2590 |
|
2588 | 2591 | Notes
|
2589 | 2592 | -----
|
@@ -2707,6 +2710,269 @@ def rdedfann(record_name, pn_dir=None, delete_file=True, info_only=True,
|
2707 | 2710 | fs=fs)
|
2708 | 2711 |
|
2709 | 2712 |
|
| 2713 | +def mrgann(ann_file1, ann_file2, out_file_name='merged_ann.atr', |
| 2714 | + merge_method='combine', chan1=-1, chan2=-1, start_ann=0, |
| 2715 | + end_ann='e', record_only=True, verbose=False): |
| 2716 | + """ |
| 2717 | + This function reads a pair of annotation files (specified by `ann_file1` |
| 2718 | + and `ann_file2`) for the specified record and writes a third annotation |
| 2719 | + file (specified by `out_file_name`) for the same record. The header (.hea) |
| 2720 | + file should be included in the same directory as each annotation file so |
| 2721 | + that the sampling rate can be read. Typical applications of `mrgann` |
| 2722 | + include combining annotation files that apply to different signals within |
| 2723 | + a multi-signal record, and replacing a segment of an annotation file with |
| 2724 | + annotations from another file. For example, setting 'merge_method' to |
| 2725 | + 'combine' will simply blindly merge the annotation files for the specified |
| 2726 | + 'start_ann' and 'end_ann' range while setting 'merge_method' to 'replace1' |
| 2727 | + will replace the contents of the first file with the second in that |
| 2728 | + specified range. Setting 'merge_method' to 'replace2' will replace the |
| 2729 | + contents of the second file with the first in that specified range. |
| 2730 | +
|
| 2731 | + Parameters |
| 2732 | + ---------- |
| 2733 | + ann_file1 : string |
| 2734 | + The file path of the first annotation file (with extension included). |
| 2735 | + ann_file2 : string |
| 2736 | + The file path of the second annotation file (with extension included). |
| 2737 | + out_file_name : string |
| 2738 | + The name of the output file name (with extension included). The |
| 2739 | + default is 'merged_ann.atr'. |
| 2740 | + merge_method : string, optional |
| 2741 | + The method used to merge the two annotation files. The default is |
| 2742 | + 'combine' which simply combines the two files along every attribute; |
| 2743 | + duplicates will be preserved. The other options are 'replace1' which |
| 2744 | + replaces attributes of the first annotation file with attributes of |
| 2745 | + the second for the desired time range, 'replace2' which does the |
| 2746 | + same thing except switched (first file replaces second), and 'delete' |
| 2747 | + which deletes all of the annotations in the desired time range. |
| 2748 | + chan1 : int, optional |
| 2749 | + Sets the value of `chan` for the first annotation file. The default is |
| 2750 | + -1 which means to keep it the same. |
| 2751 | + chan2 : int, optional |
| 2752 | + Sets the value of `chan` for the second annotation file. The default |
| 2753 | + is -1 which means to keep it the same. |
| 2754 | + start_ann : float, int, string, optional |
| 2755 | + The location (sample, time, etc.) to start the annotation filtering. |
| 2756 | + If float, it will be interpreted as time in seconds. If int, it will |
| 2757 | + be interpreted as sample number. If string, it will be interpreted |
| 2758 | + as time formatted in HH:MM:SS format (the same as that in `wfdbtime`). |
| 2759 | + The default is 0 to represent sample number 0. A value of 0.0 would |
| 2760 | + represent 0 seconds instead. |
| 2761 | + end_ann : float, int, string, optional |
| 2762 | + The location (sample, time, etc.) to stop the annotation filtering. |
| 2763 | + If float, it will be interpreted as time in seconds. If int, it will |
| 2764 | + be interpreted as sample number. If string, it will be interpreted |
| 2765 | + as time formatted in HH:MM:SS format (the same as that in `wfdbtime`). |
| 2766 | + The default is 'e' to represent the end of the annotation. |
| 2767 | + record_only : bool, optional |
| 2768 | + Whether to only return the annotation information (True) or not |
| 2769 | + (False). If False, this function will generate a WFDB-formatted |
| 2770 | + annotation file. If True, it will return the object returned if that |
| 2771 | + file was read with `rdann`. |
| 2772 | + verbose : bool, optional |
| 2773 | + Whether to print all the information read about each annotation file |
| 2774 | + and the methodology for merging them (True) or not (False). |
| 2775 | +
|
| 2776 | + Returns |
| 2777 | + ------- |
| 2778 | + N/A : Annotation, optional |
| 2779 | + If 'record_only' is set to True, then return the new WFDB-formatted |
| 2780 | + annotation object which is the same as generated by the `rdann` |
| 2781 | + output. Else, create the WFDB-formatted annotation file. |
| 2782 | +
|
| 2783 | + """ |
| 2784 | + ann1 = rdann(ann_file1.split('.')[0], ann_file1.split('.')[1]) |
| 2785 | + ann2 = rdann(ann_file2.split('.')[0], ann_file2.split('.')[1]) |
| 2786 | + if ann1.fs != ann2.fs: |
| 2787 | + raise Exception('Annotation sample rates do not match up: samples ' |
| 2788 | + 'can be aligned but final sample rate can not be ' |
| 2789 | + 'determined') |
| 2790 | + # Apply the channel mapping if desired |
| 2791 | + if chan1 != -1: |
| 2792 | + if chan1 < -1: |
| 2793 | + raise Exception('Invalid value for `chan1`: must be >= 0') |
| 2794 | + ann1.chan = np.array([chan1] * ann1.ann_len) |
| 2795 | + if chan2 != -1: |
| 2796 | + if chan2 < -1: |
| 2797 | + raise Exception('Invalid value for `chan2`: must be >= 0') |
| 2798 | + ann2.chan = np.array([chan2] * ann2.ann_len) |
| 2799 | + |
| 2800 | + if start_ann == 'e': |
| 2801 | + raise Exception('Start time can not be set to the end of the record') |
| 2802 | + if end_ann == 0: |
| 2803 | + raise Exception('End time can not be set to the start of the record') |
| 2804 | + |
| 2805 | + samples = [] |
| 2806 | + for i,time in enumerate([start_ann, end_ann]): |
| 2807 | + if time == 'e': |
| 2808 | + # End of annotation, set end sample to largest int, roughly |
| 2809 | + sample = sys.maxsize |
| 2810 | + else: |
| 2811 | + if type(time) is int: |
| 2812 | + # Sample number |
| 2813 | + sample = time |
| 2814 | + elif type(time) is float: |
| 2815 | + # Time in seconds |
| 2816 | + sample = int(time * ann1.fs) |
| 2817 | + else: |
| 2818 | + # HH:MM:SS format, loosely |
| 2819 | + time_split = [t if t != '' else '0' for t in time.split(':')] |
| 2820 | + if len(time_split) == 1: |
| 2821 | + seconds = float(time)%60 |
| 2822 | + minutes = int(float(time)//60) |
| 2823 | + hours = int(float(time)//60//60) |
| 2824 | + elif len(time_split) == 2: |
| 2825 | + seconds = float(time_split[1]) |
| 2826 | + minutes = int(time_split[0]) |
| 2827 | + hours = 0 |
| 2828 | + elif len(time_split) == 3: |
| 2829 | + seconds = float(time_split[2]) |
| 2830 | + minutes = int(time_split[1]) |
| 2831 | + hours = int(time_split[0]) |
| 2832 | + if seconds >= 60: |
| 2833 | + raise Exception('Seconds not in correct format') |
| 2834 | + if minutes >= 60: |
| 2835 | + raise Exception('Minutes not in correct format') |
| 2836 | + total_seconds = hours*60*60 + minutes*60 + seconds |
| 2837 | + if (i == 1) and (total_seconds == 0): |
| 2838 | + raise Exception('End time can not be set to the start of ' |
| 2839 | + 'the record') |
| 2840 | + sample = int(total_seconds * ann1.fs) |
| 2841 | + if sample > max([max(ann1.sample), max(ann2.sample)]): |
| 2842 | + if i == 0: |
| 2843 | + raise Exception('Start time can not be set to the ' |
| 2844 | + 'end of the record') |
| 2845 | + else: |
| 2846 | + print("'end_ann' greater than the highest " |
| 2847 | + "annotation... reverting to the highest " |
| 2848 | + "annotation") |
| 2849 | + samples.append(sample) |
| 2850 | + start_sample = samples[0] |
| 2851 | + end_sample = samples[1] |
| 2852 | + if verbose: |
| 2853 | + print(f'Start sample: {start_sample}, end sample: {end_sample}') |
| 2854 | + |
| 2855 | + if (merge_method == 'combine') or (merge_method == 'delete'): |
| 2856 | + if verbose: |
| 2857 | + print('Combining the two files together') |
| 2858 | + # The sample should never be empty but others can (though they |
| 2859 | + # shouldn't be) |
| 2860 | + both_sample = np.concatenate([ann1.sample, ann2.sample]).astype(np.int64) |
| 2861 | + # Generate a list of sorted indices then sort the array |
| 2862 | + sort_indices = np.argsort(both_sample) |
| 2863 | + both_sample = np.sort(both_sample) |
| 2864 | + # Find where to filter the array |
| 2865 | + if merge_method == 'combine': |
| 2866 | + sample_range = ((both_sample >= start_sample) & |
| 2867 | + (both_sample <= end_sample)) |
| 2868 | + if merge_method == 'delete': |
| 2869 | + sample_range = ((both_sample < start_sample) | |
| 2870 | + (both_sample > end_sample)) |
| 2871 | + index_range = np.where(sample_range)[0] |
| 2872 | + both_sample = both_sample[sample_range] |
| 2873 | + # Combine both annotation attributes |
| 2874 | + ann_attr = {} |
| 2875 | + blank_array = np.array([], dtype=np.int64) |
| 2876 | + for cat in ['chan', 'num', 'subtype', 'label_store', 'symbol', |
| 2877 | + 'aux_note']: |
| 2878 | + ann1_cat = ann1.__dict__[cat] |
| 2879 | + ann2_cat = ann2.__dict__[cat] |
| 2880 | + if cat in ['symbol', 'aux_note']: |
| 2881 | + ann1_cat = ann1_cat if ann1_cat is not None else [] |
| 2882 | + ann2_cat = ann2_cat if ann2_cat is not None else [] |
| 2883 | + temp_cat = ann1_cat |
| 2884 | + temp_cat.extend(ann2_cat) |
| 2885 | + if len(temp_cat) == 0: |
| 2886 | + ann_attr[cat] = None |
| 2887 | + else: |
| 2888 | + temp_cat = [temp_cat[i] for i in sort_indices] |
| 2889 | + ann_attr[cat] = [temp_cat[i] for i in index_range] |
| 2890 | + else: |
| 2891 | + ann1_cat = ann1_cat if ann1_cat is not None else blank_array |
| 2892 | + ann2_cat = ann2_cat if ann2_cat is not None else blank_array |
| 2893 | + temp_cat = np.concatenate([ann1_cat, ann2_cat]).astype(np.int64) |
| 2894 | + if temp_cat.shape[0] == 0: |
| 2895 | + ann_attr[cat] = None |
| 2896 | + else: |
| 2897 | + temp_cat = np.array([temp_cat[i] for i in sort_indices]) |
| 2898 | + ann_attr[cat] = np.array([temp_cat[i] for i in index_range]) |
| 2899 | + |
| 2900 | + elif (merge_method == 'replace1') or (merge_method == 'replace2'): |
| 2901 | + if merge_method == 'replace1': |
| 2902 | + if verbose: |
| 2903 | + print('Replacing the contents of the first file with the ' |
| 2904 | + 'contents of the second') |
| 2905 | + keep_ann = ann2 |
| 2906 | + remove_ann = ann1 |
| 2907 | + elif merge_method == 'replace2': |
| 2908 | + if verbose: |
| 2909 | + print('Replacing the contents of the second file with the ' |
| 2910 | + 'contents of the first') |
| 2911 | + keep_ann = ann1 |
| 2912 | + remove_ann = ann2 |
| 2913 | + # Find where to filter the first array |
| 2914 | + keep_sample_range = ((keep_ann.sample >= start_sample) & |
| 2915 | + (keep_ann.sample <= end_sample)) |
| 2916 | + keep_index_range = np.where(keep_sample_range)[0] |
| 2917 | + # Find where to filter the second array |
| 2918 | + remove_sample_range = ((remove_ann.sample < start_sample) | |
| 2919 | + (remove_ann.sample > end_sample)) |
| 2920 | + remove_index_range = np.where(remove_sample_range)[0] |
| 2921 | + # The sample should never be empty but others can (though they |
| 2922 | + # shouldn't be) |
| 2923 | + keep_ann_sample = keep_ann.sample[keep_index_range] |
| 2924 | + remove_ann_sample = remove_ann.sample[remove_index_range] |
| 2925 | + both_sample = np.concatenate([keep_ann_sample, remove_ann_sample]).astype(np.int64) |
| 2926 | + # Generate a list of sorted indices then sort the array |
| 2927 | + sort_indices = np.argsort(both_sample) |
| 2928 | + both_sample = np.sort(both_sample) |
| 2929 | + # Combine both annotation attributes |
| 2930 | + ann_attr = {} |
| 2931 | + blank_array = np.array([], dtype=np.int64) |
| 2932 | + for cat in ['chan', 'num', 'subtype', 'label_store', 'symbol', |
| 2933 | + 'aux_note']: |
| 2934 | + keep_cat = keep_ann.__dict__[cat] |
| 2935 | + remove_cat = remove_ann.__dict__[cat] |
| 2936 | + if cat in ['symbol', 'aux_note']: |
| 2937 | + keep_cat = [keep_cat[i] for i in keep_index_range] if keep_cat is not None else [] |
| 2938 | + remove_cat = [remove_cat[i] for i in remove_index_range] if remove_cat is not None else [] |
| 2939 | + temp_cat = keep_cat |
| 2940 | + temp_cat.extend(remove_cat) |
| 2941 | + if len(temp_cat) == 0: |
| 2942 | + ann_attr[cat] = None |
| 2943 | + else: |
| 2944 | + ann_attr[cat] = [temp_cat[i] for i in sort_indices] |
| 2945 | + else: |
| 2946 | + keep_cat = np.array([keep_cat[i] for i in keep_index_range]) if keep_cat is not None else blank_array |
| 2947 | + remove_cat = np.array([remove_cat[i] for i in remove_index_range]) if remove_cat is not None else blank_array |
| 2948 | + temp_cat = np.concatenate([keep_cat, remove_cat]).astype(np.int64) |
| 2949 | + if temp_cat.shape[0] == 0: |
| 2950 | + ann_attr[cat] = None |
| 2951 | + else: |
| 2952 | + ann_attr[cat] = np.array([temp_cat[i] for i in sort_indices]) |
| 2953 | + else: |
| 2954 | + raise Exception("Invalid value for 'merge_method': options are " |
| 2955 | + "'combine', 'replace1', and 'replace2'") |
| 2956 | + |
| 2957 | + if record_only: |
| 2958 | + if verbose: |
| 2959 | + print('Returning Annotation object') |
| 2960 | + return Annotation(record_name=out_file_name.split('.')[0], |
| 2961 | + extension=out_file_name.split('.')[1], |
| 2962 | + sample=both_sample, symbol=ann_attr['symbol'], |
| 2963 | + subtype=ann_attr['subtype'], chan=ann_attr['chan'], |
| 2964 | + num=ann_attr['num'], aux_note=ann_attr['aux_note'], |
| 2965 | + label_store=ann_attr['label_store'], fs=ann1.fs) |
| 2966 | + else: |
| 2967 | + if verbose: |
| 2968 | + print(f'Creating annotation file called: {out_file_name}') |
| 2969 | + wrann(out_file_name.split('.')[0], out_file_name.split('.')[1], |
| 2970 | + sample=both_sample, symbol=ann_attr['symbol'], |
| 2971 | + subtype=ann_attr['subtype'], chan=ann_attr['chan'], |
| 2972 | + num=ann_attr['num'], aux_note=ann_attr['aux_note'], |
| 2973 | + label_store=ann_attr['label_store'], fs=ann1.fs) |
| 2974 | + |
| 2975 | + |
2710 | 2976 | def _format_ann_from_df(df_in):
|
2711 | 2977 | """
|
2712 | 2978 | Parameters
|
|
0 commit comments