Skip to content

Commit

Permalink
Fix TextIO not fully reading a GCS file when decompressive transcodin…
Browse files Browse the repository at this point in the history
…g happens (#33384)

* Add check_splittability in filesystems.

For GCS, we determine the splittability based on whether the file
meets decompressive transcoding criteria.

When decompressive transcoding occurs, the size returned from
metadata (gzip file size) does not match the size of the content
returned (original data). In this case, we set the source to
unsplittable to ensure all its content is read.

* Rename the function and remove unused one.

* Revert the previous changes and use raw_download to retrieve raw data in gcs client lib

* Raise exception for doubly compressed gcs object. Apply yapf.

* Add some comments.

* Add integration tests and fix unit test failure.

* Fix lints

* More lints

* Add a one-line description to CHANGES.md
  • Loading branch information
shunping authored Jan 9, 2025
1 parent d50cc15 commit 1392cc8
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@

## Bugfixes

* Fix data loss issues when reading gzipped files with TextIO (Python) ([#18390](https://github.com/apache/beam/issues/18390), [#31040](https://github.com/apache/beam/issues/31040)).
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231))

Expand Down
25 changes: 23 additions & 2 deletions sdks/python/apache_beam/io/gcp/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,8 +589,29 @@ def __init__(
blob,
chunk_size=DEFAULT_READ_BUFFER_SIZE,
enable_read_bucket_metric=False,
retry=DEFAULT_RETRY):
super().__init__(blob, chunk_size=chunk_size, retry=retry)
retry=DEFAULT_RETRY,
raw_download=True):
# By default, we always request to retrieve raw data from GCS even if the
# object meets the criteria of decompressive transcoding
# (https://cloud.google.com/storage/docs/transcoding).
super().__init__(
blob, chunk_size=chunk_size, retry=retry, raw_download=raw_download)
# TODO: Remove this after
# https://github.com/googleapis/python-storage/issues/1406 is fixed.
# As a workaround, we manually trigger a reload here. Otherwise, an internal
# call of reader.seek() will cause an exception if raw_download is set
# when initializing BlobReader(),
blob.reload()

# TODO: Currently there is a bug in GCS server side when a client requests
# a file with "content-encoding=gzip" and "content-type=application/gzip" or
# "content-type=application/x-gzip", which will lead to infinite loop.
# We skip the support of this type of files until the GCS bug is fixed.
# Internal bug id: 203845981.
if (blob.content_encoding == "gzip" and
blob.content_type in ["application/gzip", "application/x-gzip"]):
raise NotImplementedError("Doubly compressed files not supported.")

self.enable_read_bucket_metric = enable_read_bucket_metric
self.mode = "r"

Expand Down
71 changes: 71 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,22 @@
import logging
import unittest
import uuid
import zlib

import mock
import pytest
from parameterized import parameterized
from parameterized import parameterized_class

from apache_beam import Create
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.textio import ReadAllFromText
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.combiners import Count

try:
from apache_beam.io.gcp import gcsio
Expand Down Expand Up @@ -230,6 +238,69 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name):
self.assertIsNone(self.gcsio.get_bucket(overridden_bucket_name))


class GcsIOReadGzipTest(unittest.TestCase):
gcs_path_prefix = "gs://apache-beam-samples/textio/"
gzip_test_files = [
"textio-test-data.content-type-gzip-content-encoding-gzip.1k.txt.gz",
"textio-test-data.content-type-gzip-content-encoding-none.1k.txt.gz",
"textio-test-data.content-type-none-content-encoding-gzip.1k.txt.gz",
"textio-test-data.content-type-none-content-encoding-none.1k.txt.gz",
"textio-test-data.content-type-text-content-encoding-gzip.1k.txt.gz",
"textio-test-data.content-type-text-content-encoding-none.1k.txt.gz",
"textio-test-data.default.1k.txt",
"textio-test-data.default.1k.txt.gz",
"textio-test-data.gzip-local.1k.txt.gz",
]

@parameterized.expand([
(gzip_test_files[0], CompressionTypes.UNCOMPRESSED, NotImplementedError),
(gzip_test_files[0], CompressionTypes.GZIP, NotImplementedError),
(gzip_test_files[0], CompressionTypes.AUTO, NotImplementedError),
(gzip_test_files[1], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[1], CompressionTypes.GZIP, None),
(gzip_test_files[1], CompressionTypes.AUTO, None),
(gzip_test_files[2], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[2], CompressionTypes.GZIP, None),
(gzip_test_files[2], CompressionTypes.AUTO, None),
(gzip_test_files[3], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[3], CompressionTypes.GZIP, None),
(gzip_test_files[3], CompressionTypes.AUTO, None),
(gzip_test_files[4], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[4], CompressionTypes.GZIP, None),
(gzip_test_files[4], CompressionTypes.AUTO, None),
(gzip_test_files[5], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[5], CompressionTypes.GZIP, None),
(gzip_test_files[5], CompressionTypes.AUTO, None),
(gzip_test_files[6], CompressionTypes.UNCOMPRESSED, None),
(gzip_test_files[6], CompressionTypes.GZIP, zlib.error),
(gzip_test_files[6], CompressionTypes.AUTO, None),
(gzip_test_files[7], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[7], CompressionTypes.GZIP, None),
(gzip_test_files[7], CompressionTypes.AUTO, None),
(gzip_test_files[8], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[8], CompressionTypes.GZIP, None),
(gzip_test_files[8], CompressionTypes.AUTO, None),
])
@unittest.skipIf(NotFound is None, 'GCP dependencies are not installed')
def test_read_gzip_file(self, file_name, compression_type, exception):
p = TestPipeline(runner="Direct", is_integration_test=True)
r = (
p
| Create([f"{GcsIOReadGzipTest.gcs_path_prefix}{file_name}"])
| "Read File from GCS" >>
ReadAllFromText(compression_type=compression_type)
| Count.Globally())
assert_that(r, equal_to([1000]))

if exception is None:
result = p.run()
result.wait_until_finish()
else:
with self.assertRaises(exception):
result = p.run()
result.wait_until_finish()


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
5 changes: 5 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ def __init__(
self._fail_when_getting_metadata = fail_when_getting_metadata
self._fail_when_reading = fail_when_reading
self.generation = random.randint(0, (1 << 63) - 1)
self.content_encoding = None
self.content_type = None

def reload(self):
pass

def delete(self):
self.bucket.delete_blob(self.name)
Expand Down

0 comments on commit 1392cc8

Please sign in to comment.