GRPC中的错误处理
1. 概述
gRPC 是一个执行进程间远程过程调用 (RPC) 的平台。它具有高性能,可以在任何环境中运行。
在本教程中,我们将重点介绍使用 Java 处理 gRPC 错误。gRPC 具有非常低的延迟和高吞吐量,因此非常适合在微服务架构等复杂环境中使用。在这些系统中,充分了解网络不同组件的状态、性能和故障至关重要。因此,一个好的错误处理实现对于帮助我们实现之前的目标至关重要。
2. gRPC 中的错误处理基础
gRPC 中的错误是一等实体,即gRPC 中的每个调用要么是有效载荷消息,要么是状态错误消息。
错误编码在状态消息中,并在所有支持的语言中实现。
一般来说,我们不应该在响应负载中包含错误。为此,请始终使用StreamObserver::OnError,它在内部将状态错误添加到尾随标头。正如我们将在下面看到的,唯一的例外是当我们使用流时。
所有客户端或服务器 gRPC 库都支持官方 gRPC 错误模型 。Java 用io.grpc.Status类封装了这个错误模型。此类需要标准错误状态代码 和可选字符串错误消息以提供附加信息。此错误模型的优点是独立于所使用的数据编码(协议缓冲区、REST 等)支持它。但是,它非常有限,因为我们不能在状态中包含错误详细信息。
如果您的 gRPC 应用程序为数据编码实现了协议缓冲区,那么您可以为 Google API 使用更丰富的错误模型 。com.google.rpc.Status 类封装了这个错误模型。此类提供com.google.rpc.Code值、错误消息和附加错误详细信息作为protobuf消息。
此外,我们可以利用一组预定义的protobuf错误消息,在*error_details.proto 中定义,涵盖最常见的情况。在com.google.rpc包中,我们有以下类:RetryInfo、DebugInfo、QuotaFailure、ErrorInfo、PrecondicionFailure、BadRequest、RequestInfo、ResourceInfo和Help*将所有错误消息封装在error_details.proto 中。
除了这两个错误模型之外,我们还可以定义自定义错误消息,这些错误消息可以作为键值对添加到 RPC 元数据中。
我们将编写一个非常简单的应用程序来展示如何将这些错误模型与定价服务一起使用,其中客户端发送商品名称,服务器提供定价值。
3. 一元 RPC 调用
让我们开始考虑在CommodityPrice.proto中定义的以下服务接口:
service CommodityPriceProvider {
rpc getBestCommodityPrice(Commodity) returns (CommodityQuote) {}
}
message Commodity {
string access_token = 1;
string commodity_name = 2;
}
message CommodityQuote {
string commodity_name = 1;
string producer_name = 2;
double price = 3;
}
message ErrorResponse {
string commodity_name = 1;
string access_token = 2;
string expected_token = 3;
string expected_value = 4;
}
服务的输入是Commodity消息。在请求中,客户端必须提供一个access_token和一个commodity_name。
服务器使用CommodityQuote同步响应,该报价声明commodity_name、producer_name和Commodity的相关price。
出于说明目的,我们还定义了一个自定义ErrorResponse。这是我们将作为元数据发送给客户端的自定义错误消息的示例。
3.1. 使用io.grpc.Status响应
在服务器的服务调用中,我们检查对有效Commodity的请求:
public void getBestCommodityPrice(Commodity request, StreamObserver<CommodityQuote> responseObserver) {
if (commodityLookupBasePrice.get(request.getCommodityName()) == null) {
Metadata.Key<ErrorResponse> errorResponseKey = ProtoUtils.keyForProto(ErrorResponse.getDefaultInstance());
ErrorResponse errorResponse = ErrorResponse.newBuilder()
.setCommodityName(request.getCommodityName())
.setAccessToken(request.getAccessToken())
.setExpectedValue("Only Commodity1, Commodity2 are supported")
.build();
Metadata metadata = new Metadata();
metadata.put(errorResponseKey, errorResponse);
responseObserver.onError(io.grpc.Status.INVALID_ARGUMENT.withDescription("The commodity is not supported")
.asRuntimeException(metadata));
}
// ...
}
在这个简单的示例中,如果商品LookupBasePrice HashTable中不存在Commodity,我们将返回错误。
首先,我们构建一个自定义ErrorResponse并创建一个键值对,我们将其添加到*metadata.put(errorResponseKey, errorResponse)*中的元数据中。
我们使用io.grpc.Status来指定错误状态。函数responseObserver::onError以Throwable作为参数,因此我们使用asRuntimeException(metadata)将Status转换为Throwable。asRuntimeException可以选择采用 Metadata 参数(在我们的例子中,一个ErrorResponse键值对),它添加到消息的尾部。
如果客户端发出无效请求,它将返回异常:
@Test
public void whenUsingInvalidCommodityName_thenReturnExceptionIoRpcStatus() throws Exception {
Commodity request = Commodity.newBuilder()
.setAccessToken("123validToken")
.setCommodityName("Commodity5")
.build();
StatusRuntimeException thrown = Assertions.assertThrows(StatusRuntimeException.class, () -> blockingStub.getBestCommodityPrice(request));
assertEquals("INVALID_ARGUMENT", thrown.getStatus().getCode().toString());
assertEquals("INVALID_ARGUMENT: The commodity is not supported", thrown.getMessage());
Metadata metadata = Status.trailersFromThrowable(thrown);
ErrorResponse errorResponse = metadata.get(ProtoUtils.keyForProto(ErrorResponse.getDefaultInstance()));
assertEquals("Commodity5",errorResponse.getCommodityName());
assertEquals("123validToken", errorResponse.getAccessToken());
assertEquals("Only Commodity1, Commodity2 are supported", errorResponse.getExpectedValue());
}
对blockingStub::getBestCommodityPrice的调用会引发StatusRuntimeExeption,因为请求具有无效的商品名称。
我们使用Status::trailerFromThrowable来访问元数据。ProtoUtils::keyForProto为我们提供了ErrorResponse的元数据键。
3.2. 使用com.google.rpc.Status响应
让我们考虑以下服务器代码示例:
public void getBestCommodityPrice(Commodity request, StreamObserver<CommodityQuote> responseObserver) {
// ...
if (request.getAccessToken().equals("123validToken") == false) {
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(com.google.rpc.Code.NOT_FOUND.getNumber())
.setMessage("The access token not found")
.addDetails(Any.pack(ErrorInfo.newBuilder()
.setReason("Invalid Token")
.setDomain("com.blogdemo.grpc.errorhandling")
.putMetadata("insertToken", "123validToken")
.build()))
.build();
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
}
// ...
}
在实现中, 如果请求没有有效的令牌,getBestCommodityPrice会返回错误。
此外,我们将状态码、消息和详细信息设置为com.google.rpc.Status。
在此示例中,我们使用预定义的com.google.rpc.ErrorInfo而不是我们自定义的ErrorDetails(尽管如果需要我们可以同时使用两者)。我们使用Any::pack()序列化ErrorInfo。
StatusProto::toStatusRuntimeException类将com.google.rpc.Status转换为Throwable。
原则上,我们还可以添加error_details.proto中定义的其他消息来进一步自定义响应。
客户端实现很简单:
@Test
public void whenUsingInvalidRequestToken_thenReturnExceptionGoogleRPCStatus() throws Exception {
Commodity request = Commodity.newBuilder()
.setAccessToken("invalidToken")
.setCommodityName("Commodity1")
.build();
StatusRuntimeException thrown = Assertions.assertThrows(StatusRuntimeException.class,
() -> blockingStub.getBestCommodityPrice(request));
com.google.rpc.Status status = StatusProto.fromThrowable(thrown);
assertNotNull(status);
assertEquals("NOT_FOUND", Code.forNumber(status.getCode()).toString());
assertEquals("The access token not found", status.getMessage());
for (Any any : status.getDetailsList()) {
if (any.is(ErrorInfo.class)) {
ErrorInfo errorInfo = any.unpack(ErrorInfo.class);
assertEquals("Invalid Token", errorInfo.getReason());
assertEquals("com.blogdemo.grpc.errorhandling", errorInfo.getDomain());
assertEquals("123validToken", errorInfo.getMetadataMap().get("insertToken"));
}
}
}
StatusProto.fromThrowable是一种直接从异常中获取com.google.rpc.Status的实用方法。
从status::getDetailsList我们得到com.google.rpc.ErrorInfo详细信息。
4. gRPC 流的错误
gRPC 流 允许服务器和客户端在单个 RPC 调用中发送多条消息。
**在错误传播方面,我们目前使用的方法对 gRPC 流无效。原因是*onError()*必须是 RPC 中调用的最后一个方法,**因为在此调用之后,框架会切断客户端和服务器之间的通信。
当我们使用流时,这不是我们想要的行为。相反,我们希望保持连接打开以响应可能通过 RPC 来的其他消息。
这个问题的一个很好的解决方案是将错误添加到消息本身,正如我们在CommodityPrice.proto中显示的那样:
service CommodityPriceProvider {
rpc getBestCommodityPrice(Commodity) returns (CommodityQuote) {}
rpc bidirectionalListOfPrices(stream Commodity) returns (stream StreamingCommodityQuote) {}
}
message Commodity {
string access_token = 1;
string commodity_name = 2;
}
message StreamingCommodityQuote{
oneof message{
CommodityQuote comodity_quote = 1;
google.rpc.Status status = 2;
}
}
函数bidirectionalListOfPrices返回一个StreamingCommodityQuote。此消息具有oneof关键字,表明它可以使用CommodityQuote或google.rpc.Status。
在以下示例中,如果客户端发送无效令牌,则服务器会在响应正文中添加状态错误:
public StreamObserver<Commodity> bidirectionalListOfPrices(StreamObserver<StreamingCommodityQuote> responseObserver) {
return new StreamObserver<Commodity>() {
@Override
public void onNext(Commodity request) {
if (request.getAccessToken().equals("123validToken") == false) {
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.NOT_FOUND.getNumber())
.setMessage("The access token not found")
.addDetails(Any.pack(ErrorInfo.newBuilder()
.setReason("Invalid Token")
.setDomain("com.blogdemo.grpc.errorhandling")
.putMetadata("insertToken", "123validToken")
.build()))
.build();
StreamingCommodityQuote streamingCommodityQuote = StreamingCommodityQuote.newBuilder()
.setStatus(status)
.build();
responseObserver.onNext(streamingCommodityQuote);
}
// ...
}
}
}
该代码创建com.google.rpc.Status的一个实例并将其添加到StreamingCommodityQuote响应消息中。它不会调用onError(),因此框架不会中断与客户端的连接。
让我们看一下客户端实现:
public void onNext(StreamingCommodityQuote streamingCommodityQuote) {
switch (streamingCommodityQuote.getMessageCase()) {
case COMODITY_QUOTE:
CommodityQuote commodityQuote = streamingCommodityQuote.getComodityQuote();
logger.info("RESPONSE producer:" + commodityQuote.getCommodityName() + " price:" + commodityQuote.getPrice());
break;
case STATUS:
com.google.rpc.Status status = streamingCommodityQuote.getStatus();
logger.info("Status code:" + Code.forNumber(status.getCode()));
logger.info("Status message:" + status.getMessage());
for (Any any : status.getDetailsList()) {
if (any.is(ErrorInfo.class)) {
ErrorInfo errorInfo;
try {
errorInfo = any.unpack(ErrorInfo.class);
logger.info("Reason:" + errorInfo.getReason());
logger.info("Domain:" + errorInfo.getDomain());
logger.info("Insert Token:" + errorInfo.getMetadataMap().get("insertToken"));
} catch (InvalidProtocolBufferException e) {
logger.error(e.getMessage());
}
}
}
break;
// ...
}
}
客户端在onNext(StreamingCommodityQuote)中获取返回的消息,并使用switch语句来区分CommodityQuote或com.google.rpc.Status。