Skip to content

prefect.client.base

PrefectResponse

Bases: httpx.Response

A Prefect wrapper for the httpx.Response class.

Provides more informative error messages.

Source code in prefect/client/base.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class PrefectResponse(httpx.Response):
    """
    A Prefect wrapper for the `httpx.Response` class.

    Provides more informative error messages.
    """

    def raise_for_status(self) -> None:
        """
        Raise an exception if the response contains an HTTPStatusError.

        The `PrefectHTTPStatusError` contains useful additional information that
        is not contained in the `HTTPStatusError`.
        """
        try:
            return super().raise_for_status()
        except HTTPStatusError as exc:
            raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__

    @classmethod
    def from_httpx_response(cls: Type[Self], response: httpx.Response) -> Self:
        """
        Create a `PrefectReponse` from an `httpx.Response`.

        By changing the `__class__` attribute of the Response, we change the method
        resolution order to look for methods defined in PrefectResponse, while leaving
        everything else about the original Response instance intact.
        """
        new_response = copy.copy(response)
        new_response.__class__ = cls
        return new_response

raise_for_status

Raise an exception if the response contains an HTTPStatusError.

The PrefectHTTPStatusError contains useful additional information that is not contained in the HTTPStatusError.

Source code in prefect/client/base.py
120
121
122
123
124
125
126
127
128
129
130
def raise_for_status(self) -> None:
    """
    Raise an exception if the response contains an HTTPStatusError.

    The `PrefectHTTPStatusError` contains useful additional information that
    is not contained in the `HTTPStatusError`.
    """
    try:
        return super().raise_for_status()
    except HTTPStatusError as exc:
        raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__

from_httpx_response classmethod

Create a PrefectReponse from an httpx.Response.

By changing the __class__ attribute of the Response, we change the method resolution order to look for methods defined in PrefectResponse, while leaving everything else about the original Response instance intact.

Source code in prefect/client/base.py
132
133
134
135
136
137
138
139
140
141
142
143
@classmethod
def from_httpx_response(cls: Type[Self], response: httpx.Response) -> Self:
    """
    Create a `PrefectReponse` from an `httpx.Response`.

    By changing the `__class__` attribute of the Response, we change the method
    resolution order to look for methods defined in PrefectResponse, while leaving
    everything else about the original Response instance intact.
    """
    new_response = copy.copy(response)
    new_response.__class__ = cls
    return new_response

PrefectHttpxClient

Bases: httpx.AsyncClient

A Prefect wrapper for the async httpx client with support for retry-after headers for:

  • 429 CloudFlare-style rate limiting
  • 503 Service unavailable

Additionally, this client will always call raise_for_status on responses.

For more details on rate limit headers, see: Configuring Cloudflare Rate Limiting

Source code in prefect/client/base.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
class PrefectHttpxClient(httpx.AsyncClient):
    """
    A Prefect wrapper for the async httpx client with support for retry-after headers
    for:

    - 429 CloudFlare-style rate limiting
    - 503 Service unavailable

    Additionally, this client will always call `raise_for_status` on responses.

    For more details on rate limit headers, see:
    [Configuring Cloudflare Rate Limiting](https://support.cloudflare.com/hc/en-us/articles/115001635128-Configuring-Rate-Limiting-from-UI)
    """

    RETRY_MAX = 5

    async def _send_with_retry(
        self,
        request: Callable,
        retry_codes: Set[int] = set(),
        retry_exceptions: Tuple[Exception, ...] = tuple(),
    ):
        """
        Send a request and retry it if it fails.

        Sends the provided request and retries it up to self.RETRY_MAX times if
        the request either raises an exception listed in `retry_exceptions` or receives
        a response with a status code listed in `retry_codes`.

        Retries will be delayed based on either the retry header (preferred) or
        exponential backoff if a retry header is not provided.
        """
        try_count = 0
        response = None

        while try_count <= self.RETRY_MAX:
            try_count += 1
            retry_seconds = None
            exc_info = None

            try:
                response = await request()
            except retry_exceptions:
                if try_count > self.RETRY_MAX:
                    raise
                # Otherwise, we will ignore this error but capture the info for logging
                exc_info = sys.exc_info()
            else:
                # We got a response; return immediately if it is not retryable
                if response.status_code not in retry_codes:
                    return response

                if "Retry-After" in response.headers:
                    retry_seconds = float(response.headers["Retry-After"])

            # Use an exponential back-off if not set in a header
            if retry_seconds is None:
                retry_seconds = 2**try_count

            logger.debug(
                (
                    "Encountered retryable exception during request. "
                    if exc_info
                    else "Received response with retryable status code. "
                )
                + f"Another attempt will be made in {retry_seconds}s. "
                f"This is attempt {try_count}/{self.RETRY_MAX + 1}.",
                exc_info=exc_info,
            )
            await anyio.sleep(retry_seconds)

        assert (
            response is not None
        ), "Retry handling ended without response or exception"

        # We ran out of retries, return the failed response
        return response

    async def send(self, *args, **kwargs) -> Response:
        api_request = partial(super().send, *args, **kwargs)

        response = await self._send_with_retry(
            request=api_request,
            retry_codes={
                status.HTTP_429_TOO_MANY_REQUESTS,
                status.HTTP_503_SERVICE_UNAVAILABLE,
            },
            retry_exceptions=(
                httpx.ReadTimeout,
                httpx.PoolTimeout,
                # `ConnectionResetError` when reading socket raises as a `ReadError`
                httpx.ReadError,
                # Sockets can be closed during writes resulting in a `WriteError`
                httpx.WriteError,
                # Uvicorn bug, see https://github.com/PrefectHQ/prefect/issues/7512
                httpx.RemoteProtocolError,
                # HTTP2 bug, see https://github.com/PrefectHQ/prefect/issues/7442
                httpx.LocalProtocolError,
            ),
        )

        # Convert to a Prefect response to add nicer errors messages
        response = PrefectResponse.from_httpx_response(response)

        # Always raise bad responses
        # NOTE: We may want to remove this and handle responses per route in the
        #       `PrefectClient`
        response.raise_for_status()

        return response

app_lifespan_context async

A context manager that calls startup/shutdown hooks for the given application.

Lifespan contexts are cached per application to avoid calling the lifespan hooks more than once if the context is entered in nested code. A no-op context will be returned if the context for the given application is already being managed.

This manager is robust to concurrent access within the event loop. For example, if you have concurrent contexts for the same application, it is guaranteed that startup hooks will be called before their context starts and shutdown hooks will only be called after their context exits.

A reference count is used to support nested use of clients without running lifespan hooks excessively. The first client context entered will create and enter a lifespan context. Each subsequent client will increment a reference count but will not create a new lifespan context. When each client context exits, the reference count is decremented. When the last client context exits, the lifespan will be closed.

In simple nested cases, the first client context will be the one to exit the lifespan. However, if client contexts are entered concurrently they may not exit in a consistent order. If the first client context was responsible for closing the lifespan, it would have to wait until all other client contexts to exit to avoid firing shutdown hooks while the application is in use. Waiting for the other clients to exit can introduce deadlocks, so, instead, the first client will exit without closing the lifespan context and reference counts will be used to ensure the lifespan is closed once all of the clients are done.

Source code in prefect/client/base.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
@asynccontextmanager
async def app_lifespan_context(app: FastAPI) -> ContextManager[None]:
    """
    A context manager that calls startup/shutdown hooks for the given application.

    Lifespan contexts are cached per application to avoid calling the lifespan hooks
    more than once if the context is entered in nested code. A no-op context will be
    returned if the context for the given application is already being managed.

    This manager is robust to concurrent access within the event loop. For example,
    if you have concurrent contexts for the same application, it is guaranteed that
    startup hooks will be called before their context starts and shutdown hooks will
    only be called after their context exits.

    A reference count is used to support nested use of clients without running
    lifespan hooks excessively. The first client context entered will create and enter
    a lifespan context. Each subsequent client will increment a reference count but will
    not create a new lifespan context. When each client context exits, the reference
    count is decremented. When the last client context exits, the lifespan will be
    closed.

    In simple nested cases, the first client context will be the one to exit the
    lifespan. However, if client contexts are entered concurrently they may not exit
    in a consistent order. If the first client context was responsible for closing
    the lifespan, it would have to wait until all other client contexts to exit to
    avoid firing shutdown hooks while the application is in use. Waiting for the other
    clients to exit can introduce deadlocks, so, instead, the first client will exit
    without closing the lifespan context and reference counts will be used to ensure
    the lifespan is closed once all of the clients are done.
    """
    # TODO: A deadlock has been observed during multithreaded use of clients while this
    #       lifespan context is being used. This has only been reproduced on Python 3.7
    #       and while we hope to discourage using multiple event loops in threads, this
    #       bug may emerge again.
    #       See https://github.com/PrefectHQ/orion/pull/1696
    thread_id = threading.get_ident()

    # The id of the application is used instead of the hash so each application instance
    # is managed independently even if they share the same settings. We include the
    # thread id since applications are managed separately per thread.
    key = (thread_id, id(app))

    # On exception, this will be populated with exception details
    exc_info = (None, None, None)

    # Get a lock unique to this thread since anyio locks are not threadsafe
    lock = APP_LIFESPANS_LOCKS[thread_id]

    async with lock:
        if key in APP_LIFESPANS:
            # The lifespan is already being managed, just increment the reference count
            APP_LIFESPANS_REF_COUNTS[key] += 1
        else:
            # Create a new lifespan manager
            APP_LIFESPANS[key] = context = LifespanManager(
                app, startup_timeout=30, shutdown_timeout=30
            )
            APP_LIFESPANS_REF_COUNTS[key] = 1

            # Ensure we enter the context before releasing the lock so startup hooks
            # are complete before another client can be used
            await context.__aenter__()

    try:
        yield
    except BaseException:
        exc_info = sys.exc_info()
        raise
    finally:
        # If we do not shield against anyio cancellation, the lock will return
        # immediately and the code in its context will not run, leaving the lifespan
        # open
        with anyio.CancelScope(shield=True):
            async with lock:
                # After the consumer exits the context, decrement the reference count
                APP_LIFESPANS_REF_COUNTS[key] -= 1

                # If this the last context to exit, close the lifespan
                if APP_LIFESPANS_REF_COUNTS[key] <= 0:
                    APP_LIFESPANS_REF_COUNTS.pop(key)
                    context = APP_LIFESPANS.pop(key)
                    await context.__aexit__(*exc_info)