From 1392cc8c211de93d072ba6e464bc2a7e167a1230 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 9 Jan 2025 13:54:08 -0500 Subject: [PATCH] Fix TextIO not fully reading a GCS file when decompressive transcoding happens (#33384) * Add check_splittability in filesystems. For GCS, we determine the splittability based on whether the file meets decompressive transcoding criteria. When decompressive transcoding occurs, the size returned from metadata (gzip file size) does not match the size of the content returned (original data). In this case, we set the source to unsplittable to ensure all its content is read. * Rename the function and remove unused one. * Revert the previous changes and use raw_download to retrieve raw data in gcs client lib * Raise exception for doubly compressed gcs object. Apply yapf. * Add some comments. * Add integration tests and fix unit test failure. * Fix lints * More lints * Add a one-line description to CHANGES.md --- CHANGES.md | 1 + sdks/python/apache_beam/io/gcp/gcsio.py | 25 ++++++- .../io/gcp/gcsio_integration_test.py | 71 +++++++++++++++++++ sdks/python/apache_beam/io/gcp/gcsio_test.py | 5 ++ 4 files changed, 100 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 2333c835e038..1f8f13305c83 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -80,6 +80,7 @@ ## Bugfixes +* Fix data loss issues when reading gzipped files with TextIO (Python) ([#18390](https://github.com/apache/beam/issues/18390), [#31040](https://github.com/apache/beam/issues/31040)). * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231)) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 8056de51f43f..e0dcffa86dff 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -589,8 +589,29 @@ def __init__( blob, chunk_size=DEFAULT_READ_BUFFER_SIZE, enable_read_bucket_metric=False, - retry=DEFAULT_RETRY): - super().__init__(blob, chunk_size=chunk_size, retry=retry) + retry=DEFAULT_RETRY, + raw_download=True): + # By default, we always request to retrieve raw data from GCS even if the + # object meets the criteria of decompressive transcoding + # (https://cloud.google.com/storage/docs/transcoding). + super().__init__( + blob, chunk_size=chunk_size, retry=retry, raw_download=raw_download) + # TODO: Remove this after + # https://github.com/googleapis/python-storage/issues/1406 is fixed. + # As a workaround, we manually trigger a reload here. Otherwise, an internal + # call of reader.seek() will cause an exception if raw_download is set + # when initializing BlobReader(), + blob.reload() + + # TODO: Currently there is a bug in GCS server side when a client requests + # a file with "content-encoding=gzip" and "content-type=application/gzip" or + # "content-type=application/x-gzip", which will lead to infinite loop. + # We skip the support of this type of files until the GCS bug is fixed. + # Internal bug id: 203845981. + if (blob.content_encoding == "gzip" and + blob.content_type in ["application/gzip", "application/x-gzip"]): + raise NotImplementedError("Doubly compressed files not supported.") + self.enable_read_bucket_metric = enable_read_bucket_metric self.mode = "r" diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py index 07a5fb5df553..2ce355060bb4 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py @@ -31,14 +31,22 @@ import logging import unittest import uuid +import zlib import mock import pytest +from parameterized import parameterized from parameterized import parameterized_class +from apache_beam import Create +from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystems import FileSystems +from apache_beam.io.textio import ReadAllFromText from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.transforms.combiners import Count try: from apache_beam.io.gcp import gcsio @@ -230,6 +238,69 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name): self.assertIsNone(self.gcsio.get_bucket(overridden_bucket_name)) +class GcsIOReadGzipTest(unittest.TestCase): + gcs_path_prefix = "gs://apache-beam-samples/textio/" + gzip_test_files = [ + "textio-test-data.content-type-gzip-content-encoding-gzip.1k.txt.gz", + "textio-test-data.content-type-gzip-content-encoding-none.1k.txt.gz", + "textio-test-data.content-type-none-content-encoding-gzip.1k.txt.gz", + "textio-test-data.content-type-none-content-encoding-none.1k.txt.gz", + "textio-test-data.content-type-text-content-encoding-gzip.1k.txt.gz", + "textio-test-data.content-type-text-content-encoding-none.1k.txt.gz", + "textio-test-data.default.1k.txt", + "textio-test-data.default.1k.txt.gz", + "textio-test-data.gzip-local.1k.txt.gz", + ] + + @parameterized.expand([ + (gzip_test_files[0], CompressionTypes.UNCOMPRESSED, NotImplementedError), + (gzip_test_files[0], CompressionTypes.GZIP, NotImplementedError), + (gzip_test_files[0], CompressionTypes.AUTO, NotImplementedError), + (gzip_test_files[1], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[1], CompressionTypes.GZIP, None), + (gzip_test_files[1], CompressionTypes.AUTO, None), + (gzip_test_files[2], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[2], CompressionTypes.GZIP, None), + (gzip_test_files[2], CompressionTypes.AUTO, None), + (gzip_test_files[3], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[3], CompressionTypes.GZIP, None), + (gzip_test_files[3], CompressionTypes.AUTO, None), + (gzip_test_files[4], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[4], CompressionTypes.GZIP, None), + (gzip_test_files[4], CompressionTypes.AUTO, None), + (gzip_test_files[5], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[5], CompressionTypes.GZIP, None), + (gzip_test_files[5], CompressionTypes.AUTO, None), + (gzip_test_files[6], CompressionTypes.UNCOMPRESSED, None), + (gzip_test_files[6], CompressionTypes.GZIP, zlib.error), + (gzip_test_files[6], CompressionTypes.AUTO, None), + (gzip_test_files[7], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[7], CompressionTypes.GZIP, None), + (gzip_test_files[7], CompressionTypes.AUTO, None), + (gzip_test_files[8], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[8], CompressionTypes.GZIP, None), + (gzip_test_files[8], CompressionTypes.AUTO, None), + ]) + @unittest.skipIf(NotFound is None, 'GCP dependencies are not installed') + def test_read_gzip_file(self, file_name, compression_type, exception): + p = TestPipeline(runner="Direct", is_integration_test=True) + r = ( + p + | Create([f"{GcsIOReadGzipTest.gcs_path_prefix}{file_name}"]) + | "Read File from GCS" >> + ReadAllFromText(compression_type=compression_type) + | Count.Globally()) + assert_that(r, equal_to([1000])) + + if exception is None: + result = p.run() + result.wait_until_finish() + else: + with self.assertRaises(exception): + result = p.run() + result.wait_until_finish() + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index 19df15dcf7fa..7b79030b4b71 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -178,6 +178,11 @@ def __init__( self._fail_when_getting_metadata = fail_when_getting_metadata self._fail_when_reading = fail_when_reading self.generation = random.randint(0, (1 << 63) - 1) + self.content_encoding = None + self.content_type = None + + def reload(self): + pass def delete(self): self.bucket.delete_blob(self.name)