From 40f9a6d82d9dd875cfc7151c3b6ad5eb9a55d19e Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sun, 15 Dec 2024 00:00:32 -0500 Subject: [PATCH 1/9] Add check_splittability in filesystems. For GCS, we determine the splittability based on whether the file meets decompressive transcoding criteria. When decompressive transcoding occurs, the size returned from metadata (gzip file size) does not match the size of the content returned (original data). In this case, we set the source to unsplittable to ensure all its content is read. --- sdks/python/apache_beam/io/filebasedsource.py | 8 ++++++++ sdks/python/apache_beam/io/filesystem.py | 3 +++ sdks/python/apache_beam/io/filesystems.py | 5 +++++ sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 14 ++++++++++++++ sdks/python/apache_beam/io/gcp/gcsio.py | 2 ++ 5 files changed, 32 insertions(+) diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index a02bc6de32c7..6dc35e6b6b66 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -259,7 +259,15 @@ def splittable(self): return self._splittable +def _is_decompressive_transcoding_enabled(file_path): + + return True + + def _determine_splittability_from_compression_type(file_path, compression_type): + if not FileSystems.check_splittability(file_path): + return False + if compression_type == CompressionTypes.AUTO: compression_type = CompressionTypes.detect_compression_type(file_path) diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 840fdf3309e7..de9fbdbe286d 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -945,3 +945,6 @@ def report_lineage(self, path, unused_lineage, level=None): Unless override by FileSystem implementations, default to no-op. """ pass + + def check_splittability(self, path): + return True diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py index 87f45f3308ee..e5c31387c869 100644 --- a/sdks/python/apache_beam/io/filesystems.py +++ b/sdks/python/apache_beam/io/filesystems.py @@ -415,3 +415,8 @@ def report_sink_lineage(path, level=None): """ filesystem = FileSystems.get_filesystem(path) filesystem.report_lineage(path, Lineage.sinks(), level=level) + + @staticmethod + def check_splittability(path): + filesystem = FileSystems.get_filesystem(path) + return filesystem.check_splittability(path) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 325f70ddfd96..7ace28dcdc56 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -377,3 +377,17 @@ def report_lineage(self, path, lineage, level=None): # bucket only components = components[:-1] lineage.add('gcs', *components) + + def check_splittability(self, path): + try: + file_metadata = self._gcsIO()._status(path) + if file_metadata.get('content_encoding', None) == 'gzip': + # When decompressive transcoding is enabled, the file stored in GCS is a + # gzip file but when it is read, the returned data is uncompressed. + # However, the blob size is still the gzipped file size, which does not + # match the size of the returned data. + # Here we disable splittable so later it will use [0, inf] as in range + # tracker. + return False + except Exception as e: # pylint: disable=broad-except + raise BeamIOError("Metadata operation failed", {path: e}) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 8056de51f43f..bfa7b1b3d266 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -475,6 +475,8 @@ def _status(self, path): file_status['updated'] = self._updated_to_seconds(gcs_object.updated) if hasattr(gcs_object, 'size'): file_status['size'] = gcs_object.size + if hasattr(gcs_object, 'content_encoding'): + file_status['content_encoding'] = gcs_object.content_encoding return file_status def _gcs_object(self, path): From 7be3418bf9d94a9dcbaf26958e5821c3dd5af87e Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 27 Dec 2024 15:54:40 -0500 Subject: [PATCH 2/9] Rename the function and remove unused one. --- sdks/python/apache_beam/io/filebasedsource.py | 8 +------- sdks/python/apache_beam/io/filesystems.py | 4 ++-- sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 3 ++- 3 files changed, 5 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index 6dc35e6b6b66..a2b7adc6c440 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -258,14 +258,8 @@ def read_records(self, file_name, offset_range_tracker): def splittable(self): return self._splittable - -def _is_decompressive_transcoding_enabled(file_path): - - return True - - def _determine_splittability_from_compression_type(file_path, compression_type): - if not FileSystems.check_splittability(file_path): + if not FileSystems.is_file_splittable(file_path): return False if compression_type == CompressionTypes.AUTO: diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py index e5c31387c869..209d920dcb46 100644 --- a/sdks/python/apache_beam/io/filesystems.py +++ b/sdks/python/apache_beam/io/filesystems.py @@ -417,6 +417,6 @@ def report_sink_lineage(path, level=None): filesystem.report_lineage(path, Lineage.sinks(), level=level) @staticmethod - def check_splittability(path): + def is_file_splittable(path): filesystem = FileSystems.get_filesystem(path) - return filesystem.check_splittability(path) + return filesystem.is_file_splittable(path) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 7ace28dcdc56..3c43bc3aca50 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -378,7 +378,8 @@ def report_lineage(self, path, lineage, level=None): components = components[:-1] lineage.add('gcs', *components) - def check_splittability(self, path): + def is_file_splittable(self, path): + return True try: file_metadata = self._gcsIO()._status(path) if file_metadata.get('content_encoding', None) == 'gzip': From 06b42314a090d84fe03fc6e501cac373d8081475 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 3 Jan 2025 21:18:23 -0500 Subject: [PATCH 3/9] Revert the previous changes and use raw_download to retrieve raw data in gcs client lib --- sdks/python/apache_beam/io/filebasedsource.py | 4 +--- sdks/python/apache_beam/io/filesystem.py | 3 --- sdks/python/apache_beam/io/filesystems.py | 5 ----- sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 15 --------------- sdks/python/apache_beam/io/gcp/gcsio.py | 12 ++++++++++-- 5 files changed, 11 insertions(+), 28 deletions(-) diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index a2b7adc6c440..a02bc6de32c7 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -258,10 +258,8 @@ def read_records(self, file_name, offset_range_tracker): def splittable(self): return self._splittable -def _determine_splittability_from_compression_type(file_path, compression_type): - if not FileSystems.is_file_splittable(file_path): - return False +def _determine_splittability_from_compression_type(file_path, compression_type): if compression_type == CompressionTypes.AUTO: compression_type = CompressionTypes.detect_compression_type(file_path) diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index de9fbdbe286d..840fdf3309e7 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -945,6 +945,3 @@ def report_lineage(self, path, unused_lineage, level=None): Unless override by FileSystem implementations, default to no-op. """ pass - - def check_splittability(self, path): - return True diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py index 209d920dcb46..87f45f3308ee 100644 --- a/sdks/python/apache_beam/io/filesystems.py +++ b/sdks/python/apache_beam/io/filesystems.py @@ -415,8 +415,3 @@ def report_sink_lineage(path, level=None): """ filesystem = FileSystems.get_filesystem(path) filesystem.report_lineage(path, Lineage.sinks(), level=level) - - @staticmethod - def is_file_splittable(path): - filesystem = FileSystems.get_filesystem(path) - return filesystem.is_file_splittable(path) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 3c43bc3aca50..325f70ddfd96 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -377,18 +377,3 @@ def report_lineage(self, path, lineage, level=None): # bucket only components = components[:-1] lineage.add('gcs', *components) - - def is_file_splittable(self, path): - return True - try: - file_metadata = self._gcsIO()._status(path) - if file_metadata.get('content_encoding', None) == 'gzip': - # When decompressive transcoding is enabled, the file stored in GCS is a - # gzip file but when it is read, the returned data is uncompressed. - # However, the blob size is still the gzipped file size, which does not - # match the size of the returned data. - # Here we disable splittable so later it will use [0, inf] as in range - # tracker. - return False - except Exception as e: # pylint: disable=broad-except - raise BeamIOError("Metadata operation failed", {path: e}) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index bfa7b1b3d266..2114b41f53cc 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -591,11 +591,19 @@ def __init__( blob, chunk_size=DEFAULT_READ_BUFFER_SIZE, enable_read_bucket_metric=False, - retry=DEFAULT_RETRY): - super().__init__(blob, chunk_size=chunk_size, retry=retry) + retry=DEFAULT_RETRY, + raw_download=True): + super().__init__(blob, chunk_size=chunk_size, retry=retry, raw_download=raw_download) + # TODO: Remove this after + # https://github.com/googleapis/python-storage/issues/1406 is fixed. + # As a workaround, we manually trigger a reload here. Otherwise, an internal + # call of reader.seek() will cause an exception if raw_download is set + # when initializing BlobReader(), + blob.reload() self.enable_read_bucket_metric = enable_read_bucket_metric self.mode = "r" + def read(self, size=-1): bytesRead = super().read(size) if self.enable_read_bucket_metric: From d1052017cd53cf679cba454d748ed7eb79978818 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 3 Jan 2025 21:40:59 -0500 Subject: [PATCH 4/9] Raise exception for doubly compressed gcs object. Apply yapf. --- sdks/python/apache_beam/io/gcp/gcsio.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 2114b41f53cc..032346c388d9 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -475,8 +475,6 @@ def _status(self, path): file_status['updated'] = self._updated_to_seconds(gcs_object.updated) if hasattr(gcs_object, 'size'): file_status['size'] = gcs_object.size - if hasattr(gcs_object, 'content_encoding'): - file_status['content_encoding'] = gcs_object.content_encoding return file_status def _gcs_object(self, path): @@ -593,17 +591,27 @@ def __init__( enable_read_bucket_metric=False, retry=DEFAULT_RETRY, raw_download=True): - super().__init__(blob, chunk_size=chunk_size, retry=retry, raw_download=raw_download) + super().__init__( + blob, chunk_size=chunk_size, retry=retry, raw_download=raw_download) # TODO: Remove this after # https://github.com/googleapis/python-storage/issues/1406 is fixed. # As a workaround, we manually trigger a reload here. Otherwise, an internal # call of reader.seek() will cause an exception if raw_download is set # when initializing BlobReader(), blob.reload() + + # TODO: Currently there is a bug in GCS server side when a client request + # a file with "content-encoding=gzip" and "content-type=application/gzip" or + # "content-type=application/x-gzip", which will lead to infinite loop. + # We skip the support of this type of file until GCS bug is fixed. + # Internal bug id: 203845981. + if (blob.content_encoding == "gzip" and + blob.content_type in ["application/gzip", "application/x-gzip"]): + raise NotImplementedError("Doubly compressed files not supported.") + self.enable_read_bucket_metric = enable_read_bucket_metric self.mode = "r" - def read(self, size=-1): bytesRead = super().read(size) if self.enable_read_bucket_metric: From e77904594d4e3794eec0df18a823b9e1351b2514 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 3 Jan 2025 21:46:19 -0500 Subject: [PATCH 5/9] Add some comments. --- sdks/python/apache_beam/io/gcp/gcsio.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 032346c388d9..e0dcffa86dff 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -591,6 +591,9 @@ def __init__( enable_read_bucket_metric=False, retry=DEFAULT_RETRY, raw_download=True): + # By default, we always request to retrieve raw data from GCS even if the + # object meets the criteria of decompressive transcoding + # (https://cloud.google.com/storage/docs/transcoding). super().__init__( blob, chunk_size=chunk_size, retry=retry, raw_download=raw_download) # TODO: Remove this after @@ -600,10 +603,10 @@ def __init__( # when initializing BlobReader(), blob.reload() - # TODO: Currently there is a bug in GCS server side when a client request + # TODO: Currently there is a bug in GCS server side when a client requests # a file with "content-encoding=gzip" and "content-type=application/gzip" or # "content-type=application/x-gzip", which will lead to infinite loop. - # We skip the support of this type of file until GCS bug is fixed. + # We skip the support of this type of files until the GCS bug is fixed. # Internal bug id: 203845981. if (blob.content_encoding == "gzip" and blob.content_type in ["application/gzip", "application/x-gzip"]): From 72b2bcbd62b1b42388055440bf564a9860fac399 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 3 Jan 2025 23:14:00 -0500 Subject: [PATCH 6/9] Add integration tests and fix unit test failure. --- .../io/gcp/gcsio_integration_test.py | 70 +++++++++++++++++++ sdks/python/apache_beam/io/gcp/gcsio_test.py | 5 ++ 2 files changed, 75 insertions(+) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py index 07a5fb5df553..6161c4803199 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py @@ -31,14 +31,22 @@ import logging import unittest import uuid +import zlib import mock import pytest +from parameterized import parameterized from parameterized import parameterized_class +from apache_beam import Create +from apache_beam.io.textio import ReadAllFromText +from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystems import FileSystems from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.transforms.combiners import Count try: from apache_beam.io.gcp import gcsio @@ -230,6 +238,68 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name): self.assertIsNone(self.gcsio.get_bucket(overridden_bucket_name)) +class GcsIOReadGzipTest(unittest.TestCase): + gzip_test_files = [ + "gs://apache-beam-samples/textio/textio-test-data.content-type-gzip-content-encoding-gzip.1k.txt.gz", + "gs://apache-beam-samples/textio/textio-test-data.content-type-gzip-content-encoding-none.1k.txt.gz", + "gs://apache-beam-samples/textio/textio-test-data.content-type-none-content-encoding-gzip.1k.txt.gz", + "gs://apache-beam-samples/textio/textio-test-data.content-type-none-content-encoding-none.1k.txt.gz", + "gs://apache-beam-samples/textio/textio-test-data.content-type-text-content-encoding-gzip.1k.txt.gz", + "gs://apache-beam-samples/textio/textio-test-data.content-type-text-content-encoding-none.1k.txt.gz", + "gs://apache-beam-samples/textio/textio-test-data.default.1k.txt", + "gs://apache-beam-samples/textio/textio-test-data.default.1k.txt.gz", + "gs://apache-beam-samples/textio/textio-test-data.gzip-local.1k.txt.gz", + ] + + @parameterized.expand([ + (gzip_test_files[0], CompressionTypes.UNCOMPRESSED, NotImplementedError), + (gzip_test_files[0], CompressionTypes.GZIP, NotImplementedError), + (gzip_test_files[0], CompressionTypes.AUTO, NotImplementedError), + (gzip_test_files[1], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[1], CompressionTypes.GZIP, None), + (gzip_test_files[1], CompressionTypes.AUTO, None), + (gzip_test_files[2], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[2], CompressionTypes.GZIP, None), + (gzip_test_files[2], CompressionTypes.AUTO, None), + (gzip_test_files[3], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[3], CompressionTypes.GZIP, None), + (gzip_test_files[3], CompressionTypes.AUTO, None), + (gzip_test_files[4], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[4], CompressionTypes.GZIP, None), + (gzip_test_files[4], CompressionTypes.AUTO, None), + (gzip_test_files[5], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[5], CompressionTypes.GZIP, None), + (gzip_test_files[5], CompressionTypes.AUTO, None), + (gzip_test_files[6], CompressionTypes.UNCOMPRESSED, None), + (gzip_test_files[6], CompressionTypes.GZIP, zlib.error), + (gzip_test_files[6], CompressionTypes.AUTO, None), + (gzip_test_files[7], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[7], CompressionTypes.GZIP, None), + (gzip_test_files[7], CompressionTypes.AUTO, None), + (gzip_test_files[8], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[8], CompressionTypes.GZIP, None), + (gzip_test_files[8], CompressionTypes.AUTO, None), + ]) + @unittest.skipIf(NotFound is None, 'GCP dependencies are not installed') + def test_read_gzip_file(self, file_name, compression_type, exception): + p = TestPipeline(runner="Direct", is_integration_test=True) + r = ( + p + | Create([file_name]) + | "Read File from GCS" >> + ReadAllFromText(compression_type=compression_type) + | Count.Globally()) + assert_that(r, equal_to([1000])) + + if exception is None: + result = p.run() + result.wait_until_finish() + else: + with self.assertRaises(exception): + result = p.run() + result.wait_until_finish() + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index 19df15dcf7fa..7b79030b4b71 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -178,6 +178,11 @@ def __init__( self._fail_when_getting_metadata = fail_when_getting_metadata self._fail_when_reading = fail_when_reading self.generation = random.randint(0, (1 << 63) - 1) + self.content_encoding = None + self.content_type = None + + def reload(self): + pass def delete(self): self.bucket.delete_blob(self.name) From 853d63315d65f9042559fd9a69cb05b5e9686f68 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 3 Jan 2025 23:53:03 -0500 Subject: [PATCH 7/9] Fix lints --- .../io/gcp/gcsio_integration_test.py | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py index 6161c4803199..d8e2d6ee71a9 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py @@ -239,16 +239,17 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name): class GcsIOReadGzipTest(unittest.TestCase): + gcs_path_prefix = "gs://apache-beam-samples/textio/" gzip_test_files = [ - "gs://apache-beam-samples/textio/textio-test-data.content-type-gzip-content-encoding-gzip.1k.txt.gz", - "gs://apache-beam-samples/textio/textio-test-data.content-type-gzip-content-encoding-none.1k.txt.gz", - "gs://apache-beam-samples/textio/textio-test-data.content-type-none-content-encoding-gzip.1k.txt.gz", - "gs://apache-beam-samples/textio/textio-test-data.content-type-none-content-encoding-none.1k.txt.gz", - "gs://apache-beam-samples/textio/textio-test-data.content-type-text-content-encoding-gzip.1k.txt.gz", - "gs://apache-beam-samples/textio/textio-test-data.content-type-text-content-encoding-none.1k.txt.gz", - "gs://apache-beam-samples/textio/textio-test-data.default.1k.txt", - "gs://apache-beam-samples/textio/textio-test-data.default.1k.txt.gz", - "gs://apache-beam-samples/textio/textio-test-data.gzip-local.1k.txt.gz", + "textio-test-data.content-type-gzip-content-encoding-gzip.1k.txt.gz", + "textio-test-data.content-type-gzip-content-encoding-none.1k.txt.gz", + "textio-test-data.content-type-none-content-encoding-gzip.1k.txt.gz", + "textio-test-data.content-type-none-content-encoding-none.1k.txt.gz", + "textio-test-data.content-type-text-content-encoding-gzip.1k.txt.gz", + "textio-test-data.content-type-text-content-encoding-none.1k.txt.gz", + "textio-test-data.default.1k.txt", + "textio-test-data.default.1k.txt.gz", + "textio-test-data.gzip-local.1k.txt.gz", ] @parameterized.expand([ @@ -285,7 +286,7 @@ def test_read_gzip_file(self, file_name, compression_type, exception): p = TestPipeline(runner="Direct", is_integration_test=True) r = ( p - | Create([file_name]) + | Create([f"{GcsIOReadGzipTest.gcs_path_prefix}{file_name}"]) | "Read File from GCS" >> ReadAllFromText(compression_type=compression_type) | Count.Globally()) From e89b793c16cd02905e9827a3c235b1a44e6a1fd1 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 4 Jan 2025 00:22:05 -0500 Subject: [PATCH 8/9] More lints --- sdks/python/apache_beam/io/gcp/gcsio_integration_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py index d8e2d6ee71a9..2ce355060bb4 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py @@ -39,9 +39,9 @@ from parameterized import parameterized_class from apache_beam import Create -from apache_beam.io.textio import ReadAllFromText from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystems import FileSystems +from apache_beam.io.textio import ReadAllFromText from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that From 78b30d96236b7de18044012aea29a7e17f473cb2 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 9 Jan 2025 12:38:13 -0500 Subject: [PATCH 9/9] Add a one-line description to CHANGES.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index d5cbb76fb3d5..967cd15c25dd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -78,6 +78,7 @@ ## Bugfixes +* Fix data loss issues when reading gzipped files with TextIO (Python) ([#18390](https://github.com/apache/beam/issues/18390), [#31040](https://github.com/apache/beam/issues/31040)). * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Security Fixes