Performance Tuning Guide¶
This guide covers optimization strategies and best practices for achieving optimal performance with the Chess.com API client.
Connection Optimization¶
Connection Pooling¶
Configure connection pooling for optimal performance:
import aiohttp
from chess_com_api import ChessComClient
async def create_optimized_client():
connector = aiohttp.TCPConnector(
limit=100, # Maximum concurrent connections
ttl_dns_cache=300, # DNS cache TTL in seconds
use_dns_cache=True, # Enable DNS caching
force_close=False, # Keep connections alive
enable_cleanup_closed=True
)
timeout = aiohttp.ClientTimeout(
total=30, # Total request timeout
connect=10, # Connection timeout
sock_read=10, # Socket read timeout
sock_connect=10 # Socket connect timeout
)
session = aiohttp.ClientSession(
connector=connector,
timeout=timeout
)
return ChessComClient(session=session)
DNS Optimization¶
import socket
# Configure custom DNS resolver
resolver = aiohttp.AsyncResolver(
nameservers=["8.8.8.8", "8.8.4.4"]
)
connector = aiohttp.TCPConnector(
resolver=resolver,
family=socket.AF_INET # IPv4 only for consistent performance
)
Concurrent Operations¶
Batch Processing¶
Process multiple requests efficiently:
import asyncio
from typing import List, Optional
async def batch_process(
usernames: List[str],
batch_size: int = 50
) -> List[Optional[dict]]:
"""Process usernames in batches."""
results = []
async with ChessComClient() as client:
for i in range(0, len(usernames), batch_size):
batch = usernames[i:i + batch_size]
tasks = [
client.get_player(username)
for username in batch
]
# Process batch
batch_results = await asyncio.gather(
*tasks,
return_exceptions=True
)
# Handle results
for result in batch_results:
if isinstance(result, Exception):
results.append(None)
else:
results.append(result)
# Add small delay between batches
await asyncio.sleep(0.1)
return results
Concurrent Request Management¶
from dataclasses import dataclass
from typing import TypeVar, Generic
T = TypeVar('T')
@dataclass
class RequestManager(Generic[T]):
"""Manage concurrent request execution."""
max_concurrent: int = 50
timeout: float = 30.0
async def execute(
self,
client: ChessComClient,
operations: List[T]
) -> List[Optional[T]]:
semaphore = asyncio.Semaphore(self.max_concurrent)
async def controlled_request(operation: T) -> Optional[T]:
async with semaphore:
try:
return await asyncio.wait_for(
operation,
timeout=self.timeout
)
except asyncio.TimeoutError:
return None
except Exception as e:
print(f"Error processing request: {e}")
return None
return await asyncio.gather(
*[controlled_request(op) for op in operations]
)
# Usage
async def get_multiple_players(usernames: List[str]):
async with ChessComClient() as client:
manager = RequestManager[dict](max_concurrent=50)
operations = [
client.get_player(username)
for username in usernames
]
results = await manager.execute(client, operations)
return results
Memory Optimization¶
Generator-based Processing¶
Use generators for large datasets:
async def process_large_dataset(usernames: List[str]):
async def player_generator():
async with ChessComClient() as client:
for username in usernames:
try:
player = await client.get_player(username)
yield player
except Exception as e:
print(f"Error processing {username}: {e}")
continue
async for player in player_generator():
# Process each player with minimal memory usage
process_player(player)
Memory-Efficient Bulk Operations¶
from itertools import islice
async def memory_efficient_processing(
usernames: List[str],
chunk_size: int = 1000
):
def chunks(data: List[str], size: int):
iterator = iter(data)
return iter(lambda: list(islice(iterator, size)), [])
results = []
async with ChessComClient() as client:
for chunk in chunks(usernames, chunk_size):
chunk_results = await batch_process(chunk)
# Process results immediately to free memory
for result in chunk_results:
if result:
process_and_store(result)
# Clear chunk results
chunk_results = None
# Allow garbage collection
await asyncio.sleep(0)
Cache Optimization¶
Simple Cache Implementation¶
import time
from typing import Dict, Any, Optional
from dataclasses import dataclass
@dataclass
class CacheEntry:
data: Any
timestamp: float
ttl: float
class APICache:
def __init__(self, default_ttl: float = 300):
self.cache: Dict[str, CacheEntry] = {}
self.default_ttl = default_ttl
def get(self, key: str) -> Optional[Any]:
if key in self.cache:
entry = self.cache[key]
if time.time() - entry.timestamp < entry.ttl:
return entry.data
del self.cache[key]
return None
def set(
self,
key: str,
value: Any,
ttl: Optional[float] = None
) -> None:
self.cache[key] = CacheEntry(
data=value,
timestamp=time.time(),
ttl=ttl or self.default_ttl
)
def cleanup(self) -> None:
now = time.time()
expired = [
key for key, entry in self.cache.items()
if now - entry.timestamp >= entry.ttl
]
for key in expired:
del self.cache[key]
# Usage with client
class CachedChessComClient(ChessComClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cache = APICache()
async def get_player(self, username: str):
cache_key = f"player:{username}"
cached = self.cache.get(cache_key)
if cached:
return cached
result = await super().get_player(username)
self.cache.set(cache_key, result)
return result
Performance Monitoring¶
Request Timing¶
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import Dict, List
@dataclass
class PerformanceMetrics:
total_requests: int = 0
total_errors: int = 0
response_times: List[float] = field(default_factory=list)
endpoint_times: Dict[str, List[float]] = field(default_factory=dict)
@property
def average_response_time(self) -> float:
if not self.response_times:
return 0.0
return sum(self.response_times) / len(self.response_times)
def add_timing(self, endpoint: str, duration: float):
self.total_requests += 1
self.response_times.append(duration)
if endpoint not in self.endpoint_times:
self.endpoint_times[endpoint] = []
self.endpoint_times[endpoint].append(duration)
def print_stats(self):
print(f"Total Requests: {self.total_requests}")
print(f"Total Errors: {self.total_errors}")
print(f"Average Response Time: {self.average_response_time:.3f}s")
print("\nEndpoint Statistics:")
for endpoint, times in self.endpoint_times.items():
avg_time = sum(times) / len(times)
print(f"{endpoint}: {avg_time:.3f}s average")
class MonitoredChessComClient(ChessComClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metrics = PerformanceMetrics()
@asynccontextmanager
async def _timed_request(self, endpoint: str):
start_time = time.time()
try:
yield
finally:
duration = time.time() - start_time
self.metrics.add_timing(endpoint, duration)
async def get_player(self, username: str):
async with self._timed_request(f"get_player:{username}"):
return await super().get_player(username)
Best Practices¶
1. Connection Management¶
- Reuse client instances
- Configure appropriate timeouts
- Use connection pooling
- Enable keep-alive when appropriate
2. Concurrency¶
- Use appropriate batch sizes
- Implement rate limiting
- Handle errors gracefully
- Monitor memory usage
3. Memory Management¶
- Process large datasets in chunks
- Use generators for streaming
- Clear unused references
- Monitor memory usage
4. Caching¶
- Cache frequently accessed data
- Implement TTL for cache entries
- Regular cache cleanup
- Monitor cache hit rates
5. Error Handling¶
- Implement retries with backoff
- Handle timeouts appropriately
- Log errors for monitoring
- Monitor error rates
Performance Checklist¶
-
Client Configuration
- Configure connection pooling
- Set appropriate timeouts
- Enable DNS caching
- Configure retry strategy
-
Request Optimization
- Batch similar requests
- Implement concurrent processing
- Use connection pooling
- Enable keep-alive when appropriate
-
Memory Management
- Process large datasets in chunks
- Clear unused references
- Monitor memory usage
- Use generators for large datasets
-
Error Handling
- Implement retry strategy
- Handle timeouts
- Log errors appropriately
- Monitor error rates
-
Monitoring
- Track request times
- Monitor error rates
- Track memory usage
- Monitor cache performance