微服务架构部署

今天我们实战部署一个微服务架构的应用,包含API网关、多个微服务、数据库、缓存、消息队列等。

微服务架构设计

┌─────────────────────────────────────────────────────────────┐
│                      API Gateway (Nginx)                      │
│              (反向代理 + 负载均衡 + 限流)                     │
│                    ┌─────────┬─────────┬─────────┐            │
│                    │         │         │         │            │
│              ┌─────┴────┐ ┌──┴────┐ ┌──┴────┐ ┌──┴────┐     │
│              │   用户   │ │  订单 │ │ 商品 │ │ 支付 │     │
│              │  服务    │ │ 服务 │ │ 服务 │ │ 服务 │     │
│              └─────┬────┘ └──┬────┘ └──┬────┘ └──┬────┘     │
│                    │         │         │         │            │
│                    └─────────┴─────────┴─────────┘            │
│                              │                               │
│              ┌───────────────┼───────────────┐              │
│              ↓               ↓               ↓              │
│      ┌──────────────┐ ┌──────────────┐ ┌──────────────┐     │
│      │   MySQL      │ │    Redis     │ │   RabbitMQ   │     │
│      │  (主+从)     │ │    集群       │ │  (消息队列)   │     │
│      └──────────────┘ └──────────────┘ └──────────────┘     │
└─────────────────────────────────────────────────────────────┘

1. API网关

nginx/nginx.conf

events {
    worker_connections 2048;
}

http {
    # 限流配置
    limit_req_zone $binary_remote_addr zone=api_limit:10m rate=100r/s;
    limit_conn_zone $binary_remote_addr zone=conn_limit:10m;

    # 负载均衡
    upstream user_service {
        least_conn;
        server user-service-1:8001;
        server user-service-2:8002;
        server user-service-3:8003;
    }

    upstream order_service {
        least_conn;
        server order-service-1:8001;
        server order-service-2:8002;
    }

    upstream product_service {
        least_conn;
        server product-service-1:8001;
        server product-service-2:8002;
    }

    upstream payment_service {
        least_conn;
        server payment-service-1:8001;
    }

    server {
        listen 80;
        server_name api.example.com;

        # 用户服务
        location /api/users {
            limit_req zone=api_limit burst=200 nodelay;
            limit_conn conn_limit 10;

            proxy_pass http://user_service;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Service-Name "user-service";

            # 超时配置
            proxy_connect_timeout 30s;
            proxy_send_timeout 30s;
            proxy_read_timeout 30s;
        }

        # 订单服务
        location /api/orders {
            limit_req zone=api_limit burst=100 nodelay;

            proxy_pass http://order_service;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Service-Name "order-service";
        }

        # 商品服务
        location /api/products {
            limit_req zone=api_limit burst=200 nodelay;

            proxy_pass http://product_service;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Service-Name "product-service";
        }

        # 支付服务
        location /api/payments {
            limit_req zone=api_limit burst=50 nodelay;

            proxy_pass http://payment_service;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Service-Name "payment-service";
        }

        # 健康检查
        location /health {
            access_log off;
            return 200 "OK\n";
            add_header Content-Type text/plain;
        }

        # 日志格式
        access_log /var/log/nginx/access.log combined;
        error_log /var/log/nginx/error.log warn;
    }
}

2. 用户服务

services/user-service/Dockerfile

FROM python:3.11-slim

WORKDIR /app

# 安装依赖
RUN apt-get update && \
    apt-get install -y --no-install-recommends \
    gcc \
    default-libmysqlclient-dev \
    pkg-config \
    && rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

# 创建用户
RUN groupadd -r appuser && useradd -r -g appuser appuser
RUN chown -R appuser:appuser /app
USER appuser

ENV PYTHONUNBUFFERED=1
ENV SERVICE_NAME=user-service

EXPOSE 8001

CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:8001", "app:app"]

services/user-service/app.py

from flask import Flask, jsonify, request
from flask_cors import CORS
import mysql.connector
import redis
import os

app = Flask(__name__)
CORS(app)

# 配置
db_config = {
    'host': os.getenv('DB_HOST', 'mysql-master'),
    'user': os.getenv('DB_USER', 'root'),
    'password': os.getenv('DB_PASSWORD', '123456'),
    'database': os.getenv('DB_NAME', 'users_db')
}

redis_config = {
    'host': os.getenv('REDIS_HOST', 'redis'),
    'port': int(os.getenv('REDIS_PORT', 6379)),
    'db': 0
}

@app.route('/health')
def health():
    return jsonify({'status': 'healthy', 'service': 'user-service'})

@app.route('/users', methods=['GET'])
def get_users():
    try:
        # Redis缓存
        r = redis.Redis(**redis_config)
        cached = r.get('users')
        if cached:
            return jsonify({'source': 'cache', 'data': eval(cached)})

        # 数据库查询
        conn = mysql.connector.connect(**db_config)
        cursor = conn.cursor(dictionary=True)
        cursor.execute("SELECT id, username, email FROM users")
        users = cursor.fetchall()
        cursor.close()
        conn.close()

        # 写入缓存
        r.setex('users', 300, str(users))

        return jsonify({'source': 'database', 'data': users})
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
    try:
        r = redis.Redis(**redis_config)
        cache_key = f'user:{user_id}'
        cached = r.get(cache_key)

        if cached:
            return jsonify({'source': 'cache', 'data': eval(cached)})

        conn = mysql.connector.connect(**db_config)
        cursor = conn.cursor(dictionary=True)
        cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
        user = cursor.fetchone()
        cursor.close()
        conn.close()

        if not user:
            return jsonify({'error': 'User not found'}), 404

        r.setex(cache_key, 3600, str(user))

        return jsonify(user)
    except Exception as e:
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8001)

3. 订单服务

services/order-service/app.py

from flask import Flask, jsonify, request
import pika
import mysql.connector
import json
import os

app = Flask(__name__)

# 配置
db_config = {
    'host': os.getenv('DB_HOST', 'mysql-master'),
    'user': os.getenv('DB_USER', 'root'),
    'password': os.getenv('DB_PASSWORD', '123456'),
    'database': 'orders_db'
}

# RabbitMQ连接
rabbitmq_config = {
    'host': os.getenv('RABBITMQ_HOST', 'rabbitmq'),
    'port': int(os.getenv('RABBITMQ_PORT', 5672)),
    'user': os.getenv('RABBITMQ_USER', 'guest'),
    'password': os.getenv('RABBITMQ_PASSWORD', 'guest')
}

def send_to_queue(message):
    """发送消息到队列"""
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(**rabbitmq_config)
    )
    channel = connection.channel()
    channel.queue_declare(queue='order_events', durable=True)
    channel.basic_publish(
        exchange='',
        routing_key='order_events',
        body=json.dumps(message),
        properties=pika.BasicProperties(delivery_mode=2)
    )
    connection.close()

@app.route('/health')
def health():
    return jsonify({'status': 'healthy', 'service': 'order-service'})

@app.route('/orders', methods=['GET'])
def get_orders():
    try:
        conn = mysql.connector.connect(**db_config)
        cursor = conn.cursor(dictionary=True)
        cursor.execute("SELECT * FROM orders ORDER BY created_at DESC LIMIT 100")
        orders = cursor.fetchall()
        cursor.close()
        conn.close()

        return jsonify(orders)
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/orders', methods=['POST'])
def create_order():
    try:
        data = request.json
        user_id = data.get('user_id')
        product_id = data.get('product_id')
        quantity = data.get('quantity', 1)

        # 创建订单
        conn = mysql.connector.connect(**db_config)
        cursor = conn.cursor()
        cursor.execute(
            "INSERT INTO orders (user_id, product_id, quantity, status) VALUES (%s, %s, %s, %s)",
            (user_id, product_id, quantity, 'pending')
        )
        conn.commit()
        order_id = cursor.lastrowid
        cursor.close()
        conn.close()

        # 发送消息到队列
        message = {
            'event': 'order_created',
            'order_id': order_id,
            'user_id': user_id,
            'product_id': product_id,
            'quantity': quantity
        }
        send_to_queue(message)

        return jsonify({'order_id': order_id, 'status': 'pending'}), 201
    except Exception as e:
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8001)

4. 商品服务

services/product-service/app.py

from flask import Flask, jsonify
import mysql.connector
import os

app = Flask(__name__)

db_config = {
    'host': os.getenv('DB_HOST', 'mysql-master'),
    'user': os.getenv('DB_USER', 'root'),
    'password': os.getenv('DB_PASSWORD', '123456'),
    'database': 'products_db'
}

@app.route('/health')
def health():
    return jsonify({'status': 'healthy', 'service': 'product-service'})

@app.route('/products', methods=['GET'])
def get_products():
    try:
        conn = mysql.connector.connect(**db_config)
        cursor = conn.cursor(dictionary=True)
        cursor.execute("SELECT * FROM products WHERE status = 'active'")
        products = cursor.fetchall()
        cursor.close()
        conn.close()

        return jsonify(products)
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/products/<int:product_id>', methods=['GET'])
def get_product(product_id):
    try:
        conn = mysql.connector.connect(**db_config)
        cursor = conn.cursor(dictionary=True)
        cursor.execute("SELECT * FROM products WHERE id = %s", (product_id,))
        product = cursor.fetchone()
        cursor.close()
        conn.close()

        if not product:
            return jsonify({'error': 'Product not found'}), 404

        return jsonify(product)
    except Exception as e:
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8001)

5. Docker Compose编排

docker-compose.yml

version: '3.8'

services:
  # API网关
  gateway:
    image: nginx:alpine
    container_name: api-gateway
    ports:
      - "80:80"
    volumes:
      - ./nginx/nginx.conf:/etc/nginx/nginx.conf
      - nginx-logs:/var/log/nginx
    depends_on:
      - user-service-1
      - user-service-2
      - user-service-3
      - order-service-1
      - order-service-2
      - product-service-1
      - product-service-2
    networks:
      - microservices
    restart: unless-stopped

  # 用户服务(3个实例)
  user-service-1:
    build:
      context: ./services/user-service
      dockerfile: Dockerfile
    container_name: user-service-1
    ports:
      - "8001:8001"
    environment:
      - DB_HOST=mysql-master
      - DB_PASSWORD=123456
      - DB_NAME=users_db
      - REDIS_HOST=redis
      - INSTANCE_ID=1
    depends_on:
      - mysql-master
      - redis
    networks:
      - microservices
    restart: unless-stopped

  user-service-2:
    build:
      context: ./services/user-service
      dockerfile: Dockerfile
    container_name: user-service-2
    ports:
      - "8002:8001"
    environment:
      - DB_HOST=mysql-master
      - DB_PASSWORD=123456
      - DB_NAME=users_db
      - REDIS_HOST=redis
      - INSTANCE_ID=2
    depends_on:
      - mysql-master
      - redis
    networks:
      - microservices
    restart: unless-stopped

  user-service-3:
    build:
      context: ./services/user-service
      dockerfile: Dockerfile
    container_name: user-service-3
    ports:
      - "8003:8001"
    environment:
      - DB_HOST=mysql-master
      - DB_PASSWORD=123456
      - DB_NAME=users_db
      - REDIS_HOST=redis
      - INSTANCE_ID=3
    depends_on:
      - mysql-master
      - redis
    networks:
      - microservices
    restart: unless-stopped

  # 订单服务(2个实例)
  order-service-1:
    build:
      context: ./services/order-service
      dockerfile: Dockerfile
    container_name: order-service-1
    ports:
      - "8011:8001"
    environment:
      - DB_HOST=mysql-master
      - DB_PASSWORD=123456
      - DB_NAME=orders_db
      - RABBITMQ_HOST=rabbitmq
    depends_on:
      - mysql-master
      - rabbitmq
    networks:
      - microservices
    restart: unless-stopped

  order-service-2:
    build:
      context: ./services/order-service
      dockerfile: Dockerfile
    container_name: order-service-2
    ports:
      - "8012:8001"
    environment:
      - DB_HOST=mysql-master
      - DB_PASSWORD=123456
      - DB_NAME=orders_db
      - RABBITMQ_HOST=rabbitmq
    depends_on:
      - mysql-master
      - rabbitmq
    networks:
      - microservices
    restart: unless-stopped

  # 商品服务(2个实例)
  product-service-1:
    build:
      context: ./services/product-service
      dockerfile: Dockerfile
    container_name: product-service-1
    ports:
      - "8021:8001"
    environment:
      - DB_HOST=mysql-master
      - DB_PASSWORD=123456
      - DB_NAME=products_db
    depends_on:
      - mysql-master
    networks:
      - microservices
    restart: unless-stopped

  product-service-2:
    build:
      context: ./services/product-service
      dockerfile: Dockerfile
    container_name: product-service-2
    ports:
      - "8022:8001"
    environment:
      - DB_HOST=mysql-master
      - DB_PASSWORD=123456
      - DB_NAME=products_db
    depends_on:
      - mysql-master
    networks:
      - microservices
    restart: unless-stopped

  # MySQL主库
  mysql-master:
    image: mysql:8.0
    container_name: mysql-master
    environment:
      MYSQL_ROOT_PASSWORD: 123456
    volumes:
      - mysql-master-data:/var/lib/mysql
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    networks:
      - microservices
    restart: unless-stopped

  # MySQL从库
  mysql-slave:
    image: mysql:8.0
    container_name: mysql-slave
    environment:
      MYSQL_ROOT_PASSWORD: 123456
    networks:
      - microservices
    restart: unless-stopped

  # Redis集群
  redis:
    image: redis:7-alpine
    container_name: redis-cache
    command: redis-server --appendonly yes
    volumes:
      - redis-data:/data
    ports:
      - "6379:6379"
    networks:
      - microservices
    restart: unless-stopped

  # RabbitMQ消息队列
  rabbitmq:
    image: rabbitmq:3-management
    container_name: rabbitmq
    ports:
      - "5672:5672"
      - "15672:15672"
    volumes:
      - rabbitmq-data:/var/lib/rabbitmq
    networks:
      - microservices
    restart: unless-stopped

networks:
  microservices:
    driver: bridge

volumes:
  mysql-master-data:
  redis-data:
  rabbitmq-data:
  nginx-logs:

6. 服务发现和负载均衡

技术原理

1. 负载均衡策略

  • least_conn:最少连接数(推荐)
  • ip_hash:IP哈希(会话保持)
  • round_robin:轮询(默认)

2. 服务发现

  • Docker内置DNS:容器名称自动解析
  • 健康检查:自动剔除不健康的实例
  • 自动重启:故障实例自动重启

3. 限流保护

  • limit_req:请求速率限制
  • limit_conn:连接数限制

7. 消息队列处理

消费者服务

services/consumer-service/app.py

import pika
import json
import mysql.connector

rabbitmq_config = {
    'host': os.getenv('RABBITMQ_HOST', 'rabbitmq'),
    'port': 5672,
    'user': 'guest',
    'password': 'guest'
}

def process_order(ch, method, properties, body):
    """处理订单消息"""
    try:
        message = json.loads(body)
        print(f"Processing order: {message}")

        # 处理订单逻辑
        # ...更新数据库、发送通知等

        # 确认消息
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"Error: {e}")
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

# 连接RabbitMQ
connection = pika.BlockingConnection(
    pika.ConnectionParameters(**rabbitmq_config)
)
channel = connection.channel()
channel.queue_declare(queue='order_events', durable=True)

# 设置 prefetch
channel.basic_qos(prefetch_count=1)

# 消费消息
channel.basic_consume(queue='order_events', on_message_callback=process_order)

print('Waiting for messages...')
channel.start_consuming()

8. 监控和日志

Prometheus配置

prometheus:
  image: prom/prometheus:latest
  container_name: prometheus
  volumes:
    - ./prometheus.yml:/etc/prometheus/prometheus.yml
    - prometheus-data:/prometheus
  ports:
    - "9090:9090"
  networks:
    - microservices

grafana:
  image: grafana/grafana:latest
  container_name: grafana
  volumes:
    - grafana-data:/var/lib/grafana
  ports:
    - "3000:3000"
  networks:
    - microservices

9. 部署和测试

启动所有服务

# 构建镜像
docker-compose build

# 启动服务
docker-compose up -d

# 查看状态
docker-compose ps

# 查看日志
docker-compose logs -f

测试服务

# 健康检查
curl http://localhost/health

# 测试用户服务
curl http://localhost/api/users

# 测试订单创建
curl -X POST http://localhost/api/orders \
  -H "Content-Type: application/json" \
  -d '{"user_id": 1, "product_id": 1, "quantity": 2}'

# 测试商品服务
curl http://localhost/api/products

本章小结

  • 微服务架构:多个独立服务协同工作
  • API网关:Nginx + 负载均衡 + 限流
  • 服务实例:多实例部署,提高可用性
  • 消息队列:RabbitMQ异步处理
  • 监控日志:Prometheus + Grafana

现在已经掌握了微服务部署,下一章我们来学习Docker进阶优化!

继续学下去,马上就能处理企业级项目了!