diff --git a/CHANGES.md b/CHANGES.md index d5cbb76fb3d5..967cd15c25dd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -78,6 +78,7 @@ ## Bugfixes +* Fix data loss issues when reading gzipped files with TextIO (Python) ([#18390](https://github.com/apache/beam/issues/18390), [#31040](https://github.com/apache/beam/issues/31040)). * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Security Fixes diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 8056de51f43f..e0dcffa86dff 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -589,8 +589,29 @@ def __init__( blob, chunk_size=DEFAULT_READ_BUFFER_SIZE, enable_read_bucket_metric=False, - retry=DEFAULT_RETRY): - super().__init__(blob, chunk_size=chunk_size, retry=retry) + retry=DEFAULT_RETRY, + raw_download=True): + # By default, we always request to retrieve raw data from GCS even if the + # object meets the criteria of decompressive transcoding + # (https://cloud.google.com/storage/docs/transcoding). + super().__init__( + blob, chunk_size=chunk_size, retry=retry, raw_download=raw_download) + # TODO: Remove this after + # https://github.com/googleapis/python-storage/issues/1406 is fixed. + # As a workaround, we manually trigger a reload here. Otherwise, an internal + # call of reader.seek() will cause an exception if raw_download is set + # when initializing BlobReader(), + blob.reload() + + # TODO: Currently there is a bug in GCS server side when a client requests + # a file with "content-encoding=gzip" and "content-type=application/gzip" or + # "content-type=application/x-gzip", which will lead to infinite loop. + # We skip the support of this type of files until the GCS bug is fixed. + # Internal bug id: 203845981. + if (blob.content_encoding == "gzip" and + blob.content_type in ["application/gzip", "application/x-gzip"]): + raise NotImplementedError("Doubly compressed files not supported.") + self.enable_read_bucket_metric = enable_read_bucket_metric self.mode = "r" diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py index 07a5fb5df553..2ce355060bb4 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py @@ -31,14 +31,22 @@ import logging import unittest import uuid +import zlib import mock import pytest +from parameterized import parameterized from parameterized import parameterized_class +from apache_beam import Create +from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystems import FileSystems +from apache_beam.io.textio import ReadAllFromText from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.transforms.combiners import Count try: from apache_beam.io.gcp import gcsio @@ -230,6 +238,69 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name): self.assertIsNone(self.gcsio.get_bucket(overridden_bucket_name)) +class GcsIOReadGzipTest(unittest.TestCase): + gcs_path_prefix = "gs://apache-beam-samples/textio/" + gzip_test_files = [ + "textio-test-data.content-type-gzip-content-encoding-gzip.1k.txt.gz", + "textio-test-data.content-type-gzip-content-encoding-none.1k.txt.gz", + "textio-test-data.content-type-none-content-encoding-gzip.1k.txt.gz", + "textio-test-data.content-type-none-content-encoding-none.1k.txt.gz", + "textio-test-data.content-type-text-content-encoding-gzip.1k.txt.gz", + "textio-test-data.content-type-text-content-encoding-none.1k.txt.gz", + "textio-test-data.default.1k.txt", + "textio-test-data.default.1k.txt.gz", + "textio-test-data.gzip-local.1k.txt.gz", + ] + + @parameterized.expand([ + (gzip_test_files[0], CompressionTypes.UNCOMPRESSED, NotImplementedError), + (gzip_test_files[0], CompressionTypes.GZIP, NotImplementedError), + (gzip_test_files[0], CompressionTypes.AUTO, NotImplementedError), + (gzip_test_files[1], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[1], CompressionTypes.GZIP, None), + (gzip_test_files[1], CompressionTypes.AUTO, None), + (gzip_test_files[2], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[2], CompressionTypes.GZIP, None), + (gzip_test_files[2], CompressionTypes.AUTO, None), + (gzip_test_files[3], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[3], CompressionTypes.GZIP, None), + (gzip_test_files[3], CompressionTypes.AUTO, None), + (gzip_test_files[4], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[4], CompressionTypes.GZIP, None), + (gzip_test_files[4], CompressionTypes.AUTO, None), + (gzip_test_files[5], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[5], CompressionTypes.GZIP, None), + (gzip_test_files[5], CompressionTypes.AUTO, None), + (gzip_test_files[6], CompressionTypes.UNCOMPRESSED, None), + (gzip_test_files[6], CompressionTypes.GZIP, zlib.error), + (gzip_test_files[6], CompressionTypes.AUTO, None), + (gzip_test_files[7], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[7], CompressionTypes.GZIP, None), + (gzip_test_files[7], CompressionTypes.AUTO, None), + (gzip_test_files[8], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[8], CompressionTypes.GZIP, None), + (gzip_test_files[8], CompressionTypes.AUTO, None), + ]) + @unittest.skipIf(NotFound is None, 'GCP dependencies are not installed') + def test_read_gzip_file(self, file_name, compression_type, exception): + p = TestPipeline(runner="Direct", is_integration_test=True) + r = ( + p + | Create([f"{GcsIOReadGzipTest.gcs_path_prefix}{file_name}"]) + | "Read File from GCS" >> + ReadAllFromText(compression_type=compression_type) + | Count.Globally()) + assert_that(r, equal_to([1000])) + + if exception is None: + result = p.run() + result.wait_until_finish() + else: + with self.assertRaises(exception): + result = p.run() + result.wait_until_finish() + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index 19df15dcf7fa..7b79030b4b71 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -178,6 +178,11 @@ def __init__( self._fail_when_getting_metadata = fail_when_getting_metadata self._fail_when_reading = fail_when_reading self.generation = random.randint(0, (1 << 63) - 1) + self.content_encoding = None + self.content_type = None + + def reload(self): + pass def delete(self): self.bucket.delete_blob(self.name)