Skip to content

Commit

Permalink
Adds Amazon AWS support #268
Browse files Browse the repository at this point in the history
  • Loading branch information
Lucas-Mc committed Apr 29, 2021
1 parent 701317d commit b56b4b5
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 93 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ scikit-learn==0.22.2.post1
scipy==1.4.1
threadpoolctl==2.1.0
urllib3==1.25.9
s3fs==2021.4.0
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@
'scikit-learn>=0.18',
'scipy>=0.17.0',
'threadpoolctl>=1.0.0',
'urllib3>=1.22'
'urllib3>=1.22',
's3fs>=2021.4.0'
],

# List additional groups of dependencies here (e.g. development
Expand Down
2 changes: 1 addition & 1 deletion wfdb/io/_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -2091,7 +2091,7 @@ def _infer_sig_len(file_name, fmt, n_sig, dir_name, pn_dir=None):
file_size = os.path.getsize(os.path.join(dir_name, file_name))
else:
file_size = download._remote_file_size(file_name=file_name,
pn_dir=pn_dir)
remote_dir=pn_dir)

sig_len = int(file_size / (BYTES_PER_SAMPLE[fmt] * n_sig))

Expand Down
4 changes: 2 additions & 2 deletions wfdb/io/annotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1620,7 +1620,7 @@ def rdann(record_name, extension, sampfrom=0, sampto=None, shift_samps=False,
>>> ann = wfdb.rdann('sample-data/100', 'atr', sampto=300000)
"""
if (pn_dir is not None) and ('.' not in pn_dir):
if (pn_dir is not None) and ('.' not in pn_dir) and (not pn_dir.startswith('s3')):
dir_list = pn_dir.split('/')
pn_dir = posixpath.join(dir_list[0],
record.get_version(dir_list[0]),
Expand Down Expand Up @@ -2255,7 +2255,7 @@ def ann2rr(record_name, extension, pn_dir=None, start_time=None,
>>> 257
"""
if (pn_dir is not None) and ('.' not in pn_dir):
if (pn_dir is not None) and ('.' not in pn_dir) and (not pn_dir.startswith('s3')):
dir_list = pn_dir.split('/')
pn_dir = posixpath.join(dir_list[0], record.get_version(dir_list[0]),
*dir_list[1:])
Expand Down
152 changes: 104 additions & 48 deletions wfdb/io/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def set_db_index_url(db_index_url=PN_INDEX_URL):
config.db_index_url = db_index_url


def _remote_file_size(url=None, file_name=None, pn_dir=None):
def _remote_file_size(url=None, file_name=None, remote_dir=None):
"""
Get the remote file size in bytes.
Expand All @@ -59,11 +59,16 @@ def _remote_file_size(url=None, file_name=None, pn_dir=None):
The full url of the file. Use this option to explicitly
state the full url.
file_name : str, optional
The base file name. Use this argument along with pn_dir if you
The base file name. Use this argument along with remote_dir if you
want the full url to be constructed.
pn_dir : str, optional
The base file name. Use this argument along with file_name if
you want the full url to be constructed.
remote_dir : str, optional
The remote directory of either two things:
(1) The S3 URI form which to find the required header file. This
should always begin with 's3' in order to work correctly. An
example input would be: 's3://my-aws-bucket/'
(2) The PhysioNet database directory from which to find the
required header file. eg. For file '100.hea' in
'http://physionet.org/content/mitdb', remote_dir='mitdb'.
Returns
-------
Expand All @@ -72,8 +77,17 @@ def _remote_file_size(url=None, file_name=None, pn_dir=None):
"""
# Option to construct the url
if file_name and pn_dir:
url = posixpath.join(config.db_index_url, pn_dir, file_name)
if file_name and remote_dir:
if remote_dir.startswith('s3'):
# Set up the remote AWS S3 file system
import s3fs
fs = s3fs.S3FileSystem(anon=True)
file_dir = posixpath.join(remote_dir, file_name)
# Read and decode the lines
with fs.open(file_dir) as f:
return f.size
else:
url = posixpath.join(config.db_index_url, remote_dir, file_name)

response = requests.head(url, headers={'Accept-Encoding': 'identity'})
# Raise HTTPError if invalid url
Expand All @@ -85,18 +99,22 @@ def _remote_file_size(url=None, file_name=None, pn_dir=None):
return remote_file_size


def _stream_header(file_name, pn_dir):
def _stream_header(file_name, remote_dir):
"""
Stream the lines of a remote header file.
Parameters
----------
file_name : str
The name of the headerr file to be read.
pn_dir : str
The PhysioNet database directory from which to find the
required header file. eg. For file '100.hea' in
'http://physionet.org/content/mitdb', pn_dir='mitdb'.
remote_dir : str
The remote directory of either two things:
(1) The S3 URI form which to find the required header file. This
should always begin with 's3' in order to work correctly. An
example input would be: 's3://my-aws-bucket/'
(2) The PhysioNet database directory from which to find the
required header file. eg. For file '100.hea' in
'http://physionet.org/content/mitdb', remote_dir='mitdb'.
Returns
-------
Expand All @@ -106,15 +124,23 @@ def _stream_header(file_name, pn_dir):
All of the comment header lines.
"""
# Full url of header location
url = posixpath.join(config.db_index_url, pn_dir, file_name)
response = requests.get(url)

# Raise HTTPError if invalid url
response.raise_for_status()

# Get each line as a string
filelines = response.content.decode('iso-8859-1').splitlines()
if remote_dir.startswith('s3'):
# Set up the remote AWS S3 file system
import s3fs
fs = s3fs.S3FileSystem(anon=True)
file_dir = posixpath.join(remote_dir, file_name)
# Read and decode the lines
with fs.open(file_dir) as f:
filelines = f.readlines()
filelines = [l.decode('iso-8859-1') for l in filelines]
else:
# Full url of header location
url = posixpath.join(config.db_index_url, remote_dir, file_name)
response = requests.get(url)
# Raise HTTPError if invalid url
response.raise_for_status()
# Get each line as a string
filelines = response.content.decode('iso-8859-1').splitlines()

# Separate content into header and comment lines
header_lines = []
Expand All @@ -139,16 +165,22 @@ def _stream_header(file_name, pn_dir):
return (header_lines, comment_lines)


def _stream_dat(file_name, pn_dir, byte_count, start_byte, dtype):
def _stream_dat(file_name, remote_dir, byte_count, start_byte, dtype):
"""
Stream data from a remote dat file into a 1d numpy array.
Parameters
----------
file_name : str
The name of the dat file to be read.
pn_dir : str
The PhysioNet directory where the dat file is located.
remote_dir : str
The remote directory of either two things:
(1) The S3 URI form which to find the required header file. This
should always begin with 's3' in order to work correctly. An
example input would be: 's3://my-aws-bucket/'
(2) The PhysioNet database directory from which to find the
required header file. eg. For file '100.hea' in
'http://physionet.org/content/mitdb', remote_dir='mitdb'.
byte_count : int
The number of bytes to be read.
start_byte : int
Expand All @@ -162,58 +194,82 @@ def _stream_dat(file_name, pn_dir, byte_count, start_byte, dtype):
The data read from the dat file.
"""
# Full url of dat file
url = posixpath.join(config.db_index_url, pn_dir, file_name)
if remote_dir.startswith('s3'):
# Set up the remote AWS S3 file system
import s3fs
fs = s3fs.S3FileSystem(anon=True)
file_dir = posixpath.join(remote_dir, file_name)
# Read and decode the lines
with fs.open(file_dir) as f:
file_content = f.read()
else:
# Full url of dat file
url = posixpath.join(config.db_index_url, remote_dir, file_name)

# Specify the byte range
end_byte = start_byte + byte_count - 1
headers = {"Range":"bytes=%d-%d" % (start_byte, end_byte),
'Accept-Encoding': '*'}
# Specify the byte range
end_byte = start_byte + byte_count - 1
headers = {"Range":"bytes=%d-%d" % (start_byte, end_byte),
'Accept-Encoding': '*'}

# Get the content
response = requests.get(url, headers=headers, stream=True)
# Get the content
response = requests.get(url, headers=headers, stream=True)

# Raise HTTPError if invalid url
response.raise_for_status()
# Raise HTTPError if invalid url
response.raise_for_status()
file_content = response.content

# Convert to numpy array
if type(dtype) == str:
# Convert 24-bit to 16-bit then proceed
temp_data = np.frombuffer(response.content, 'b').reshape(-1,3)[:,1:].flatten().view('i2')
temp_data = np.frombuffer(file_content, 'b').reshape(-1,3)[:,1:].flatten().view('i2')
sig_data = np.fromstring(temp_data, dtype='i2')
else:
sig_data = np.fromstring(response.content, dtype=dtype)
sig_data = np.fromstring(file_content, dtype=dtype)

return sig_data


def _stream_annotation(file_name, pn_dir):
def _stream_annotation(file_name, remote_dir):
"""
Stream an entire remote annotation file from Physionet.
Parameters
----------
file_name : str
The name of the annotation file to be read.
pn_dir : str
The PhysioNet directory where the annotation file is located.
remote_dir : str
The remote directory of either two things:
(1) The S3 URI form which to find the required header file. This
should always begin with 's3' in order to work correctly. An
example input would be: 's3://my-aws-bucket/'
(2) The PhysioNet database directory from which to find the
required header file. eg. For file '100.hea' in
'http://physionet.org/content/mitdb', remote_dir='mitdb'.
Returns
-------
ann_data : ndarray
The resulting data stream in numpy array format.
"""
# Full url of annotation file
url = posixpath.join(config.db_index_url, pn_dir, file_name)

# Get the content
response = requests.get(url)
# Raise HTTPError if invalid url
response.raise_for_status()

if remote_dir.startswith('s3'):
# Set up the remote AWS S3 file system
import s3fs
fs = s3fs.S3FileSystem(anon=True)
file_dir = posixpath.join(remote_dir, file_name)
# Read and decode the lines
with fs.open(file_dir) as f:
file_content = f.read()
else:
# Full url of annotation file
url = posixpath.join(config.db_index_url, remote_dir, file_name)
# Get the content
response = requests.get(url)
# Raise HTTPError if invalid url
response.raise_for_status()
file_content = response.content
# Convert to numpy array
ann_data = np.fromstring(response.content, dtype=np.dtype('<u1'))
ann_data = np.fromstring(file_content, dtype=np.dtype('<u1'))

return ann_data

Expand Down
Loading

0 comments on commit b56b4b5

Please sign in to comment.