Microservices Monitoring

Monitor distributed microservices architectures

Service Health Monitoring

Monitor individual microservices using auto-provisioning:

# health_monitor.py
import requests
import os
import socket

SERVICE_NAME = os.getenv('SERVICE_NAME', socket.gethostname())
PROJECT_KEY = os.getenv('TELEMETRY_PROJECT_KEY')
MONITOR_URL = f"https://telemetry.host/ping/{PROJECT_KEY}/auto/{SERVICE_NAME}?create=1"

def report_health(status="success", message="", metadata=None):
    """Report service health to monitoring."""
    try:
        data = {
            "status": status,
            "message": message,
            "metadata": metadata or {}
        }
        requests.post(MONITOR_URL, json=data, timeout=5)
    except Exception as e:
        print(f"Failed to report health: {e}")

# Call periodically
if __name__ == "__main__":
    import time
    while True:
        try:
            # Check service health
            # ...your health checks...
            report_health("success", f"{SERVICE_NAME} healthy")
        except Exception as e:
            report_health("error", str(e))
        
        time.sleep(300)  # Every 5 minutes

FastAPI Health Endpoint

# main.py
from fastapi import FastAPI, BackgroundTasks
import requests
import os

app = FastAPI()

MONITOR_URL = os.getenv('TELEMETRY_MONITOR_URL')

def send_health_check():
    """Background task to report health."""
    try:
        requests.post(MONITOR_URL, json={"status": "success"}, timeout=2)
    except:
        pass  # Don't fail request if monitoring fails

@app.get("/health")
async def health_check(background_tasks: BackgroundTasks):
    """Health check endpoint that also reports to monitoring."""
    # Add background task to report
    background_tasks.add_task(send_health_check)
    
    return {"status": "healthy"}

# Periodic health reporting
from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
scheduler.add_job(send_health_check, 'interval', minutes=5)
scheduler.start()

Node.js Service Monitoring

// health-monitor.js
const axios = require('axios');

const MONITOR_URL = process.env.TELEMETRY_MONITOR_URL;
const SERVICE_NAME = process.env.SERVICE_NAME || require('os').hostname();

async function reportHealth(status = 'success', message = '', metadata = {}) {
  try {
    await axios.post(MONITOR_URL, {
      status,
      message,
      metadata: {
        service: SERVICE_NAME,
        ...metadata
      }
    }, { timeout: 5000 });
  } catch (error) {
    console.error('Failed to report health:', error.message);
  }
}

// Report health every 5 minutes
setInterval(async () => {
  try {
    // Your health checks here
    await reportHealth('success', `${SERVICE_NAME} healthy`);
  } catch (error) {
    await reportHealth('error', error.message);
  }
}, 5 * 60 * 1000);

module.exports = { reportHealth };

Service-to-Service Communication

Monitor critical service dependencies:

# service_client.py
import requests
from functools import wraps
import os

MONITOR_URL = os.getenv('TELEMETRY_MONITOR_URL')

def monitor_service_call(service_name):
    """Decorator to monitor service-to-service calls."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            try:
                result = func(*args, **kwargs)
                
                # Report successful call
                requests.post(
                    MONITOR_URL,
                    json={
                        "status": "success",
                        "message": f"Call to {service_name} succeeded"
                    },
                    timeout=2
                )
                
                return result
            except Exception as e:
                # Report failed call
                requests.post(
                    MONITOR_URL,
                    json={
                        "status": "error",
                        "message": f"Call to {service_name} failed: {str(e)}"
                    },
                    timeout=2
                )
                raise
        return wrapper
    return decorator

# Usage
@monitor_service_call("payment-service")
def call_payment_service(order_id):
    response = requests.post(
        "http://payment-service/process",
        json={"order_id": order_id}
    )
    return response.json()

Message Queue Consumer

Monitor message processing:

# consumer.py
import pika
import requests
import os
import time

MONITOR_URL = os.getenv('TELEMETRY_MONITOR_URL')
QUEUE_NAME = 'orders'

def process_message(ch, method, properties, body):
    """Process message and report to monitoring."""
    start_time = time.time()
    
    try:
        # Process message
        # ... your processing logic ...
        
        duration = time.time() - start_time
        
        # Report success
        requests.post(
            MONITOR_URL,
            json={
                "status": "success",
                "message": "Message processed",
                "duration": int(duration),
                "metadata": {
                    "queue": QUEUE_NAME,
                    "routing_key": method.routing_key
                }
            },
            timeout=2
        )
        
        ch.basic_ack(delivery_tag=method.delivery_tag)
        
    except Exception as e:
        # Report failure
        requests.post(
            MONITOR_URL,
            json={
                "status": "error",
                "message": f"Message processing failed: {str(e)}",
                "metadata": {
                    "queue": QUEUE_NAME
                }
            },
            timeout=2
        )
        
        ch.basic_nack(delivery_tag=method.delivery_tag)

# Start consuming
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.basic_consume(queue=QUEUE_NAME, on_message_callback=process_message)
channel.start_consuming()

gRPC Service Monitoring

# grpc_monitor.py
import grpc
from grpc import ServerInterceptor
import requests

class MonitoringInterceptor(ServerInterceptor):
    def __init__(self, monitor_url):
        self.monitor_url = monitor_url
    
    def intercept_service(self, continuation, handler_call_details):
        """Intercept gRPC calls and report to monitoring."""
        handler = continuation(handler_call_details)
        
        def monitored_handler(request, context):
            try:
                response = handler(request, context)
                
                # Report successful call
                requests.post(
                    self.monitor_url,
                    json={
                        "status": "success",
                        "message": f"gRPC call: {handler_call_details.method}"
                    },
                    timeout=2
                )
                
                return response
            except Exception as e:
                # Report failed call
                requests.post(
                    self.monitor_url,
                    json={
                        "status": "error",
                        "message": f"gRPC error: {str(e)}"
                    },
                    timeout=2
                )
                raise
        
        return monitored_handler

# Usage
server = grpc.server(
    futures.ThreadPoolExecutor(max_workers=10),
    interceptors=[MonitoringInterceptor(os.getenv('TELEMETRY_MONITOR_URL'))]
)

Service Mesh Integration (Istio)

Monitor via sidecar:

# monitoring-sidecar.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: myservice
spec:
  template:
    spec:
      containers:
      - name: myservice
        image: myservice:latest
        
      - name: health-monitor
        image: curlimages/curl:latest
        env:
        - name: MONITOR_URL
          valueFrom:
            secretKeyRef:
              name: telemetry-secrets
              key: monitor-url
        - name: SERVICE_NAME
          value: "myservice"
        command:
        - /bin/sh
        - -c
        - |
          while true; do
            if wget -q --spider http://localhost:8080/health; then
              curl -X POST "$MONITOR_URL" \
                -d "{\"status\":\"success\",\"message\":\"$SERVICE_NAME healthy\"}"
            else
              curl -X POST "$MONITOR_URL" \
                -d "{\"status\":\"error\",\"message\":\"$SERVICE_NAME unhealthy\"}"
            fi
            sleep 300
          done

API Gateway Monitoring

Monitor gateway performance:

# gateway_monitor.py
from fastapi import FastAPI, Request
import httpx
import time
import requests
import os

app = FastAPI()

MONITOR_URL = os.getenv('TELEMETRY_MONITOR_URL')

@app.middleware("http")
async def monitor_requests(request: Request, call_next):
    """Middleware to monitor all requests through gateway."""
    start_time = time.time()
    
    try:
        response = await call_next(request)
        duration = time.time() - start_time
        
        # Report successful request
        if duration > 5:  # Only report slow requests
            requests.post(
                MONITOR_URL,
                json={
                    "status": "warning",
                    "message": f"Slow request: {request.url.path}",
                    "duration": int(duration)
                },
                timeout=2
            )
        
        return response
        
    except Exception as e:
        # Report failed request
        requests.post(
            MONITOR_URL,
            json={
                "status": "error",
                "message": f"Gateway error: {str(e)}",
                "metadata": {
                    "path": request.url.path,
                    "method": request.method
                }
            },
            timeout=2
        )
        raise

Circuit Breaker Monitoring

# circuit_breaker.py
import requests
from enum import Enum
import time

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(self, service_name, monitor_url, threshold=5):
        self.service_name = service_name
        self.monitor_url = monitor_url
        self.threshold = threshold
        self.failures = 0
        self.state = CircuitState.CLOSED
        self.last_failure_time = None
    
    def call(self, func):
        """Execute function with circuit breaker protection."""
        if self.state == CircuitState.OPEN:
            # Check if should try half-open
            if time.time() - self.last_failure_time > 60:
                self.state = CircuitState.HALF_OPEN
            else:
                self.report("error", "Circuit breaker OPEN")
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func()
            self.on_success()
            return result
        except Exception as e:
            self.on_failure()
            raise
    
    def on_success(self):
        """Handle successful call."""
        self.failures = 0
        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.CLOSED
            self.report("success", "Circuit breaker CLOSED")
    
    def on_failure(self):
        """Handle failed call."""
        self.failures += 1
        self.last_failure_time = time.time()
        
        if self.failures >= self.threshold:
            self.state = CircuitState.OPEN
            self.report("error", f"Circuit breaker OPENED after {self.failures} failures")
    
    def report(self, status, message):
        """Report circuit breaker state."""
        try:
            requests.post(
                self.monitor_url,
                json={
                    "status": status,
                    "message": f"{self.service_name}: {message}",
                    "metadata": {
                        "circuit_state": self.state.value,
                        "failures": self.failures
                    }
                },
                timeout=2
            )
        except:
            pass

Next Steps