ENTAXY-248 release 1.8.1
This commit is contained in:
@ -8,7 +8,7 @@
|
||||
<parent>
|
||||
<groupId>ru.entaxy.esb.system</groupId>
|
||||
<artifactId>system-parent</artifactId>
|
||||
<version>1.8.0</version>
|
||||
<version>1.8.1</version>
|
||||
</parent>
|
||||
|
||||
<groupId>ru.entaxy.esb.system.commons</groupId>
|
||||
@ -61,12 +61,17 @@
|
||||
<dependency>
|
||||
<groupId>com.google.code.gson</groupId>
|
||||
<artifactId>gson</artifactId>
|
||||
<version>${gson.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>xerces</groupId>
|
||||
<artifactId>xercesImpl</artifactId>
|
||||
<version>${xerces.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>xml-apis</groupId>
|
||||
<artifactId>xml-apis</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.osgi</groupId>
|
||||
@ -89,7 +94,6 @@
|
||||
<dependency>
|
||||
<groupId>org.osgi</groupId>
|
||||
<artifactId>osgi.core</artifactId>
|
||||
<version>${osgi.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
@ -158,5 +162,10 @@
|
||||
<groupId>org.apache.camel</groupId>
|
||||
<artifactId>camel-cxf</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.camel</groupId>
|
||||
<artifactId>camel-base</artifactId>
|
||||
<version>${camel.version}-ENTAXY</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
@ -1,221 +0,0 @@
|
||||
/*-
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2021 EmDev LLC
|
||||
* ==========
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* ~~~~~~/licensing~~~~~~
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.aggregation;
|
||||
|
||||
import com.hazelcast.core.HazelcastInstance;
|
||||
import org.apache.camel.*;
|
||||
import org.apache.camel.model.OptionalIdentifiedDefinition;
|
||||
import org.apache.camel.model.ToDefinition;
|
||||
import org.apache.camel.model.language.SimpleExpression;
|
||||
import org.apache.camel.processor.CamelInternalProcessor;
|
||||
import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
|
||||
import org.apache.camel.reifier.ProcessorReifier;
|
||||
import org.apache.camel.spi.AggregationRepository;
|
||||
import org.apache.camel.spi.IdAware;
|
||||
import org.apache.camel.spi.RouteIdAware;
|
||||
import org.apache.camel.util.concurrent.SynchronousExecutorService;
|
||||
import ru.entaxy.esb.system.common.aggregation.hazelcast.DisconnectedMembershipListener;
|
||||
import ru.entaxy.esb.system.common.aggregation.repo.IgniteAggregationRepository;
|
||||
|
||||
public class AggregationProcessorBean implements Processor {
|
||||
|
||||
private static final String routeName = "aggregation";
|
||||
private CamelContext camelContext;
|
||||
private String aggregationStrategyRef;
|
||||
private AggregationStrategy aggregationStrategy;
|
||||
private String aggregationStrategyMethodName;
|
||||
private String aggregateExpression = "ENTAXY_AcknowledgeMsgID";
|
||||
private String toDefinition = "direct-vm:common-revert-no-acknowledge-messages?block=true&timeout=60000";
|
||||
private int completionSize = 2;
|
||||
//10 min in mc
|
||||
private int completionTimeout = 600_000;
|
||||
private String aggregationRepositoryRef;
|
||||
private AggregationRepository aggregationRepository;
|
||||
|
||||
private HazelcastInstance hazelcastInstance;
|
||||
|
||||
private AggregationProcessorWithRestoreTimeout aggregationProcessorWithRestoreTimeout;
|
||||
|
||||
public void initAggregateProcessor() throws Exception {
|
||||
Route route = camelContext.getRoute(routeName);
|
||||
|
||||
aggregationProcessorWithRestoreTimeout = new AggregationProcessorWithRestoreTimeout(camelContext,
|
||||
getCamelDestinationProcessor(route),
|
||||
getCorrelationExpression(route),
|
||||
createAggregationStrategy(camelContext),
|
||||
new SynchronousExecutorService(),
|
||||
false);
|
||||
settingsAggregationProcessorWithRestoreTimeout(route);
|
||||
aggregationProcessorWithRestoreTimeout.doStart();
|
||||
|
||||
addHazelcastMembershipListener();
|
||||
}
|
||||
|
||||
private void settingsAggregationProcessorWithRestoreTimeout(Route route) {
|
||||
AggregationRepository repository = createAggregationRepository(route);
|
||||
if (repository != null) {
|
||||
aggregationProcessorWithRestoreTimeout.setAggregationRepository(repository);
|
||||
}
|
||||
aggregationProcessorWithRestoreTimeout.setCompletionSize(completionSize);
|
||||
aggregationProcessorWithRestoreTimeout.setCompletionTimeout(completionTimeout);
|
||||
}
|
||||
|
||||
private void addHazelcastMembershipListener() {
|
||||
hazelcastInstance.getCluster().addMembershipListener(new DisconnectedMembershipListener(aggregationProcessorWithRestoreTimeout, camelContext));
|
||||
}
|
||||
|
||||
private Expression getCorrelationExpression(Route route) {
|
||||
return new SimpleExpression(aggregateExpression);
|
||||
}
|
||||
|
||||
private CamelInternalProcessor getCamelDestinationProcessor(Route route) throws Exception {
|
||||
Processor childProcessor = createChildProcessor(route, true);
|
||||
|
||||
// wrap the aggregate route in a unit of work processor
|
||||
CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, childProcessor);
|
||||
internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(route, camelContext));
|
||||
return internal;
|
||||
}
|
||||
|
||||
private Processor createChildProcessor(Route route, boolean mandatory) throws Exception {
|
||||
Processor children = null;
|
||||
ToDefinition definition = new ToDefinition(toDefinition);
|
||||
// at first use custom factory
|
||||
if (camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory() != null) {
|
||||
children = camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory().createChildProcessor(route,
|
||||
definition, mandatory);
|
||||
}
|
||||
// fallback to default implementation if factory did not create the
|
||||
// child
|
||||
if (children == null) {
|
||||
children = createOutputsProcessor(route, definition);
|
||||
}
|
||||
|
||||
if (children == null && mandatory) {
|
||||
throw new IllegalArgumentException("Definition has no children on " + definition);
|
||||
}
|
||||
return children;
|
||||
}
|
||||
|
||||
protected Processor createOutputsProcessor(Route route, ToDefinition definition) throws Exception {
|
||||
Processor processor = ProcessorReifier.reifier(route, definition).createProcessor();
|
||||
|
||||
// inject id
|
||||
if (processor instanceof IdAware) {
|
||||
String id = getId(definition);
|
||||
((IdAware) processor).setId(id);
|
||||
}
|
||||
if (processor instanceof RouteIdAware) {
|
||||
((RouteIdAware) processor).setRouteId(route.getRouteId());
|
||||
}
|
||||
|
||||
return processor;
|
||||
}
|
||||
|
||||
protected String getId(OptionalIdentifiedDefinition<?> def) {
|
||||
return def.idOrCreate(camelContext.adapt(ExtendedCamelContext.class).getNodeIdFactory());
|
||||
}
|
||||
|
||||
|
||||
private AggregationRepository createAggregationRepository(Route route) {
|
||||
if (aggregationRepository == null && aggregationRepositoryRef != null) {
|
||||
aggregationRepository = (AggregationRepository) route.getCamelContext().getRegistry().lookupByName(aggregationRepositoryRef);
|
||||
}
|
||||
return aggregationRepository;
|
||||
}
|
||||
|
||||
private AggregationStrategy createAggregationStrategy(CamelContext camelContext) {
|
||||
if (aggregationStrategy == null && aggregationStrategyRef != null) {
|
||||
Object aggStrategy = camelContext.getRegistry().lookupByNameAndType(aggregationStrategyRef, Object.class);
|
||||
if (aggStrategy instanceof AggregationStrategy) {
|
||||
aggregationStrategy = (AggregationStrategy) aggStrategy;
|
||||
} else if (aggStrategy != null) {
|
||||
aggregationStrategy = new AggregationStrategyBeanAdapter(aggStrategy, aggregationStrategyMethodName);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + aggregationStrategyRef);
|
||||
}
|
||||
}
|
||||
|
||||
if (aggregationStrategy == null) {
|
||||
throw new IllegalArgumentException("AggregationStrategy or AggregationStrategyRef must be set on " + this);
|
||||
}
|
||||
|
||||
if (aggregationStrategy instanceof CamelContextAware) {
|
||||
((CamelContextAware) aggregationStrategy).setCamelContext(camelContext);
|
||||
}
|
||||
|
||||
return aggregationStrategy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(Exchange exchange) throws Exception {
|
||||
aggregationProcessorWithRestoreTimeout.process(exchange);
|
||||
}
|
||||
|
||||
public void setCamelContext(CamelContext camelContext) {
|
||||
this.camelContext = camelContext;
|
||||
}
|
||||
|
||||
public void setAggregationStrategyRef(String aggregationStrategyRef) {
|
||||
this.aggregationStrategyRef = aggregationStrategyRef;
|
||||
}
|
||||
|
||||
public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
|
||||
this.aggregationStrategy = aggregationStrategy;
|
||||
}
|
||||
|
||||
public void setAggregationStrategyMethodName(String aggregationStrategyMethodName) {
|
||||
this.aggregationStrategyMethodName = aggregationStrategyMethodName;
|
||||
}
|
||||
|
||||
public void setAggregateExpression(String aggregateExpression) {
|
||||
this.aggregateExpression = aggregateExpression;
|
||||
}
|
||||
|
||||
public void setToDefinition(String toDefinition) {
|
||||
this.toDefinition = toDefinition;
|
||||
}
|
||||
|
||||
public void setCompletionSize(int completionSize) {
|
||||
this.completionSize = completionSize;
|
||||
}
|
||||
|
||||
public void setCompletionTimeout(int completionTimeout) {
|
||||
this.completionTimeout = completionTimeout;
|
||||
}
|
||||
|
||||
public void setAggregationRepositoryRef(String aggregationRepositoryRef) {
|
||||
this.aggregationRepositoryRef = aggregationRepositoryRef;
|
||||
}
|
||||
|
||||
public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
|
||||
this.hazelcastInstance = hazelcastInstance;
|
||||
}
|
||||
|
||||
public void init() throws Exception {
|
||||
camelContext.addStartupListener((a, b) -> initAggregateProcessor());
|
||||
}
|
||||
|
||||
public void doStop() throws Exception {
|
||||
aggregationProcessorWithRestoreTimeout.doStop();
|
||||
if (aggregationRepository instanceof IgniteAggregationRepository)
|
||||
((IgniteAggregationRepository) aggregationRepository).doStop();
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
@ -19,25 +19,21 @@
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.aggregation.hazelcast;
|
||||
|
||||
import com.hazelcast.core.HazelcastInstance;
|
||||
import com.hazelcast.core.MemberAttributeEvent;
|
||||
import com.hazelcast.core.MembershipEvent;
|
||||
import com.hazelcast.core.MembershipListener;
|
||||
import org.apache.camel.CamelContext;
|
||||
import org.apache.camel.processor.aggregate.AggregateProcessorSetter;
|
||||
import org.apache.camel.processor.aggregate.EntaxyAggregateProcessor;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import ru.entaxy.esb.system.common.aggregation.AggregationProcessorWithRestoreTimeout;
|
||||
|
||||
public class DisconnectedMembershipListener implements MembershipListener {
|
||||
public class DisconnectedMembershipListener implements MembershipListener, AggregateProcessorSetter {
|
||||
|
||||
protected static final Logger log = LoggerFactory.getLogger(DisconnectedMembershipListener.class);
|
||||
private final AggregationProcessorWithRestoreTimeout aggregationProcessorWithRestoreTimeout;
|
||||
private final CamelContext camelContext;
|
||||
|
||||
public DisconnectedMembershipListener(AggregationProcessorWithRestoreTimeout aggregationProcessorWithRestoreTimeout,
|
||||
CamelContext camelContext) {
|
||||
this.aggregationProcessorWithRestoreTimeout = aggregationProcessorWithRestoreTimeout;
|
||||
this.camelContext = camelContext;
|
||||
}
|
||||
private CamelContext camelContext;
|
||||
private EntaxyAggregateProcessor aggregateProcessor;
|
||||
|
||||
@Override
|
||||
public void memberAdded(MembershipEvent membershipEvent) {
|
||||
@ -46,8 +42,8 @@ public class DisconnectedMembershipListener implements MembershipListener {
|
||||
@Override
|
||||
public void memberRemoved(MembershipEvent membershipEvent) {
|
||||
try {
|
||||
aggregationProcessorWithRestoreTimeout.recoverCompletedMessageFromAggregationRepository(camelContext);
|
||||
aggregationProcessorWithRestoreTimeout.restoreTimeoutMapFromAggregationRepository();
|
||||
aggregateProcessor.recoverCompletedMessageFromAggregationRepository(camelContext);
|
||||
aggregateProcessor.restoreTimeoutMapFromAggregationRepository();
|
||||
} catch (Exception e) {
|
||||
log.error("Can't restore Timeout from Aggregator. Please restart bundle.", e);
|
||||
}
|
||||
@ -56,4 +52,17 @@ public class DisconnectedMembershipListener implements MembershipListener {
|
||||
@Override
|
||||
public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAggregateProcessor(EntaxyAggregateProcessor aggregateProcessor) {
|
||||
this.aggregateProcessor = aggregateProcessor;
|
||||
}
|
||||
|
||||
public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
|
||||
hazelcastInstance.getCluster().addMembershipListener(this);
|
||||
}
|
||||
|
||||
public void setCamelContext(CamelContext camelContext) {
|
||||
this.camelContext = camelContext;
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user