From e31b93b2783685480d41b187fdb461f8dffc2734 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 4 Oct 2023 09:07:03 -0400 Subject: [PATCH 01/17] retry get_table on quota errors --- sdks/python/apache_beam/io/gcp/bigquery_tools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 2f9420795288..387a99312561 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -747,7 +747,7 @@ def _insert_all_rows( @retry.with_exponential_backoff( num_retries=MAX_RETRIES, - retry_filter=retry.retry_on_server_errors_and_timeout_filter) + retry_filter=retry.retry_on_server_errors_timeout_or_quota_issues_filter) def get_table(self, project_id, dataset_id, table_id): """Lookup a table's metadata object. From ad464b194c3e7cf702e081914a3126faf2a46fa3 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 4 Oct 2023 17:01:41 -0400 Subject: [PATCH 02/17] add tests --- .../apache_beam/io/gcp/bigquery_test.py | 131 ++++++++++++++++++ 1 file changed, 131 insertions(+) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 7e9c1e634748..3d5099e9a2f2 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -43,6 +43,7 @@ from apache_beam.internal import pickler from apache_beam.internal.gcp.json_value import to_json_value from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp +from apache_beam.io.filesystems import FileSystems from apache_beam.io.gcp import bigquery as beam_bq from apache_beam.io.gcp import bigquery_tools from apache_beam.io.gcp.bigquery import ReadFromBigQuery @@ -83,6 +84,7 @@ try: from apitools.base.py.exceptions import HttpError + from apitools.base.py.exceptions import HttpForbiddenError from google.cloud import bigquery as gcp_bigquery from google.api_core import exceptions except ImportError: @@ -419,6 +421,135 @@ def test_create_temp_dataset_exception(self, exception_type, error_message): mock_insert.assert_called() self.assertIn(error_message, exc.exception.args[0]) + @parameterized.expand([ + # first two attempts return a 403 quotaExceeded error, third attempt passes + param( + responses=[ + HttpForbiddenError( + response={'status': 403}, content="quotaExceeded", url=""), + HttpForbiddenError( + response={'status': 403}, content="quotaExceeded", url="") + ], + expected_retries=2), + # first two attempts returns a 403 rateLimitExceeded error, third attempt passes + param( + responses=[ + HttpForbiddenError( + response={'status': 403}, content="rateLimitExceeded", + url=""), + HttpForbiddenError( + response={'status': 403}, content="rateLimitExceeded", url="") + ], + expected_retries=2), + ]) + def test_get_table_transient_exception(self, responses, expected_retries): + class DummyTable: + class DummySchema: + fields = [] + + # this attribute is call on inside estimate_size + numBytes = 5 + schema = DummySchema() + + with mock.patch('time.sleep'), \ + mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService, 'Get') as mock_get_table, \ + mock.patch.object(BigQueryWrapper, + 'wait_for_bq_job'), \ + mock.patch.object(BigQueryWrapper, + 'perform_extract_job'), \ + mock.patch.object(FileSystems, + 'match'), \ + mock.patch.object(FileSystems, + 'delete'), \ + beam.Pipeline() as p: + call_counter = 0 + + def store_callback(unused_request): + nonlocal call_counter + if call_counter < len(responses): + exception = responses[call_counter] + call_counter += 1 + raise exception + else: + call_counter += 1 + return DummyTable() + + mock_get_table.side_effect = store_callback + _ = p | beam.io.ReadFromBigQuery( + table="project.dataset.table", gcs_location="gs://some_bucket") + + # ReadFromBigQuery export mode calls get_table() twice. Once to get + # metadata (numBytes), and once to retrieve the table's schema + # Any additional calls are retries + self.assertEqual(expected_retries, mock_get_table.call_count - 2) + + @parameterized.expand([ + # first attempt returns a non-transient error and fails. + # second attempt doesn't even get reached. + param( + responses=[ + HttpForbiddenError( + response={'status': 400}, content="invalid", url=""), + HttpForbiddenError( + response={'status': 400}, content="invalid", url="") + ], + expected_retries=0), + # first attempt returns a transient error and retries + # second attempt returns a non-transient error and fails + param( + responses=[ + HttpForbiddenError( + response={'status': 403}, content="rateLimitExceeded", + url=""), + HttpForbiddenError( + response={'status': 400}, content="invalid", url="") + ], + expected_retries=1), + ]) + def test_get_table_non_transient_exception(self, responses, expected_retries): + class DummyTable: + class DummySchema: + fields = [] + + # this attribute is call on inside estimate_size + numBytes = 5 + schema = DummySchema() + + with mock.patch('time.sleep'), \ + mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService, 'Get') as mock_get_table, \ + mock.patch.object(BigQueryWrapper, + 'wait_for_bq_job'), \ + mock.patch.object(BigQueryWrapper, + 'perform_extract_job'), \ + mock.patch.object(FileSystems, + 'match'), \ + mock.patch.object(FileSystems, + 'delete'), \ + self.assertRaises(Exception), \ + beam.Pipeline() as p: + call_counter = 0 + + def store_callback(unused_request): + nonlocal call_counter + if call_counter < len(responses): + exception = responses[call_counter] + call_counter += 1 + raise exception + else: + call_counter += 1 + return DummyTable() + + mock_get_table.side_effect = store_callback + _ = p | beam.io.ReadFromBigQuery( + table="project.dataset.table", gcs_location="gs://some_bucket") + + # ReadFromBigQuery export mode calls get_table() twice. Once to get + # metadata (numBytes), and once to retrieve the table's schema + # However, the second call is never reached because this test will always + # fail before it does so + # After the first call, any additional calls are retries + self.assertEqual(expected_retries, mock_get_table.call_count - 1) + @parameterized.expand([ param( exception_type=exceptions.BadRequest if exceptions else None, From dece14d579f68c8154c42c1c1c873b58b26b98f5 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 4 Oct 2023 17:05:56 -0400 Subject: [PATCH 03/17] tweak test --- sdks/python/apache_beam/io/gcp/bigquery_test.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 3d5099e9a2f2..e0f04eaccbce 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -431,14 +431,16 @@ def test_create_temp_dataset_exception(self, exception_type, error_message): response={'status': 403}, content="quotaExceeded", url="") ], expected_retries=2), - # first two attempts returns a 403 rateLimitExceeded error, third attempt passes + # first attempts returns a 403 rateLimitExceeded error, + # second attempt returns a 408 timeout error, + # third attempt passes param( responses=[ HttpForbiddenError( response={'status': 403}, content="rateLimitExceeded", url=""), HttpForbiddenError( - response={'status': 403}, content="rateLimitExceeded", url="") + response={'status': 408}, content="timeout", url="") ], expected_retries=2), ]) From b3b58c64ad09327a9dc322c8dae5d9ee1c4d6d94 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 4 Oct 2023 17:07:32 -0400 Subject: [PATCH 04/17] remove comments --- sdks/python/apache_beam/io/gcp/bigquery_test.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index e0f04eaccbce..d42a173a934f 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -448,8 +448,6 @@ def test_get_table_transient_exception(self, responses, expected_retries): class DummyTable: class DummySchema: fields = [] - - # this attribute is call on inside estimate_size numBytes = 5 schema = DummySchema() @@ -512,8 +510,6 @@ def test_get_table_non_transient_exception(self, responses, expected_retries): class DummyTable: class DummySchema: fields = [] - - # this attribute is call on inside estimate_size numBytes = 5 schema = DummySchema() From 7f7a6b890c4d91a2f13c7930376e55d1f82e5d83 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 4 Oct 2023 17:17:41 -0400 Subject: [PATCH 05/17] use httperror --- sdks/python/apache_beam/io/gcp/bigquery_test.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index d42a173a934f..b7b16d3c6cb8 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -448,6 +448,7 @@ def test_get_table_transient_exception(self, responses, expected_retries): class DummyTable: class DummySchema: fields = [] + numBytes = 5 schema = DummySchema() @@ -488,21 +489,18 @@ def store_callback(unused_request): # second attempt doesn't even get reached. param( responses=[ - HttpForbiddenError( - response={'status': 400}, content="invalid", url=""), - HttpForbiddenError( - response={'status': 400}, content="invalid", url="") + HttpError(response={'status': 400}, content="invalid", url=""), + HttpError(response={'status': 400}, content="invalid", url="") ], expected_retries=0), # first attempt returns a transient error and retries # second attempt returns a non-transient error and fails param( responses=[ - HttpForbiddenError( + HttpError( response={'status': 403}, content="rateLimitExceeded", url=""), - HttpForbiddenError( - response={'status': 400}, content="invalid", url="") + HttpError(response={'status': 400}, content="invalid", url="") ], expected_retries=1), ]) @@ -510,6 +508,7 @@ def test_get_table_non_transient_exception(self, responses, expected_retries): class DummyTable: class DummySchema: fields = [] + numBytes = 5 schema = DummySchema() From 753bc6776499a5bd2a3d87ed2be8464d7e61c43d Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 4 Oct 2023 19:47:28 -0400 Subject: [PATCH 06/17] only retry on transient reasons --- .../apache_beam/io/gcp/bigquery_test.py | 87 +++++++++++++++---- sdks/python/apache_beam/utils/retry.py | 32 ++++++- 2 files changed, 99 insertions(+), 20 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index b7b16d3c6cb8..942a20e61551 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -422,27 +422,45 @@ def test_create_temp_dataset_exception(self, exception_type, error_message): self.assertIn(error_message, exc.exception.args[0]) @parameterized.expand([ - # first two attempts return a 403 quotaExceeded error, third attempt passes + # first attempt returns a Http 403 error with bad contents and will retry + # second attempt returns a Http 408 error, + # third attempt passes param( responses=[ HttpForbiddenError( - response={'status': 403}, content="quotaExceeded", url=""), + response={'status': 403}, content="bad contents", url=""), HttpForbiddenError( - response={'status': 403}, content="quotaExceeded", url="") + response={'status': 408}, content="bad contents", url="") ], expected_retries=2), - # first attempts returns a 403 rateLimitExceeded error, - # second attempt returns a 408 timeout error, - # third attempt passes + # first attempts returns a 403 rateLimitExceeded error + # second attempt returns a 403 quotaExceeded error + # third attempt returns a Http 403 quotaExceeded error + # fourth attempt passes param( responses=[ + exceptions.Forbidden( + "some message", + errors=({ + "message": "transient", "reason": "rateLimitExceeded" + }, )), + exceptions.Forbidden( + "some message", + errors=({ + "message": "transient", "reason": "quotaExceeded" + }, )), HttpForbiddenError( - response={'status': 403}, content="rateLimitExceeded", + response={'status': 403}, + content={ + "error": { + "errors": [{ + "message": "transient", "reason": "quotaExceeded" + }] + } + }, url=""), - HttpForbiddenError( - response={'status': 408}, content="timeout", url="") ], - expected_retries=2), + expected_retries=3), ]) def test_get_table_transient_exception(self, responses, expected_retries): class DummyTable: @@ -485,15 +503,33 @@ def store_callback(unused_request): self.assertEqual(expected_retries, mock_get_table.call_count - 2) @parameterized.expand([ - # first attempt returns a non-transient error and fails. - # second attempt doesn't even get reached. + # first attempt returns a Http 403 with transient reason and retries + # second attempt returns a Http 403 with non-transient reason and fails param( responses=[ - HttpError(response={'status': 400}, content="invalid", url=""), - HttpError(response={'status': 400}, content="invalid", url="") + HttpForbiddenError( + response={'status': 403}, + content={ + "error": { + "errors": [{ + "message": "transient", "reason": "quotaExceeded" + }] + } + }, + url=""), + HttpForbiddenError( + response={'status': 403}, + content={ + "error": { + "errors": [{ + "message": "transient", "reason": "accessDenied" + }] + } + }, + url="") ], - expected_retries=0), - # first attempt returns a transient error and retries + expected_retries=1), + # first attempt returns a transient 403 error and retries # second attempt returns a non-transient error and fails param( responses=[ @@ -503,6 +539,25 @@ def store_callback(unused_request): HttpError(response={'status': 400}, content="invalid", url="") ], expected_retries=1), + # first attempt returns a transient 403 error and retries + # second attempt returns a 403 error with bad contents and retries + # third attempt returns a 403 with non-transient reason and fails + param( + responses=[ + exceptions.Forbidden( + "some error", + errors=({ + "message": "transient", "reason": "rateLimitExceeded" + }, )), + HttpError( + response={'status': 403}, content="bad contents", url=""), + exceptions.Forbidden( + "some error", + errors=({ + "message": "transient", "reason": "accessDenied" + }, )), + ], + expected_retries=2), ]) def test_get_table_non_transient_exception(self, responses, expected_retries): class DummyTable: diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py index 6eed2900b9a4..1837b7d87023 100644 --- a/sdks/python/apache_beam/utils/retry.py +++ b/sdks/python/apache_beam/utils/retry.py @@ -33,6 +33,7 @@ import sys import time import traceback +import requests.exceptions from apache_beam.io.filesystem import BeamIOError @@ -42,9 +43,11 @@ try: from apitools.base.py.exceptions import HttpError from google.api_core.exceptions import GoogleAPICallError + from google.api_core import exceptions except ImportError as e: HttpError = None GoogleAPICallError = None # type: ignore + exceptions = None # Protect against environments where aws tools are not available. # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports @@ -57,7 +60,17 @@ # pylint: enable=wrong-import-order, wrong-import-position _LOGGER = logging.getLogger(__name__) - +_RETRYABLE_REASONS = ["rateLimitExceeded", "quotaExceeded", "internalError", "backendError"] +_RETRYABLE_TYPES = ( + exceptions.TooManyRequests, + exceptions.InternalServerError, + exceptions.BadGateway, + exceptions.ServiceUnavailable, + exceptions.DeadlineExceeded, + requests.exceptions.ConnectionError, + requests.exceptions.Timeout, + ConnectionError +) class PermanentException(Exception): """Base class for exceptions that should not be retried.""" @@ -168,15 +181,26 @@ def retry_on_server_errors_and_timeout_filter(exception): def retry_on_server_errors_timeout_or_quota_issues_filter(exception): """Retry on server, timeout and 403 errors. - 403 errors can be accessDenied, billingNotEnabled, and also quotaExceeded, - rateLimitExceeded.""" + 403 errors include both transient (accessDenied, billingNotEnabled) and + non-transient errors (quotaExceeded, rateLimitExceeded). Only retry transient + errors.""" if HttpError is not None and isinstance(exception, HttpError): if exception.status_code == 403: + try: + # attempt to extract the reason and check if it's retryable + return exception.content["error"]["errors"][0]["reason"] in _RETRYABLE_REASONS + except Exception: + _LOGGER.debug("Could not determine if HttpError is non-transient. Will retry: %s", exception) return True if GoogleAPICallError is not None and isinstance(exception, GoogleAPICallError): if exception.code == 403: - return True + if not hasattr(exception, "errors") or len(exception.errors) == 0: + # default to retrying + return True + + reason = exception.errors[0]["reason"] + return reason in _RETRYABLE_REASONS if S3ClientError is not None and isinstance(exception, S3ClientError): if exception.code == 403: return True From be35778a0f35394c68cc16e5a1efe761ab8e8131 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 4 Oct 2023 19:53:02 -0400 Subject: [PATCH 07/17] default exceptions to none at import error --- sdks/python/apache_beam/utils/retry.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py index 1837b7d87023..9327a2822d3f 100644 --- a/sdks/python/apache_beam/utils/retry.py +++ b/sdks/python/apache_beam/utils/retry.py @@ -62,11 +62,11 @@ _LOGGER = logging.getLogger(__name__) _RETRYABLE_REASONS = ["rateLimitExceeded", "quotaExceeded", "internalError", "backendError"] _RETRYABLE_TYPES = ( - exceptions.TooManyRequests, - exceptions.InternalServerError, - exceptions.BadGateway, - exceptions.ServiceUnavailable, - exceptions.DeadlineExceeded, + exceptions.TooManyRequests if exceptions else None, + exceptions.InternalServerError if exceptions else None, + exceptions.BadGateway if exceptions else None, + exceptions.ServiceUnavailable if exceptions else None, + exceptions.DeadlineExceeded if exceptions else None, requests.exceptions.ConnectionError, requests.exceptions.Timeout, ConnectionError From 06e3f80dabab58964032ca4e73d596b97d6f23fd Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 4 Oct 2023 20:07:49 -0400 Subject: [PATCH 08/17] lint fix --- sdks/python/apache_beam/utils/retry.py | 29 +++++++++++++++----------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py index 9327a2822d3f..a975d6177a15 100644 --- a/sdks/python/apache_beam/utils/retry.py +++ b/sdks/python/apache_beam/utils/retry.py @@ -60,17 +60,19 @@ # pylint: enable=wrong-import-order, wrong-import-position _LOGGER = logging.getLogger(__name__) -_RETRYABLE_REASONS = ["rateLimitExceeded", "quotaExceeded", "internalError", "backendError"] +_RETRYABLE_REASONS = [ + "rateLimitExceeded", "quotaExceeded", "internalError", "backendError" +] _RETRYABLE_TYPES = ( - exceptions.TooManyRequests if exceptions else None, - exceptions.InternalServerError if exceptions else None, - exceptions.BadGateway if exceptions else None, - exceptions.ServiceUnavailable if exceptions else None, - exceptions.DeadlineExceeded if exceptions else None, - requests.exceptions.ConnectionError, - requests.exceptions.Timeout, - ConnectionError -) + exceptions.TooManyRequests if exceptions else None, + exceptions.InternalServerError if exceptions else None, + exceptions.BadGateway if exceptions else None, + exceptions.ServiceUnavailable if exceptions else None, + exceptions.DeadlineExceeded if exceptions else None, + requests.exceptions.ConnectionError, + requests.exceptions.Timeout, + ConnectionError) + class PermanentException(Exception): """Base class for exceptions that should not be retried.""" @@ -188,9 +190,12 @@ def retry_on_server_errors_timeout_or_quota_issues_filter(exception): if exception.status_code == 403: try: # attempt to extract the reason and check if it's retryable - return exception.content["error"]["errors"][0]["reason"] in _RETRYABLE_REASONS + return exception.content["error"]["errors"][0][ + "reason"] in _RETRYABLE_REASONS except Exception: - _LOGGER.debug("Could not determine if HttpError is non-transient. Will retry: %s", exception) + _LOGGER.debug( + "Could not determine if HttpError is non-transient. Will retry: %s", + exception) return True if GoogleAPICallError is not None and isinstance(exception, GoogleAPICallError): From 351e6e5a9d88d8836b1a5dfb48b1f8107f43a95d Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 5 Oct 2023 10:06:11 -0400 Subject: [PATCH 09/17] lint fix --- sdks/python/apache_beam/io/gcp/bigquery_test.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 942a20e61551..04adf41d29fc 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -471,7 +471,8 @@ class DummySchema: schema = DummySchema() with mock.patch('time.sleep'), \ - mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService, 'Get') as mock_get_table, \ + mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService, + 'Get') as mock_get_table, \ mock.patch.object(BigQueryWrapper, 'wait_for_bq_job'), \ mock.patch.object(BigQueryWrapper, @@ -568,7 +569,8 @@ class DummySchema: schema = DummySchema() with mock.patch('time.sleep'), \ - mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService, 'Get') as mock_get_table, \ + mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService, + 'Get') as mock_get_table, \ mock.patch.object(BigQueryWrapper, 'wait_for_bq_job'), \ mock.patch.object(BigQueryWrapper, From 604a83608bfce5c87b38d298fb0fad39dc7abf12 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 5 Oct 2023 13:36:22 -0400 Subject: [PATCH 10/17] address comments --- .../apache_beam/io/gcp/bigquery_test.py | 18 +++++---- sdks/python/apache_beam/utils/retry.py | 38 ++++++++----------- 2 files changed, 25 insertions(+), 31 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 04adf41d29fc..4eedc6d309aa 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -434,8 +434,8 @@ def test_create_temp_dataset_exception(self, exception_type, error_message): ], expected_retries=2), # first attempts returns a 403 rateLimitExceeded error - # second attempt returns a 403 quotaExceeded error - # third attempt returns a Http 403 quotaExceeded error + # second attempt returns a 429 rateLimitExceeded error + # third attempt returns a Http 403 rateLimitExceeded error # fourth attempt passes param( responses=[ @@ -444,17 +444,18 @@ def test_create_temp_dataset_exception(self, exception_type, error_message): errors=({ "message": "transient", "reason": "rateLimitExceeded" }, )), - exceptions.Forbidden( + exceptions.ResourceExhausted( "some message", errors=({ - "message": "transient", "reason": "quotaExceeded" + "message": "transient", "reason": "rateLimitExceeded" }, )), HttpForbiddenError( response={'status': 403}, content={ "error": { "errors": [{ - "message": "transient", "reason": "quotaExceeded" + "message": "transient", + "reason": "rateLimitExceeded" }] } }, @@ -504,16 +505,17 @@ def store_callback(unused_request): self.assertEqual(expected_retries, mock_get_table.call_count - 2) @parameterized.expand([ - # first attempt returns a Http 403 with transient reason and retries + # first attempt returns a Http 429 with transient reason and retries # second attempt returns a Http 403 with non-transient reason and fails param( responses=[ HttpForbiddenError( - response={'status': 403}, + response={'status': 429}, content={ "error": { "errors": [{ - "message": "transient", "reason": "quotaExceeded" + "message": "transient", + "reason": "rateLimitExceeded" }] } }, diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py index a975d6177a15..1018f2adbcc9 100644 --- a/sdks/python/apache_beam/utils/retry.py +++ b/sdks/python/apache_beam/utils/retry.py @@ -43,11 +43,9 @@ try: from apitools.base.py.exceptions import HttpError from google.api_core.exceptions import GoogleAPICallError - from google.api_core import exceptions except ImportError as e: HttpError = None GoogleAPICallError = None # type: ignore - exceptions = None # Protect against environments where aws tools are not available. # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports @@ -60,18 +58,7 @@ # pylint: enable=wrong-import-order, wrong-import-position _LOGGER = logging.getLogger(__name__) -_RETRYABLE_REASONS = [ - "rateLimitExceeded", "quotaExceeded", "internalError", "backendError" -] -_RETRYABLE_TYPES = ( - exceptions.TooManyRequests if exceptions else None, - exceptions.InternalServerError if exceptions else None, - exceptions.BadGateway if exceptions else None, - exceptions.ServiceUnavailable if exceptions else None, - exceptions.DeadlineExceeded if exceptions else None, - requests.exceptions.ConnectionError, - requests.exceptions.Timeout, - ConnectionError) +_RETRYABLE_REASONS = ["rateLimitExceeded", "internalError", "backendError"] class PermanentException(Exception): @@ -181,28 +168,33 @@ def retry_on_server_errors_and_timeout_filter(exception): def retry_on_server_errors_timeout_or_quota_issues_filter(exception): - """Retry on server, timeout and 403 errors. + """Retry on server, timeout, 429, and some 403 errors. - 403 errors include both transient (accessDenied, billingNotEnabled) and - non-transient errors (quotaExceeded, rateLimitExceeded). Only retry transient - errors.""" + 403 errors from BigQuery include both non-transient (accessDenied, + billingNotEnabled) and transient errors (rateLimitExceeded). + Only retry transient errors.""" if HttpError is not None and isinstance(exception, HttpError): + if exception.status_code == 429: + return True if exception.status_code == 403: try: # attempt to extract the reason and check if it's retryable return exception.content["error"]["errors"][0][ "reason"] in _RETRYABLE_REASONS - except Exception: + except (KeyError, IndexError, TypeError): _LOGGER.debug( - "Could not determine if HttpError is non-transient. Will retry: %s", + "Could not determine if HttpError is non-transient. " + "Will not retry: %s", exception) - return True + return False if GoogleAPICallError is not None and isinstance(exception, GoogleAPICallError): + if exception.code == 429: + return True if exception.code == 403: if not hasattr(exception, "errors") or len(exception.errors) == 0: - # default to retrying - return True + # default to not retrying + return False reason = exception.errors[0]["reason"] return reason in _RETRYABLE_REASONS From bfdc47cecd29569cc5a2bf83536780e2fbc95ab5 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 6 Oct 2023 10:06:33 -0400 Subject: [PATCH 11/17] fix tests --- .../apache_beam/io/gcp/bigquery_test.py | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 4eedc6d309aa..183f126d7659 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -422,19 +422,19 @@ def test_create_temp_dataset_exception(self, exception_type, error_message): self.assertIn(error_message, exc.exception.args[0]) @parameterized.expand([ - # first attempt returns a Http 403 error with bad contents and will retry - # second attempt returns a Http 408 error, + # first attempt returns a Http 500 blank error and retries + # second attempt returns a Http 408 blank error and retries, # third attempt passes param( responses=[ HttpForbiddenError( - response={'status': 403}, content="bad contents", url=""), + response={'status': 500}, content="something", url=""), HttpForbiddenError( - response={'status': 408}, content="bad contents", url="") + response={'status': 408}, content="blank", url="") ], expected_retries=2), # first attempts returns a 403 rateLimitExceeded error - # second attempt returns a 429 rateLimitExceeded error + # second attempt returns a 429 blank error # third attempt returns a Http 403 rateLimitExceeded error # fourth attempt passes param( @@ -444,11 +444,7 @@ def test_create_temp_dataset_exception(self, exception_type, error_message): errors=({ "message": "transient", "reason": "rateLimitExceeded" }, )), - exceptions.ResourceExhausted( - "some message", - errors=({ - "message": "transient", "reason": "rateLimitExceeded" - }, )), + exceptions.ResourceExhausted("some message"), HttpForbiddenError( response={'status': 403}, content={ @@ -533,17 +529,26 @@ def store_callback(unused_request): ], expected_retries=1), # first attempt returns a transient 403 error and retries - # second attempt returns a non-transient error and fails + # second attempt returns a 403 error with bad contents and fails param( responses=[ HttpError( - response={'status': 403}, content="rateLimitExceeded", + response={'status': 403}, + content={ + "error": { + "errors": [{ + "message": "transient", + "reason": "rateLimitExceeded" + }] + } + }, url=""), - HttpError(response={'status': 400}, content="invalid", url="") + HttpError( + response={'status': 403}, content="bad contents", url="") ], expected_retries=1), # first attempt returns a transient 403 error and retries - # second attempt returns a 403 error with bad contents and retries + # second attempt returns a 429 error and retries # third attempt returns a 403 with non-transient reason and fails param( responses=[ @@ -552,8 +557,7 @@ def store_callback(unused_request): errors=({ "message": "transient", "reason": "rateLimitExceeded" }, )), - HttpError( - response={'status': 403}, content="bad contents", url=""), + exceptions.ResourceExhausted("some transient error"), exceptions.Forbidden( "some error", errors=({ From b5d671da8e3c21680ddc77c4ae5ce3f364dc2ee6 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 6 Dec 2023 14:08:32 -0500 Subject: [PATCH 12/17] deserialize content first --- sdks/python/apache_beam/utils/retry.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py index 1018f2adbcc9..969ca1b36391 100644 --- a/sdks/python/apache_beam/utils/retry.py +++ b/sdks/python/apache_beam/utils/retry.py @@ -28,6 +28,7 @@ # pytype: skip-file import functools +import json import logging import random import sys @@ -179,11 +180,11 @@ def retry_on_server_errors_timeout_or_quota_issues_filter(exception): if exception.status_code == 403: try: # attempt to extract the reason and check if it's retryable - return exception.content["error"]["errors"][0][ - "reason"] in _RETRYABLE_REASONS + content = json.loads(exception.content) + return content["error"]["errors"][0]["reason"] in _RETRYABLE_REASONS except (KeyError, IndexError, TypeError): - _LOGGER.debug( - "Could not determine if HttpError is non-transient. " + _LOGGER.warning( + "Could not determine if HttpError is transient. " "Will not retry: %s", exception) return False From e64019ff0135b2a6155398aa28989a7700f25fec Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 19 Dec 2023 13:26:32 +0300 Subject: [PATCH 13/17] lint --- sdks/python/apache_beam/utils/retry.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py index 969ca1b36391..cef205051b39 100644 --- a/sdks/python/apache_beam/utils/retry.py +++ b/sdks/python/apache_beam/utils/retry.py @@ -34,7 +34,6 @@ import sys import time import traceback -import requests.exceptions from apache_beam.io.filesystem import BeamIOError From 45846c67bc4602b07288f394ca83e9cde7ba737e Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 8 Jan 2024 10:44:34 -0500 Subject: [PATCH 14/17] ignore tests when imports fail, only json load when needed --- sdks/python/apache_beam/io/gcp/bigquery_test.py | 6 ++++-- sdks/python/apache_beam/utils/retry.py | 8 +++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 183f126d7659..134b9dad96d4 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -290,7 +290,9 @@ def test_repeatable_field_is_properly_converted(self): self.assertEqual(expected_row, actual) -@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') +@unittest.skipIf( + HttpError is None or HttpForbiddenError is None, + 'GCP dependencies are not installed') class TestReadFromBigQuery(unittest.TestCase): @classmethod def setUpClass(cls): @@ -532,7 +534,7 @@ def store_callback(unused_request): # second attempt returns a 403 error with bad contents and fails param( responses=[ - HttpError( + HttpForbiddenError( response={'status': 403}, content={ "error": { diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py index cef205051b39..485fc9d627e9 100644 --- a/sdks/python/apache_beam/utils/retry.py +++ b/sdks/python/apache_beam/utils/retry.py @@ -179,13 +179,15 @@ def retry_on_server_errors_timeout_or_quota_issues_filter(exception): if exception.status_code == 403: try: # attempt to extract the reason and check if it's retryable - content = json.loads(exception.content) + content = exception.content + if not isinstance(content, dict): + content = json.loads(exception.content) return content["error"]["errors"][0]["reason"] in _RETRYABLE_REASONS - except (KeyError, IndexError, TypeError): + except (KeyError, IndexError, TypeError) as e: _LOGGER.warning( "Could not determine if HttpError is transient. " "Will not retry: %s", - exception) + e) return False if GoogleAPICallError is not None and isinstance(exception, GoogleAPICallError): From d70b1a8512c422d141b714073b833b4e50b4f122 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 8 Jan 2024 11:06:34 -0500 Subject: [PATCH 15/17] default HttpForbiddenError to none --- sdks/python/apache_beam/io/gcp/bigquery_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 134b9dad96d4..a547ceb5a715 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -90,6 +90,7 @@ except ImportError: gcp_bigquery = None HttpError = None + HttpForbiddenError = None exceptions = None # pylint: enable=wrong-import-order, wrong-import-position From a7c1e1d2aeaaae70ffe3ee9650395d720359c2a3 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 8 Jan 2024 11:47:50 -0500 Subject: [PATCH 16/17] default to none when import fails --- sdks/python/apache_beam/io/gcp/bigquery_test.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index a547ceb5a715..02c94e037892 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -431,9 +431,11 @@ def test_create_temp_dataset_exception(self, exception_type, error_message): param( responses=[ HttpForbiddenError( - response={'status': 500}, content="something", url=""), + response={'status': 500}, content="something", url="") + if HttpForbiddenError else None, HttpForbiddenError( response={'status': 408}, content="blank", url="") + if HttpForbiddenError else None ], expected_retries=2), # first attempts returns a 403 rateLimitExceeded error @@ -458,7 +460,7 @@ def test_create_temp_dataset_exception(self, exception_type, error_message): }] } }, - url=""), + url="") if HttpForbiddenError else None, ], expected_retries=3), ]) @@ -518,7 +520,7 @@ def store_callback(unused_request): }] } }, - url=""), + url="") if HttpForbiddenError else None, HttpForbiddenError( response={'status': 403}, content={ @@ -528,7 +530,7 @@ def store_callback(unused_request): }] } }, - url="") + url="") if HttpForbiddenError else None ], expected_retries=1), # first attempt returns a transient 403 error and retries @@ -545,9 +547,10 @@ def store_callback(unused_request): }] } }, - url=""), + url="") if HttpForbiddenError else None, HttpError( response={'status': 403}, content="bad contents", url="") + if HttpError else None ], expected_retries=1), # first attempt returns a transient 403 error and retries From 4992a839adacaf96160c6f54fe82ffa3c20fb1ce Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 8 Jan 2024 13:56:16 -0500 Subject: [PATCH 17/17] default to non when import fails --- sdks/python/apache_beam/io/gcp/bigquery_test.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 02c94e037892..282e9f34be7f 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -448,8 +448,9 @@ def test_create_temp_dataset_exception(self, exception_type, error_message): "some message", errors=({ "message": "transient", "reason": "rateLimitExceeded" - }, )), - exceptions.ResourceExhausted("some message"), + }, )) if exceptions else None, + exceptions.ResourceExhausted("some message") + if exceptions else None, HttpForbiddenError( response={'status': 403}, content={ @@ -562,13 +563,14 @@ def store_callback(unused_request): "some error", errors=({ "message": "transient", "reason": "rateLimitExceeded" - }, )), - exceptions.ResourceExhausted("some transient error"), + }, )) if exceptions else None, + exceptions.ResourceExhausted("some transient error") + if exceptions else None, exceptions.Forbidden( "some error", errors=({ "message": "transient", "reason": "accessDenied" - }, )), + }, )) if exceptions else None, ], expected_retries=2), ])