Skip to content

Commit

Permalink
Add integration tests and fix unit test failure.
Browse files Browse the repository at this point in the history
  • Loading branch information
shunping committed Jan 4, 2025
1 parent e779045 commit 72b2bcb
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 0 deletions.
70 changes: 70 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,22 @@
import logging
import unittest
import uuid
import zlib

import mock
import pytest
from parameterized import parameterized
from parameterized import parameterized_class

from apache_beam import Create
from apache_beam.io.textio import ReadAllFromText
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystems import FileSystems
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.combiners import Count

try:
from apache_beam.io.gcp import gcsio
Expand Down Expand Up @@ -230,6 +238,68 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name):
self.assertIsNone(self.gcsio.get_bucket(overridden_bucket_name))


class GcsIOReadGzipTest(unittest.TestCase):
gzip_test_files = [
"gs://apache-beam-samples/textio/textio-test-data.content-type-gzip-content-encoding-gzip.1k.txt.gz",
"gs://apache-beam-samples/textio/textio-test-data.content-type-gzip-content-encoding-none.1k.txt.gz",
"gs://apache-beam-samples/textio/textio-test-data.content-type-none-content-encoding-gzip.1k.txt.gz",
"gs://apache-beam-samples/textio/textio-test-data.content-type-none-content-encoding-none.1k.txt.gz",
"gs://apache-beam-samples/textio/textio-test-data.content-type-text-content-encoding-gzip.1k.txt.gz",
"gs://apache-beam-samples/textio/textio-test-data.content-type-text-content-encoding-none.1k.txt.gz",
"gs://apache-beam-samples/textio/textio-test-data.default.1k.txt",
"gs://apache-beam-samples/textio/textio-test-data.default.1k.txt.gz",
"gs://apache-beam-samples/textio/textio-test-data.gzip-local.1k.txt.gz",
]

@parameterized.expand([
(gzip_test_files[0], CompressionTypes.UNCOMPRESSED, NotImplementedError),
(gzip_test_files[0], CompressionTypes.GZIP, NotImplementedError),
(gzip_test_files[0], CompressionTypes.AUTO, NotImplementedError),
(gzip_test_files[1], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[1], CompressionTypes.GZIP, None),
(gzip_test_files[1], CompressionTypes.AUTO, None),
(gzip_test_files[2], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[2], CompressionTypes.GZIP, None),
(gzip_test_files[2], CompressionTypes.AUTO, None),
(gzip_test_files[3], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[3], CompressionTypes.GZIP, None),
(gzip_test_files[3], CompressionTypes.AUTO, None),
(gzip_test_files[4], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[4], CompressionTypes.GZIP, None),
(gzip_test_files[4], CompressionTypes.AUTO, None),
(gzip_test_files[5], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[5], CompressionTypes.GZIP, None),
(gzip_test_files[5], CompressionTypes.AUTO, None),
(gzip_test_files[6], CompressionTypes.UNCOMPRESSED, None),
(gzip_test_files[6], CompressionTypes.GZIP, zlib.error),
(gzip_test_files[6], CompressionTypes.AUTO, None),
(gzip_test_files[7], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[7], CompressionTypes.GZIP, None),
(gzip_test_files[7], CompressionTypes.AUTO, None),
(gzip_test_files[8], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[8], CompressionTypes.GZIP, None),
(gzip_test_files[8], CompressionTypes.AUTO, None),
])
@unittest.skipIf(NotFound is None, 'GCP dependencies are not installed')
def test_read_gzip_file(self, file_name, compression_type, exception):
p = TestPipeline(runner="Direct", is_integration_test=True)
r = (
p
| Create([file_name])
| "Read File from GCS" >>
ReadAllFromText(compression_type=compression_type)
| Count.Globally())
assert_that(r, equal_to([1000]))

if exception is None:
result = p.run()
result.wait_until_finish()
else:
with self.assertRaises(exception):
result = p.run()
result.wait_until_finish()


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
5 changes: 5 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ def __init__(
self._fail_when_getting_metadata = fail_when_getting_metadata
self._fail_when_reading = fail_when_reading
self.generation = random.randint(0, (1 << 63) - 1)
self.content_encoding = None
self.content_type = None

def reload(self):
pass

def delete(self):
self.bucket.delete_blob(self.name)
Expand Down

0 comments on commit 72b2bcb

Please sign in to comment.