Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TextIO not fully reading a GCS file when decompressive transcoding happens #33384

Merged
merged 10 commits into from
Jan 9, 2025
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@

## Bugfixes

* Fix data loss issues when reading gzipped files with TextIO (Python) ([#18390](https://github.com/apache/beam/issues/18390), [#31040](https://github.com/apache/beam/issues/31040)).
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Security Fixes
Expand Down
25 changes: 23 additions & 2 deletions sdks/python/apache_beam/io/gcp/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,8 +589,29 @@ def __init__(
blob,
chunk_size=DEFAULT_READ_BUFFER_SIZE,
enable_read_bucket_metric=False,
retry=DEFAULT_RETRY):
super().__init__(blob, chunk_size=chunk_size, retry=retry)
retry=DEFAULT_RETRY,
raw_download=True):
# By default, we always request to retrieve raw data from GCS even if the
# object meets the criteria of decompressive transcoding
# (https://cloud.google.com/storage/docs/transcoding).
super().__init__(
blob, chunk_size=chunk_size, retry=retry, raw_download=raw_download)
shunping marked this conversation as resolved.
Show resolved Hide resolved
# TODO: Remove this after
# https://github.com/googleapis/python-storage/issues/1406 is fixed.
# As a workaround, we manually trigger a reload here. Otherwise, an internal
# call of reader.seek() will cause an exception if raw_download is set
# when initializing BlobReader(),
blob.reload()

# TODO: Currently there is a bug in GCS server side when a client requests
# a file with "content-encoding=gzip" and "content-type=application/gzip" or
# "content-type=application/x-gzip", which will lead to infinite loop.
# We skip the support of this type of files until the GCS bug is fixed.
# Internal bug id: 203845981.
if (blob.content_encoding == "gzip" and
blob.content_type in ["application/gzip", "application/x-gzip"]):
raise NotImplementedError("Doubly compressed files not supported.")

self.enable_read_bucket_metric = enable_read_bucket_metric
self.mode = "r"

Expand Down
71 changes: 71 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,22 @@
import logging
import unittest
import uuid
import zlib

import mock
import pytest
from parameterized import parameterized
from parameterized import parameterized_class

from apache_beam import Create
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.textio import ReadAllFromText
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.combiners import Count

try:
from apache_beam.io.gcp import gcsio
Expand Down Expand Up @@ -230,6 +238,69 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name):
self.assertIsNone(self.gcsio.get_bucket(overridden_bucket_name))


class GcsIOReadGzipTest(unittest.TestCase):
gcs_path_prefix = "gs://apache-beam-samples/textio/"
gzip_test_files = [
"textio-test-data.content-type-gzip-content-encoding-gzip.1k.txt.gz",
"textio-test-data.content-type-gzip-content-encoding-none.1k.txt.gz",
"textio-test-data.content-type-none-content-encoding-gzip.1k.txt.gz",
"textio-test-data.content-type-none-content-encoding-none.1k.txt.gz",
"textio-test-data.content-type-text-content-encoding-gzip.1k.txt.gz",
"textio-test-data.content-type-text-content-encoding-none.1k.txt.gz",
"textio-test-data.default.1k.txt",
"textio-test-data.default.1k.txt.gz",
"textio-test-data.gzip-local.1k.txt.gz",
]

@parameterized.expand([
(gzip_test_files[0], CompressionTypes.UNCOMPRESSED, NotImplementedError),
(gzip_test_files[0], CompressionTypes.GZIP, NotImplementedError),
(gzip_test_files[0], CompressionTypes.AUTO, NotImplementedError),
(gzip_test_files[1], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[1], CompressionTypes.GZIP, None),
(gzip_test_files[1], CompressionTypes.AUTO, None),
(gzip_test_files[2], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[2], CompressionTypes.GZIP, None),
(gzip_test_files[2], CompressionTypes.AUTO, None),
(gzip_test_files[3], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[3], CompressionTypes.GZIP, None),
(gzip_test_files[3], CompressionTypes.AUTO, None),
(gzip_test_files[4], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[4], CompressionTypes.GZIP, None),
(gzip_test_files[4], CompressionTypes.AUTO, None),
(gzip_test_files[5], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[5], CompressionTypes.GZIP, None),
(gzip_test_files[5], CompressionTypes.AUTO, None),
(gzip_test_files[6], CompressionTypes.UNCOMPRESSED, None),
(gzip_test_files[6], CompressionTypes.GZIP, zlib.error),
(gzip_test_files[6], CompressionTypes.AUTO, None),
(gzip_test_files[7], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[7], CompressionTypes.GZIP, None),
(gzip_test_files[7], CompressionTypes.AUTO, None),
(gzip_test_files[8], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[8], CompressionTypes.GZIP, None),
(gzip_test_files[8], CompressionTypes.AUTO, None),
])
@unittest.skipIf(NotFound is None, 'GCP dependencies are not installed')
def test_read_gzip_file(self, file_name, compression_type, exception):
p = TestPipeline(runner="Direct", is_integration_test=True)
r = (
p
| Create([f"{GcsIOReadGzipTest.gcs_path_prefix}{file_name}"])
| "Read File from GCS" >>
ReadAllFromText(compression_type=compression_type)
| Count.Globally())
assert_that(r, equal_to([1000]))

if exception is None:
result = p.run()
result.wait_until_finish()
else:
with self.assertRaises(exception):
result = p.run()
result.wait_until_finish()


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
5 changes: 5 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ def __init__(
self._fail_when_getting_metadata = fail_when_getting_metadata
self._fail_when_reading = fail_when_reading
self.generation = random.randint(0, (1 << 63) - 1)
self.content_encoding = None
self.content_type = None

def reload(self):
pass

def delete(self):
self.bucket.delete_blob(self.name)
Expand Down
Loading