使用Apache Camel Multicast组件遇到的一个问题(使用apache部署web网站)

网友投稿 930 2022-05-30

1 前言

2 Multicast组件简介

Multicast是Apache Camel(以下简称“Camel”)中一个功能强大的EIP组件,可以将消息发送至多条子路径,然后并行地执行它们。

参考官网文档,我们可以使用两种方式配置Multicast组件:

独立执行所有子路径,并将最后响应的子路径的结果作为最终输出。这也是Multicast组件的默认配置。

通过实现Camel的聚合策略(AggregationStrategy),使用自定义的聚合器来处理所有子路径的输出。

3 问题描述

本文使用案例如下:使用Jetty组件发布一个API,调用该API后,消息会分别发送至"direct:A"和"direct:B"两条子路径。在使用自定义的聚合策略处理后,继续执行后续步骤。其中在"direct:A"中抛出一个异常,来模拟运行失败;"direct:B"正常运行。同时在onException中定义了异常处理策略。

本文使用的Camel版本为3.8.0

@Override

public void configure() throws Exception {

onException(Exception.class)

.useOriginalMessage()

.handled(true)

.log("Exception handler invoked")

使用Apache Camel Multicast组件遇到的一个问题(使用apache部署web网站)

.transform().constant("{\"data\" : \"err\"}")

.end();

from("jetty:http://localhost:8081/myapi?httpMethodRestrict=GET")

.log("received request")

.log("Entering multicast")

.multicast(new SimpleFlowMergeAggregator())

.parallelProcessing().to("direct:A", "direct:B")

.end()

.log("Aggregated results ${body}")

.log("Another log")

.transform(simple("{\"result\" : \"success\"}"))

.end();

from("direct:A")

.log("Executing PATH_1 - exception path")

.transform(constant("DATA_FROM_PATH_1"))

.log("Starting exception throw")

.throwException(new Exception("USER INITIATED EXCEPTION"))

.log("PATH_1")

.end();

from("direct:B")

.log("Executing PATH_2 - success path")

.delayer(1000)

.transform(constant("DATA_FROM_PATH_2"))

.log("PATH_2")

.end();

}

自定义聚合器SimpleFlowMergeAggregator定义如下,其中我们将所有子路径的结果放入一个list对象。

public class SimpleFlowMergeAggregator implements AggregationStrategy {

private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName());

@Override

public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

LOGGER.info("Inside aggregator " + newExchange.getIn().getBody());

if(oldExchange == null) {

String data = newExchange.getIn().getBody(String.class);

List aggregatedDataList = new ArrayList<>();

aggregatedDataList.add(data);

newExchange.getIn().setBody(aggregatedDataList);

return newExchange;

}

List oldData = oldExchange.getIn().getBody(List.class);

oldData.add(newExchange.getIn().getBody(String.class));

oldExchange.getIn().setBody(oldData);

return oldExchange;

}

}

基于对Multicast组件执行逻辑的理解,我们认为存在多个子路径时,其运行结果应该为:如果其中有一条子路径能运行成功,则使用聚合的结果继续执行后续步骤;如果所有子路径都运行失败,则停止整个路由(route)。本案例中,由于子路径"direct:A"运行异常,子路径"direct:B"运行正常,则应该正常执行后续两个步骤日志(log)和转换(transform)。

运行上述案例,日志信息如下:

2021-05-06 12:43:18.565  INFO 13956 --- [qtp916897446-42] route1                                   : received request

2021-05-06 12:43:18.566  INFO 13956 --- [qtp916897446-42] route1                                   : Entering multicast

2021-05-06 12:43:18.575  INFO 13956 --- [ #4 - Multicast] route2                                   : Executing PATH_1 - exception path

2021-05-06 12:43:18.575  INFO 13956 --- [ #4 - Multicast] route2                                   : Starting exception throw

2021-05-06 12:43:18.578  INFO 13956 --- [ #4 - Multicast] route2                                   : Exception handler invoked

2021-05-06 12:43:18.579  INFO 13956 --- [ #4 - Multicast] c.e.d.m.SimpleFlowMergeAggregator        : Inside aggregator {"data" : "err"}

2021-05-06 12:43:19.575  INFO 13956 --- [ #3 - Multicast] route3                                   : Executing PATH_2 - success path

2021-05-06 12:43:21.576  INFO 13956 --- [ #3 - Multicast] route3                                   : PATH_2

2021-05-06 12:43:21.576  INFO 13956 --- [ #3 - Multicast] c.e.d.m.SimpleFlowMergeAggregator        : Inside aggregator DATA_FROM_PATH_2

观察上述日志,我们发现完成两条子路径结果的聚合后,后续的两个步骤日志(log)和转换(transform)并未执行。这并不符合我们期望的结果。

经过多次测试,我们还发现,只有当到达聚合器SimpleFlowMergeAggregator的第一个子路径("direct:A")执行异常时,便会发生这种后续步骤未执行的情况;而如果第一个子路径("direct:A")执行成功,即使另一个子路径("direct:B")执行失败,也会继续执行后续的步骤。

4 问题分析

接下来,我们通过查看Camel源代码,来找出上述现象的原因。

在camel-core-processors模块的Pipeline.java 中,其run()方法中有这样一段代码:

@Override

public void run() {

boolean stop = exchange.isRouteStop();

int num = index;

boolean more = num < size;

boolean first = num == 0;

if (!stop && more && (first || continueProcessing(exchange, "so breaking out of pipeline", LOG))) {

// prepare for next run

if (exchange.hasOut()) {

exchange.setIn(exchange.getOut());

exchange.setOut(null);

}

// get the next processor

AsyncProcessor processor = processors.get(index++);

processor.process(exchange, this);

} else {

// copyResults is needed in case MEP is OUT and the message is not an OUT message

ExchangeHelper.copyResults(exchange, exchange);

// logging nextExchange as it contains the exchange that might have altered the payload and since

// we are logging the completion if will be confusing if we log the original instead

// we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots

if (LOG.isTraceEnabled()) {

LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);

}

AsyncCallback cb = callback;

taskFactory.release(this);

reactiveExecutor.schedule(cb);

}

}

其中,这个if判断决定了是否继续执行后续步骤:

if (!stop && more && (first || continueProcessing(exchange, "so breaking out of pipeline", LOG)))

可以看出,在如下三种情况下,后续步骤将不会被执行:

1. 之前的步骤已经将exchange 对象标记为停止状态。

boolean stop = exchange.isRouteStop();

2. 后续没有步骤可执行。

boolean more = num < size;

3. continueProcessing()方法返回false。

我们来看看continueProcessing()方法的代码。

public final class PipelineHelper {

public static boolean continueProcessing(Exchange exchange, String message, Logger log) {

ExtendedExchange ee = (ExtendedExchange) exchange;

boolean stop = ee.isFailed() || ee.isRollbackOnly() || ee.isRollbackOnlyLast()

|| (ee.isErrorHandlerHandledSet() && ee.isErrorHandlerHandled());

if (stop) {

if (log.isDebugEnabled()) {

StringBuilder sb = new StringBuilder();

sb.append("Message exchange has failed: ").append(message).append(" for exchange: ").append(exchange);

if (exchange.isRollbackOnly() || exchange.isRollbackOnlyLast()) {

sb.append(" Marked as rollback only.");

}

if (exchange.getException() != null) {

sb.append(" Exception: ").append(exchange.getException());

}

if (ee.isErrorHandlerHandledSet() && ee.isErrorHandlerHandled()) {

sb.append(" Handled by the error handler.");

}

log.debug(sb.toString());

}

return false;

}

if (ee.isRouteStop()) {

if (log.isDebugEnabled()) {

log.debug("ExchangeId: {} is marked to stop routing: {}", exchange.getExchangeId(), exchange);

}

return false;

}

return true;

}

}

可以看出,当执行过程发生异常并且被异常处理器捕获时,continueProcessing()方法将返回false。

再回到我们的案例,第一个到达聚合器SimpleFlowMergeAggregator的子路径("direct:A"),会作为后续聚合的基础,其它子路径("direct:B")会在此基础上追加各自的body数据。实际上,很多Camel用户都会采用这种方式来实现自定义聚合策略。但这样做存在一个问题:在异常处理时,子路径"direct:A"的exchange对象会被设置一个状态标识,而此状态标识会被传递到下游,用于判断是否继续执行后续步骤。由于作为聚合基础的"direct:A"子路径的exchange对象状态为“异常”,最终continueProcessing()方法将返回false,后续的步骤也就不会再执行。

5 解决方案

对于上述问题,用户可以使用多种方式来设置异常处理时exchange对象的状态。本文采用如下解决方案:如果第一个子路径执行正常,则继续执行后续步骤;如果第一个子路径执行异常,则将其与其它执行成功的子路径交换,然后继续执行后续步骤。

更新后的自定义聚合器SimpleFlowMergeAggregator如下:

public class SimpleFlowMergeAggregator implements AggregationStrategy {

private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName());

@Override

public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

LOGGER.info("Inside aggregator " + newExchange.getIn().getBody());

if(oldExchange == null) {

String data = newExchange.getIn().getBody(String.class);

List aggregatedDataList = new ArrayList<>();

aggregatedDataList.add(data);

newExchange.getIn().setBody(aggregatedDataList);

return newExchange;

}

if(hadException(oldExchange)) {

if(!hadException(newExchange)) {

// aggregate and swap the base

LOGGER.info("Found new exchange with success. swapping the base exchange");

List oldData = oldExchange.getIn().getBody(List.class);

oldData.add(newExchange.getIn().getBody(String.class));

// swapped the base here

newExchange.getIn().setBody(oldData);

return newExchange;

}

}

List oldData = oldExchange.getIn().getBody(List.class);

oldData.add(newExchange.getIn().getBody(String.class));

oldExchange.getIn().setBody(oldData);

return oldExchange;

}

private boolean hadException(Exchange exchange) {

if(exchange.isFailed()) {

return true;

}

if(exchange.isRollbackOnly()) {

return true;

}

if(exchange.isRollbackOnlyLast()) {

return true;

}

if(((ExtendedExchange)exchange).isErrorHandlerHandledSet()

&& ((ExtendedExchange)exchange).isErrorHandlerHandled()) {

return true;

}

return false;

}

}

再次运行上述案例,日志信息如下:

2021-05-06 12:46:19.122  INFO 2576 --- [qtp174245837-45] route1                                   : received request

2021-05-06 12:46:19.123  INFO 2576 --- [qtp174245837-45] route1                                   : Entering multicast

2021-05-06 12:46:19.130  INFO 2576 --- [ #3 - Multicast] route2                                   : Executing PATH_1 - exception path

2021-05-06 12:46:19.130  INFO 2576 --- [ #3 - Multicast] route2                                   : Starting exception throw

2021-05-06 12:46:19.134  INFO 2576 --- [ #3 - Multicast] route2                                   : Exception handler invoked

2021-05-06 12:46:19.135  INFO 2576 --- [ #3 - Multicast] c.e.d.m.SimpleFlowMergeAggregator        : Inside aggregator {"data" : "err"}

2021-05-06 12:46:20.130  INFO 2576 --- [ #4 - Multicast] route3                                   : Executing PATH_2 - success path

2021-05-06 12:46:22.132  INFO 2576 --- [ #4 - Multicast] route3                                   : PATH_2

2021-05-06 12:46:22.132  INFO 2576 --- [ #4 - Multicast] c.e.d.m.SimpleFlowMergeAggregator        : Inside aggregator DATA_FROM_PATH_2

2021-05-06 12:46:22.132  INFO 2576 --- [ #4 - Multicast] c.e.d.m.SimpleFlowMergeAggregator        : Found new exchange with success. swapping the base exchange

2021-05-06 12:46:22.133  INFO 2576 --- [ #4 - Multicast] route1                                   : Aggregated results {"data" : "err"},DATA_FROM_PATH_2

2021-05-06 12:46:22.133  INFO 2576 --- [ #4 - Multicast] route1                                   : Another log

可以看出,使用新的自定义聚合策略后,后续的日志(log)和转换(transform)步骤都成功执行。

6 结语

本文通过案例,发现了一个Camel Multicast组件聚合策略相关的问题。通过查看Camel源代码,找到了问题原因并给出了解决方案。

希望本文可以帮助到遇到同样问题的Camel用户。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:15. 微服务API网关-kong初探-2(15.5系统怎么样)
下一篇:GaussDB for DWS 数据融合系列第一期:云端数据接入(CDM)(gaussdb数据库)
相关文章