Skip to content

Commit

Permalink
Retry on 503 (#1408)
Browse files Browse the repository at this point in the history
* add default retry on all client factories, which includes 502 and 503 errors
* update retries to use defaults and ensure that a timeout or deadline is set

(cherry picked from commit a219818)
  • Loading branch information
mikealfare authored and colin-rogers-dbt committed Jan 9, 2025
1 parent 069472d commit 1adbf7e
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 22 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20241120-163101.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Fix issue where dbt-bigquery was not retrying in certain retryable scenarios,
e.g. 503's
time: 2024-11-20T16:31:01.60689-05:00
custom:
Author: mikealfare
Issue: "682"
12 changes: 6 additions & 6 deletions dbt/adapters/bigquery/clients.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from google.api_core.client_info import ClientInfo
from google.api_core.client_options import ClientOptions
from google.api_core.retry import Retry
from google.auth.exceptions import DefaultCredentialsError
from google.cloud.bigquery import Client as BigQueryClient
from google.cloud.bigquery import Client as BigQueryClient, DEFAULT_RETRY as BQ_DEFAULT_RETRY
from google.cloud.dataproc_v1 import BatchControllerClient, JobControllerClient
from google.cloud.storage import Client as StorageClient
from google.cloud.storage.retry import DEFAULT_RETRY as GCS_DEFAULT_RETRY

from dbt.adapters.events.logging import AdapterLogger

Expand All @@ -28,23 +28,23 @@ def create_bigquery_client(credentials: BigQueryCredentials) -> BigQueryClient:
return _create_bigquery_client(credentials)


@Retry() # google decorator. retries on transient errors with exponential backoff
@GCS_DEFAULT_RETRY
def create_gcs_client(credentials: BigQueryCredentials) -> StorageClient:
return StorageClient(
project=credentials.execution_project,
credentials=create_google_credentials(credentials),
)


@Retry() # google decorator. retries on transient errors with exponential backoff
# dataproc does not appear to have a default retry like BQ and GCS
def create_dataproc_job_controller_client(credentials: BigQueryCredentials) -> JobControllerClient:
return JobControllerClient(
credentials=create_google_credentials(credentials),
client_options=ClientOptions(api_endpoint=_dataproc_endpoint(credentials)),
)


@Retry() # google decorator. retries on transient errors with exponential backoff
# dataproc does not appear to have a default retry like BQ and GCS
def create_dataproc_batch_controller_client(
credentials: BigQueryCredentials,
) -> BatchControllerClient:
Expand All @@ -54,7 +54,7 @@ def create_dataproc_batch_controller_client(
)


@Retry() # google decorator. retries on transient errors with exponential backoff
@BQ_DEFAULT_RETRY
def _create_bigquery_client(credentials: BigQueryCredentials) -> BigQueryClient:
return BigQueryClient(
credentials.execution_project,
Expand Down
33 changes: 17 additions & 16 deletions dbt/adapters/bigquery/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from google.api_core.future.polling import DEFAULT_POLLING
from google.api_core.retry import Retry
from google.cloud.bigquery.retry import DEFAULT_RETRY, _job_should_retry
from google.cloud.bigquery.retry import DEFAULT_JOB_RETRY, _job_should_retry
from requests.exceptions import ConnectionError

from dbt.adapters.contracts.connection import Connection, ConnectionState
Expand All @@ -15,14 +15,8 @@

_logger = AdapterLogger("BigQuery")


_SECOND = 1.0
_MINUTE = 60 * _SECOND
_HOUR = 60 * _MINUTE
_DAY = 24 * _HOUR
_DEFAULT_INITIAL_DELAY = _SECOND
_DEFAULT_MAXIMUM_DELAY = 3 * _SECOND
_DEFAULT_POLLING_MAXIMUM_DELAY = 10 * _SECOND
_MINUTE = 60.0
_DAY = 24 * 60 * 60.0


class RetryFactory:
Expand All @@ -44,7 +38,7 @@ def create_job_execution_timeout(self, fallback: float = _DAY) -> float:
) # keep _DAY here so it's not overridden by passing fallback=None

def create_retry(self, fallback: Optional[float] = None) -> Retry:
return DEFAULT_RETRY.with_timeout(self._job_execution_timeout or fallback or _DAY)
return DEFAULT_JOB_RETRY.with_timeout(self._job_execution_timeout or fallback or _DAY)

def create_polling(self, model_timeout: Optional[float] = None) -> Retry:
return DEFAULT_POLLING.with_timeout(model_timeout or self._job_execution_timeout or _DAY)
Expand All @@ -53,14 +47,21 @@ def create_reopen_with_deadline(self, connection: Connection) -> Retry:
"""
This strategy mimics what was accomplished with _retry_and_handle
"""
return Retry(
predicate=_DeferredException(self._retries),
initial=_DEFAULT_INITIAL_DELAY,
maximum=_DEFAULT_MAXIMUM_DELAY,
deadline=self._job_deadline,
on_error=_create_reopen_on_error(connection),

retry = DEFAULT_JOB_RETRY.with_delay(maximum=3.0).with_predicate(
_DeferredException(self._retries)
)

# there is no `with_on_error` method, but we want to retain the defaults on `DEFAULT_JOB_RETRY
retry._on_error = _create_reopen_on_error(connection)

# don't override the default deadline to None if the user did not provide one,
# the process will never end
if deadline := self._job_deadline:
return retry.with_deadline(deadline)

return retry


class _DeferredException:
"""
Expand Down
62 changes: 62 additions & 0 deletions hatch.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
[version]
path = "dbt/adapters/bigquery/__version__.py"

[build.targets.sdist]
packages = ["dbt"]

[build.targets.wheel]
packages = ["dbt"]

[envs.default]
python = "3.9"
dependencies = [
"dbt-adapters @ git+https://github.com/dbt-labs/dbt-adapters.git",
"dbt-common @ git+https://github.com/dbt-labs/dbt-common.git",
"dbt-tests-adapter @ git+https://github.com/dbt-labs/dbt-adapters.git#subdirectory=dbt-tests-adapter",
"dbt-core @ git+https://github.com/dbt-labs/dbt-core.git#subdirectory=core",
"ddtrace==2.3.0",
"ipdb~=0.13.13",
"pre-commit==3.7.0",
"freezegun",
"pytest>=7.0,<8.0",
"pytest-csv~=3.0",
"pytest-dotenv",
"pytest-logbook~=1.2",
"pytest-mock",
"pytest-xdist",
]

[envs.default.scripts]
setup = "pre-commit install"
code-quality = "pre-commit run --all-files"
unit-tests = "python -m pytest {args:tests/unit}"
integration-tests = "python -m pytest --profile service_account {args:tests/functional}"
docker-dev = [
"docker build -f docker/dev.Dockerfile -t dbt-bigquery-dev .",
"docker run --rm -it --name dbt-bigquery-dev -v $(shell pwd):/opt/code dbt-bigquery-dev",
]

[envs.build]
detached = true
dependencies = [
"wheel",
"twine",
"check-wheel-contents",
]

[envs.build.scripts]
check-all = [
"- check-wheel",
"- check-sdist",
]
check-wheel = [
"twine check dist/*",
"find ./dist/dbt_bigquery-*.whl -maxdepth 1 -type f | xargs python -m pip install --force-reinstall --find-links=dist/",
"pip freeze | grep dbt-bigquery",
]
check-sdist = [
"check-wheel-contents dist/*.whl --ignore W007,W008",
"find ./dist/dbt_bigquery-*.gz -maxdepth 1 -type f | xargs python -m pip install --force-reinstall --find-links=dist/",
"pip freeze | grep dbt-bigquery",
]
docker-prod = "docker build -f docker/Dockerfile -t dbt-bigquery ."

0 comments on commit 1adbf7e

Please sign in to comment.