Rate Limiter Project
The Rate Limiter Project is a Python module designed to manage rate limits for API calls. It helps users stay below their assigned rate limits, preventing 429 errors.
Overview
The project consists of a single module, rate_limiter.py
, which provides a RateLimitManager
class. This class manages rate limits across multiple clients using shared state.
Features
- Manages rate limits for API calls
- Prevents 429 errors by staying below assigned rate limits
- Supports multiple rate limits (requests, requests-day, requests-hour)
- Calculates required delay based on active rate limits
- Logs current rate limit state
Usage
To use the Rate Limiter Project, simply import the RateLimitManager
class and create an instance:
from rate_limiter import RateLimitManager
rate_limiter = RateLimitManager()
You can then use the safe_api_call method to make API calls:
response = rate_limiter.safe_api_call(
model="DeepSeek-R1",
messages=[
{"role": "system", "content": "You are a helpful assistant"},
{"role": "user", "content": "Hello"},
],
temperature=0.1,
top_p=0.1,
)
Env Configuration
The RateLimitManager class uses the following env variables:
SAMBANOVA_API_KEY: The API key for the Sambanova API
DEBUG_MODE: 1|0 enable or disable debug logging
Dependencies
The Rate Limiter Project depends on the following libraries:
httpx
openai
threading
time
datetime
import os
import time
import logging
import threading
import httpx
from datetime import datetime
from typing import Dict, Union
from collections import defaultdict
from openai import OpenAI
class HeaderCaptureTransport(httpx.HTTPTransport):
def __init__(self):
super().__init__()
self.last_headers = None
def handle_request(self, request):
response = super().handle_request(request)
self.last_headers = response.headers
return response
class RateLimitManager:
"""Manages rate limits across multiple clients using shared state"""
_shared_state = defaultdict(dict)
_lock = threading.Lock()
def __init__(self):
self.logger = logging.getLogger("RateLimitManager")
self.transport = HeaderCaptureTransport()
self.client = OpenAI(
api_key=os.environ["SAMBANOVA_API_KEY"],
base_url="https://api.sambanova.ai/v1",
http_client=httpx.Client(transport=self.transport),
)
self.limits = {
"requests": {"limit": 0, "remaining": 0, "reset": 0},
"requests-day": {"limit": 0, "remaining": 0, "reset": 0},
"requests-hour": {"limit": 0, "remaining": 0, "reset": 0},
}
self.debug_mode = os.environ.get("DEBUG_MODE", "0") == "1"
def safe_api_call(self, *args, **kwargs):
"""Make API call with coordinated rate limiting"""
with self._lock:
required_delay = self._calculate_required_delay()
if required_delay > 0 and self.debug_mode:
print(f"ā³ Approaching rate limit - sleeping {required_delay:.2f}s")
time.sleep(required_delay)
retry_count = 0
max_retries = 5
while retry_count < max_retries:
try:
response = self.client.chat.completions.create(*args, **kwargs)
headers = self.transport.last_headers
if headers is None and self.debug_mode:
print("ā ļø No headers received in response")
if self.debug_mode:
print("RAW RESPONSE HEADERS:")
for key, value in headers.items():
print(f"{key}: {value}")
if self.debug_mode:
print("API RESPONSE:")
print(response)
self._update_rate_limits(headers)
self._log_current_state(headers)
if self._is_rate_limited(headers):
reset_timestamp = self._get_reset_timestamp()
print(
f"š“ Rate limit exceeded, retrying at {datetime.utcfromtimestamp(reset_timestamp).strftime('%Y-%m-%d %H:%M:%S')}"
)
retry_count += 1
time.sleep(self._calculate_required_delay())
else:
if self.debug_mode:
print("ā
Valid API response received")
return response
except Exception as e:
if self.debug_mode:
print(f"šØ API call failed: {str(e)}")
raise # Re-raise the exception after handling rate limits
raise RuntimeError("Max retries exceeded")
def _update_rate_limits(self, headers: dict):
"""Update shared state with latest header values"""
self.limits["requests"]["limit"] = int(
headers.get("x-ratelimit-limit-requests", 0)
)
self.limits["requests"]["remaining"] = int(
headers.get("x-ratelimit-remaining-requests", 0)
)
self.limits["requests"]["reset"] = int(
headers.get("x-ratelimit-reset-requests", 0)
)
if "x-ratelimit-limit-requests-day" in headers:
self.limits["requests-day"]["limit"] = int(
headers.get("x-ratelimit-limit-requests-day", 0)
)
self.limits["requests-day"]["remaining"] = int(
headers.get("x-ratelimit-remaining-requests-day", 0)
)
self.limits["requests-day"]["reset"] = int(
headers.get("x-ratelimit-reset-requests-day", 0)
)
else:
self.limits["requests-day"]["limit"] = 0
self.limits["requests-day"]["remaining"] = 0
self.limits["requests-day"]["reset"] = 0
if "x-ratelimit-limit-requests-hour" in headers:
self.limits["requests-hour"]["limit"] = int(
headers.get("x-ratelimit-limit-requests-hour", 0)
)
self.limits["requests-hour"]["remaining"] = int(
headers.get("x-ratelimit-remaining-requests-hour", 0)
)
self.limits["requests-hour"]["reset"] = int(
headers.get("x-ratelimit-reset-requests-hour", 0)
)
else:
self.limits["requests-hour"]["limit"] = 0
self.limits["requests-hour"]["remaining"] = 0
self.limits["requests-hour"]["reset"] = 0
def _log_current_state(self, headers):
"""Display formatted rate limit state"""
if self.debug_mode:
print("\nš Rate Limit State:")
print(
f" Requests: {max(0, self.limits['requests']['remaining'])}/{self.limits['requests']['limit']}"
)
if self.limits["requests-day"]["limit"] > 0:
print(
f" Requests (day): {max(0, self.limits['requests-day']['remaining'])}/{self.limits['requests-day']['limit']}"
)
if self.limits["requests-hour"]["limit"] > 0:
print(
f" Requests (hour): {max(0, self.limits['requests-hour']['remaining'])}/{self.limits['requests-hour']['limit']}"
)
requests_reset = self.limits["requests"]["reset"]
if requests_reset > 0:
print(
f" Reset (requests): {datetime.utcfromtimestamp(requests_reset).strftime('%Y-%m-%d %H:%M:%S')}"
)
if (
self.limits["requests-day"]["limit"] > 0
and self.limits["requests-day"]["reset"] > 0
):
print(
f" Reset (requests-day): {datetime.utcfromtimestamp(self.limits['requests-day']['reset']).strftime('%Y-%m-%d %H:%M:%S')}"
)
if (
self.limits["requests-hour"]["limit"] > 0
and self.limits["requests-hour"]["reset"] > 0
):
print(
f" Reset (requests-hour): {datetime.utcfromtimestamp(self.limits['requests-hour']['reset']).strftime('%Y-%m-%d %H:%M:%S')}"
)
def _calculate_required_delay(self) -> float:
"""Determine delay needed based on all active rate limits"""
now = time.time()
max_delay = 0.0
for limit in self.limits.values():
if limit["remaining"] <= 1 and limit["reset"] > now:
current_delay = limit["reset"] - now
max_delay = max(max_delay, current_delay)
return max(0.0, max_delay)
def _is_rate_limited(self, headers):
"""Check if any rate limit was hit"""
return any(
limit["remaining"] <= 1 and limit["limit"] > 0
for limit in self.limits.values()
)
def _get_reset_timestamp(self):
"""Get the reset timestamp for the most restrictive rate limit"""
if (
self.limits["requests-hour"]["limit"] > 0
and self.limits["requests-hour"]["remaining"] <= 1
):
return self.limits["requests-hour"]["reset"]
elif (
self.limits["requests-day"]["limit"] > 0
and self.limits["requests-day"]["remaining"] <= 1
):
return self.limits["requests-day"]["reset"]
else:
return self.limits["requests"]["reset"]
Please let me know if you run into any problems!
Thanks
Seth Kneeland