initial public commit

This commit is contained in:
2021-09-06 17:46:59 +03:00
commit b744b08829
824 changed files with 91593 additions and 0 deletions

View File

@ -0,0 +1,221 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.aggregation;
import com.hazelcast.core.HazelcastInstance;
import org.apache.camel.*;
import org.apache.camel.model.OptionalIdentifiedDefinition;
import org.apache.camel.model.ToDefinition;
import org.apache.camel.model.language.SimpleExpression;
import org.apache.camel.processor.CamelInternalProcessor;
import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
import org.apache.camel.reifier.ProcessorReifier;
import org.apache.camel.spi.AggregationRepository;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.util.concurrent.SynchronousExecutorService;
import ru.entaxy.esb.system.common.aggregation.hazelcast.DisconnectedMembershipListener;
import ru.entaxy.esb.system.common.aggregation.repo.IgniteAggregationRepository;
public class AggregationProcessorBean implements Processor {
private static final String routeName = "aggregation";
private CamelContext camelContext;
private String aggregationStrategyRef;
private AggregationStrategy aggregationStrategy;
private String aggregationStrategyMethodName;
private String aggregateExpression = "ENTAXY_AcknowledgeMsgID";
private String toDefinition = "direct-vm:common-revert-no-acknowledge-messages?block=true&timeout=60000";
private int completionSize = 2;
//10 min in mc
private int completionTimeout = 600_000;
private String aggregationRepositoryRef;
private AggregationRepository aggregationRepository;
private HazelcastInstance hazelcastInstance;
private AggregationProcessorWithRestoreTimeout aggregationProcessorWithRestoreTimeout;
public void initAggregateProcessor() throws Exception {
Route route = camelContext.getRoute(routeName);
aggregationProcessorWithRestoreTimeout = new AggregationProcessorWithRestoreTimeout(camelContext,
getCamelDestinationProcessor(route),
getCorrelationExpression(route),
createAggregationStrategy(camelContext),
new SynchronousExecutorService(),
false);
settingsAggregationProcessorWithRestoreTimeout(route);
aggregationProcessorWithRestoreTimeout.doStart();
addHazelcastMembershipListener();
}
private void settingsAggregationProcessorWithRestoreTimeout(Route route) {
AggregationRepository repository = createAggregationRepository(route);
if (repository != null) {
aggregationProcessorWithRestoreTimeout.setAggregationRepository(repository);
}
aggregationProcessorWithRestoreTimeout.setCompletionSize(completionSize);
aggregationProcessorWithRestoreTimeout.setCompletionTimeout(completionTimeout);
}
private void addHazelcastMembershipListener() {
hazelcastInstance.getCluster().addMembershipListener(new DisconnectedMembershipListener(aggregationProcessorWithRestoreTimeout, camelContext));
}
private Expression getCorrelationExpression(Route route) {
return new SimpleExpression(aggregateExpression);
}
private CamelInternalProcessor getCamelDestinationProcessor(Route route) throws Exception {
Processor childProcessor = createChildProcessor(route, true);
// wrap the aggregate route in a unit of work processor
CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, childProcessor);
internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(route, camelContext));
return internal;
}
private Processor createChildProcessor(Route route, boolean mandatory) throws Exception {
Processor children = null;
ToDefinition definition = new ToDefinition(toDefinition);
// at first use custom factory
if (camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory() != null) {
children = camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory().createChildProcessor(route,
definition, mandatory);
}
// fallback to default implementation if factory did not create the
// child
if (children == null) {
children = createOutputsProcessor(route, definition);
}
if (children == null && mandatory) {
throw new IllegalArgumentException("Definition has no children on " + definition);
}
return children;
}
protected Processor createOutputsProcessor(Route route, ToDefinition definition) throws Exception {
Processor processor = ProcessorReifier.reifier(route, definition).createProcessor();
// inject id
if (processor instanceof IdAware) {
String id = getId(definition);
((IdAware) processor).setId(id);
}
if (processor instanceof RouteIdAware) {
((RouteIdAware) processor).setRouteId(route.getRouteId());
}
return processor;
}
protected String getId(OptionalIdentifiedDefinition<?> def) {
return def.idOrCreate(camelContext.adapt(ExtendedCamelContext.class).getNodeIdFactory());
}
private AggregationRepository createAggregationRepository(Route route) {
if (aggregationRepository == null && aggregationRepositoryRef != null) {
aggregationRepository = (AggregationRepository) route.getCamelContext().getRegistry().lookupByName(aggregationRepositoryRef);
}
return aggregationRepository;
}
private AggregationStrategy createAggregationStrategy(CamelContext camelContext) {
if (aggregationStrategy == null && aggregationStrategyRef != null) {
Object aggStrategy = camelContext.getRegistry().lookupByNameAndType(aggregationStrategyRef, Object.class);
if (aggStrategy instanceof AggregationStrategy) {
aggregationStrategy = (AggregationStrategy) aggStrategy;
} else if (aggStrategy != null) {
aggregationStrategy = new AggregationStrategyBeanAdapter(aggStrategy, aggregationStrategyMethodName);
} else {
throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + aggregationStrategyRef);
}
}
if (aggregationStrategy == null) {
throw new IllegalArgumentException("AggregationStrategy or AggregationStrategyRef must be set on " + this);
}
if (aggregationStrategy instanceof CamelContextAware) {
((CamelContextAware) aggregationStrategy).setCamelContext(camelContext);
}
return aggregationStrategy;
}
@Override
public void process(Exchange exchange) throws Exception {
aggregationProcessorWithRestoreTimeout.process(exchange);
}
public void setCamelContext(CamelContext camelContext) {
this.camelContext = camelContext;
}
public void setAggregationStrategyRef(String aggregationStrategyRef) {
this.aggregationStrategyRef = aggregationStrategyRef;
}
public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
this.aggregationStrategy = aggregationStrategy;
}
public void setAggregationStrategyMethodName(String aggregationStrategyMethodName) {
this.aggregationStrategyMethodName = aggregationStrategyMethodName;
}
public void setAggregateExpression(String aggregateExpression) {
this.aggregateExpression = aggregateExpression;
}
public void setToDefinition(String toDefinition) {
this.toDefinition = toDefinition;
}
public void setCompletionSize(int completionSize) {
this.completionSize = completionSize;
}
public void setCompletionTimeout(int completionTimeout) {
this.completionTimeout = completionTimeout;
}
public void setAggregationRepositoryRef(String aggregationRepositoryRef) {
this.aggregationRepositoryRef = aggregationRepositoryRef;
}
public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
this.hazelcastInstance = hazelcastInstance;
}
public void init() throws Exception {
camelContext.addStartupListener((a, b) -> initAggregateProcessor());
}
public void doStop() throws Exception {
aggregationProcessorWithRestoreTimeout.doStop();
if (aggregationRepository instanceof IgniteAggregationRepository)
((IgniteAggregationRepository) aggregationRepository).doStop();
}
}

View File

@ -0,0 +1,44 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.aggregation;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
public class HeaderMergeAggregatorImpl implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange != null && newExchange != null) {
Map<String, Object> oldHeaders = oldExchange.getIn().getHeaders();
Map<String, Object> newHeaders = newExchange.getIn().getHeaders();
oldHeaders = oldHeaders.entrySet().stream()
.filter(e -> !newHeaders.containsKey(e.getKey()))
.collect(Collectors.toMap(e->e.getKey(), e->e.getValue()));
newExchange.getIn().getHeaders().putAll(oldHeaders);
}
return newExchange;
}
}

View File

@ -0,0 +1,55 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.aggregation;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
public class TimeoutAwareAggregationStrategyImpl implements AggregationStrategy {
private static final String ACK_MESSAGE_HEADER = "NTX_AckMessage";
private final String nameHeaderAcknowledge;
public TimeoutAwareAggregationStrategyImpl(String nameHeaderAcknowledge) {
this.nameHeaderAcknowledge = nameHeaderAcknowledge;
}
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
return newExchange;
} else {
oldExchange.getMessage().setHeader(nameHeaderAcknowledge, checkOnCorrectPair(oldExchange, newExchange));
return oldExchange;
}
}
private boolean checkOnCorrectPair(Exchange oldExchange, Exchange newExchange) {
return (oldExchange != null && oldExchange.getMessage().getHeader(ACK_MESSAGE_HEADER) == null &&
(newExchange == null || newExchange.getMessage().getHeader(ACK_MESSAGE_HEADER) == null));
}
@Override
public void timeout(Exchange oldExchange, int index, int total, long timeout) {
oldExchange.getMessage().setHeader(nameHeaderAcknowledge,
(oldExchange.getMessage().getHeader(ACK_MESSAGE_HEADER) == null));
}
}

View File

@ -0,0 +1,59 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.aggregation.hazelcast;
import com.hazelcast.core.MemberAttributeEvent;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;
import org.apache.camel.CamelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.entaxy.esb.system.common.aggregation.AggregationProcessorWithRestoreTimeout;
public class DisconnectedMembershipListener implements MembershipListener {
protected static final Logger log = LoggerFactory.getLogger(DisconnectedMembershipListener.class);
private final AggregationProcessorWithRestoreTimeout aggregationProcessorWithRestoreTimeout;
private final CamelContext camelContext;
public DisconnectedMembershipListener(AggregationProcessorWithRestoreTimeout aggregationProcessorWithRestoreTimeout,
CamelContext camelContext) {
this.aggregationProcessorWithRestoreTimeout = aggregationProcessorWithRestoreTimeout;
this.camelContext = camelContext;
}
@Override
public void memberAdded(MembershipEvent membershipEvent) {
}
@Override
public void memberRemoved(MembershipEvent membershipEvent) {
try {
aggregationProcessorWithRestoreTimeout.recoverCompletedMessageFromAggregationRepository(camelContext);
aggregationProcessorWithRestoreTimeout.restoreTimeoutMapFromAggregationRepository();
} catch (Exception e) {
log.error("Can't restore Timeout from Aggregator. Please restart bundle.", e);
}
}
@Override
public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
}
}

View File

@ -0,0 +1,427 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.aggregation.repo;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper;
import org.apache.camel.processor.aggregate.jdbc.JdbcCamelCodec;
import org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper;
import org.apache.camel.spi.OptimisticLockingAggregationRepository;
import org.apache.camel.spi.RecoverableAggregationRepository;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.logger.jcl.JclLogger;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.NotNull;
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.EmptyResultDataAccessException;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_QUIET;
public class IgniteAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository {
private static final Logger LOG = LoggerFactory.getLogger(IgniteAggregationRepository.class);
public static final String AGGREGATION = "aggregation";
public static final String AGGREGATION_COMPLETED = "aggregationCompleted";
public static final int MAX_ATTEMPT_COUNT = 3;
private JdbcOptimisticLockingExceptionMapper jdbcOptimisticLockingExceptionMapper = new DefaultJdbcOptimisticLockingExceptionMapper();
private boolean returnOldExchange;
private final JdbcCamelCodec codec = new JdbcCamelCodec();
private long recoveryInterval = 5000;
private boolean useRecovery = true;
private int maximumRedeliveries;
private String deadLetterUri;
private boolean allowSerializedHeaders;
private int backups = 2;
private String workDirectory;
private String addresses = "127.0.0.1:47500,127.0.0.1:47501";
private Ignite ignite;
private IgniteTransactions transactions;
private IgniteCache<String, byte[]> aggregationCache;
private CacheConfiguration<String, byte[]> aggregationCfg;
private IgniteCache<String, byte[]> aggregationCompleted;
private CacheConfiguration<String, byte[]> aggregationCompletedCfg;
private BundleContext bundleContext;
/**
* Creates an ignite aggregation repository
*/
public IgniteAggregationRepository() {
}
/**
* Creates an ignite aggregation repository with the two mandatory parameters
*/
public IgniteAggregationRepository(BundleContext bundleContext, String workDirectory) {
this.setBundleContext(bundleContext);
this.setWorkDirectory(workDirectory);
}
@Override
public Exchange add(final CamelContext camelContext, final String correlationId,
final Exchange oldExchange, final Exchange newExchange) throws OptimisticLockingException {
try {
return add(camelContext, correlationId, newExchange);
} catch (Exception e) {
if (jdbcOptimisticLockingExceptionMapper != null && jdbcOptimisticLockingExceptionMapper.isOptimisticLocking(e)) {
throw new OptimisticLockingException();
} else {
throw RuntimeCamelException.wrapRuntimeCamelException(e);
}
}
}
@Override
public Exchange add(final CamelContext camelContext, final String correlationId, final Exchange exchange) {
Exchange result = null;
final String key = correlationId;
try {
LOG.debug("Adding exchange with key: [{}]", key);
boolean present = aggregationCache.get(key) != null;
// Recover existing exchange with that ID
if (isReturnOldExchange() && present) {
result = get(key, camelContext);
}
final byte[] data = codec.marshallExchange(camelContext, exchange, allowSerializedHeaders);
try (Transaction tx = transactions.txStart()) {
aggregationCache.put(key, data);
tx.commit();
}
} catch (Exception e) {
throw new RuntimeException("Error adding with key " + key, e);
}
return result;
}
public Exchange insert(final CamelContext camelContext, final String correlationId, final Exchange exchange) throws IOException {
Exchange result = null;
LOG.debug("Adding exchange with key: [{}]", correlationId);
final byte[] data = codec.marshallExchange(camelContext, exchange, allowSerializedHeaders);
aggregationCompleted.put(correlationId, data);
return result;
}
@Override
public Exchange get(final CamelContext camelContext, final String correlationId) {
Exchange result = get(correlationId, camelContext);
LOG.debug("Getting key [{}] -> {}", correlationId, result);
return result;
}
private Exchange get(final String key, final CamelContext camelContext) {
try {
final byte[] data = aggregationCache.get(key);
return codec.unmarshallExchange(camelContext, data);
} catch (EmptyResultDataAccessException | NullPointerException ex) {
return null;
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
@Override
public void remove(final CamelContext camelContext, final String correlationId, final Exchange exchange) {
try (Transaction tx = transactions.txStart()) {
insert(camelContext, correlationId, exchange);
if (!aggregationCache.remove(correlationId)) {
throw new RuntimeException("Error removing key " + correlationId + " from repository " + AGGREGATION);
}
tx.commit();
} catch (Exception exception) {
throw new RuntimeException("Error removing key " + correlationId + " from repository " + AGGREGATION, exception);
}
}
@Override
public void confirm(final CamelContext camelContext, final String exchangeId) {
confirm(camelContext, exchangeId, 0);
}
private void confirm(final CamelContext camelContext, final String exchangeId, int attemptCount) {
if (attemptCount <= MAX_ATTEMPT_COUNT) {
try (Transaction tx = transactions.txStart()) {
aggregationCompleted.remove(exchangeId);
tx.commit();
} catch (Exception exception) {
confirm(camelContext, exchangeId, ++attemptCount);
}
}
}
@Override
public Set<String> getKeys() {
Set<String> keys = new HashSet<>();
aggregationCache.query(new ScanQuery<>(null)).forEach(entry -> keys.add((String) entry.getKey()));
return keys;
}
@Override
public Set<String> scan(CamelContext camelContext) {
Set<String> keys = new HashSet<>();
aggregationCompleted.query(new ScanQuery<>(null)).forEach(entry -> keys.add((String) entry.getKey()));
return keys;
}
@Override
public Exchange recover(CamelContext camelContext, String exchangeId) {
final byte[] data = aggregationCompleted.get(exchangeId);
Exchange answer = null;
try {
answer = codec.unmarshallExchange(camelContext, data);
} catch (IOException | ClassNotFoundException e) {
LOG.error("Exception in recovering exchangeId {}", exchangeId, e);
}
LOG.debug("Recovering exchangeId [{}] -> {}", exchangeId, answer);
return answer;
}
/**
* If recovery is enabled then a background task is run every x'th time to scan for failed exchanges to recover
* and resubmit. By default this interval is 5000 millis.
*
* @param interval the interval
* @param timeUnit the time unit
*/
public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
this.recoveryInterval = timeUnit.toMillis(interval);
}
public void setRecoveryInterval(long interval) {
this.recoveryInterval = interval;
}
public long getRecoveryIntervalInMillis() {
return recoveryInterval;
}
public boolean isUseRecovery() {
return useRecovery;
}
/**
* @param useRecovery Whether or not recovery is enabled. This option is by default true. When enabled the Camel
* Aggregator automatic recover failed aggregated exchange and have them resubmittedd
*/
public void setUseRecovery(boolean useRecovery) {
this.useRecovery = useRecovery;
}
public int getMaximumRedeliveries() {
return maximumRedeliveries;
}
public void setMaximumRedeliveries(int maximumRedeliveries) {
this.maximumRedeliveries = maximumRedeliveries;
}
public String getDeadLetterUri() {
return deadLetterUri;
}
/**
* @param deadLetterUri An endpoint uri for a Dead Letter Channel where exhausted recovered Exchanges will be
* moved. If this option is used then the maximumRedeliveries option must also be provided.
* Important note : if the deadletter route throws an exception, it will be send again to DLQ
* until it succeed !
*/
public void setDeadLetterUri(String deadLetterUri) {
this.deadLetterUri = deadLetterUri;
}
public boolean isReturnOldExchange() {
return returnOldExchange;
}
/**
* @param returnOldExchange Whether the get operation should return the old existing Exchange if any existed.
* By default this option is false to optimize as we do not need the old exchange when
* aggregating
*/
public void setReturnOldExchange(boolean returnOldExchange) {
this.returnOldExchange = returnOldExchange;
}
public boolean isAllowSerializedHeaders() {
return allowSerializedHeaders;
}
public void setAllowSerializedHeaders(boolean allowSerializedHeaders) {
this.allowSerializedHeaders = allowSerializedHeaders;
}
public JdbcOptimisticLockingExceptionMapper getJdbcOptimisticLockingExceptionMapper() {
return jdbcOptimisticLockingExceptionMapper;
}
public void setJdbcOptimisticLockingExceptionMapper(JdbcOptimisticLockingExceptionMapper jdbcOptimisticLockingExceptionMapper) {
this.jdbcOptimisticLockingExceptionMapper = jdbcOptimisticLockingExceptionMapper;
}
@Override
protected void doStart() throws Exception {
ObjectHelper.notNull(bundleContext, "BundleContext");
ObjectHelper.notNull(workDirectory, "WorkDirectory");
settingsIgnite();
// log number of existing exchanges
int current = getKeys().size();
int completed = scan(null).size();
if (current > 0) {
LOG.info("On startup there are " + current + " aggregate exchanges (not completed) in repository: " + AGGREGATION);
} else {
LOG.info("On startup there are no existing aggregate exchanges (not completed) in repository: {}", AGGREGATION);
}
if (completed > 0) {
LOG.warn("On startup there are " + completed + " completed exchanges to be recovered in repository: " + AGGREGATION_COMPLETED);
} else {
LOG.info("On startup there are no completed exchanges to be recovered in repository: {}", AGGREGATION_COMPLETED);
}
}
@Override
public void doStop() throws Exception {
if (ignite != null) {
ignite.close();
}
}
private void settingsIgnite() {
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setDiscoverySpi(getTcpDiscoverySpi());
cfg.setGridLogger(new JclLogger());
cfg.setTransactionConfiguration(new TransactionConfiguration());
settingsPersistence(cfg);
aggregationCfg = getCacheConfiguration(AGGREGATION);
cfg.setCacheConfiguration(aggregationCfg);
aggregationCompletedCfg = getCacheConfiguration(AGGREGATION_COMPLETED);
cfg.setCacheConfiguration(aggregationCompletedCfg);
startIgnite(cfg);
createCache();
}
private void settingsPersistence(IgniteConfiguration cfg) {
DataStorageConfiguration storageCfg = new DataStorageConfiguration();
storageCfg.getDefaultDataRegionConfiguration().setPersistenceEnabled(true);
cfg.setWorkDirectory(workDirectory);
cfg.setDataStorageConfiguration(storageCfg);
}
private void createCache() {
transactions = ignite.transactions();
aggregationCache = ignite.getOrCreateCache(aggregationCfg);
aggregationCompleted = ignite.getOrCreateCache(aggregationCompletedCfg);
// Set the baseline topology that is represented by these nodes.
ignite.cluster().setBaselineTopology(ignite.cluster().localNode().order());
}
private void startIgnite(IgniteConfiguration cfg) {
System.setProperty(IGNITE_QUIET, "false");
ignite = Ignition.getOrStart(cfg);
ignite.cluster().active(true);
}
@NotNull
private CacheConfiguration<String, byte[]> getCacheConfiguration(String cacheName) {
CacheConfiguration<String, byte[]> cacheCfg = new CacheConfiguration<String, byte[]>();
cacheCfg.setName(cacheName);
cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
cacheCfg.setCacheMode(CacheMode.REPLICATED);
cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
cacheCfg.setBackups(backups);
return cacheCfg;
}
@NotNull
private TcpDiscoverySpi getTcpDiscoverySpi() {
TcpDiscoverySpi spi = new TcpDiscoverySpi();
spi.setIpFinder(new TcpDiscoveryVmIpFinder(false) {
{
setAddresses(Arrays.asList(addresses.split(",")));
}
});
return spi;
}
public void setBundleContext(BundleContext bundleContext) {
this.bundleContext = bundleContext;
}
public void setWorkDirectory(String workDirectory) {
this.workDirectory = workDirectory;
}
public void setBackups(int backups) {
this.backups = backups;
}
public void setAddresses(String addresses) {
this.addresses = addresses;
}
}

View File

@ -0,0 +1,649 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.aggregation.repo;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper;
import org.apache.camel.processor.aggregate.jdbc.JdbcCamelCodec;
import org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper;
import org.apache.camel.spi.OptimisticLockingAggregationRepository;
import org.apache.camel.spi.RecoverableAggregationRepository;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.Constants;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.support.AbstractLobCreatingPreparedStatementCallback;
import org.springframework.jdbc.support.lob.DefaultLobHandler;
import org.springframework.jdbc.support.lob.LobCreator;
import org.springframework.jdbc.support.lob.LobHandler;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import javax.sql.DataSource;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* JDBC based {@link org.apache.camel.spi.AggregationRepository} JdbcAggregationRepository will only preserve any
* Serializable compatible data types. If a data type is not such a type its dropped and a WARN is logged. And it only
* persists the Message body and the Message headers. The Exchange properties are not persisted.
*/
public class JdbcAggregationRepository extends ServiceSupport
implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository {
protected static final String EXCHANGE = "exchange";
protected static final String ID = "id";
protected static final String BODY = "body";
// optimistic locking: version identifier needed to avoid the lost update problem
private static final String VERSION = "version";
private static final String VERSION_PROPERTY = "CamelOptimisticLockVersion";
private static final Logger LOG = LoggerFactory.getLogger(JdbcAggregationRepository.class);
private static final Constants PROPAGATION_CONSTANTS = new Constants(TransactionDefinition.class);
private JdbcOptimisticLockingExceptionMapper jdbcOptimisticLockingExceptionMapper
= new DefaultJdbcOptimisticLockingExceptionMapper();
private PlatformTransactionManager transactionManager;
private DataSource dataSource;
private TransactionTemplate transactionTemplate;
private TransactionTemplate transactionTemplateReadOnly;
private int propagationBehavior = TransactionDefinition.PROPAGATION_REQUIRED;
private JdbcTemplate jdbcTemplate;
private LobHandler lobHandler = new DefaultLobHandler();
private String repositoryName;
private boolean returnOldExchange;
private JdbcCamelCodec codec = new JdbcCamelCodec();
private long recoveryInterval = 5000;
private boolean useRecovery = true;
private int maximumRedeliveries;
private String deadLetterUri;
private List<String> headersToStoreAsText;
private boolean storeBodyAsText;
private boolean allowSerializedHeaders;
/**
* Creates an aggregation repository
*/
public JdbcAggregationRepository() {
}
/**
* Creates an aggregation repository with the three mandatory parameters
*/
public JdbcAggregationRepository(PlatformTransactionManager transactionManager, String repositoryName,
DataSource dataSource) {
this.setRepositoryName(repositoryName);
this.setTransactionManager(transactionManager);
this.setDataSource(dataSource);
}
/**
* Sets the name of the repository
*/
public final void setRepositoryName(String repositoryName) {
this.repositoryName = repositoryName;
}
public final void setTransactionManager(PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
/**
* Sets the DataSource to use for accessing the database
*/
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public Exchange add(
final CamelContext camelContext, final String correlationId,
final Exchange oldExchange, final Exchange newExchange)
throws OptimisticLockingException {
try {
return add(camelContext, correlationId, newExchange);
} catch (Exception e) {
if (jdbcOptimisticLockingExceptionMapper != null && jdbcOptimisticLockingExceptionMapper.isOptimisticLocking(e)) {
throw new OptimisticLockingException();
} else {
throw RuntimeCamelException.wrapRuntimeCamelException(e);
}
}
}
@Override
public Exchange add(final CamelContext camelContext, final String correlationId, final Exchange exchange) {
return transactionTemplate.execute(status -> {
Exchange result = null;
final String key = correlationId;
try {
LOG.debug("Adding exchange with key {}", key);
boolean present = jdbcTemplate.queryForObject(
"SELECT COUNT(1) FROM " + getRepositoryName() + " WHERE " + ID + " = ?", Integer.class, key) != 0;
// Recover existing exchange with that ID
if (isReturnOldExchange() && present) {
result = get(key, getRepositoryName(), camelContext);
}
if (present) {
long version = exchange.getProperty(VERSION_PROPERTY, Long.class);
LOG.debug("Updating record with key {} and version {}", key, version);
update(camelContext, correlationId, exchange, getRepositoryName(), version);
} else {
LOG.debug("Inserting record with key {}", key);
insert(camelContext, correlationId, exchange, getRepositoryName(), 1L);
}
} catch (Exception e) {
throw new RuntimeException("Error adding to repository " + repositoryName + " with key " + key, e);
}
return result;
});
}
/**
* Updates the current exchange details in the given repository table.
*
* @param camelContext Current CamelContext
* @param key Correlation key
* @param exchange Aggregated exchange
* @param repositoryName Table's name
* @param version Version identifier
*/
protected void update(
final CamelContext camelContext, final String key, final Exchange exchange, String repositoryName, Long version)
throws Exception {
StringBuilder queryBuilder = new StringBuilder()
.append("UPDATE ").append(repositoryName)
.append(" SET ")
.append(EXCHANGE).append(" = ?")
.append(", ")
.append(VERSION).append(" = ?");
if (storeBodyAsText) {
queryBuilder.append(", ").append(BODY).append(" = ?");
}
if (hasHeadersToStoreAsText()) {
for (String headerName : headersToStoreAsText) {
queryBuilder.append(", ").append(headerName).append(" = ?");
}
}
queryBuilder.append(" WHERE ")
.append(ID).append(" = ?")
.append(" AND ")
.append(VERSION).append(" = ?");
String sql = queryBuilder.toString();
updateHelper(camelContext, key, exchange, sql, version);
}
/**
* Inserts a new record into the given repository table. Note: the exchange properties are NOT persisted.
*
* @param camelContext Current CamelContext
* @param correlationId Correlation key
* @param exchange Aggregated exchange to insert
* @param repositoryName Table's name
* @param version Version identifier
*/
protected void insert(
final CamelContext camelContext, final String correlationId, final Exchange exchange, String repositoryName,
Long version)
throws Exception {
// The default totalParameterIndex is 3 for ID, Exchange and version. Depending on logic this will be increased.
int totalParameterIndex = 3;
StringBuilder queryBuilder = new StringBuilder()
.append("INSERT INTO ").append(repositoryName)
.append('(').append(EXCHANGE)
.append(", ").append(ID)
.append(", ").append(VERSION);
if (storeBodyAsText) {
queryBuilder.append(", ").append(BODY);
totalParameterIndex++;
}
if (hasHeadersToStoreAsText()) {
for (String headerName : headersToStoreAsText) {
queryBuilder.append(", ").append(headerName);
totalParameterIndex++;
}
}
queryBuilder.append(") VALUES (");
for (int i = 0; i < totalParameterIndex - 1; i++) {
queryBuilder.append("?, ");
}
queryBuilder.append("?)");
String sql = queryBuilder.toString();
insertHelper(camelContext, correlationId, exchange, sql, version);
}
protected int insertHelper(
final CamelContext camelContext, final String key, final Exchange exchange, String sql, final Long version)
throws Exception {
final byte[] data = codec.marshallExchange(camelContext, exchange, allowSerializedHeaders);
Integer insertCount = jdbcTemplate.execute(sql,
new AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
@Override
protected void setValues(PreparedStatement ps, LobCreator lobCreator) throws SQLException {
int totalParameterIndex = 0;
lobCreator.setBlobAsBytes(ps, ++totalParameterIndex, data);
ps.setString(++totalParameterIndex, key);
ps.setLong(++totalParameterIndex, version);
if (storeBodyAsText) {
ps.setString(++totalParameterIndex, exchange.getIn().getBody(String.class));
}
if (hasHeadersToStoreAsText()) {
for (String headerName : headersToStoreAsText) {
String headerValue = exchange.getIn().getHeader(headerName, String.class);
ps.setString(++totalParameterIndex, headerValue);
}
}
}
});
return insertCount == null ? 0 : insertCount;
}
protected int updateHelper(
final CamelContext camelContext, final String key, final Exchange exchange, String sql, final Long version)
throws Exception {
final byte[] data = codec.marshallExchange(camelContext, exchange, allowSerializedHeaders);
Integer updateCount = jdbcTemplate.execute(sql,
new AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
@Override
protected void setValues(PreparedStatement ps, LobCreator lobCreator) throws SQLException {
int totalParameterIndex = 0;
lobCreator.setBlobAsBytes(ps, ++totalParameterIndex, data);
ps.setLong(++totalParameterIndex, version + 1);
if (storeBodyAsText) {
ps.setString(++totalParameterIndex, exchange.getIn().getBody(String.class));
}
if (hasHeadersToStoreAsText()) {
for (String headerName : headersToStoreAsText) {
String headerValue = exchange.getIn().getHeader(headerName, String.class);
ps.setString(++totalParameterIndex, headerValue);
}
}
ps.setString(++totalParameterIndex, key);
ps.setLong(++totalParameterIndex, version);
}
});
if (updateCount == 1) {
return updateCount;
} else {
// Found stale version while updating record
throw new OptimisticLockingException();
}
}
@Override
public Exchange get(final CamelContext camelContext, final String correlationId) {
final String key = correlationId;
Exchange result = get(key, getRepositoryName(), camelContext);
LOG.debug("Getting key {} -> {}", key, result);
return result;
}
private Exchange get(final String key, final String repositoryName, final CamelContext camelContext) {
return transactionTemplateReadOnly.execute(status -> {
try {
Map<String, Object> columns = jdbcTemplate.queryForMap(
String.format("SELECT %1$s, %2$s FROM %3$s WHERE %4$s=?", EXCHANGE, VERSION, repositoryName, ID),
new Object[]{key}, new int[]{Types.VARCHAR});
byte[] marshalledExchange = (byte[]) columns.get(EXCHANGE);
long version = (long) columns.get(VERSION);
Exchange result = codec.unmarshallExchange(camelContext, marshalledExchange);
result.setProperty(VERSION_PROPERTY, version);
return result;
} catch (EmptyResultDataAccessException ex) {
return null;
} catch (IOException ex) {
// Rollback the transaction
throw new RuntimeException("Error getting key " + key + " from repository " + repositoryName, ex);
} catch (ClassNotFoundException ex) {
// Rollback the transaction
throw new RuntimeException(ex);
}
});
}
@Override
public void remove(final CamelContext camelContext, final String correlationId, final Exchange exchange) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
protected void doInTransactionWithoutResult(TransactionStatus status) {
final String key = correlationId;
final String confirmKey = exchange.getExchangeId();
final long version = exchange.getProperty(VERSION_PROPERTY, Long.class);
try {
LOG.debug("Removing key {}", key);
// добавлена проверка на корректное удаление из таблицы, т к возникала ситуация гонки на узлах.
// один узел уже удалил из completed, а второй в процессе, соответственно ошибки дублирования
// не появляется появляется дубликат в очереди
if (jdbcTemplate.update("DELETE FROM " + getRepositoryName() + " WHERE " + ID + " = ? AND " + VERSION + " = ?",
key, version) == 1) {
insert(camelContext, confirmKey, exchange, getRepositoryNameCompleted(), version);
} else {
throw new RuntimeException("Error removing key " + key + " from repository " + repositoryName);
}
} catch (Exception e) {
throw new RuntimeException("Error removing key " + key + " from repository " + repositoryName, e);
}
}
});
}
@Override
public void confirm(final CamelContext camelContext, final String exchangeId) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
protected void doInTransactionWithoutResult(TransactionStatus status) {
LOG.debug("Confirming exchangeId {}", exchangeId);
final String confirmKey = exchangeId;
jdbcTemplate.update("DELETE FROM " + getRepositoryNameCompleted() + " WHERE " + ID + " = ?",
confirmKey);
}
});
}
@Override
public Set<String> getKeys() {
return getKeys(getRepositoryName());
}
@Override
public Set<String> scan(CamelContext camelContext) {
return getKeys(getRepositoryNameCompleted());
}
/**
* Returns the keys in the given repository
*
* @param repositoryName The name of the table
* @return Set of keys in the given repository name
*/
protected Set<String> getKeys(final String repositoryName) {
return transactionTemplateReadOnly.execute(status -> {
List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM " + repositoryName,
(rs, rowNum) -> {
String id = rs.getString(ID);
LOG.trace("getKey {}", id);
return id;
});
return new LinkedHashSet<>(keys);
});
}
@Override
public Exchange recover(CamelContext camelContext, String exchangeId) {
final String key = exchangeId;
Exchange answer = get(key, getRepositoryNameCompleted(), camelContext);
LOG.debug("Recovering exchangeId {} -> {}", key, answer);
return answer;
}
/**
* If recovery is enabled then a background task is run every x'th time to scan for failed exchanges to recover and
* resubmit. By default this interval is 5000 millis.
*/
@Override
public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
this.recoveryInterval = timeUnit.toMillis(interval);
}
@Override
public void setRecoveryInterval(long interval) {
this.recoveryInterval = interval;
}
@Override
public long getRecoveryIntervalInMillis() {
return recoveryInterval;
}
@Override
public boolean isUseRecovery() {
return useRecovery;
}
/**
* Whether or not recovery is enabled. This option is by default true. When enabled the Camel Aggregator automatic
* recover failed aggregated exchange and have them resubmitted.
*/
@Override
public void setUseRecovery(boolean useRecovery) {
this.useRecovery = useRecovery;
}
@Override
public int getMaximumRedeliveries() {
return maximumRedeliveries;
}
@Override
public void setMaximumRedeliveries(int maximumRedeliveries) {
this.maximumRedeliveries = maximumRedeliveries;
}
@Override
public String getDeadLetterUri() {
return deadLetterUri;
}
/**
* An endpoint uri for a Dead Letter Channel where exhausted recovered Exchanges will be moved. If this option is
* used then the maximumRedeliveries option must also be provided. Important note : if the deadletter route throws
* an exception, it will be send again to DLQ until it succeed !
*/
@Override
public void setDeadLetterUri(String deadLetterUri) {
this.deadLetterUri = deadLetterUri;
}
public boolean isReturnOldExchange() {
return returnOldExchange;
}
/**
* Whether the get operation should return the old existing Exchange if any existed. By default this option is false
* to optimize as we do not need the old exchange when aggregating.
*/
public void setReturnOldExchange(boolean returnOldExchange) {
this.returnOldExchange = returnOldExchange;
}
public void setJdbcCamelCodec(JdbcCamelCodec codec) {
this.codec = codec;
}
public boolean hasHeadersToStoreAsText() {
return this.headersToStoreAsText != null && !this.headersToStoreAsText.isEmpty();
}
public List<String> getHeadersToStoreAsText() {
return headersToStoreAsText;
}
/**
* Allows to store headers as String which is human readable. By default this option is disabled, storing the
* headers in binary format.
*
* @param headersToStoreAsText the list of headers to store as String
*/
public void setHeadersToStoreAsText(List<String> headersToStoreAsText) {
this.headersToStoreAsText = headersToStoreAsText;
}
public boolean isStoreBodyAsText() {
return storeBodyAsText;
}
/**
* Whether to store the message body as String which is human readable. By default this option is false storing the
* body in binary format.
*/
public void setStoreBodyAsText(boolean storeBodyAsText) {
this.storeBodyAsText = storeBodyAsText;
}
public boolean isAllowSerializedHeaders() {
return allowSerializedHeaders;
}
public void setAllowSerializedHeaders(boolean allowSerializedHeaders) {
this.allowSerializedHeaders = allowSerializedHeaders;
}
public int getPropagationBehavior() {
return propagationBehavior;
}
/**
* Sets propagation behavior to use with spring transaction templates which are used for database access. The
* default is TransactionDefinition.PROPAGATION_REQUIRED.
*/
public void setPropagationBehavior(int propagationBehavior) {
this.propagationBehavior = propagationBehavior;
}
/**
* Sets propagation behavior to use with spring transaction templates which are used for database access. The
* default is TransactionDefinition.PROPAGATION_REQUIRED. This setter accepts names of the constants, like
* "PROPAGATION_REQUIRED".
*
* @param propagationBehaviorName
*/
public void setPropagationBehaviorName(String propagationBehaviorName) {
if (!propagationBehaviorName.startsWith(DefaultTransactionDefinition.PREFIX_PROPAGATION)) {
throw new IllegalArgumentException("Only propagation constants allowed");
}
setPropagationBehavior(PROPAGATION_CONSTANTS.asNumber(propagationBehaviorName).intValue());
}
public LobHandler getLobHandler() {
return lobHandler;
}
/**
* Sets a custom LobHandler to use
*/
public void setLobHandler(LobHandler lobHandler) {
this.lobHandler = lobHandler;
}
public JdbcOptimisticLockingExceptionMapper getJdbcOptimisticLockingExceptionMapper() {
return jdbcOptimisticLockingExceptionMapper;
}
public void setJdbcOptimisticLockingExceptionMapper(
JdbcOptimisticLockingExceptionMapper jdbcOptimisticLockingExceptionMapper) {
this.jdbcOptimisticLockingExceptionMapper = jdbcOptimisticLockingExceptionMapper;
}
public String getRepositoryName() {
return repositoryName;
}
public String getRepositoryNameCompleted() {
return getRepositoryName() + "_completed";
}
@Override
protected void doInit() throws Exception {
super.doInit();
ObjectHelper.notNull(repositoryName, "RepositoryName");
ObjectHelper.notNull(transactionManager, "TransactionManager");
ObjectHelper.notNull(dataSource, "DataSource");
transactionTemplate = new TransactionTemplate(transactionManager);
transactionTemplate.setPropagationBehavior(propagationBehavior);
transactionTemplateReadOnly = new TransactionTemplate(transactionManager);
transactionTemplateReadOnly.setPropagationBehavior(propagationBehavior);
transactionTemplateReadOnly.setReadOnly(true);
}
@Override
protected void doStart() throws Exception {
super.doStart();
// log number of existing exchanges
int current = getKeys().size();
int completed = scan(null).size();
if (current > 0) {
LOG.info("On startup there are " + current + " aggregate exchanges (not completed) in repository: "
+ getRepositoryName());
} else {
LOG.info("On startup there are no existing aggregate exchanges (not completed) in repository: {}",
getRepositoryName());
}
if (completed > 0) {
LOG.warn("On startup there are " + completed + " completed exchanges to be recovered in repository: "
+ getRepositoryNameCompleted());
} else {
LOG.info("On startup there are no completed exchanges to be recovered in repository: {}",
getRepositoryNameCompleted());
}
}
@Override
protected void doStop() throws Exception {
// noop
}
}

View File

@ -0,0 +1,97 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.aggregation.repo;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
/**
* PostgreSQL specific {@link JdbcAggregationRepository} that deals with SQL Violation Exceptions using special
* {@code INSERT INTO .. ON CONFLICT DO NOTHING} claues.
*/
public class PostgresAggregationRepository extends JdbcAggregationRepository {
/**
* Creates an aggregation repository
*/
public PostgresAggregationRepository() {
}
/**
* Creates an aggregation repository with the three mandatory parameters
*/
public PostgresAggregationRepository(PlatformTransactionManager transactionManager, String repositoryName,
DataSource dataSource) {
super(transactionManager, repositoryName, dataSource);
}
/**
* Inserts a new record into the given repository table
*
* @param camelContext the current CamelContext
* @param correlationId the correlation key
* @param exchange the aggregated exchange
* @param repositoryName The name of the table
*/
protected void insert(
final CamelContext camelContext, final String correlationId, final Exchange exchange, String repositoryName)
throws Exception {
// The default totalParameterIndex is 2 for ID and Exchange. Depending on logic this will be increased
int totalParameterIndex = 2;
StringBuilder queryBuilder = new StringBuilder()
.append("INSERT INTO ").append(repositoryName)
.append('(')
.append(EXCHANGE).append(", ")
.append(ID);
if (isStoreBodyAsText()) {
queryBuilder.append(", ").append(BODY);
totalParameterIndex++;
}
if (hasHeadersToStoreAsText()) {
for (String headerName : getHeadersToStoreAsText()) {
queryBuilder.append(", ").append(headerName);
totalParameterIndex++;
}
}
queryBuilder.append(") VALUES (");
for (int i = 0; i < totalParameterIndex - 1; i++) {
queryBuilder.append("?, ");
}
queryBuilder.append("?)");
queryBuilder.append(" ON CONFLICT DO NOTHING");
String sql = queryBuilder.toString();
int updateCount = insertHelper(camelContext, correlationId, exchange, sql, 1L);
if (updateCount == 0 && getRepositoryName().equals(repositoryName)) {
throw new DataIntegrityViolationException("No row was inserted due to data violation");
}
}
}

View File

@ -0,0 +1,44 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.exception;
public class BundleNotFound extends RuntimeException {
public BundleNotFound() {
super();
}
public BundleNotFound(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
public BundleNotFound(String message, Throwable cause) {
super(message, cause);
}
public BundleNotFound(String message) {
super(message);
}
public BundleNotFound(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,44 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.exception;
public class ConnectorNotFound extends RuntimeException {
public ConnectorNotFound() {
super();
}
public ConnectorNotFound(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
public ConnectorNotFound(String message, Throwable cause) {
super(message, cause);
}
public ConnectorNotFound(String message) {
super(message);
}
public ConnectorNotFound(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,44 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.exception;
public class EsbNotFound extends RuntimeException {
public EsbNotFound() {
super();
}
public EsbNotFound(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
public EsbNotFound(String message, Throwable cause) {
super(message, cause);
}
public EsbNotFound(String message) {
super(message);
}
public EsbNotFound(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,46 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.exception;
public class ProfileNotFound extends RuntimeException {
private static final long serialVersionUID = 6701844035750412423L;
public ProfileNotFound() {
super();
}
public ProfileNotFound(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
public ProfileNotFound(String message, Throwable cause) {
super(message, cause);
}
public ProfileNotFound(String message) {
super(message);
}
public ProfileNotFound(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,44 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.exception;
public class TemplateNotFound extends RuntimeException {
public TemplateNotFound() {
super();
}
public TemplateNotFound(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
public TemplateNotFound(String message, Throwable cause) {
super(message, cause);
}
public TemplateNotFound(String message) {
super(message);
}
public TemplateNotFound(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,70 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.interceptor;
import org.apache.cxf.binding.soap.SoapMessage;
import org.apache.cxf.binding.soap.interceptor.AbstractSoapInterceptor;
import org.apache.cxf.headers.Header;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.jaxb.JAXBDataBinding;
import org.apache.cxf.message.Message;
import org.apache.cxf.phase.Phase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.xml.bind.JAXBException;
import javax.xml.namespace.QName;
import java.util.List;
import java.util.Map;
public class SoapHeaderInterceptor extends AbstractSoapInterceptor {
private static final Logger log = LoggerFactory.getLogger(SoapHeaderInterceptor.class);
private static final String HEADER_USER_LOGIN = "X-ForwardedUser";
private static final String HEADER_CREATED_BY = "createdBy";
private String namespaceUri = "";
public SoapHeaderInterceptor() {
super(Phase.READ);
}
public void handleMessage(SoapMessage message) throws Fault {
try {
Map<String, List<String>> headers = (Map<String, List<String>>) message.get(Message.PROTOCOL_HEADERS);
List<Header> soapHeaders = message.getHeaders();
soapHeaders.add(new Header(new QName(namespaceUri, HEADER_CREATED_BY),
getSystemName(headers), new JAXBDataBinding(String.class)));
message.put(Header.HEADER_LIST, soapHeaders);
} catch (JAXBException e) {
log.error("Error", e);
throw new Fault(e);
}
}
private String getSystemName(Map<String, List<String>> headers) {
List<String> list = headers.get(HEADER_USER_LOGIN);
return list != null && !list.isEmpty() ? headers.get(HEADER_USER_LOGIN).get(0) : null;
}
public void setNamespaceUri(String namespaceUri) {
this.namespaceUri = namespaceUri;
}
}

View File

@ -0,0 +1,37 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.osgi;
import java.util.List;
/**
* @author starovoitenkov_sv
*
*/
public interface BundleMarkerService {
public static final String CATEGORY_PROPERTY_NAME = "ru.entaxy.esb.system.common.marker.category";
public long getBundleId();
public String getMarkerCategory();
public List<String> getMarkers();
}

View File

@ -0,0 +1,32 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.osgi;
import java.util.List;
public interface NamedReferenceListener<T> extends ReferenceListener<T> {
public List<String> getReferenceNames();
public T getReference(String referenceName);
public boolean isRegistered(String referenceName);
}

View File

@ -0,0 +1,36 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.osgi;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
public class OSGIUtils {
static public Object getServiceReference(BundleContext bundle, String className) {
Object result = null;
ServiceReference ref = bundle.getServiceReference(className);
if (ref != null) {
result = bundle.getService(ref);
}
return result;
}
}

View File

@ -0,0 +1,26 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.osgi;
public interface ReferenceListener<T> {
public void register(T service) throws Exception;
public void unregister(T service) throws Exception;
}

View File

@ -0,0 +1,58 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.osgi;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleEvent;
import org.osgi.util.tracker.BundleTrackerCustomizer;
public abstract class UniformBundleTracker implements BundleTrackerCustomizer {
@Override
public Object addingBundle(Bundle bundle, BundleEvent event) {
if (event == null) {
// existing bundles first added to the tracker with no event change
checkInitialBundle(bundle);
} else {
bundleChanged(event);
}
return bundle;
}
@Override
public void modifiedBundle(Bundle bundle, BundleEvent event, Object object) {
if (event == null) {
// cannot think of why we would be interested in a modified bundle with no bundle event
return;
}
bundleChanged(event);
}
@Override
public void removedBundle(Bundle bundle, BundleEvent event, Object object) {
// TODO Auto-generated method stub
}
protected abstract void checkInitialBundle(Bundle bundle);
protected abstract void bundleChanged(BundleEvent event);
}

View File

@ -0,0 +1,113 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.osgi.impl;
import org.osgi.framework.Bundle;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.blueprint.container.BlueprintContainer;
import ru.entaxy.esb.system.common.osgi.BundleMarkerService;
import java.util.*;
/**
* @author starovoitenkov_sv
*
*/
public class BundleMarkerServiceImpl implements BundleMarkerService {
protected Bundle bundle = null;
protected BlueprintContainer container = null;
protected List<String> markers = new ArrayList<String>();
protected String category = null;
protected ServiceRegistration serviceRegistration = null;
protected Map<String, String> serviceProperties = new HashMap<String, String>();
/* (non-Javadoc)
* @see ru.entaxy.esb.system.common.osgi.BundleMarkerService#getBundleId()
*/
@Override
public long getBundleId() {
return (bundle == null ? -1 : bundle.getBundleId());
}
/* (non-Javadoc)
* @see ru.entaxy.esb.system.common.osgi.BundleMarkerService#getMarkerCategory()
*/
@Override
public String getMarkerCategory() {
return category;
}
/* (non-Javadoc)
* @see ru.entaxy.esb.system.common.osgi.BundleMarkerService#getMarkers()
*/
@Override
public List<String> getMarkers() {
return markers;
}
// LOCAL
@SuppressWarnings({"rawtypes", "unchecked"})
public void init() throws Exception {
if (bundle == null)
throw new Exception("Bundle not set");
if (container == null)
throw new Exception("Container not set");
Dictionary properties = new Hashtable();
for (Map.Entry<String, String> entry : serviceProperties.entrySet())
properties.put(entry.getKey(), entry.getValue());
properties.put(BundleMarkerService.CATEGORY_PROPERTY_NAME, category);
serviceRegistration = bundle.getBundleContext().registerService(BundleMarkerService.class.getName(), this, properties);
}
public void destroy() {
if (serviceRegistration != null)
serviceRegistration.unregister();
}
// ACCESSORS
public void setBundle(Bundle bundle) {
this.bundle = bundle;
}
public void setContainer(BlueprintContainer container) {
this.container = container;
}
public void setMarkers(List<String> markers) {
this.markers = markers;
}
public void setCategory(String category) {
this.category = category;
}
public Map<String, String> getServiceProperties() {
return serviceProperties;
}
public void setServiceProperties(Map<String, String> serviceProperties) {
this.serviceProperties = serviceProperties;
}
}

View File

@ -0,0 +1,71 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.osgi.impl;
import org.osgi.framework.ServiceReference;
import ru.entaxy.esb.system.common.osgi.NamedReferenceListener;
import java.util.*;
public abstract class CommonNamedReferenceListener<T> implements NamedReferenceListener<T> {
protected Map<String, T> registeredReferences = new HashMap<String, T>();
public void register(T service) throws Exception {
if (service instanceof ServiceReference)
return;
String key = getObjectName(service);
registeredReferences.put(key, service);
doAfterRegister(service);
}
protected void doAfterRegister(T service) throws Exception {
}
protected void doBeforeUnregister(T service) throws Exception {
}
public void unregister(T service) throws Exception {
if (service == null || service instanceof ServiceReference)
return;
doBeforeUnregister(service);
registeredReferences.remove(getObjectName(service));
}
@Override
public List<String> getReferenceNames() {
List<String> result = new ArrayList<String>(registeredReferences.keySet());
Collections.sort(result);
return result;
}
@Override
public T getReference(String referenceName) {
return registeredReferences.get(referenceName);
}
public boolean isRegistered(String referenceName) {
return registeredReferences.containsKey(referenceName);
}
protected abstract String getObjectName(T object);
}

View File

@ -0,0 +1,56 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.osgi.impl;
import org.osgi.framework.ServiceReference;
import ru.entaxy.esb.system.common.osgi.ReferenceListener;
import java.util.ArrayList;
import java.util.List;
public class CommonReferenceListener<T> implements ReferenceListener<T> {
protected List<T> registeredReferences = new ArrayList<T>();
public void register(T service) {
if (service instanceof ServiceReference)
return;
registeredReferences.add(service);
doAfterRegister(service);
}
protected void doAfterRegister(T service) {
// to override
}
protected void doBeforeUnregister(T service) {
// to override
}
public void unregister(T service) {
if (service instanceof ServiceReference)
return;
if (registeredReferences.contains(service)) {
doBeforeUnregister(service);
registeredReferences.remove(service);
}
}
}

View File

@ -0,0 +1,60 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.util;
public class CustomHeader {
private String id;
private String type;
private String value;
public CustomHeader() {
}
public CustomHeader(String id, String type, String value) {
this.id = id;
this.type = type;
this.value = value;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}

View File

@ -0,0 +1,183 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.util;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.camel.Exchange;
import org.apache.xerces.dom.DocumentImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class HeadersConverter {
protected static final Logger log = LoggerFactory.getLogger(HeadersConverter.class);
private String customHeaders;
private String customHeaderPrefix;
private String namespace;
public void xml2camelHeaders(Exchange exchange) {
NodeList nodes = exchange.getIn().getHeader(customHeaders, NodeList.class);
if (nodes == null) {
log.warn(customHeaders + " not found");
return;
}
for (int i = 0; i < nodes.getLength(); ++i) {
Node child = nodes.item(i).getFirstChild();
String headerName = "";
String headerValue = "";
while (child != null) {
if ("id".equals(child.getLocalName())) {
headerName = customHeaderPrefix + child.getTextContent();
} else if ("value".equals(child.getLocalName())) {
headerValue = child.getTextContent();
}
child = child.getNextSibling();
}
if (!headerName.isEmpty()) {
exchange.getIn().setHeader(headerName, headerValue);
}
}
log.debug("Parsed xml custom headers count {}", nodes.getLength());
}
public void camelHeaders2xml(Exchange exchange) {
Node item = null;
Node child = null;
Document xmlDoc = new DocumentImpl();
Node list = xmlDoc.createElement("list");
for (Map.Entry<String, Object> entry : exchange.getIn().getHeaders().entrySet()) {
if (entry.getKey().startsWith(customHeaderPrefix)) {
String name = entry.getKey().substring(customHeaderPrefix.length());
if (entry.getValue() == null) break;
String value = entry.getValue().toString();
item = xmlDoc.createElementNS(namespace, "customHeader");
child = xmlDoc.createElementNS(namespace, "id");
child.appendChild(xmlDoc.createTextNode(name));
item.appendChild(child);
child = xmlDoc.createElementNS(namespace, "value");
child.appendChild(xmlDoc.createTextNode(value));
item.appendChild(child);
list.appendChild(item);
}
}
exchange.getIn().setHeader(customHeaders, list);
}
public void xml2Json(Exchange exchange) {
NodeList nodes = exchange.getIn().getHeader(customHeaders, NodeList.class);
if (nodes == null) {
log.warn(customHeaders + " not found");
return;
}
List<CustomHeader> customHeaders = new ArrayList<>();
for (int i = 0; i < nodes.getLength(); ++i) {
Node child = nodes.item(i).getFirstChild();
String headerName = "";
String headerValue = "";
String headerType = "";
while (child != null) {
if ("id".equals(child.getLocalName())) {
headerName = child.getTextContent();
} else if ("value".equals(child.getLocalName())) {
headerValue = child.getTextContent();
} else if ("type".equals(child.getLocalName())) {
headerType = child.getTextContent();
}
child = child.getNextSibling();
}
if (!headerName.isEmpty()) {
customHeaders.add(new CustomHeader(headerName, headerType, headerValue));
}
}
Gson gson = new Gson();
exchange.getIn().setHeader(this.customHeaders, gson.toJson(customHeaders));
log.debug("Parsed xml custom headers count {}", nodes.getLength());
}
public void json2xml(Exchange exchange) {
String headers = exchange.getIn().getHeader(customHeaders, String.class);
if (headers == null) {
log.warn(customHeaders + " not found");
return;
}
Gson gson = new Gson();
List<CustomHeader> customHeaders = gson.fromJson(headers, new TypeToken<ArrayList<CustomHeader>>() {
}.getType());
Node item = null;
Node child = null;
Document xmlDoc = new DocumentImpl();
Node list = xmlDoc.createElement("list");
for (CustomHeader customHeader : customHeaders) {
item = xmlDoc.createElementNS(namespace, "customHeader");
child = xmlDoc.createElementNS(namespace, "id");
child.appendChild(xmlDoc.createTextNode(customHeader.getId()));
item.appendChild(child);
child = xmlDoc.createElementNS(namespace, "type");
child.appendChild(xmlDoc.createTextNode(customHeader.getType()));
item.appendChild(child);
child = xmlDoc.createElementNS(namespace, "value");
child.appendChild(xmlDoc.createTextNode(customHeader.getValue()));
item.appendChild(child);
list.appendChild(item);
}
exchange.getIn().setHeader(this.customHeaders, list);
}
public void setCustomHeaderPrefix(String customHeaderPrefix) {
this.customHeaderPrefix = customHeaderPrefix;
}
public void setCustomHeaders(String customHeaders) {
this.customHeaders = customHeaders;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
}

View File

@ -0,0 +1,151 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class PropertiesHelper {
private static final Log LOG = LogFactory.getLog(PropertiesHelper.class.getName());
private static final String EMPTY_STRING = "";
private String configPath;
private static final String karafEtcPath = System.getProperty("karaf.etc");
private String configFile;
private Properties properties;
public PropertiesHelper() {
}
public PropertiesHelper(String configPath) {
LOG.debug("Set custom path: " + configPath + " file: " + configFile);
this.configPath = configPath;
}
public PropertiesHelper(String configPath, String configFile) {
LOG.debug("Load properties from custom path: " + configPath + " file: " + configFile);
this.configPath = configPath;
this.configFile = configFile;
loadProperties(configPath, configFile);
}
public PropertiesHelper(String configFile, boolean loadKarafEtc) {
this.configFile = configFile;
LOG.debug("Load properties from karaf etc: " + karafEtcPath + " file: " + configFile);
if (loadKarafEtc) {
loadProperties(karafEtcPath, configFile);
LOG.debug("Loaded properties: " + (properties != null ? properties.size() : "null"));
}
}
public Properties load() {
if (this.configPath != null && !this.configPath.isEmpty()
&& this.configFile != null && !this.configFile.isEmpty()) {
return loadProperties(configPath, configFile);
} else if (this.configFile != null && !this.configFile.isEmpty()) {
return loadProperties(karafEtcPath, configFile);
} else {
throw new IllegalArgumentException("configPath OR configFile NOT SETTED");
}
}
protected Properties loadProperties(String path, String configFile) {
try (InputStream input = new FileInputStream(path + File.separator + configFile)) {
properties = new Properties();
properties.load(input);
} catch (IOException ex) {
LOG.error(ex);
}
return properties;
}
public long getInteger(String name) {
return getInteger(name, 0);
}
public long getInteger(String name, int defaultValue) {
String value = this.properties.getProperty(name);
return value != null && !EMPTY_STRING.equals(value) ? Integer.valueOf(value) : defaultValue;
}
public int getInteger(String name, String defaultValue) {
return Integer.valueOf(this.properties.getProperty(name, defaultValue));
}
public long getLong(String name) {
return getLong(name, 0L);
}
public long getLong(String name, long defaultValue) {
String value = this.properties.getProperty(name);
return value != null && !EMPTY_STRING.equals(value) ? Long.valueOf(value) : defaultValue;
}
public long getLong(String name, String defaultValue) {
return Long.valueOf(this.properties.getProperty(name, defaultValue));
}
public String getString(String name) {
return this.properties.getProperty(name, EMPTY_STRING);
}
public String getString(String name, String defaultValue) {
return this.properties.getProperty(name, defaultValue);
}
public String[] getStringArray(String name, String[] defaultValue) {
String value = this.properties.getProperty(name, EMPTY_STRING);
return !value.equals(EMPTY_STRING) ? value.split(",") : defaultValue;
}
public String getConfigPath() {
return configPath;
}
public void setConfigPath(String configPath) {
this.configPath = configPath;
}
public String getConfigFile() {
return configFile;
}
public void setConfigFile(String configFile) {
this.configFile = configFile;
}
public Properties getProperties() {
return properties;
}
public void setProperties(Properties properties) {
this.properties = properties;
}
}

View File

@ -0,0 +1,50 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.util;
import org.apache.camel.Exchange;
import org.apache.camel.spi.HeaderFilterStrategy;
import java.util.HashSet;
import java.util.Set;
public class SimpleOutHeaderFilterStrategy implements HeaderFilterStrategy {
private Set<String> outFilter;
public void setOutFilter(Set<String> value) {
if (value == null) {
outFilter = new HashSet<>();
} else {
outFilter = value;
}
}
@Override
public boolean applyFilterToCamelHeaders(String headerName, Object headerValue, Exchange exchange) {
return !outFilter.contains(headerName);
}
@Override
public boolean applyFilterToExternalHeaders(String headerName, Object headerValue, Exchange exchange) {
return false;
}
}

View File

@ -0,0 +1,31 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.util;
public class SystemHeadersConstants {
public static final String HEADER_USER_LOGIN = "X-ForwardedUser";
public static final String HEADER_USER_ID = "X-ForwardedUserId";
public static final String HEADER_SYSTEM_NAME = "X-SystemName";
public static final String HEADER_SYSTEM_UUID = "X-SystemUuid";
public static final String HEADER_SYSTEM_ID = "X-SystemId";
private SystemHeadersConstants() {
}
}

View File

@ -0,0 +1,116 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.validator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.cxf.binding.soap.SoapMessage;
import org.apache.cxf.binding.soap.saaj.SAAJInInterceptor;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.phase.AbstractPhaseInterceptor;
import org.apache.cxf.phase.Phase;
import org.apache.cxf.service.Service;
import org.apache.cxf.service.model.ServiceModelUtil;
import org.apache.cxf.ws.addressing.EndpointReferenceUtils;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.xml.sax.SAXException;
import javax.xml.soap.SOAPMessage;
import javax.xml.stream.XMLStreamException;
import javax.xml.transform.dom.DOMSource;
import javax.xml.validation.Schema;
import javax.xml.validation.Validator;
import javax.xml.xpath.XPathExpressionException;
import java.io.IOException;
import java.util.List;
public class ValidateInterceptor extends AbstractPhaseInterceptor<SoapMessage> {
private static final Log LOG = LogFactory.getLog(ValidateInterceptor.class);
private final SAAJInInterceptor saajIn;
private final XmlParser xmlParser;
private boolean schemaValidationEnabled;
public ValidateInterceptor() {
super(Phase.PRE_PROTOCOL);
saajIn = new SAAJInInterceptor();
xmlParser = new XmlParser();
getAfter().add(SAAJInInterceptor.class.getName());
}
@Override
public void handleMessage(SoapMessage message) throws Fault {
try {
Node body = getMessageBody((DOMSource) getSOAPMessage(message).getSOAPPart().getContent());
if (body != null)
validate(body, message);
else
throw new XMLStreamException("Can't find the tag \"Body\"");
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
throw new Fault(e);
}
}
private Node getMessageBody(DOMSource source) throws XPathExpressionException {
Node node = source.getNode().cloneNode(true);
List<Node> nodeList = xmlParser.getNodes(node.getLastChild(), "Body");
return !nodeList.isEmpty() ? nodeList.get(0) : null;
}
private void validate(Node node, SoapMessage soapMessage) throws IOException, SAXException, XPathExpressionException {
Validator validator = getValidator(soapMessage);
validator.validate(new DOMSource(getNodeForValidate(node)));
}
private Node getNodeForValidate(Node node) throws XPathExpressionException {
if (schemaValidationEnabled && node.getLocalName().contains("packets")) {
Element element = (Element) node;
for (Node content : xmlParser.getNodes(element, "content")) {
content.getParentNode().removeChild(content);
}
return element;
}
return node;
}
private Validator getValidator(SoapMessage soapMessage) {
Service service = ServiceModelUtil.getService(soapMessage.getExchange());
Schema schema = EndpointReferenceUtils.getSchema(service.getServiceInfos().get(0), soapMessage.getExchange().getBus());
return schema.newValidator();
}
private SOAPMessage getSOAPMessage(SoapMessage smsg) {
SOAPMessage soapMessage = smsg.getContent(SOAPMessage.class);
if (soapMessage == null) {
saajIn.handleMessage(smsg);
soapMessage = smsg.getContent(SOAPMessage.class);
}
return soapMessage;
}
public void setSchemaValidationEnabled(boolean schemaValidationEnabled) {
this.schemaValidationEnabled = schemaValidationEnabled;
}
}

View File

@ -0,0 +1,50 @@
/*-
* ~~~~~~licensing~~~~~~
* system-commons
* ==========
* Copyright (C) 2020 - 2021 EmDev LLC
* ==========
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.system.common.validator;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import javax.xml.xpath.*;
import java.util.ArrayList;
import java.util.List;
public class XmlParser {
public List<Node> getNodes(Node node, String elementName) throws XPathExpressionException {
XPath xpath = XPathFactory.newInstance().newXPath();
XPathExpression expr = xpath.compile("//*[local-name()='" + elementName + "']/child::node()");
NodeList nodeList = (NodeList) expr.evaluate(node.getOwnerDocument(), XPathConstants.NODESET);
return getNotNullNodes(nodeList);
}
private List<Node> getNotNullNodes(NodeList nodeList) {
List<Node> result = new ArrayList<>();
for (int i = 0; i < nodeList.getLength(); i++) {
Node node = nodeList.item(i);
if (node.getLocalName() != null) {
result.add(node);
}
}
return result;
}
}

View File

@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~~~~~~licensing~~~~~~
system-commons
==========
Copyright (C) 2020 - 2021 EmDev LLC
==========
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
~~~~~~/licensing~~~~~~
-->
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0"
xsi:schemaLocation="
http://www.osgi.org/xmlns/blueprint/v1.0.0 https://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd
">
<cm:property-placeholder persistent-id="ru.entaxy.esb" update-strategy="reload">
<cm:default-properties>
</cm:default-properties>
</cm:property-placeholder>
<bean id="artemisConnectionFactory" class="org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory">
<argument index="0" value="${common.jms.url}"/>
<argument index="1" value="${common.jms.username}"/>
<argument index="2" value="${common.jms.password}"/>
</bean>
<bean id="pooledConnectionFactory" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory"
init-method="start" destroy-method="stop">
<property name="maxConnections" value="${common.jms.maxConnections}"/>
<property name="maxSessionsPerConnection"
value="${common.jms.maxSessionsPerConnection}"/>
<property name="connectionFactory" ref="artemisConnectionFactory"/>
</bean>
<service interface="javax.jms.ConnectionFactory" ref="pooledConnectionFactory"/>
</blueprint>

View File

@ -0,0 +1,20 @@
###
# ~~~~~~licensing~~~~~~
# system-commons
# ==========
# Copyright (C) 2020 - 2021 EmDev LLC
# ==========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ~~~~~~/licensing~~~~~~
###
output.charset=Cp1251