429s got you feeling down? Try the Rate Limit Manager wrapper

Rate Limiter Project

The Rate Limiter Project is a Python module designed to manage rate limits for API calls. It helps users stay below their assigned rate limits, preventing 429 errors.

Overview

The project consists of a single module, rate_limiter.py, which provides a RateLimitManager class. This class manages rate limits across multiple clients using shared state.

Features

  • Manages rate limits for API calls
  • Prevents 429 errors by staying below assigned rate limits
  • Supports multiple rate limits (requests, requests-day, requests-hour)
  • Calculates required delay based on active rate limits
  • Logs current rate limit state

Usage

To use the Rate Limiter Project, simply import the RateLimitManager class and create an instance:

from rate_limiter import RateLimitManager

rate_limiter = RateLimitManager()

You can then use the safe_api_call method to make API calls:

response = rate_limiter.safe_api_call(
    model="DeepSeek-R1",
    messages=[
        {"role": "system", "content": "You are a helpful assistant"},
        {"role": "user", "content": "Hello"},
    ],
    temperature=0.1,
    top_p=0.1,
)

Env Configuration

The RateLimitManager class uses the following env variables:

SAMBANOVA_API_KEY: The API key for the Sambanova API
DEBUG_MODE: 1|0 enable or disable debug logging

Dependencies

The Rate Limiter Project depends on the following libraries:

httpx
openai
threading
time
datetime

import os
import time
import logging
import threading
import httpx
from datetime import datetime
from typing import Dict, Union
from collections import defaultdict
from openai import OpenAI


class HeaderCaptureTransport(httpx.HTTPTransport):
    def __init__(self):
        super().__init__()
        self.last_headers = None

    def handle_request(self, request):
        response = super().handle_request(request)
        self.last_headers = response.headers
        return response


class RateLimitManager:
    """Manages rate limits across multiple clients using shared state"""

    _shared_state = defaultdict(dict)
    _lock = threading.Lock()

    def __init__(self):
        self.logger = logging.getLogger("RateLimitManager")
        self.transport = HeaderCaptureTransport()
        self.client = OpenAI(
            api_key=os.environ["SAMBANOVA_API_KEY"],
            base_url="https://api.sambanova.ai/v1",
            http_client=httpx.Client(transport=self.transport),
        )
        self.limits = {
            "requests": {"limit": 0, "remaining": 0, "reset": 0},
            "requests-day": {"limit": 0, "remaining": 0, "reset": 0},
            "requests-hour": {"limit": 0, "remaining": 0, "reset": 0},
        }
        self.debug_mode = os.environ.get("DEBUG_MODE", "0") == "1"

    def safe_api_call(self, *args, **kwargs):
        """Make API call with coordinated rate limiting"""
        with self._lock:
            required_delay = self._calculate_required_delay()
            if required_delay > 0 and self.debug_mode:
                print(f"ā³ Approaching rate limit - sleeping {required_delay:.2f}s")
                time.sleep(required_delay)

            retry_count = 0
            max_retries = 5
            while retry_count < max_retries:
                try:
                    response = self.client.chat.completions.create(*args, **kwargs)
                    headers = self.transport.last_headers

                    if headers is None and self.debug_mode:
                        print("āš ļø No headers received in response")
                    if self.debug_mode:
                        print("RAW RESPONSE HEADERS:")
                        for key, value in headers.items():
                            print(f"{key}: {value}")

                    if self.debug_mode:
                        print("API RESPONSE:")
                        print(response)

                    self._update_rate_limits(headers)
                    self._log_current_state(headers)
                    if self._is_rate_limited(headers):
                        reset_timestamp = self._get_reset_timestamp()
                        print(
                            f"šŸ”“ Rate limit exceeded, retrying at {datetime.utcfromtimestamp(reset_timestamp).strftime('%Y-%m-%d %H:%M:%S')}"
                        )
                        retry_count += 1
                        time.sleep(self._calculate_required_delay())
                    else:
                        if self.debug_mode:
                            print("āœ… Valid API response received")
                        return response
                except Exception as e:
                    if self.debug_mode:
                        print(f"🚨 API call failed: {str(e)}")
                    raise  # Re-raise the exception after handling rate limits
            raise RuntimeError("Max retries exceeded")

    def _update_rate_limits(self, headers: dict):
        """Update shared state with latest header values"""
        self.limits["requests"]["limit"] = int(
            headers.get("x-ratelimit-limit-requests", 0)
        )
        self.limits["requests"]["remaining"] = int(
            headers.get("x-ratelimit-remaining-requests", 0)
        )
        self.limits["requests"]["reset"] = int(
            headers.get("x-ratelimit-reset-requests", 0)
        )

        if "x-ratelimit-limit-requests-day" in headers:
            self.limits["requests-day"]["limit"] = int(
                headers.get("x-ratelimit-limit-requests-day", 0)
            )
            self.limits["requests-day"]["remaining"] = int(
                headers.get("x-ratelimit-remaining-requests-day", 0)
            )
            self.limits["requests-day"]["reset"] = int(
                headers.get("x-ratelimit-reset-requests-day", 0)
            )
        else:
            self.limits["requests-day"]["limit"] = 0
            self.limits["requests-day"]["remaining"] = 0
            self.limits["requests-day"]["reset"] = 0

        if "x-ratelimit-limit-requests-hour" in headers:
            self.limits["requests-hour"]["limit"] = int(
                headers.get("x-ratelimit-limit-requests-hour", 0)
            )
            self.limits["requests-hour"]["remaining"] = int(
                headers.get("x-ratelimit-remaining-requests-hour", 0)
            )
            self.limits["requests-hour"]["reset"] = int(
                headers.get("x-ratelimit-reset-requests-hour", 0)
            )
        else:
            self.limits["requests-hour"]["limit"] = 0
            self.limits["requests-hour"]["remaining"] = 0
            self.limits["requests-hour"]["reset"] = 0

    def _log_current_state(self, headers):
        """Display formatted rate limit state"""
        if self.debug_mode:
            print("\nšŸ“Š Rate Limit State:")
            print(
                f"   Requests: {max(0, self.limits['requests']['remaining'])}/{self.limits['requests']['limit']}"
            )
            if self.limits["requests-day"]["limit"] > 0:
                print(
                    f"   Requests (day): {max(0, self.limits['requests-day']['remaining'])}/{self.limits['requests-day']['limit']}"
                )
            if self.limits["requests-hour"]["limit"] > 0:
                print(
                    f"   Requests (hour): {max(0, self.limits['requests-hour']['remaining'])}/{self.limits['requests-hour']['limit']}"
                )
            requests_reset = self.limits["requests"]["reset"]
            if requests_reset > 0:
                print(
                    f"   Reset (requests): {datetime.utcfromtimestamp(requests_reset).strftime('%Y-%m-%d %H:%M:%S')}"
                )

            if (
                self.limits["requests-day"]["limit"] > 0
                and self.limits["requests-day"]["reset"] > 0
            ):
                print(
                    f"   Reset (requests-day): {datetime.utcfromtimestamp(self.limits['requests-day']['reset']).strftime('%Y-%m-%d %H:%M:%S')}"
                )

            if (
                self.limits["requests-hour"]["limit"] > 0
                and self.limits["requests-hour"]["reset"] > 0
            ):
                print(
                    f"   Reset (requests-hour): {datetime.utcfromtimestamp(self.limits['requests-hour']['reset']).strftime('%Y-%m-%d %H:%M:%S')}"
                )

    def _calculate_required_delay(self) -> float:
        """Determine delay needed based on all active rate limits"""
        now = time.time()
        max_delay = 0.0

        for limit in self.limits.values():
            if limit["remaining"] <= 1 and limit["reset"] > now:
                current_delay = limit["reset"] - now
                max_delay = max(max_delay, current_delay)

        return max(0.0, max_delay)

    def _is_rate_limited(self, headers):
        """Check if any rate limit was hit"""
        return any(
            limit["remaining"] <= 1 and limit["limit"] > 0
            for limit in self.limits.values()
        )

    def _get_reset_timestamp(self):
        """Get the reset timestamp for the most restrictive rate limit"""
        if (
            self.limits["requests-hour"]["limit"] > 0
            and self.limits["requests-hour"]["remaining"] <= 1
        ):
            return self.limits["requests-hour"]["reset"]
        elif (
            self.limits["requests-day"]["limit"] > 0
            and self.limits["requests-day"]["remaining"] <= 1
        ):
            return self.limits["requests-day"]["reset"]
        else:
            return self.limits["requests"]["reset"]

Please let me know if you run into any problems!

Thanks
Seth Kneeland

5 Likes