From 9de2d92fc3129ee8ca648e91aa722e328ba23ec0 Mon Sep 17 00:00:00 2001 From: Lukas Tenbrink Date: Thu, 4 Jul 2024 16:34:20 +0200 Subject: [PATCH] Streamline file reading for single segment records, record headers and annotation files. These methods all now stack between the public functions, handling mainly i/o, and the parsing functions. --- wfdb/io/_coreio.py | 15 ++- wfdb/io/_signal.py | 252 +++++++++++++++--------------------------- wfdb/io/annotation.py | 123 ++++++++++++--------- wfdb/io/download.py | 96 ---------------- wfdb/io/record.py | 49 ++++---- 5 files changed, 192 insertions(+), 343 deletions(-) diff --git a/wfdb/io/_coreio.py b/wfdb/io/_coreio.py index 9b3a7876..033424ed 100644 --- a/wfdb/io/_coreio.py +++ b/wfdb/io/_coreio.py @@ -1,14 +1,16 @@ import posixpath +import os from wfdb.io import _url from wfdb.io.download import config def _open_file( - pn_dir, file_name, mode="r", *, + dir_name="", + pn_dir=None, buffering=-1, encoding=None, errors=None, @@ -24,15 +26,18 @@ def _open_file( Parameters ---------- - pn_dir : str or None - The PhysioNet database directory where the file is stored, or None - if file_name is a local path. file_name : str The name of the file, either as a local filesystem path (if `pn_dir` is None) or a URL path (if `pn_dir` is a string.) mode : str, optional The standard I/O mode for the file ("r" by default). If `pn_dir` is not None, this must be "r", "rt", or "rb". + dir_name : str or None + If passed, and pn_dir is None, the directory will be prepended + to the file_name. + pn_dir : str or None + The PhysioNet database directory where the file is stored, or None + if file_name is a local path. buffering : int, optional Buffering policy. encoding : str, optional @@ -48,7 +53,7 @@ def _open_file( """ if pn_dir is None: return open( - file_name, + os.path.join(dir_name, file_name), mode, buffering=buffering, encoding=encoding, diff --git a/wfdb/io/_signal.py b/wfdb/io/_signal.py index 4ee09225..f934334f 100644 --- a/wfdb/io/_signal.py +++ b/wfdb/io/_signal.py @@ -4,7 +4,9 @@ import numpy as np -from wfdb.io import download, _coreio, util +from wfdb.io import download +from wfdb.io import util +from wfdb.io import _coreio MAX_I32 = 2147483647 @@ -1066,7 +1068,6 @@ def _rd_segment( ignore_skew, no_file=False, sig_data=None, - sig_stream=None, return_res=64, ): """ @@ -1196,24 +1197,24 @@ def _rd_segment( signals = [None] * len(channels) for fn in w_file_name: - # Get the list of all signals contained in the dat file - datsignals = _rd_dat_signals( - file_name=fn, - dir_name=dir_name, - pn_dir=pn_dir, - fmt=w_fmt[fn], - n_sig=len(datchannel[fn]), - sig_len=sig_len, - byte_offset=w_byte_offset[fn], - samps_per_frame=w_samps_per_frame[fn], - skew=w_skew[fn], - init_value=w_init_value[fn], - sampfrom=sampfrom, - sampto=sampto, - no_file=no_file, - sig_data=sig_data, - sig_stream=sig_stream, - ) + with _coreio._open_file( + fn, "rb", pn_dir=pn_dir, dir_name=dir_name + ) as io: + # Get the list of all signals contained in the dat file + datsignals = _rd_dat_signals( + io, + fmt=w_fmt[fn], + n_sig=len(datchannel[fn]), + sig_len=sig_len, + byte_offset=w_byte_offset[fn], + samps_per_frame=w_samps_per_frame[fn], + skew=w_skew[fn], + init_value=w_init_value[fn], + sampfrom=sampfrom, + sampto=sampto, + no_file=no_file, + sig_data=sig_data, + ) # Copy over the wanted signals for cn in range(len(out_dat_channel[fn])): @@ -1223,9 +1224,7 @@ def _rd_segment( def _rd_dat_signals( - file_name, - dir_name, - pn_dir, + io, fmt, n_sig, sig_len, @@ -1237,21 +1236,14 @@ def _rd_dat_signals( sampto, no_file=False, sig_data=None, - sig_stream=None, ): """ Read all signals from a WFDB dat file. Parameters ---------- - file_name : str - The name of the dat file. - dir_name : str - The full directory where the dat file(s) are located, if the dat - file(s) are local. - pn_dir : str - The PhysioNet directory where the dat file(s) are located, if - the dat file(s) are remote. + io : io + The io to read the dat file from. fmt : str The format of the dat file. n_sig : int @@ -1327,32 +1319,17 @@ def _rd_dat_signals( if no_file: data_to_read = sig_data elif fmt in COMPRESSED_FMTS: - if sig_stream is not None: - data_to_read = _rd_compressed_stream( - fp=sig_stream, - fmt=fmt, - sample_offset=byte_offset, - n_sig=n_sig, - samps_per_frame=samps_per_frame, - start_frame=sampfrom, - end_frame=sampto, - ) - else: - data_to_read = _rd_compressed_file( - file_name=file_name, - dir_name=dir_name, - pn_dir=pn_dir, - fmt=fmt, - sample_offset=byte_offset, - n_sig=n_sig, - samps_per_frame=samps_per_frame, - start_frame=sampfrom, - end_frame=sampto, - ) - else: - data_to_read = _rd_dat_file( - file_name, dir_name, pn_dir, fmt, start_byte, n_read_samples, sig_stream + data_to_read = _rd_compressed_stream( + io, + fmt=fmt, + sample_offset=byte_offset, + n_sig=n_sig, + samps_per_frame=samps_per_frame, + start_frame=sampfrom, + end_frame=sampto, ) + else: + data_to_read = _rd_dat_stream(io, fmt, start_byte, n_read_samples) if extra_flat_samples: if fmt in UNALIGNED_FMTS: @@ -1591,7 +1568,7 @@ def _required_byte_num(mode, fmt, n_samp): return int(n_bytes) -def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp, sig_stream): +def _rd_dat_stream(io, fmt, start_byte, n_samp): """ Read data from a dat file, either local or remote, into a 1d numpy array. @@ -1602,14 +1579,8 @@ def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp, sig_strea Parameters ---------- - file_name : str - The name of the dat file. - dir_name : str - The full directory where the dat file(s) are located, if the dat - file(s) are local. - pn_dir : str - The PhysioNet directory where the dat file(s) are located, if - the dat file(s) are remote. + io : io + The io to read the dat file from. fmt : str The format of the dat file. start_byte : int @@ -1649,27 +1620,13 @@ def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp, sig_strea element_count = n_samp byte_count = n_samp * BYTES_PER_SAMPLE[fmt] - # Memory Stream - if sig_stream is not None: - sig_stream.seek(start_byte) - sig_data = np.frombuffer( - sig_stream.read(), dtype=np.dtype(DATA_LOAD_TYPES[fmt]), count=element_count - ) - # Local dat file - elif pn_dir is None: - with open(os.path.join(dir_name, file_name), "rb") as fp: - fp.seek(start_byte) - sig_data = np.fromfile( - fp, dtype=np.dtype(DATA_LOAD_TYPES[fmt]), count=element_count - ) - # Stream dat file from Physionet - else: - dtype_in = np.dtype(DATA_LOAD_TYPES[fmt]) - sig_data = download._stream_dat( - file_name, pn_dir, byte_count, start_byte, dtype_in - ) + io.seek(start_byte) + return np.frombuffer( + io.read(byte_count), + dtype=np.dtype(DATA_LOAD_TYPES[fmt]), + count=element_count, + ) - return sig_data def _blocks_to_samples(sig_data, n_samp, fmt): """ @@ -1790,7 +1747,7 @@ def _blocks_to_samples(sig_data, n_samp, fmt): def _rd_compressed_stream( - fp, + io, fmt, sample_offset, n_sig, @@ -1798,12 +1755,52 @@ def _rd_compressed_stream( start_frame, end_frame, ): - signature = fp.read(4) + """ + Read data from a compressed file into a 1D numpy array. + Parameters + ---------- + io : io + The io to read the dat file from. + fmt : str + The format code of the signal file. + sample_offset : int + The sample number in the signal file corresponding to sample 0 of + the WFDB record. + n_sig : int + The number of signals in the file. + samps_per_frame : list + The number of samples per frame for each signal in the file. + start_frame : int + The starting frame number to read. + end_frame : int + The ending frame number to read. + Returns + ------- + signal : ndarray + The data read from the signal file. This is a one-dimensional + array in the same order the samples would be stored in a binary + signal file; `signal[(i*n_sig+j)*samps_per_frame[0]+k]` is sample + number `i*samps_per_frame[0]+k` of signal `j`. + Notes + ----- + Converting the output array into "dat file order" here is inefficient, + but necessary to match the behavior of _rd_dat_file. It would be + better to reorganize _rd_dat_signals to make the reshaping unnecessary. + """ + import soundfile + + if any(spf != samps_per_frame[0] for spf in samps_per_frame): + raise ValueError( + "All channels in a FLAC signal file must have the same " + "sampling rate and samples per frame" + ) + + signature = io.read(4) if signature != b"fLaC": - raise ValueError(f"{fp.name} is not a FLAC file") - fp.seek(0) + raise ValueError(f"{io.name} is not a FLAC file") + io.seek(0) - with soundfile.SoundFile(fp) as sf: + with soundfile.SoundFile(io) as sf: # Determine the actual resolution of the FLAC stream and the # data type will use when reading it. Note that soundfile # doesn't support int8. @@ -1817,18 +1814,18 @@ def _rd_compressed_stream( format_bits = 24 read_dtype = "int32" else: - raise ValueError(f"unknown subtype in {fp.name} ({sf.subtype})") + raise ValueError(f"unknown subtype in {io.name} ({sf.subtype})") max_bits = int(fmt) - 500 if format_bits > max_bits: raise ValueError( - f"wrong resolution in {fp.name} " + f"wrong resolution in {io.name} " f"({format_bits}, expected <= {max_bits})" ) if sf.channels != n_sig: raise ValueError( - f"wrong number of channels in {fp.name} " + f"wrong number of channels in {io.name} " f"({sf.channels}, expected {n_sig})" ) @@ -1906,73 +1903,6 @@ def _rd_compressed_stream( return sig_data.reshape(-1) -def _rd_compressed_file( - file_name, - dir_name, - pn_dir, - fmt, - sample_offset, - n_sig, - samps_per_frame, - start_frame, - end_frame, -): - """ - Read data from a compressed file into a 1D numpy array. - - Parameters - ---------- - file_name : str - The name of the signal file. - dir_name : str - The full directory where the signal file is located, if local. - This argument is ignored if `pn_dir` is not None. - pn_dir : str or None - The PhysioNet database directory where the signal file is located. - fmt : str - The format code of the signal file. - sample_offset : int - The sample number in the signal file corresponding to sample 0 of - the WFDB record. - n_sig : int - The number of signals in the file. - samps_per_frame : list - The number of samples per frame for each signal in the file. - start_frame : int - The starting frame number to read. - end_frame : int - The ending frame number to read. - - Returns - ------- - signal : ndarray - The data read from the signal file. This is a one-dimensional - array in the same order the samples would be stored in a binary - signal file; `signal[(i*n_sig+j)*samps_per_frame[0]+k]` is sample - number `i*samps_per_frame[0]+k` of signal `j`. - - Notes - ----- - Converting the output array into "dat file order" here is inefficient, - but necessary to match the behavior of _rd_dat_file. It would be - better to reorganize _rd_dat_signals to make the reshaping unnecessary. - - """ - import soundfile - - if any(spf != samps_per_frame[0] for spf in samps_per_frame): - raise ValueError( - "All channels in a FLAC signal file must have the same " - "sampling rate and samples per frame" - ) - - if pn_dir is None: - file_name = os.path.join(dir_name, file_name) - - with _coreio._open_file(pn_dir, file_name, "rb") as fp: - return _rd_compressed_stream(fp, fmt, sample_offset, n_sig, samps_per_frame, start_frame, end_frame) - - def _skew_sig( sig, skew, n_sig, read_len, fmt, nan_replace, samps_per_frame=None ): diff --git a/wfdb/io/annotation.py b/wfdb/io/annotation.py index ae86cc9f..fc4284c2 100644 --- a/wfdb/io/annotation.py +++ b/wfdb/io/annotation.py @@ -8,6 +8,7 @@ from wfdb.io import download from wfdb.io import _header +from wfdb.io import _coreio from wfdb.io import record @@ -1883,7 +1884,6 @@ def rdann( pn_dir=None, return_label_elements=["symbol"], summarize_labels=False, - ann_stream=None, ): """ Read a WFDB annotation file record_name.extension and return an @@ -1947,11 +1947,75 @@ def rdann( sampfrom, sampto, return_label_elements ) + with _coreio._open_file( + record_name + "." + extension, + "rb", + pn_dir=pn_dir, + ) as fp: + annotation = _rdann( + fp, + sampfrom, + sampto, + shift_samps, + return_label_elements, + summarize_labels, + ) + + # Try to get fs from the header file if it is not contained in the + # annotation file + if annotation.fs is None: + try: + rec = record.rdheader(record_name, pn_dir) + annotation.fs = rec.fs + except: + pass + + annotation.record_name = os.path.split(record_name)[1] + annotation.extension = extension + + return annotation + + +def _rdann( + io, + sampfrom=0, + sampto=None, + shift_samps=False, + return_label_elements=["symbol"], + summarize_labels=False, +): + """ + Read a WFDB annotation file and return an Annotation object. + + Parameters + ---------- + io : io + The io to read the annotation file from. + sampfrom : int, optional + The minimum sample number for annotations to be returned. + sampto : int, optional + The maximum sample number for annotations to be returned. + shift_samps : bool, optional + Specifies whether to return the sample indices relative to `sampfrom` + (True), or sample 0 (False). + return_label_elements : list, optional + The label elements that are to be returned from reading the annotation + file. A list with at least one of the following options: 'symbol', + 'label_store', 'description'. + summarize_labels : bool, optional + If True, assign a summary table of the set of annotation labels + contained in the file to the 'contained_labels' attribute of the + returned object. This table will contain the columns: + ['label_store', 'symbol', 'description', 'n_occurrences']. + + Returns + ------- + annotation : Annotation + The Annotation object. Call help(wfdb.Annotation) for the attribute + descriptions. + """ # Read the file in byte pairs - if ann_stream is not None: - filebytes = np.frombuffer(ann_stream.read(), " str: - """ - Stream the text of a remote header file. - - Parameters - ---------- - file_name : str - The name of the headerr file to be read. - pn_dir : str - The PhysioNet database directory from which to find the - required header file. eg. For file '100.hea' in - 'http://physionet.org/content/mitdb', pn_dir='mitdb'. - - Returns - ------- - N/A : str - The text contained in the header file - - """ - # Full url of header location - url = posixpath.join(config.db_index_url, pn_dir, file_name) - - # Get the content of the remote file - with _url.openurl(url, "rb") as f: - content = f.read() - - return content.decode("iso-8859-1") - - -def _stream_dat(file_name, pn_dir, byte_count, start_byte, dtype): - """ - Stream data from a remote dat file into a 1d numpy array. - - Parameters - ---------- - file_name : str - The name of the dat file to be read. - pn_dir : str - The PhysioNet directory where the dat file is located. - byte_count : int - The number of bytes to be read. - start_byte : int - The starting byte number to read from. - dtype : str - The numpy dtype to load the data into. - - Returns - ------- - sig_data : ndarray - The data read from the dat file. - - """ - # Full url of dat file - url = posixpath.join(config.db_index_url, pn_dir, file_name) - - # Get the content - with _url.openurl(url, "rb", buffering=0) as f: - f.seek(start_byte) - content = f.read(byte_count) - - # Convert to numpy array - sig_data = np.fromstring(content, dtype=dtype) - - return sig_data - - -def _stream_annotation(file_name, pn_dir): - """ - Stream an entire remote annotation file from Physionet. - - Parameters - ---------- - file_name : str - The name of the annotation file to be read. - pn_dir : str - The PhysioNet directory where the annotation file is located. - - Returns - ------- - ann_data : ndarray - The resulting data stream in numpy array format. - - """ - # Full url of annotation file - url = posixpath.join(config.db_index_url, pn_dir, file_name) - - # Get the content - with _url.openurl(url, "rb") as f: - content = f.read() - - # Convert to numpy array - ann_data = np.fromstring(content, dtype=np.dtype("