Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry on 503 (#1408) #1445

Merged
merged 2 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20241120-163101.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Fix issue where dbt-bigquery was not retrying in certain retryable scenarios,
e.g. 503's
time: 2024-11-20T16:31:01.60689-05:00
custom:
Author: mikealfare
Issue: "682"
12 changes: 6 additions & 6 deletions dbt/adapters/bigquery/clients.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from google.api_core.client_info import ClientInfo
from google.api_core.client_options import ClientOptions
from google.api_core.retry import Retry
from google.auth.exceptions import DefaultCredentialsError
from google.cloud.bigquery import Client as BigQueryClient
from google.cloud.bigquery import Client as BigQueryClient, DEFAULT_RETRY as BQ_DEFAULT_RETRY
from google.cloud.dataproc_v1 import BatchControllerClient, JobControllerClient
from google.cloud.storage import Client as StorageClient
from google.cloud.storage.retry import DEFAULT_RETRY as GCS_DEFAULT_RETRY

from dbt.adapters.events.logging import AdapterLogger

Expand All @@ -28,23 +28,23 @@ def create_bigquery_client(credentials: BigQueryCredentials) -> BigQueryClient:
return _create_bigquery_client(credentials)


@Retry() # google decorator. retries on transient errors with exponential backoff
@GCS_DEFAULT_RETRY
def create_gcs_client(credentials: BigQueryCredentials) -> StorageClient:
return StorageClient(
project=credentials.execution_project,
credentials=create_google_credentials(credentials),
)


@Retry() # google decorator. retries on transient errors with exponential backoff
# dataproc does not appear to have a default retry like BQ and GCS
def create_dataproc_job_controller_client(credentials: BigQueryCredentials) -> JobControllerClient:
return JobControllerClient(
credentials=create_google_credentials(credentials),
client_options=ClientOptions(api_endpoint=_dataproc_endpoint(credentials)),
)


@Retry() # google decorator. retries on transient errors with exponential backoff
# dataproc does not appear to have a default retry like BQ and GCS
def create_dataproc_batch_controller_client(
credentials: BigQueryCredentials,
) -> BatchControllerClient:
Expand All @@ -54,7 +54,7 @@ def create_dataproc_batch_controller_client(
)


@Retry() # google decorator. retries on transient errors with exponential backoff
@BQ_DEFAULT_RETRY
def _create_bigquery_client(credentials: BigQueryCredentials) -> BigQueryClient:
return BigQueryClient(
credentials.execution_project,
Expand Down
33 changes: 17 additions & 16 deletions dbt/adapters/bigquery/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from google.api_core.future.polling import DEFAULT_POLLING
from google.api_core.retry import Retry
from google.cloud.bigquery.retry import DEFAULT_RETRY, _job_should_retry
from google.cloud.bigquery.retry import DEFAULT_JOB_RETRY, _job_should_retry
from requests.exceptions import ConnectionError

from dbt.adapters.contracts.connection import Connection, ConnectionState
Expand All @@ -15,14 +15,8 @@

_logger = AdapterLogger("BigQuery")


_SECOND = 1.0
_MINUTE = 60 * _SECOND
_HOUR = 60 * _MINUTE
_DAY = 24 * _HOUR
_DEFAULT_INITIAL_DELAY = _SECOND
_DEFAULT_MAXIMUM_DELAY = 3 * _SECOND
_DEFAULT_POLLING_MAXIMUM_DELAY = 10 * _SECOND
_MINUTE = 60.0
_DAY = 24 * 60 * 60.0


class RetryFactory:
Expand All @@ -44,7 +38,7 @@ def create_job_execution_timeout(self, fallback: float = _DAY) -> float:
) # keep _DAY here so it's not overridden by passing fallback=None

def create_retry(self, fallback: Optional[float] = None) -> Retry:
return DEFAULT_RETRY.with_timeout(self._job_execution_timeout or fallback or _DAY)
return DEFAULT_JOB_RETRY.with_timeout(self._job_execution_timeout or fallback or _DAY)

def create_polling(self, model_timeout: Optional[float] = None) -> Retry:
return DEFAULT_POLLING.with_timeout(model_timeout or self._job_execution_timeout or _DAY)
Expand All @@ -53,14 +47,21 @@ def create_reopen_with_deadline(self, connection: Connection) -> Retry:
"""
This strategy mimics what was accomplished with _retry_and_handle
"""
return Retry(
predicate=_DeferredException(self._retries),
initial=_DEFAULT_INITIAL_DELAY,
maximum=_DEFAULT_MAXIMUM_DELAY,
deadline=self._job_deadline,
on_error=_create_reopen_on_error(connection),

retry = DEFAULT_JOB_RETRY.with_delay(maximum=3.0).with_predicate(
_DeferredException(self._retries)
)

# there is no `with_on_error` method, but we want to retain the defaults on `DEFAULT_JOB_RETRY
retry._on_error = _create_reopen_on_error(connection)

# don't override the default deadline to None if the user did not provide one,
# the process will never end
if deadline := self._job_deadline:
return retry.with_deadline(deadline)

return retry


class _DeferredException:
"""
Expand Down
Loading