gRPC 全链路 deadline 传播实战:从下游被卡死到 5 分钟定位

A→B→C→D 链路客户端早超时退出,D 还在傻乎乎跑 30 秒,DB 连接池打满。本文讲透 gRPC deadline 自动传播机制 + Context 取消 + Java/Go 实现 + 客户端服务端拦截器 + 重试配合 + DB 层传 timeout。附完整代码 + 监控告警 + 团队规范。

线上一个 gRPC 服务调用链路 A → B → C → D,看监控 D 的耗时偶尔会冲到 30 秒,但客户端早就超时退出了。D 还在傻乎乎处理,白白浪费资源,严重时还把下游 DB 连接打满。问题根源是全链路没传超时。本文把 gRPC 的 deadline 传播 + Context 取消 + 拦截器实战讲清楚。

事故现象

服务 A (网关):设置 5 秒超时调用 B
服务 B:收到请求,调用 C(没设超时,默认 30 秒)
服务 C:收到请求,调用 D(没设超时,默认 30 秒)
服务 D:慢 SQL 跑 30 秒后返回

时间线:
0s    A 发请求到 B,deadline = 5s
5s    A 超时,客户端断开
5s    B 还在等 C 的响应(B 的 deadline 是 30s)
30s   C 返回 B,B 又处理 5s 才发现 A 已断
35s   B 返回 A,A 已经早就放弃
=> 整条链路在 5 秒后所有工作都是浪费,但下游服务仍在跑

更严重的是:A 已经超时,前端用户会重试,B/C/D 上同一个请求的工作又来一遍。请求积压,雪球越滚越大。

gRPC 的 deadline 传播机制

gRPC 的请求自带 deadline 元数据,通过 gRPC Context 在服务端传递。下游服务如果用同一个 Context 发起新调用,新调用会继承剩余的 deadline。

关键代码:Java 客户端设 deadline

// 错:没设 deadline,默认无限等
UserServiceGrpc.UserServiceBlockingStub stub = UserServiceGrpc.newBlockingStub(channel);
UserResponse resp = stub.getUser(req);

// 对:每次调用都设 deadline
UserResponse resp = stub
    .withDeadlineAfter(5, TimeUnit.SECONDS)
    .getUser(req);

// 推荐:在客户端工厂统一设默认 deadline
public class UserServiceClient {
    private final UserServiceGrpc.UserServiceBlockingStub stub;

    public UserResponse getUser(long uid, int timeoutMs) {
        return stub.withDeadlineAfter(timeoutMs, TimeUnit.MILLISECONDS)
                   .getUser(UserRequest.newBuilder().setUid(uid).build());
    }
}

关键代码:Java 服务端传播 Context

// 错:在服务端开新线程做下游调用,丢了 Context
public class UserServiceImpl extends UserServiceGrpc.UserServiceImplBase {
    @Override
    public void getUser(UserRequest req, StreamObserver<UserResponse> responseObserver) {
        // 在新线程里调用下游,Context 丢失
        executor.submit(() -> {
            ProfileResponse profile = profileStub.getProfile(...);     // 没继承 deadline
            responseObserver.onNext(...);
            responseObserver.onCompleted();
        });
    }
}

// 对:用 Context.current().wrap 保留上下文
@Override
public void getUser(UserRequest req, StreamObserver<UserResponse> responseObserver) {
    Context ctx = Context.current();
    executor.submit(ctx.wrap(() -> {
        // 这个 lambda 在新线程里跑,但 Context 是当前请求的 Context
        // deadline 自动传播到下游
        ProfileResponse profile = profileStub.getProfile(...);
        responseObserver.onNext(...);
        responseObserver.onCompleted();
    }));
}

关键代码:服务端响应取消

// 长任务要主动检查 Context.isCancelled()
@Override
public void exportLargeReport(ExportRequest req, StreamObserver<Chunk> responseObserver) {
    Context ctx = Context.current();
    try (ResultSet rs = queryHugeData(req)) {
        while (rs.next()) {
            if (ctx.isCancelled()) {
                log.warn("client cancelled, abort export");
                return;     // 主动放弃,不再处理
            }
            Chunk chunk = buildChunk(rs);
            responseObserver.onNext(chunk);
        }
        responseObserver.onCompleted();
    } catch (Exception e) {
        responseObserver.onError(Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException());
    }
}

关键代码:把 deadline 传给业务底层

// 业务层(Repository / DB)也要支持超时取消
public class UserRepository {
    public User findById(long uid, Duration timeout) {
        return jdbcTemplate.query(con -> {
            PreparedStatement ps = con.prepareStatement("SELECT * FROM users WHERE id = ?");
            ps.setQueryTimeout((int) timeout.getSeconds());     // ← 关键
            ps.setLong(1, uid);
            return ps;
        }, ...);
    }
}

// 从 gRPC Context 算出剩余时间
@Override
public void getUser(UserRequest req, StreamObserver<UserResponse> responseObserver) {
    Deadline deadline = Context.current().getDeadline();
    Duration remaining;
    if (deadline != null) {
        long remainingMs = deadline.timeRemaining(TimeUnit.MILLISECONDS);
        if (remainingMs <= 50) {
            // 剩余时间太短,直接拒绝
            responseObserver.onError(Status.DEADLINE_EXCEEDED.asRuntimeException());
            return;
        }
        remaining = Duration.ofMillis(remainingMs - 50);     // 留 50ms 给响应序列化
    } else {
        remaining = Duration.ofSeconds(30);     // 客户端没设,给个默认上限
    }

    User user = userRepo.findById(req.getUid(), remaining);
    responseObserver.onNext(toResponse(user));
    responseObserver.onCompleted();
}

Go 端:context.Context 是基础设施级

// 客户端调用
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

resp, err := userClient.GetUser(ctx, &pb.UserRequest{Uid: 12345})
if err != nil {
    if status.Code(err) == codes.DeadlineExceeded {
        log.Println("timeout")
    }
}

// 服务端实现
func (s *UserServer) GetUser(ctx context.Context, req *pb.UserRequest) (*pb.UserResponse, error) {
    // 检查 ctx 是否已取消
    if ctx.Err() != nil {
        return nil, status.Error(codes.Canceled, "client cancelled")
    }

    // 调用下游,ctx 自动传播 deadline
    profile, err := s.profileClient.GetProfile(ctx, &pb.ProfileRequest{Uid: req.Uid})
    if err != nil {
        return nil, err
    }

    // DB 也用 ctx
    user, err := s.userRepo.FindByID(ctx, req.Uid)
    if err != nil {
        return nil, err
    }

    return &pb.UserResponse{User: user, Profile: profile}, nil
}

// Repository
func (r *UserRepository) FindByID(ctx context.Context, uid int64) (*User, error) {
    // db/sql 原生支持 ctx 取消
    row := r.db.QueryRowContext(ctx, "SELECT * FROM users WHERE id = ?", uid)
    var u User
    if err := row.Scan(&u.ID, &u.Name); err != nil {
        return nil, err
    }
    return &u, nil
}

统一拦截器:防遗漏

// 客户端拦截器:没设 deadline 自动加默认值
public class DefaultDeadlineClientInterceptor implements ClientInterceptor {
    private final long defaultMs;

    public DefaultDeadlineClientInterceptor(long defaultMs) {
        this.defaultMs = defaultMs;
    }

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
            MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
        if (callOptions.getDeadline() == null) {
            callOptions = callOptions.withDeadlineAfter(defaultMs, TimeUnit.MILLISECONDS);
        }
        return next.newCall(method, callOptions);
    }
}

// 创建 channel 时注册
ManagedChannel channel = ManagedChannelBuilder.forAddress("user-service", 9090)
    .intercept(new DefaultDeadlineClientInterceptor(5000))   // 默认 5 秒
    .usePlaintext()
    .build();
// 服务端拦截器:剩余时间太短直接拒绝
public class DeadlineCheckServerInterceptor implements ServerInterceptor {
    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
            ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
        Deadline deadline = Context.current().getDeadline();
        if (deadline != null) {
            long remainingMs = deadline.timeRemaining(TimeUnit.MILLISECONDS);
            if (remainingMs < 50) {
                call.close(Status.DEADLINE_EXCEEDED.withDescription("too close to deadline"), headers);
                return new ServerCall.Listener<ReqT>() {};
            }
        }
        return next.startCall(call, headers);
    }
}

重试 + deadline 配合

// 配合重试时要小心 deadline
public UserResponse getUserWithRetry(long uid) {
    Deadline deadline = Deadline.after(5, TimeUnit.SECONDS);
    Exception lastError = null;
    for (int i = 0; i < 3; i++) {
        if (deadline.isExpired()) break;
        try {
            return stub.withDeadline(deadline).getUser(UserRequest.newBuilder().setUid(uid).build());
        } catch (StatusRuntimeException e) {
            if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
                lastError = e;
                continue;     // 重试
            }
            throw e;
        }
    }
    throw new RuntimeException("retry exhausted", lastError);
}

注意:重试一定要复用同一个 deadline,不要每次都新建。否则重试反而把总耗时拉长。

实际收益数据

改造前:
- 30s 客户端不超时的请求平均每天 8 万
- 下游 D 服务被卡死的频率每周 2-3 次
- DB 连接池被占满告警每天 4-5 次
- 排查链路问题平均 1 小时(找谁卡了)

改造后(全链路统一 deadline + 主动取消):
- 5s+ 请求降到每天 200(主要是下游真慢)
- 下游 D 被卡死从未发生(自动取消)
- DB 连接池告警降到 0
- 链路问题靠 trace 看 deadline / cancel 状态,5 分钟定位

排查工具

# OpenTelemetry / Jaeger 看 trace
# 每个 span 上看 grpc.status / grpc.deadline_remaining

# grpcurl 测试 deadline 行为
grpcurl -max-time 1 -d '{"uid": 12345}' \
    user-service:9090 UserService/GetUser
# 返回 DEADLINE_EXCEEDED

# Prometheus 指标
grpc_server_handling_seconds{method="GetUser",grpc_code="DeadlineExceeded"}
grpc_client_handling_seconds{method="GetUser",grpc_code="Cancelled"}

# 告警
- alert: GrpcDeadlineExceededHigh
  expr: rate(grpc_server_handling_seconds_count{grpc_code="DeadlineExceeded"}[5m]) > 1
  annotations:
    summary: '{{ $labels.method }} 超时率高,可能上游 deadline 设得太短或下游变慢'

团队规范

  1. 所有 gRPC 客户端必须设 deadline(拦截器强制)
  2. 服务端业务方法第一行检查 Context.isCancelled()
  3. 下游调用必须传 Context(线程池切换用 ctx.wrap)
  4. DB / Redis 调用必须传 timeout(从 Context 计算)
  5. 长任务循环里定期检查 cancel
  6. 重试用同一个 deadline,不要重置
  7. 监控 DEADLINE_EXCEEDED 和 CANCELLED 比例
  8. 压测要模拟"客户端突然断开"场景

gRPC 自带的 deadline 传播是个非常优雅的设计,但前提是工程上把每个环节都串起来。我们这个网关链路一年遇到 3 次连环卡死事故,根本原因都是某一段没传 Context 或没设 deadline。把这套规范落实后,这类问题就再没出过。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

G1 换 ZGC 实战:p99 从 480ms 降到 95ms,踩了 4 个坑

2026-5-19 11:30:22

技术教程

线上服务周期性 Too many open files:fd 泄漏完整排查实录

2026-5-19 11:37:15

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索