Microservices Monitoring
Monitor distributed microservices architectures
Service Health Monitoring
Monitor individual microservices using auto-provisioning:
# health_monitor.py
import requests
import os
import socket
SERVICE_NAME = os.getenv('SERVICE_NAME', socket.gethostname())
PROJECT_KEY = os.getenv('TELEMETRY_PROJECT_KEY')
MONITOR_URL = f"https://telemetry.host/ping/{PROJECT_KEY}/auto/{SERVICE_NAME}?create=1"
def report_health(status="success", message="", metadata=None):
"""Report service health to monitoring."""
try:
data = {
"status": status,
"message": message,
"metadata": metadata or {}
}
requests.post(MONITOR_URL, json=data, timeout=5)
except Exception as e:
print(f"Failed to report health: {e}")
# Call periodically
if __name__ == "__main__":
import time
while True:
try:
# Check service health
# ...your health checks...
report_health("success", f"{SERVICE_NAME} healthy")
except Exception as e:
report_health("error", str(e))
time.sleep(300) # Every 5 minutes
FastAPI Health Endpoint
# main.py
from fastapi import FastAPI, BackgroundTasks
import requests
import os
app = FastAPI()
MONITOR_URL = os.getenv('TELEMETRY_MONITOR_URL')
def send_health_check():
"""Background task to report health."""
try:
requests.post(MONITOR_URL, json={"status": "success"}, timeout=2)
except:
pass # Don't fail request if monitoring fails
@app.get("/health")
async def health_check(background_tasks: BackgroundTasks):
"""Health check endpoint that also reports to monitoring."""
# Add background task to report
background_tasks.add_task(send_health_check)
return {"status": "healthy"}
# Periodic health reporting
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.add_job(send_health_check, 'interval', minutes=5)
scheduler.start()
Node.js Service Monitoring
// health-monitor.js
const axios = require('axios');
const MONITOR_URL = process.env.TELEMETRY_MONITOR_URL;
const SERVICE_NAME = process.env.SERVICE_NAME || require('os').hostname();
async function reportHealth(status = 'success', message = '', metadata = {}) {
try {
await axios.post(MONITOR_URL, {
status,
message,
metadata: {
service: SERVICE_NAME,
...metadata
}
}, { timeout: 5000 });
} catch (error) {
console.error('Failed to report health:', error.message);
}
}
// Report health every 5 minutes
setInterval(async () => {
try {
// Your health checks here
await reportHealth('success', `${SERVICE_NAME} healthy`);
} catch (error) {
await reportHealth('error', error.message);
}
}, 5 * 60 * 1000);
module.exports = { reportHealth };
Service-to-Service Communication
Monitor critical service dependencies:
# service_client.py
import requests
from functools import wraps
import os
MONITOR_URL = os.getenv('TELEMETRY_MONITOR_URL')
def monitor_service_call(service_name):
"""Decorator to monitor service-to-service calls."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
# Report successful call
requests.post(
MONITOR_URL,
json={
"status": "success",
"message": f"Call to {service_name} succeeded"
},
timeout=2
)
return result
except Exception as e:
# Report failed call
requests.post(
MONITOR_URL,
json={
"status": "error",
"message": f"Call to {service_name} failed: {str(e)}"
},
timeout=2
)
raise
return wrapper
return decorator
# Usage
@monitor_service_call("payment-service")
def call_payment_service(order_id):
response = requests.post(
"http://payment-service/process",
json={"order_id": order_id}
)
return response.json()
Message Queue Consumer
Monitor message processing:
# consumer.py
import pika
import requests
import os
import time
MONITOR_URL = os.getenv('TELEMETRY_MONITOR_URL')
QUEUE_NAME = 'orders'
def process_message(ch, method, properties, body):
"""Process message and report to monitoring."""
start_time = time.time()
try:
# Process message
# ... your processing logic ...
duration = time.time() - start_time
# Report success
requests.post(
MONITOR_URL,
json={
"status": "success",
"message": "Message processed",
"duration": int(duration),
"metadata": {
"queue": QUEUE_NAME,
"routing_key": method.routing_key
}
},
timeout=2
)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# Report failure
requests.post(
MONITOR_URL,
json={
"status": "error",
"message": f"Message processing failed: {str(e)}",
"metadata": {
"queue": QUEUE_NAME
}
},
timeout=2
)
ch.basic_nack(delivery_tag=method.delivery_tag)
# Start consuming
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.basic_consume(queue=QUEUE_NAME, on_message_callback=process_message)
channel.start_consuming()
gRPC Service Monitoring
# grpc_monitor.py
import grpc
from grpc import ServerInterceptor
import requests
class MonitoringInterceptor(ServerInterceptor):
def __init__(self, monitor_url):
self.monitor_url = monitor_url
def intercept_service(self, continuation, handler_call_details):
"""Intercept gRPC calls and report to monitoring."""
handler = continuation(handler_call_details)
def monitored_handler(request, context):
try:
response = handler(request, context)
# Report successful call
requests.post(
self.monitor_url,
json={
"status": "success",
"message": f"gRPC call: {handler_call_details.method}"
},
timeout=2
)
return response
except Exception as e:
# Report failed call
requests.post(
self.monitor_url,
json={
"status": "error",
"message": f"gRPC error: {str(e)}"
},
timeout=2
)
raise
return monitored_handler
# Usage
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=[MonitoringInterceptor(os.getenv('TELEMETRY_MONITOR_URL'))]
)
Service Mesh Integration (Istio)
Monitor via sidecar:
# monitoring-sidecar.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: myservice
spec:
template:
spec:
containers:
- name: myservice
image: myservice:latest
- name: health-monitor
image: curlimages/curl:latest
env:
- name: MONITOR_URL
valueFrom:
secretKeyRef:
name: telemetry-secrets
key: monitor-url
- name: SERVICE_NAME
value: "myservice"
command:
- /bin/sh
- -c
- |
while true; do
if wget -q --spider http://localhost:8080/health; then
curl -X POST "$MONITOR_URL" \
-d "{\"status\":\"success\",\"message\":\"$SERVICE_NAME healthy\"}"
else
curl -X POST "$MONITOR_URL" \
-d "{\"status\":\"error\",\"message\":\"$SERVICE_NAME unhealthy\"}"
fi
sleep 300
done
API Gateway Monitoring
Monitor gateway performance:
# gateway_monitor.py
from fastapi import FastAPI, Request
import httpx
import time
import requests
import os
app = FastAPI()
MONITOR_URL = os.getenv('TELEMETRY_MONITOR_URL')
@app.middleware("http")
async def monitor_requests(request: Request, call_next):
"""Middleware to monitor all requests through gateway."""
start_time = time.time()
try:
response = await call_next(request)
duration = time.time() - start_time
# Report successful request
if duration > 5: # Only report slow requests
requests.post(
MONITOR_URL,
json={
"status": "warning",
"message": f"Slow request: {request.url.path}",
"duration": int(duration)
},
timeout=2
)
return response
except Exception as e:
# Report failed request
requests.post(
MONITOR_URL,
json={
"status": "error",
"message": f"Gateway error: {str(e)}",
"metadata": {
"path": request.url.path,
"method": request.method
}
},
timeout=2
)
raise
Circuit Breaker Monitoring
# circuit_breaker.py
import requests
from enum import Enum
import time
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, service_name, monitor_url, threshold=5):
self.service_name = service_name
self.monitor_url = monitor_url
self.threshold = threshold
self.failures = 0
self.state = CircuitState.CLOSED
self.last_failure_time = None
def call(self, func):
"""Execute function with circuit breaker protection."""
if self.state == CircuitState.OPEN:
# Check if should try half-open
if time.time() - self.last_failure_time > 60:
self.state = CircuitState.HALF_OPEN
else:
self.report("error", "Circuit breaker OPEN")
raise Exception("Circuit breaker is OPEN")
try:
result = func()
self.on_success()
return result
except Exception as e:
self.on_failure()
raise
def on_success(self):
"""Handle successful call."""
self.failures = 0
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.report("success", "Circuit breaker CLOSED")
def on_failure(self):
"""Handle failed call."""
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.threshold:
self.state = CircuitState.OPEN
self.report("error", f"Circuit breaker OPENED after {self.failures} failures")
def report(self, status, message):
"""Report circuit breaker state."""
try:
requests.post(
self.monitor_url,
json={
"status": status,
"message": f"{self.service_name}: {message}",
"metadata": {
"circuit_state": self.state.value,
"failures": self.failures
}
},
timeout=2
)
except:
pass
Next Steps
- Learn about Docker health checks
- See CI/CD integration
- Explore backup monitoring