release version 1.10.0
This commit is contained in:
@ -6,7 +6,7 @@
|
||||
<parent>
|
||||
<groupId>ru.entaxy.esb.system</groupId>
|
||||
<artifactId>system-parent</artifactId>
|
||||
<version>1.9.0</version>
|
||||
<version>1.10.0</version>
|
||||
</parent>
|
||||
|
||||
<groupId>ru.entaxy.esb.system.commons</groupId>
|
||||
@ -19,12 +19,7 @@
|
||||
<properties>
|
||||
<bundle.osgi.export.pkg>
|
||||
ru.entaxy.esb.system.common.osgi,
|
||||
ru.entaxy.esb.system.common.osgi.impl,
|
||||
ru.entaxy.esb.system.common.exception,
|
||||
ru.entaxy.esb.system.common.aggregation.*,
|
||||
ru.entaxy.esb.system.common.interceptor,
|
||||
ru.entaxy.esb.system.common.util,
|
||||
ru.entaxy.esb.system.common.validator
|
||||
ru.entaxy.esb.system.common.osgi.impl
|
||||
</bundle.osgi.export.pkg>
|
||||
<bundle.osgi.import.pkg>
|
||||
com.google.gson,
|
||||
|
@ -1,50 +0,0 @@
|
||||
/*-
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
* rights to the Software and any copies are the property of the Copyright Holder. Unless
|
||||
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
|
||||
* Software for commercial purposes to provide services to third parties.
|
||||
*
|
||||
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
|
||||
* Under no circumstances does the Copyright Holder guarantee or promise that the
|
||||
* Software provided by him will be suitable or not suitable for the specific purposes
|
||||
* of the User, that the Software will meet all commercial and personal subjective
|
||||
* expectations of the User, that the Software will work properly, without technical
|
||||
* errors, quickly and uninterruptedly.
|
||||
*
|
||||
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
|
||||
* to the User for any direct or indirect losses of the User, his expenses or actual
|
||||
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
|
||||
* or damage to data, property, etc.
|
||||
* ~~~~~~/licensing~~~~~~
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.aggregation;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.camel.AggregationStrategy;
|
||||
import org.apache.camel.Exchange;
|
||||
|
||||
public class HeaderMergeAggregatorImpl implements AggregationStrategy {
|
||||
|
||||
@Override
|
||||
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
|
||||
if (oldExchange != null && newExchange != null) {
|
||||
Map<String, Object> oldHeaders = oldExchange.getIn().getHeaders();
|
||||
Map<String, Object> newHeaders = newExchange.getIn().getHeaders();
|
||||
|
||||
oldHeaders = oldHeaders.entrySet().stream()
|
||||
.filter(e -> !newHeaders.containsKey(e.getKey()))
|
||||
.collect(Collectors.toMap(e->e.getKey(), e->e.getValue()));
|
||||
newExchange.getIn().getHeaders().putAll(oldHeaders);
|
||||
}
|
||||
return newExchange;
|
||||
}
|
||||
|
||||
}
|
@ -1,61 +0,0 @@
|
||||
/*-
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
* rights to the Software and any copies are the property of the Copyright Holder. Unless
|
||||
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
|
||||
* Software for commercial purposes to provide services to third parties.
|
||||
*
|
||||
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
|
||||
* Under no circumstances does the Copyright Holder guarantee or promise that the
|
||||
* Software provided by him will be suitable or not suitable for the specific purposes
|
||||
* of the User, that the Software will meet all commercial and personal subjective
|
||||
* expectations of the User, that the Software will work properly, without technical
|
||||
* errors, quickly and uninterruptedly.
|
||||
*
|
||||
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
|
||||
* to the User for any direct or indirect losses of the User, his expenses or actual
|
||||
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
|
||||
* or damage to data, property, etc.
|
||||
* ~~~~~~/licensing~~~~~~
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.aggregation;
|
||||
|
||||
import org.apache.camel.AggregationStrategy;
|
||||
import org.apache.camel.Exchange;
|
||||
|
||||
public class TimeoutAwareAggregationStrategyImpl implements AggregationStrategy {
|
||||
|
||||
private static final String ACK_MESSAGE_HEADER = "NTX_AckMessage";
|
||||
|
||||
private final String nameHeaderAcknowledge;
|
||||
|
||||
public TimeoutAwareAggregationStrategyImpl(String nameHeaderAcknowledge) {
|
||||
this.nameHeaderAcknowledge = nameHeaderAcknowledge;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
|
||||
if (oldExchange == null) {
|
||||
return newExchange;
|
||||
} else {
|
||||
oldExchange.getMessage().setHeader(nameHeaderAcknowledge, checkOnCorrectPair(oldExchange, newExchange));
|
||||
return oldExchange;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean checkOnCorrectPair(Exchange oldExchange, Exchange newExchange) {
|
||||
return (oldExchange != null && oldExchange.getMessage().getHeader(ACK_MESSAGE_HEADER) == null &&
|
||||
(newExchange == null || newExchange.getMessage().getHeader(ACK_MESSAGE_HEADER) == null));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void timeout(Exchange oldExchange, int index, int total, long timeout) {
|
||||
oldExchange.getMessage().setHeader(nameHeaderAcknowledge,
|
||||
(oldExchange.getMessage().getHeader(ACK_MESSAGE_HEADER) == null));
|
||||
}
|
||||
}
|
@ -1,74 +0,0 @@
|
||||
/*-
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
* rights to the Software and any copies are the property of the Copyright Holder. Unless
|
||||
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
|
||||
* Software for commercial purposes to provide services to third parties.
|
||||
*
|
||||
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
|
||||
* Under no circumstances does the Copyright Holder guarantee or promise that the
|
||||
* Software provided by him will be suitable or not suitable for the specific purposes
|
||||
* of the User, that the Software will meet all commercial and personal subjective
|
||||
* expectations of the User, that the Software will work properly, without technical
|
||||
* errors, quickly and uninterruptedly.
|
||||
*
|
||||
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
|
||||
* to the User for any direct or indirect losses of the User, his expenses or actual
|
||||
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
|
||||
* or damage to data, property, etc.
|
||||
* ~~~~~~/licensing~~~~~~
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.aggregation.hazelcast;
|
||||
|
||||
import com.hazelcast.core.HazelcastInstance;
|
||||
import com.hazelcast.core.MemberAttributeEvent;
|
||||
import com.hazelcast.core.MembershipEvent;
|
||||
import com.hazelcast.core.MembershipListener;
|
||||
import org.apache.camel.CamelContext;
|
||||
import org.apache.camel.processor.aggregate.AggregateProcessorSetter;
|
||||
import org.apache.camel.processor.aggregate.EntaxyAggregateProcessor;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class DisconnectedMembershipListener implements MembershipListener, AggregateProcessorSetter {
|
||||
|
||||
protected static final Logger log = LoggerFactory.getLogger(DisconnectedMembershipListener.class);
|
||||
private CamelContext camelContext;
|
||||
private EntaxyAggregateProcessor aggregateProcessor;
|
||||
|
||||
@Override
|
||||
public void memberAdded(MembershipEvent membershipEvent) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void memberRemoved(MembershipEvent membershipEvent) {
|
||||
try {
|
||||
aggregateProcessor.recoverCompletedMessageFromAggregationRepository(camelContext);
|
||||
aggregateProcessor.restoreTimeoutMapFromAggregationRepository();
|
||||
} catch (Exception e) {
|
||||
log.error("Can't restore Timeout from Aggregator. Please restart bundle.", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAggregateProcessor(EntaxyAggregateProcessor aggregateProcessor) {
|
||||
this.aggregateProcessor = aggregateProcessor;
|
||||
}
|
||||
|
||||
public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
|
||||
hazelcastInstance.getCluster().addMembershipListener(this);
|
||||
}
|
||||
|
||||
public void setCamelContext(CamelContext camelContext) {
|
||||
this.camelContext = camelContext;
|
||||
}
|
||||
}
|
@ -1,433 +0,0 @@
|
||||
/*-
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
* rights to the Software and any copies are the property of the Copyright Holder. Unless
|
||||
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
|
||||
* Software for commercial purposes to provide services to third parties.
|
||||
*
|
||||
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
|
||||
* Under no circumstances does the Copyright Holder guarantee or promise that the
|
||||
* Software provided by him will be suitable or not suitable for the specific purposes
|
||||
* of the User, that the Software will meet all commercial and personal subjective
|
||||
* expectations of the User, that the Software will work properly, without technical
|
||||
* errors, quickly and uninterruptedly.
|
||||
*
|
||||
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
|
||||
* to the User for any direct or indirect losses of the User, his expenses or actual
|
||||
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
|
||||
* or damage to data, property, etc.
|
||||
* ~~~~~~/licensing~~~~~~
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.aggregation.repo;
|
||||
|
||||
import org.apache.camel.CamelContext;
|
||||
import org.apache.camel.Exchange;
|
||||
import org.apache.camel.RuntimeCamelException;
|
||||
import org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper;
|
||||
import org.apache.camel.processor.aggregate.jdbc.JdbcCamelCodec;
|
||||
import org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper;
|
||||
import org.apache.camel.spi.OptimisticLockingAggregationRepository;
|
||||
import org.apache.camel.spi.RecoverableAggregationRepository;
|
||||
import org.apache.camel.support.service.ServiceSupport;
|
||||
import org.apache.camel.util.ObjectHelper;
|
||||
import org.apache.ignite.Ignite;
|
||||
import org.apache.ignite.IgniteCache;
|
||||
import org.apache.ignite.IgniteTransactions;
|
||||
import org.apache.ignite.Ignition;
|
||||
import org.apache.ignite.cache.CacheAtomicityMode;
|
||||
import org.apache.ignite.cache.CacheMode;
|
||||
import org.apache.ignite.cache.CacheRebalanceMode;
|
||||
import org.apache.ignite.cache.query.ScanQuery;
|
||||
import org.apache.ignite.configuration.CacheConfiguration;
|
||||
import org.apache.ignite.configuration.DataStorageConfiguration;
|
||||
import org.apache.ignite.configuration.IgniteConfiguration;
|
||||
import org.apache.ignite.configuration.TransactionConfiguration;
|
||||
import org.apache.ignite.logger.jcl.JclLogger;
|
||||
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
|
||||
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
|
||||
import org.apache.ignite.transactions.Transaction;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.osgi.framework.BundleContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.dao.EmptyResultDataAccessException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.ignite.IgniteSystemProperties.IGNITE_QUIET;
|
||||
|
||||
public class IgniteAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(IgniteAggregationRepository.class);
|
||||
public static final String AGGREGATION = "aggregation";
|
||||
public static final String AGGREGATION_COMPLETED = "aggregationCompleted";
|
||||
public static final int MAX_ATTEMPT_COUNT = 3;
|
||||
|
||||
private JdbcOptimisticLockingExceptionMapper jdbcOptimisticLockingExceptionMapper = new DefaultJdbcOptimisticLockingExceptionMapper();
|
||||
private boolean returnOldExchange;
|
||||
private final JdbcCamelCodec codec = new JdbcCamelCodec();
|
||||
private long recoveryInterval = 5000;
|
||||
private boolean useRecovery = true;
|
||||
private int maximumRedeliveries;
|
||||
private String deadLetterUri;
|
||||
private boolean allowSerializedHeaders;
|
||||
private int backups = 2;
|
||||
private String workDirectory;
|
||||
private String addresses = "127.0.0.1:47500,127.0.0.1:47501";
|
||||
|
||||
private Ignite ignite;
|
||||
private IgniteTransactions transactions;
|
||||
|
||||
private IgniteCache<String, byte[]> aggregationCache;
|
||||
private CacheConfiguration<String, byte[]> aggregationCfg;
|
||||
|
||||
private IgniteCache<String, byte[]> aggregationCompleted;
|
||||
private CacheConfiguration<String, byte[]> aggregationCompletedCfg;
|
||||
|
||||
private BundleContext bundleContext;
|
||||
|
||||
/**
|
||||
* Creates an ignite aggregation repository
|
||||
*/
|
||||
public IgniteAggregationRepository() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an ignite aggregation repository with the two mandatory parameters
|
||||
*/
|
||||
public IgniteAggregationRepository(BundleContext bundleContext, String workDirectory) {
|
||||
this.setBundleContext(bundleContext);
|
||||
this.setWorkDirectory(workDirectory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Exchange add(final CamelContext camelContext, final String correlationId,
|
||||
final Exchange oldExchange, final Exchange newExchange) throws OptimisticLockingException {
|
||||
|
||||
try {
|
||||
return add(camelContext, correlationId, newExchange);
|
||||
} catch (Exception e) {
|
||||
if (jdbcOptimisticLockingExceptionMapper != null && jdbcOptimisticLockingExceptionMapper.isOptimisticLocking(e)) {
|
||||
throw new OptimisticLockingException();
|
||||
} else {
|
||||
throw RuntimeCamelException.wrapRuntimeCamelException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Exchange add(final CamelContext camelContext, final String correlationId, final Exchange exchange) {
|
||||
Exchange result = null;
|
||||
final String key = correlationId;
|
||||
|
||||
try {
|
||||
LOG.debug("Adding exchange with key: [{}]", key);
|
||||
|
||||
boolean present = aggregationCache.get(key) != null;
|
||||
|
||||
// Recover existing exchange with that ID
|
||||
if (isReturnOldExchange() && present) {
|
||||
result = get(key, camelContext);
|
||||
}
|
||||
|
||||
final byte[] data = codec.marshallExchange(camelContext, exchange, allowSerializedHeaders);
|
||||
try (Transaction tx = transactions.txStart()) {
|
||||
aggregationCache.put(key, data);
|
||||
tx.commit();
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Error adding with key " + key, e);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public Exchange insert(final CamelContext camelContext, final String correlationId, final Exchange exchange) throws IOException {
|
||||
Exchange result = null;
|
||||
LOG.debug("Adding exchange with key: [{}]", correlationId);
|
||||
|
||||
final byte[] data = codec.marshallExchange(camelContext, exchange, allowSerializedHeaders);
|
||||
aggregationCompleted.put(correlationId, data);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Exchange get(final CamelContext camelContext, final String correlationId) {
|
||||
Exchange result = get(correlationId, camelContext);
|
||||
|
||||
LOG.debug("Getting key [{}] -> {}", correlationId, result);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private Exchange get(final String key, final CamelContext camelContext) {
|
||||
try {
|
||||
final byte[] data = aggregationCache.get(key);
|
||||
return codec.unmarshallExchange(camelContext, data);
|
||||
} catch (EmptyResultDataAccessException | NullPointerException ex) {
|
||||
return null;
|
||||
} catch (IOException | ClassNotFoundException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(final CamelContext camelContext, final String correlationId, final Exchange exchange) {
|
||||
try (Transaction tx = transactions.txStart()) {
|
||||
insert(camelContext, correlationId, exchange);
|
||||
if (!aggregationCache.remove(correlationId)) {
|
||||
throw new RuntimeException("Error removing key " + correlationId + " from repository " + AGGREGATION);
|
||||
}
|
||||
tx.commit();
|
||||
} catch (Exception exception) {
|
||||
throw new RuntimeException("Error removing key " + correlationId + " from repository " + AGGREGATION, exception);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void confirm(final CamelContext camelContext, final String exchangeId) {
|
||||
confirm(camelContext, exchangeId, 0);
|
||||
}
|
||||
|
||||
private void confirm(final CamelContext camelContext, final String exchangeId, int attemptCount) {
|
||||
if (attemptCount <= MAX_ATTEMPT_COUNT) {
|
||||
try (Transaction tx = transactions.txStart()) {
|
||||
aggregationCompleted.remove(exchangeId);
|
||||
tx.commit();
|
||||
} catch (Exception exception) {
|
||||
confirm(camelContext, exchangeId, ++attemptCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getKeys() {
|
||||
Set<String> keys = new HashSet<>();
|
||||
aggregationCache.query(new ScanQuery<>(null)).forEach(entry -> keys.add((String) entry.getKey()));
|
||||
return keys;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> scan(CamelContext camelContext) {
|
||||
Set<String> keys = new HashSet<>();
|
||||
aggregationCompleted.query(new ScanQuery<>(null)).forEach(entry -> keys.add((String) entry.getKey()));
|
||||
return keys;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Exchange recover(CamelContext camelContext, String exchangeId) {
|
||||
final byte[] data = aggregationCompleted.get(exchangeId);
|
||||
Exchange answer = null;
|
||||
try {
|
||||
answer = codec.unmarshallExchange(camelContext, data);
|
||||
} catch (IOException | ClassNotFoundException e) {
|
||||
LOG.error("Exception in recovering exchangeId {}", exchangeId, e);
|
||||
}
|
||||
|
||||
LOG.debug("Recovering exchangeId [{}] -> {}", exchangeId, answer);
|
||||
|
||||
return answer;
|
||||
}
|
||||
|
||||
/**
|
||||
* If recovery is enabled then a background task is run every x'th time to scan for failed exchanges to recover
|
||||
* and resubmit. By default this interval is 5000 millis.
|
||||
*
|
||||
* @param interval the interval
|
||||
* @param timeUnit the time unit
|
||||
*/
|
||||
public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
|
||||
this.recoveryInterval = timeUnit.toMillis(interval);
|
||||
}
|
||||
|
||||
public void setRecoveryInterval(long interval) {
|
||||
this.recoveryInterval = interval;
|
||||
}
|
||||
|
||||
public long getRecoveryIntervalInMillis() {
|
||||
return recoveryInterval;
|
||||
}
|
||||
|
||||
public boolean isUseRecovery() {
|
||||
return useRecovery;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param useRecovery Whether or not recovery is enabled. This option is by default true. When enabled the Camel
|
||||
* Aggregator automatic recover failed aggregated exchange and have them resubmittedd
|
||||
*/
|
||||
public void setUseRecovery(boolean useRecovery) {
|
||||
this.useRecovery = useRecovery;
|
||||
}
|
||||
|
||||
public int getMaximumRedeliveries() {
|
||||
return maximumRedeliveries;
|
||||
}
|
||||
|
||||
public void setMaximumRedeliveries(int maximumRedeliveries) {
|
||||
this.maximumRedeliveries = maximumRedeliveries;
|
||||
}
|
||||
|
||||
public String getDeadLetterUri() {
|
||||
return deadLetterUri;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param deadLetterUri An endpoint uri for a Dead Letter Channel where exhausted recovered Exchanges will be
|
||||
* moved. If this option is used then the maximumRedeliveries option must also be provided.
|
||||
* Important note : if the deadletter route throws an exception, it will be send again to DLQ
|
||||
* until it succeed !
|
||||
*/
|
||||
public void setDeadLetterUri(String deadLetterUri) {
|
||||
this.deadLetterUri = deadLetterUri;
|
||||
}
|
||||
|
||||
public boolean isReturnOldExchange() {
|
||||
return returnOldExchange;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param returnOldExchange Whether the get operation should return the old existing Exchange if any existed.
|
||||
* By default this option is false to optimize as we do not need the old exchange when
|
||||
* aggregating
|
||||
*/
|
||||
public void setReturnOldExchange(boolean returnOldExchange) {
|
||||
this.returnOldExchange = returnOldExchange;
|
||||
}
|
||||
|
||||
public boolean isAllowSerializedHeaders() {
|
||||
return allowSerializedHeaders;
|
||||
}
|
||||
|
||||
public void setAllowSerializedHeaders(boolean allowSerializedHeaders) {
|
||||
this.allowSerializedHeaders = allowSerializedHeaders;
|
||||
}
|
||||
|
||||
public JdbcOptimisticLockingExceptionMapper getJdbcOptimisticLockingExceptionMapper() {
|
||||
return jdbcOptimisticLockingExceptionMapper;
|
||||
}
|
||||
|
||||
public void setJdbcOptimisticLockingExceptionMapper(JdbcOptimisticLockingExceptionMapper jdbcOptimisticLockingExceptionMapper) {
|
||||
this.jdbcOptimisticLockingExceptionMapper = jdbcOptimisticLockingExceptionMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws Exception {
|
||||
ObjectHelper.notNull(bundleContext, "BundleContext");
|
||||
ObjectHelper.notNull(workDirectory, "WorkDirectory");
|
||||
|
||||
settingsIgnite();
|
||||
|
||||
// log number of existing exchanges
|
||||
int current = getKeys().size();
|
||||
int completed = scan(null).size();
|
||||
|
||||
if (current > 0) {
|
||||
LOG.info("On startup there are " + current + " aggregate exchanges (not completed) in repository: " + AGGREGATION);
|
||||
} else {
|
||||
LOG.info("On startup there are no existing aggregate exchanges (not completed) in repository: {}", AGGREGATION);
|
||||
}
|
||||
if (completed > 0) {
|
||||
LOG.warn("On startup there are " + completed + " completed exchanges to be recovered in repository: " + AGGREGATION_COMPLETED);
|
||||
} else {
|
||||
LOG.info("On startup there are no completed exchanges to be recovered in repository: {}", AGGREGATION_COMPLETED);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doStop() throws Exception {
|
||||
if (ignite != null) {
|
||||
ignite.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void settingsIgnite() {
|
||||
IgniteConfiguration cfg = new IgniteConfiguration();
|
||||
cfg.setDiscoverySpi(getTcpDiscoverySpi());
|
||||
cfg.setGridLogger(new JclLogger());
|
||||
cfg.setTransactionConfiguration(new TransactionConfiguration());
|
||||
|
||||
settingsPersistence(cfg);
|
||||
|
||||
aggregationCfg = getCacheConfiguration(AGGREGATION);
|
||||
cfg.setCacheConfiguration(aggregationCfg);
|
||||
|
||||
aggregationCompletedCfg = getCacheConfiguration(AGGREGATION_COMPLETED);
|
||||
cfg.setCacheConfiguration(aggregationCompletedCfg);
|
||||
|
||||
startIgnite(cfg);
|
||||
|
||||
createCache();
|
||||
}
|
||||
|
||||
private void settingsPersistence(IgniteConfiguration cfg) {
|
||||
DataStorageConfiguration storageCfg = new DataStorageConfiguration();
|
||||
storageCfg.getDefaultDataRegionConfiguration().setPersistenceEnabled(true);
|
||||
cfg.setWorkDirectory(workDirectory);
|
||||
cfg.setDataStorageConfiguration(storageCfg);
|
||||
}
|
||||
|
||||
private void createCache() {
|
||||
transactions = ignite.transactions();
|
||||
aggregationCache = ignite.getOrCreateCache(aggregationCfg);
|
||||
aggregationCompleted = ignite.getOrCreateCache(aggregationCompletedCfg);
|
||||
|
||||
// Set the baseline topology that is represented by these nodes.
|
||||
ignite.cluster().setBaselineTopology(ignite.cluster().localNode().order());
|
||||
}
|
||||
|
||||
private void startIgnite(IgniteConfiguration cfg) {
|
||||
System.setProperty(IGNITE_QUIET, "false");
|
||||
ignite = Ignition.getOrStart(cfg);
|
||||
ignite.cluster().active(true);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private CacheConfiguration<String, byte[]> getCacheConfiguration(String cacheName) {
|
||||
CacheConfiguration<String, byte[]> cacheCfg = new CacheConfiguration<String, byte[]>();
|
||||
cacheCfg.setName(cacheName);
|
||||
cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
|
||||
cacheCfg.setCacheMode(CacheMode.REPLICATED);
|
||||
cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
|
||||
cacheCfg.setBackups(backups);
|
||||
return cacheCfg;
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private TcpDiscoverySpi getTcpDiscoverySpi() {
|
||||
TcpDiscoverySpi spi = new TcpDiscoverySpi();
|
||||
spi.setIpFinder(new TcpDiscoveryVmIpFinder(false) {
|
||||
{
|
||||
setAddresses(Arrays.asList(addresses.split(",")));
|
||||
}
|
||||
});
|
||||
return spi;
|
||||
}
|
||||
|
||||
public void setBundleContext(BundleContext bundleContext) {
|
||||
this.bundleContext = bundleContext;
|
||||
}
|
||||
|
||||
public void setWorkDirectory(String workDirectory) {
|
||||
this.workDirectory = workDirectory;
|
||||
}
|
||||
|
||||
public void setBackups(int backups) {
|
||||
this.backups = backups;
|
||||
}
|
||||
|
||||
public void setAddresses(String addresses) {
|
||||
this.addresses = addresses;
|
||||
}
|
||||
}
|
@ -1,655 +0,0 @@
|
||||
/*-
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
* rights to the Software and any copies are the property of the Copyright Holder. Unless
|
||||
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
|
||||
* Software for commercial purposes to provide services to third parties.
|
||||
*
|
||||
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
|
||||
* Under no circumstances does the Copyright Holder guarantee or promise that the
|
||||
* Software provided by him will be suitable or not suitable for the specific purposes
|
||||
* of the User, that the Software will meet all commercial and personal subjective
|
||||
* expectations of the User, that the Software will work properly, without technical
|
||||
* errors, quickly and uninterruptedly.
|
||||
*
|
||||
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
|
||||
* to the User for any direct or indirect losses of the User, his expenses or actual
|
||||
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
|
||||
* or damage to data, property, etc.
|
||||
* ~~~~~~/licensing~~~~~~
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.aggregation.repo;
|
||||
|
||||
import org.apache.camel.CamelContext;
|
||||
import org.apache.camel.Exchange;
|
||||
import org.apache.camel.RuntimeCamelException;
|
||||
import org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper;
|
||||
import org.apache.camel.processor.aggregate.jdbc.JdbcCamelCodec;
|
||||
import org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper;
|
||||
import org.apache.camel.spi.OptimisticLockingAggregationRepository;
|
||||
import org.apache.camel.spi.RecoverableAggregationRepository;
|
||||
import org.apache.camel.support.service.ServiceSupport;
|
||||
import org.apache.camel.util.ObjectHelper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.Constants;
|
||||
import org.springframework.dao.EmptyResultDataAccessException;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.jdbc.core.support.AbstractLobCreatingPreparedStatementCallback;
|
||||
import org.springframework.jdbc.support.lob.DefaultLobHandler;
|
||||
import org.springframework.jdbc.support.lob.LobCreator;
|
||||
import org.springframework.jdbc.support.lob.LobHandler;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.TransactionDefinition;
|
||||
import org.springframework.transaction.TransactionStatus;
|
||||
import org.springframework.transaction.support.DefaultTransactionDefinition;
|
||||
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.io.IOException;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Types;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* JDBC based {@link org.apache.camel.spi.AggregationRepository} JdbcAggregationRepository will only preserve any
|
||||
* Serializable compatible data types. If a data type is not such a type its dropped and a WARN is logged. And it only
|
||||
* persists the Message body and the Message headers. The Exchange properties are not persisted.
|
||||
*/
|
||||
public class JdbcAggregationRepository extends ServiceSupport
|
||||
implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository {
|
||||
|
||||
protected static final String EXCHANGE = "exchange";
|
||||
protected static final String ID = "id";
|
||||
protected static final String BODY = "body";
|
||||
|
||||
// optimistic locking: version identifier needed to avoid the lost update problem
|
||||
private static final String VERSION = "version";
|
||||
private static final String VERSION_PROPERTY = "CamelOptimisticLockVersion";
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(JdbcAggregationRepository.class);
|
||||
private static final Constants PROPAGATION_CONSTANTS = new Constants(TransactionDefinition.class);
|
||||
|
||||
private JdbcOptimisticLockingExceptionMapper jdbcOptimisticLockingExceptionMapper
|
||||
= new DefaultJdbcOptimisticLockingExceptionMapper();
|
||||
private PlatformTransactionManager transactionManager;
|
||||
private DataSource dataSource;
|
||||
private TransactionTemplate transactionTemplate;
|
||||
private TransactionTemplate transactionTemplateReadOnly;
|
||||
private int propagationBehavior = TransactionDefinition.PROPAGATION_REQUIRED;
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
private LobHandler lobHandler = new DefaultLobHandler();
|
||||
private String repositoryName;
|
||||
private boolean returnOldExchange;
|
||||
private JdbcCamelCodec codec = new JdbcCamelCodec();
|
||||
private long recoveryInterval = 5000;
|
||||
private boolean useRecovery = true;
|
||||
private int maximumRedeliveries;
|
||||
private String deadLetterUri;
|
||||
private List<String> headersToStoreAsText;
|
||||
private boolean storeBodyAsText;
|
||||
private boolean allowSerializedHeaders;
|
||||
|
||||
/**
|
||||
* Creates an aggregation repository
|
||||
*/
|
||||
public JdbcAggregationRepository() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an aggregation repository with the three mandatory parameters
|
||||
*/
|
||||
public JdbcAggregationRepository(PlatformTransactionManager transactionManager, String repositoryName,
|
||||
DataSource dataSource) {
|
||||
this.setRepositoryName(repositoryName);
|
||||
this.setTransactionManager(transactionManager);
|
||||
this.setDataSource(dataSource);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the name of the repository
|
||||
*/
|
||||
public final void setRepositoryName(String repositoryName) {
|
||||
this.repositoryName = repositoryName;
|
||||
}
|
||||
|
||||
public final void setTransactionManager(PlatformTransactionManager transactionManager) {
|
||||
this.transactionManager = transactionManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the DataSource to use for accessing the database
|
||||
*/
|
||||
public void setDataSource(DataSource dataSource) {
|
||||
this.dataSource = dataSource;
|
||||
|
||||
jdbcTemplate = new JdbcTemplate(dataSource);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Exchange add(
|
||||
final CamelContext camelContext, final String correlationId,
|
||||
final Exchange oldExchange, final Exchange newExchange)
|
||||
throws OptimisticLockingException {
|
||||
|
||||
try {
|
||||
return add(camelContext, correlationId, newExchange);
|
||||
} catch (Exception e) {
|
||||
if (jdbcOptimisticLockingExceptionMapper != null && jdbcOptimisticLockingExceptionMapper.isOptimisticLocking(e)) {
|
||||
throw new OptimisticLockingException();
|
||||
} else {
|
||||
throw RuntimeCamelException.wrapRuntimeCamelException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Exchange add(final CamelContext camelContext, final String correlationId, final Exchange exchange) {
|
||||
return transactionTemplate.execute(status -> {
|
||||
Exchange result = null;
|
||||
final String key = correlationId;
|
||||
|
||||
try {
|
||||
LOG.debug("Adding exchange with key {}", key);
|
||||
|
||||
boolean present = jdbcTemplate.queryForObject(
|
||||
"SELECT COUNT(1) FROM " + getRepositoryName() + " WHERE " + ID + " = ?", Integer.class, key) != 0;
|
||||
|
||||
// Recover existing exchange with that ID
|
||||
if (isReturnOldExchange() && present) {
|
||||
result = get(key, getRepositoryName(), camelContext);
|
||||
}
|
||||
|
||||
if (present) {
|
||||
long version = exchange.getProperty(VERSION_PROPERTY, Long.class);
|
||||
LOG.debug("Updating record with key {} and version {}", key, version);
|
||||
update(camelContext, correlationId, exchange, getRepositoryName(), version);
|
||||
} else {
|
||||
LOG.debug("Inserting record with key {}", key);
|
||||
insert(camelContext, correlationId, exchange, getRepositoryName(), 1L);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Error adding to repository " + repositoryName + " with key " + key, e);
|
||||
}
|
||||
|
||||
return result;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the current exchange details in the given repository table.
|
||||
*
|
||||
* @param camelContext Current CamelContext
|
||||
* @param key Correlation key
|
||||
* @param exchange Aggregated exchange
|
||||
* @param repositoryName Table's name
|
||||
* @param version Version identifier
|
||||
*/
|
||||
protected void update(
|
||||
final CamelContext camelContext, final String key, final Exchange exchange, String repositoryName, Long version)
|
||||
throws Exception {
|
||||
StringBuilder queryBuilder = new StringBuilder()
|
||||
.append("UPDATE ").append(repositoryName)
|
||||
.append(" SET ")
|
||||
.append(EXCHANGE).append(" = ?")
|
||||
.append(", ")
|
||||
.append(VERSION).append(" = ?");
|
||||
if (storeBodyAsText) {
|
||||
queryBuilder.append(", ").append(BODY).append(" = ?");
|
||||
}
|
||||
|
||||
if (hasHeadersToStoreAsText()) {
|
||||
for (String headerName : headersToStoreAsText) {
|
||||
queryBuilder.append(", ").append(headerName).append(" = ?");
|
||||
}
|
||||
}
|
||||
|
||||
queryBuilder.append(" WHERE ")
|
||||
.append(ID).append(" = ?")
|
||||
.append(" AND ")
|
||||
.append(VERSION).append(" = ?");
|
||||
|
||||
String sql = queryBuilder.toString();
|
||||
updateHelper(camelContext, key, exchange, sql, version);
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts a new record into the given repository table. Note: the exchange properties are NOT persisted.
|
||||
*
|
||||
* @param camelContext Current CamelContext
|
||||
* @param correlationId Correlation key
|
||||
* @param exchange Aggregated exchange to insert
|
||||
* @param repositoryName Table's name
|
||||
* @param version Version identifier
|
||||
*/
|
||||
protected void insert(
|
||||
final CamelContext camelContext, final String correlationId, final Exchange exchange, String repositoryName,
|
||||
Long version)
|
||||
throws Exception {
|
||||
// The default totalParameterIndex is 3 for ID, Exchange and version. Depending on logic this will be increased.
|
||||
int totalParameterIndex = 3;
|
||||
StringBuilder queryBuilder = new StringBuilder()
|
||||
.append("INSERT INTO ").append(repositoryName)
|
||||
.append('(').append(EXCHANGE)
|
||||
.append(", ").append(ID)
|
||||
.append(", ").append(VERSION);
|
||||
|
||||
if (storeBodyAsText) {
|
||||
queryBuilder.append(", ").append(BODY);
|
||||
totalParameterIndex++;
|
||||
}
|
||||
|
||||
if (hasHeadersToStoreAsText()) {
|
||||
for (String headerName : headersToStoreAsText) {
|
||||
queryBuilder.append(", ").append(headerName);
|
||||
totalParameterIndex++;
|
||||
}
|
||||
}
|
||||
|
||||
queryBuilder.append(") VALUES (");
|
||||
|
||||
for (int i = 0; i < totalParameterIndex - 1; i++) {
|
||||
queryBuilder.append("?, ");
|
||||
}
|
||||
queryBuilder.append("?)");
|
||||
|
||||
String sql = queryBuilder.toString();
|
||||
|
||||
insertHelper(camelContext, correlationId, exchange, sql, version);
|
||||
}
|
||||
|
||||
protected int insertHelper(
|
||||
final CamelContext camelContext, final String key, final Exchange exchange, String sql, final Long version)
|
||||
throws Exception {
|
||||
final byte[] data = codec.marshallExchange(camelContext, exchange, allowSerializedHeaders);
|
||||
Integer insertCount = jdbcTemplate.execute(sql,
|
||||
new AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
|
||||
@Override
|
||||
protected void setValues(PreparedStatement ps, LobCreator lobCreator) throws SQLException {
|
||||
int totalParameterIndex = 0;
|
||||
lobCreator.setBlobAsBytes(ps, ++totalParameterIndex, data);
|
||||
ps.setString(++totalParameterIndex, key);
|
||||
ps.setLong(++totalParameterIndex, version);
|
||||
if (storeBodyAsText) {
|
||||
ps.setString(++totalParameterIndex, exchange.getIn().getBody(String.class));
|
||||
}
|
||||
if (hasHeadersToStoreAsText()) {
|
||||
for (String headerName : headersToStoreAsText) {
|
||||
String headerValue = exchange.getIn().getHeader(headerName, String.class);
|
||||
ps.setString(++totalParameterIndex, headerValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
return insertCount == null ? 0 : insertCount;
|
||||
}
|
||||
|
||||
protected int updateHelper(
|
||||
final CamelContext camelContext, final String key, final Exchange exchange, String sql, final Long version)
|
||||
throws Exception {
|
||||
final byte[] data = codec.marshallExchange(camelContext, exchange, allowSerializedHeaders);
|
||||
Integer updateCount = jdbcTemplate.execute(sql,
|
||||
new AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
|
||||
@Override
|
||||
protected void setValues(PreparedStatement ps, LobCreator lobCreator) throws SQLException {
|
||||
int totalParameterIndex = 0;
|
||||
lobCreator.setBlobAsBytes(ps, ++totalParameterIndex, data);
|
||||
ps.setLong(++totalParameterIndex, version + 1);
|
||||
if (storeBodyAsText) {
|
||||
ps.setString(++totalParameterIndex, exchange.getIn().getBody(String.class));
|
||||
}
|
||||
if (hasHeadersToStoreAsText()) {
|
||||
for (String headerName : headersToStoreAsText) {
|
||||
String headerValue = exchange.getIn().getHeader(headerName, String.class);
|
||||
ps.setString(++totalParameterIndex, headerValue);
|
||||
}
|
||||
}
|
||||
ps.setString(++totalParameterIndex, key);
|
||||
ps.setLong(++totalParameterIndex, version);
|
||||
}
|
||||
});
|
||||
if (updateCount == 1) {
|
||||
return updateCount;
|
||||
} else {
|
||||
// Found stale version while updating record
|
||||
throw new OptimisticLockingException();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Exchange get(final CamelContext camelContext, final String correlationId) {
|
||||
final String key = correlationId;
|
||||
Exchange result = get(key, getRepositoryName(), camelContext);
|
||||
LOG.debug("Getting key {} -> {}", key, result);
|
||||
return result;
|
||||
}
|
||||
|
||||
private Exchange get(final String key, final String repositoryName, final CamelContext camelContext) {
|
||||
return transactionTemplateReadOnly.execute(status -> {
|
||||
try {
|
||||
|
||||
Map<String, Object> columns = jdbcTemplate.queryForMap(
|
||||
String.format("SELECT %1$s, %2$s FROM %3$s WHERE %4$s=?", EXCHANGE, VERSION, repositoryName, ID),
|
||||
new Object[]{key}, new int[]{Types.VARCHAR});
|
||||
|
||||
byte[] marshalledExchange = (byte[]) columns.get(EXCHANGE);
|
||||
long version = (long) columns.get(VERSION);
|
||||
|
||||
Exchange result = codec.unmarshallExchange(camelContext, marshalledExchange);
|
||||
result.setProperty(VERSION_PROPERTY, version);
|
||||
return result;
|
||||
|
||||
} catch (EmptyResultDataAccessException ex) {
|
||||
return null;
|
||||
} catch (IOException ex) {
|
||||
// Rollback the transaction
|
||||
throw new RuntimeException("Error getting key " + key + " from repository " + repositoryName, ex);
|
||||
} catch (ClassNotFoundException ex) {
|
||||
// Rollback the transaction
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void remove(final CamelContext camelContext, final String correlationId, final Exchange exchange) {
|
||||
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
||||
final String key = correlationId;
|
||||
final String confirmKey = exchange.getExchangeId();
|
||||
final long version = exchange.getProperty(VERSION_PROPERTY, Long.class);
|
||||
try {
|
||||
LOG.debug("Removing key {}", key);
|
||||
|
||||
// добавлена проверка на корректное удаление из таблицы, т к возникала ситуация гонки на узлах.
|
||||
// один узел уже удалил из completed, а второй в процессе, соответственно ошибки дублирования
|
||||
// не появляется появляется дубликат в очереди
|
||||
if (jdbcTemplate.update("DELETE FROM " + getRepositoryName() + " WHERE " + ID + " = ? AND " + VERSION + " = ?",
|
||||
key, version) == 1) {
|
||||
insert(camelContext, confirmKey, exchange, getRepositoryNameCompleted(), version);
|
||||
} else {
|
||||
throw new RuntimeException("Error removing key " + key + " from repository " + repositoryName);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Error removing key " + key + " from repository " + repositoryName, e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void confirm(final CamelContext camelContext, final String exchangeId) {
|
||||
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
||||
LOG.debug("Confirming exchangeId {}", exchangeId);
|
||||
final String confirmKey = exchangeId;
|
||||
|
||||
jdbcTemplate.update("DELETE FROM " + getRepositoryNameCompleted() + " WHERE " + ID + " = ?",
|
||||
confirmKey);
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getKeys() {
|
||||
return getKeys(getRepositoryName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> scan(CamelContext camelContext) {
|
||||
return getKeys(getRepositoryNameCompleted());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the keys in the given repository
|
||||
*
|
||||
* @param repositoryName The name of the table
|
||||
* @return Set of keys in the given repository name
|
||||
*/
|
||||
protected Set<String> getKeys(final String repositoryName) {
|
||||
return transactionTemplateReadOnly.execute(status -> {
|
||||
List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM " + repositoryName,
|
||||
(rs, rowNum) -> {
|
||||
String id = rs.getString(ID);
|
||||
LOG.trace("getKey {}", id);
|
||||
return id;
|
||||
});
|
||||
return new LinkedHashSet<>(keys);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Exchange recover(CamelContext camelContext, String exchangeId) {
|
||||
final String key = exchangeId;
|
||||
Exchange answer = get(key, getRepositoryNameCompleted(), camelContext);
|
||||
LOG.debug("Recovering exchangeId {} -> {}", key, answer);
|
||||
return answer;
|
||||
}
|
||||
|
||||
/**
|
||||
* If recovery is enabled then a background task is run every x'th time to scan for failed exchanges to recover and
|
||||
* resubmit. By default this interval is 5000 millis.
|
||||
*/
|
||||
@Override
|
||||
public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
|
||||
this.recoveryInterval = timeUnit.toMillis(interval);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRecoveryInterval(long interval) {
|
||||
this.recoveryInterval = interval;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRecoveryIntervalInMillis() {
|
||||
return recoveryInterval;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isUseRecovery() {
|
||||
return useRecovery;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether or not recovery is enabled. This option is by default true. When enabled the Camel Aggregator automatic
|
||||
* recover failed aggregated exchange and have them resubmitted.
|
||||
*/
|
||||
@Override
|
||||
public void setUseRecovery(boolean useRecovery) {
|
||||
this.useRecovery = useRecovery;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaximumRedeliveries() {
|
||||
return maximumRedeliveries;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaximumRedeliveries(int maximumRedeliveries) {
|
||||
this.maximumRedeliveries = maximumRedeliveries;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDeadLetterUri() {
|
||||
return deadLetterUri;
|
||||
}
|
||||
|
||||
/**
|
||||
* An endpoint uri for a Dead Letter Channel where exhausted recovered Exchanges will be moved. If this option is
|
||||
* used then the maximumRedeliveries option must also be provided. Important note : if the deadletter route throws
|
||||
* an exception, it will be send again to DLQ until it succeed !
|
||||
*/
|
||||
@Override
|
||||
public void setDeadLetterUri(String deadLetterUri) {
|
||||
this.deadLetterUri = deadLetterUri;
|
||||
}
|
||||
|
||||
public boolean isReturnOldExchange() {
|
||||
return returnOldExchange;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether the get operation should return the old existing Exchange if any existed. By default this option is false
|
||||
* to optimize as we do not need the old exchange when aggregating.
|
||||
*/
|
||||
public void setReturnOldExchange(boolean returnOldExchange) {
|
||||
this.returnOldExchange = returnOldExchange;
|
||||
}
|
||||
|
||||
public void setJdbcCamelCodec(JdbcCamelCodec codec) {
|
||||
this.codec = codec;
|
||||
}
|
||||
|
||||
public boolean hasHeadersToStoreAsText() {
|
||||
return this.headersToStoreAsText != null && !this.headersToStoreAsText.isEmpty();
|
||||
}
|
||||
|
||||
public List<String> getHeadersToStoreAsText() {
|
||||
return headersToStoreAsText;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows to store headers as String which is human readable. By default this option is disabled, storing the
|
||||
* headers in binary format.
|
||||
*
|
||||
* @param headersToStoreAsText the list of headers to store as String
|
||||
*/
|
||||
public void setHeadersToStoreAsText(List<String> headersToStoreAsText) {
|
||||
this.headersToStoreAsText = headersToStoreAsText;
|
||||
}
|
||||
|
||||
public boolean isStoreBodyAsText() {
|
||||
return storeBodyAsText;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether to store the message body as String which is human readable. By default this option is false storing the
|
||||
* body in binary format.
|
||||
*/
|
||||
public void setStoreBodyAsText(boolean storeBodyAsText) {
|
||||
this.storeBodyAsText = storeBodyAsText;
|
||||
}
|
||||
|
||||
public boolean isAllowSerializedHeaders() {
|
||||
return allowSerializedHeaders;
|
||||
}
|
||||
|
||||
public void setAllowSerializedHeaders(boolean allowSerializedHeaders) {
|
||||
this.allowSerializedHeaders = allowSerializedHeaders;
|
||||
}
|
||||
|
||||
public int getPropagationBehavior() {
|
||||
return propagationBehavior;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets propagation behavior to use with spring transaction templates which are used for database access. The
|
||||
* default is TransactionDefinition.PROPAGATION_REQUIRED.
|
||||
*/
|
||||
public void setPropagationBehavior(int propagationBehavior) {
|
||||
this.propagationBehavior = propagationBehavior;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets propagation behavior to use with spring transaction templates which are used for database access. The
|
||||
* default is TransactionDefinition.PROPAGATION_REQUIRED. This setter accepts names of the constants, like
|
||||
* "PROPAGATION_REQUIRED".
|
||||
*
|
||||
* @param propagationBehaviorName
|
||||
*/
|
||||
public void setPropagationBehaviorName(String propagationBehaviorName) {
|
||||
if (!propagationBehaviorName.startsWith(DefaultTransactionDefinition.PREFIX_PROPAGATION)) {
|
||||
throw new IllegalArgumentException("Only propagation constants allowed");
|
||||
}
|
||||
setPropagationBehavior(PROPAGATION_CONSTANTS.asNumber(propagationBehaviorName).intValue());
|
||||
}
|
||||
|
||||
public LobHandler getLobHandler() {
|
||||
return lobHandler;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets a custom LobHandler to use
|
||||
*/
|
||||
public void setLobHandler(LobHandler lobHandler) {
|
||||
this.lobHandler = lobHandler;
|
||||
}
|
||||
|
||||
public JdbcOptimisticLockingExceptionMapper getJdbcOptimisticLockingExceptionMapper() {
|
||||
return jdbcOptimisticLockingExceptionMapper;
|
||||
}
|
||||
|
||||
public void setJdbcOptimisticLockingExceptionMapper(
|
||||
JdbcOptimisticLockingExceptionMapper jdbcOptimisticLockingExceptionMapper) {
|
||||
this.jdbcOptimisticLockingExceptionMapper = jdbcOptimisticLockingExceptionMapper;
|
||||
}
|
||||
|
||||
public String getRepositoryName() {
|
||||
return repositoryName;
|
||||
}
|
||||
|
||||
public String getRepositoryNameCompleted() {
|
||||
return getRepositoryName() + "_completed";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doInit() throws Exception {
|
||||
super.doInit();
|
||||
|
||||
ObjectHelper.notNull(repositoryName, "RepositoryName");
|
||||
ObjectHelper.notNull(transactionManager, "TransactionManager");
|
||||
ObjectHelper.notNull(dataSource, "DataSource");
|
||||
|
||||
transactionTemplate = new TransactionTemplate(transactionManager);
|
||||
transactionTemplate.setPropagationBehavior(propagationBehavior);
|
||||
|
||||
transactionTemplateReadOnly = new TransactionTemplate(transactionManager);
|
||||
transactionTemplateReadOnly.setPropagationBehavior(propagationBehavior);
|
||||
transactionTemplateReadOnly.setReadOnly(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws Exception {
|
||||
super.doStart();
|
||||
|
||||
// log number of existing exchanges
|
||||
int current = getKeys().size();
|
||||
int completed = scan(null).size();
|
||||
|
||||
if (current > 0) {
|
||||
LOG.info("On startup there are " + current + " aggregate exchanges (not completed) in repository: "
|
||||
+ getRepositoryName());
|
||||
} else {
|
||||
LOG.info("On startup there are no existing aggregate exchanges (not completed) in repository: {}",
|
||||
getRepositoryName());
|
||||
}
|
||||
if (completed > 0) {
|
||||
LOG.warn("On startup there are " + completed + " completed exchanges to be recovered in repository: "
|
||||
+ getRepositoryNameCompleted());
|
||||
} else {
|
||||
LOG.info("On startup there are no completed exchanges to be recovered in repository: {}",
|
||||
getRepositoryNameCompleted());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws Exception {
|
||||
// noop
|
||||
}
|
||||
|
||||
}
|
@ -1,103 +0,0 @@
|
||||
/*-
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
* rights to the Software and any copies are the property of the Copyright Holder. Unless
|
||||
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
|
||||
* Software for commercial purposes to provide services to third parties.
|
||||
*
|
||||
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
|
||||
* Under no circumstances does the Copyright Holder guarantee or promise that the
|
||||
* Software provided by him will be suitable or not suitable for the specific purposes
|
||||
* of the User, that the Software will meet all commercial and personal subjective
|
||||
* expectations of the User, that the Software will work properly, without technical
|
||||
* errors, quickly and uninterruptedly.
|
||||
*
|
||||
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
|
||||
* to the User for any direct or indirect losses of the User, his expenses or actual
|
||||
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
|
||||
* or damage to data, property, etc.
|
||||
* ~~~~~~/licensing~~~~~~
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.aggregation.repo;
|
||||
|
||||
import org.apache.camel.CamelContext;
|
||||
import org.apache.camel.Exchange;
|
||||
import org.springframework.dao.DataIntegrityViolationException;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
|
||||
/**
|
||||
* PostgreSQL specific {@link JdbcAggregationRepository} that deals with SQL Violation Exceptions using special
|
||||
* {@code INSERT INTO .. ON CONFLICT DO NOTHING} claues.
|
||||
*/
|
||||
public class PostgresAggregationRepository extends JdbcAggregationRepository {
|
||||
|
||||
/**
|
||||
* Creates an aggregation repository
|
||||
*/
|
||||
public PostgresAggregationRepository() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an aggregation repository with the three mandatory parameters
|
||||
*/
|
||||
public PostgresAggregationRepository(PlatformTransactionManager transactionManager, String repositoryName,
|
||||
DataSource dataSource) {
|
||||
super(transactionManager, repositoryName, dataSource);
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts a new record into the given repository table
|
||||
*
|
||||
* @param camelContext the current CamelContext
|
||||
* @param correlationId the correlation key
|
||||
* @param exchange the aggregated exchange
|
||||
* @param repositoryName The name of the table
|
||||
*/
|
||||
protected void insert(
|
||||
final CamelContext camelContext, final String correlationId, final Exchange exchange, String repositoryName)
|
||||
throws Exception {
|
||||
// The default totalParameterIndex is 2 for ID and Exchange. Depending on logic this will be increased
|
||||
int totalParameterIndex = 2;
|
||||
StringBuilder queryBuilder = new StringBuilder()
|
||||
.append("INSERT INTO ").append(repositoryName)
|
||||
.append('(')
|
||||
.append(EXCHANGE).append(", ")
|
||||
.append(ID);
|
||||
|
||||
if (isStoreBodyAsText()) {
|
||||
queryBuilder.append(", ").append(BODY);
|
||||
totalParameterIndex++;
|
||||
}
|
||||
|
||||
if (hasHeadersToStoreAsText()) {
|
||||
for (String headerName : getHeadersToStoreAsText()) {
|
||||
queryBuilder.append(", ").append(headerName);
|
||||
totalParameterIndex++;
|
||||
}
|
||||
}
|
||||
|
||||
queryBuilder.append(") VALUES (");
|
||||
|
||||
for (int i = 0; i < totalParameterIndex - 1; i++) {
|
||||
queryBuilder.append("?, ");
|
||||
}
|
||||
queryBuilder.append("?)");
|
||||
|
||||
queryBuilder.append(" ON CONFLICT DO NOTHING");
|
||||
|
||||
String sql = queryBuilder.toString();
|
||||
|
||||
int updateCount = insertHelper(camelContext, correlationId, exchange, sql, 1L);
|
||||
if (updateCount == 0 && getRepositoryName().equals(repositoryName)) {
|
||||
throw new DataIntegrityViolationException("No row was inserted due to data violation");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,50 +0,0 @@
|
||||
/*-
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
* rights to the Software and any copies are the property of the Copyright Holder. Unless
|
||||
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
|
||||
* Software for commercial purposes to provide services to third parties.
|
||||
*
|
||||
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
|
||||
* Under no circumstances does the Copyright Holder guarantee or promise that the
|
||||
* Software provided by him will be suitable or not suitable for the specific purposes
|
||||
* of the User, that the Software will meet all commercial and personal subjective
|
||||
* expectations of the User, that the Software will work properly, without technical
|
||||
* errors, quickly and uninterruptedly.
|
||||
*
|
||||
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
|
||||
* to the User for any direct or indirect losses of the User, his expenses or actual
|
||||
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
|
||||
* or damage to data, property, etc.
|
||||
* ~~~~~~/licensing~~~~~~
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.exception;
|
||||
|
||||
public class BundleNotFound extends RuntimeException {
|
||||
|
||||
public BundleNotFound() {
|
||||
super();
|
||||
}
|
||||
|
||||
public BundleNotFound(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
}
|
||||
|
||||
public BundleNotFound(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public BundleNotFound(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public BundleNotFound(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
}
|
@ -1,50 +0,0 @@
|
||||
/*-
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
* rights to the Software and any copies are the property of the Copyright Holder. Unless
|
||||
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
|
||||
* Software for commercial purposes to provide services to third parties.
|
||||
*
|
||||
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
|
||||
* Under no circumstances does the Copyright Holder guarantee or promise that the
|
||||
* Software provided by him will be suitable or not suitable for the specific purposes
|
||||
* of the User, that the Software will meet all commercial and personal subjective
|
||||
* expectations of the User, that the Software will work properly, without technical
|
||||
* errors, quickly and uninterruptedly.
|
||||
*
|
||||
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
|
||||
* to the User for any direct or indirect losses of the User, his expenses or actual
|
||||
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
|
||||
* or damage to data, property, etc.
|
||||
* ~~~~~~/licensing~~~~~~
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.exception;
|
||||
|
||||
public class ConnectorNotFound extends RuntimeException {
|
||||
|
||||
public ConnectorNotFound() {
|
||||
super();
|
||||
}
|
||||
|
||||
public ConnectorNotFound(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
}
|
||||
|
||||
public ConnectorNotFound(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public ConnectorNotFound(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public ConnectorNotFound(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
}
|
@ -1,50 +0,0 @@
|
||||
/*-
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
* rights to the Software and any copies are the property of the Copyright Holder. Unless
|
||||
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
|
||||
* Software for commercial purposes to provide services to third parties.
|
||||
*
|
||||
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
|
||||
* Under no circumstances does the Copyright Holder guarantee or promise that the
|
||||
* Software provided by him will be suitable or not suitable for the specific purposes
|
||||
* of the User, that the Software will meet all commercial and personal subjective
|
||||
* expectations of the User, that the Software will work properly, without technical
|
||||
* errors, quickly and uninterruptedly.
|
||||
*
|
||||
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
|
||||
* to the User for any direct or indirect losses of the User, his expenses or actual
|
||||
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
|
||||
* or damage to data, property, etc.
|
||||
* ~~~~~~/licensing~~~~~~
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.exception;
|
||||
|
||||
public class EsbNotFound extends RuntimeException {
|
||||
|
||||
public EsbNotFound() {
|
||||
super();
|
||||
}
|
||||
|
||||
public EsbNotFound(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
}
|
||||
|
||||
public EsbNotFound(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public EsbNotFound(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public EsbNotFound(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
}
|
@ -1,52 +0,0 @@
|
||||
/*-
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
* rights to the Software and any copies are the property of the Copyright Holder. Unless
|
||||
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
|
||||
* Software for commercial purposes to provide services to third parties.
|
||||
*
|
||||
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
|
||||
* Under no circumstances does the Copyright Holder guarantee or promise that the
|
||||
* Software provided by him will be suitable or not suitable for the specific purposes
|
||||
* of the User, that the Software will meet all commercial and personal subjective
|
||||
* expectations of the User, that the Software will work properly, without technical
|
||||
* errors, quickly and uninterruptedly.
|
||||
*
|
||||
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
|
||||
* to the User for any direct or indirect losses of the User, his expenses or actual
|
||||
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
|
||||
* or damage to data, property, etc.
|
||||
* ~~~~~~/licensing~~~~~~
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.exception;
|
||||
|
||||
public class ProfileNotFound extends RuntimeException {
|
||||
|
||||
private static final long serialVersionUID = 6701844035750412423L;
|
||||
|
||||
public ProfileNotFound() {
|
||||
super();
|
||||
}
|
||||
|
||||
public ProfileNotFound(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
}
|
||||
|
||||
public ProfileNotFound(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public ProfileNotFound(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public ProfileNotFound(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
}
|
@ -1,50 +0,0 @@
|
||||
/*-
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
* rights to the Software and any copies are the property of the Copyright Holder. Unless
|
||||
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
|
||||
* Software for commercial purposes to provide services to third parties.
|
||||
*
|
||||
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
|
||||
* Under no circumstances does the Copyright Holder guarantee or promise that the
|
||||
* Software provided by him will be suitable or not suitable for the specific purposes
|
||||
* of the User, that the Software will meet all commercial and personal subjective
|
||||
* expectations of the User, that the Software will work properly, without technical
|
||||
* errors, quickly and uninterruptedly.
|
||||
*
|
||||
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
|
||||
* to the User for any direct or indirect losses of the User, his expenses or actual
|
||||
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
|
||||
* or damage to data, property, etc.
|
||||
* ~~~~~~/licensing~~~~~~
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.exception;
|
||||
|
||||
public class TemplateNotFound extends RuntimeException {
|
||||
|
||||
public TemplateNotFound() {
|
||||
super();
|
||||
}
|
||||
|
||||
public TemplateNotFound(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
}
|
||||
|
||||
public TemplateNotFound(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public TemplateNotFound(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public TemplateNotFound(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
}
|
@ -1,76 +0,0 @@
|
||||
/*-
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
* rights to the Software and any copies are the property of the Copyright Holder. Unless
|
||||
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
|
||||
* Software for commercial purposes to provide services to third parties.
|
||||
*
|
||||
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
|
||||
* Under no circumstances does the Copyright Holder guarantee or promise that the
|
||||
* Software provided by him will be suitable or not suitable for the specific purposes
|
||||
* of the User, that the Software will meet all commercial and personal subjective
|
||||
* expectations of the User, that the Software will work properly, without technical
|
||||
* errors, quickly and uninterruptedly.
|
||||
*
|
||||
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
|
||||
* to the User for any direct or indirect losses of the User, his expenses or actual
|
||||
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
|
||||
* or damage to data, property, etc.
|
||||
* ~~~~~~/licensing~~~~~~
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.interceptor;
|
||||
|
||||
import org.apache.cxf.binding.soap.SoapMessage;
|
||||
import org.apache.cxf.binding.soap.interceptor.AbstractSoapInterceptor;
|
||||
import org.apache.cxf.headers.Header;
|
||||
import org.apache.cxf.interceptor.Fault;
|
||||
import org.apache.cxf.jaxb.JAXBDataBinding;
|
||||
import org.apache.cxf.message.Message;
|
||||
import org.apache.cxf.phase.Phase;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.xml.bind.JAXBException;
|
||||
import javax.xml.namespace.QName;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class SoapHeaderInterceptor extends AbstractSoapInterceptor {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SoapHeaderInterceptor.class);
|
||||
|
||||
private static final String HEADER_USER_LOGIN = "X-ForwardedUser";
|
||||
private static final String HEADER_CREATED_BY = "createdBy";
|
||||
private String namespaceUri = "";
|
||||
|
||||
public SoapHeaderInterceptor() {
|
||||
super(Phase.READ);
|
||||
}
|
||||
|
||||
public void handleMessage(SoapMessage message) throws Fault {
|
||||
try {
|
||||
Map<String, List<String>> headers = (Map<String, List<String>>) message.get(Message.PROTOCOL_HEADERS);
|
||||
List<Header> soapHeaders = message.getHeaders();
|
||||
soapHeaders.add(new Header(new QName(namespaceUri, HEADER_CREATED_BY),
|
||||
getSystemName(headers), new JAXBDataBinding(String.class)));
|
||||
message.put(Header.HEADER_LIST, soapHeaders);
|
||||
} catch (JAXBException e) {
|
||||
log.error("Error", e);
|
||||
throw new Fault(e);
|
||||
}
|
||||
}
|
||||
|
||||
private String getSystemName(Map<String, List<String>> headers) {
|
||||
List<String> list = headers.get(HEADER_USER_LOGIN);
|
||||
return list != null && !list.isEmpty() ? headers.get(HEADER_USER_LOGIN).get(0) : null;
|
||||
}
|
||||
|
||||
public void setNamespaceUri(String namespaceUri) {
|
||||
this.namespaceUri = namespaceUri;
|
||||
}
|
||||
}
|
@ -2,7 +2,7 @@
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* Copyright (C) 2020 - 2024 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
|
@ -2,7 +2,7 @@
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* Copyright (C) 2020 - 2024 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
|
@ -2,7 +2,7 @@
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* Copyright (C) 2020 - 2024 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
|
@ -2,7 +2,7 @@
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* Copyright (C) 2020 - 2024 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
|
@ -2,7 +2,7 @@
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* Copyright (C) 2020 - 2024 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
|
@ -2,7 +2,7 @@
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* Copyright (C) 2020 - 2024 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
|
@ -2,7 +2,7 @@
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* Copyright (C) 2020 - 2024 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
|
@ -2,7 +2,7 @@
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* Copyright (C) 2020 - 2024 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
|
@ -1,66 +0,0 @@
|
||||
/*-
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
* rights to the Software and any copies are the property of the Copyright Holder. Unless
|
||||
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
|
||||
* Software for commercial purposes to provide services to third parties.
|
||||
*
|
||||
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
|
||||
* Under no circumstances does the Copyright Holder guarantee or promise that the
|
||||
* Software provided by him will be suitable or not suitable for the specific purposes
|
||||
* of the User, that the Software will meet all commercial and personal subjective
|
||||
* expectations of the User, that the Software will work properly, without technical
|
||||
* errors, quickly and uninterruptedly.
|
||||
*
|
||||
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
|
||||
* to the User for any direct or indirect losses of the User, his expenses or actual
|
||||
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
|
||||
* or damage to data, property, etc.
|
||||
* ~~~~~~/licensing~~~~~~
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.util;
|
||||
|
||||
public class CustomHeader {
|
||||
|
||||
private String id;
|
||||
private String type;
|
||||
private String value;
|
||||
|
||||
public CustomHeader() {
|
||||
}
|
||||
|
||||
public CustomHeader(String id, String type, String value) {
|
||||
this.id = id;
|
||||
this.type = type;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public void setType(String type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public String getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public void setValue(String value) {
|
||||
this.value = value;
|
||||
}
|
||||
}
|
@ -1,189 +0,0 @@
|
||||
/*-
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
* rights to the Software and any copies are the property of the Copyright Holder. Unless
|
||||
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
|
||||
* Software for commercial purposes to provide services to third parties.
|
||||
*
|
||||
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
|
||||
* Under no circumstances does the Copyright Holder guarantee or promise that the
|
||||
* Software provided by him will be suitable or not suitable for the specific purposes
|
||||
* of the User, that the Software will meet all commercial and personal subjective
|
||||
* expectations of the User, that the Software will work properly, without technical
|
||||
* errors, quickly and uninterruptedly.
|
||||
*
|
||||
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
|
||||
* to the User for any direct or indirect losses of the User, his expenses or actual
|
||||
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
|
||||
* or damage to data, property, etc.
|
||||
* ~~~~~~/licensing~~~~~~
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.util;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import org.apache.camel.Exchange;
|
||||
import org.apache.xerces.dom.DocumentImpl;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.w3c.dom.Document;
|
||||
import org.w3c.dom.Node;
|
||||
import org.w3c.dom.NodeList;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class HeadersConverter {
|
||||
protected static final Logger log = LoggerFactory.getLogger(HeadersConverter.class);
|
||||
|
||||
private String customHeaders;
|
||||
private String customHeaderPrefix;
|
||||
private String namespace;
|
||||
|
||||
public void xml2camelHeaders(Exchange exchange) {
|
||||
NodeList nodes = exchange.getIn().getHeader(customHeaders, NodeList.class);
|
||||
|
||||
if (nodes == null) {
|
||||
log.warn(customHeaders + " not found");
|
||||
return;
|
||||
}
|
||||
|
||||
for (int i = 0; i < nodes.getLength(); ++i) {
|
||||
Node child = nodes.item(i).getFirstChild();
|
||||
String headerName = "";
|
||||
String headerValue = "";
|
||||
while (child != null) {
|
||||
if ("id".equals(child.getLocalName())) {
|
||||
headerName = customHeaderPrefix + child.getTextContent();
|
||||
} else if ("value".equals(child.getLocalName())) {
|
||||
headerValue = child.getTextContent();
|
||||
}
|
||||
child = child.getNextSibling();
|
||||
}
|
||||
if (!headerName.isEmpty()) {
|
||||
exchange.getIn().setHeader(headerName, headerValue);
|
||||
}
|
||||
}
|
||||
|
||||
log.debug("Parsed xml custom headers count {}", nodes.getLength());
|
||||
}
|
||||
|
||||
public void camelHeaders2xml(Exchange exchange) {
|
||||
Node item = null;
|
||||
Node child = null;
|
||||
Document xmlDoc = new DocumentImpl();
|
||||
Node list = xmlDoc.createElement("list");
|
||||
|
||||
for (Map.Entry<String, Object> entry : exchange.getIn().getHeaders().entrySet()) {
|
||||
if (entry.getKey().startsWith(customHeaderPrefix)) {
|
||||
String name = entry.getKey().substring(customHeaderPrefix.length());
|
||||
if (entry.getValue() == null) break;
|
||||
String value = entry.getValue().toString();
|
||||
|
||||
item = xmlDoc.createElementNS(namespace, "customHeader");
|
||||
|
||||
child = xmlDoc.createElementNS(namespace, "id");
|
||||
child.appendChild(xmlDoc.createTextNode(name));
|
||||
item.appendChild(child);
|
||||
|
||||
child = xmlDoc.createElementNS(namespace, "value");
|
||||
child.appendChild(xmlDoc.createTextNode(value));
|
||||
item.appendChild(child);
|
||||
|
||||
list.appendChild(item);
|
||||
}
|
||||
}
|
||||
|
||||
exchange.getIn().setHeader(customHeaders, list);
|
||||
}
|
||||
|
||||
public void xml2Json(Exchange exchange) {
|
||||
NodeList nodes = exchange.getIn().getHeader(customHeaders, NodeList.class);
|
||||
|
||||
if (nodes == null) {
|
||||
log.warn(customHeaders + " not found");
|
||||
return;
|
||||
}
|
||||
|
||||
List<CustomHeader> customHeaders = new ArrayList<>();
|
||||
for (int i = 0; i < nodes.getLength(); ++i) {
|
||||
Node child = nodes.item(i).getFirstChild();
|
||||
String headerName = "";
|
||||
String headerValue = "";
|
||||
String headerType = "";
|
||||
while (child != null) {
|
||||
if ("id".equals(child.getLocalName())) {
|
||||
headerName = child.getTextContent();
|
||||
} else if ("value".equals(child.getLocalName())) {
|
||||
headerValue = child.getTextContent();
|
||||
} else if ("type".equals(child.getLocalName())) {
|
||||
headerType = child.getTextContent();
|
||||
}
|
||||
child = child.getNextSibling();
|
||||
}
|
||||
if (!headerName.isEmpty()) {
|
||||
customHeaders.add(new CustomHeader(headerName, headerType, headerValue));
|
||||
}
|
||||
}
|
||||
Gson gson = new Gson();
|
||||
exchange.getIn().setHeader(this.customHeaders, gson.toJson(customHeaders));
|
||||
|
||||
log.debug("Parsed xml custom headers count {}", nodes.getLength());
|
||||
}
|
||||
|
||||
public void json2xml(Exchange exchange) {
|
||||
String headers = exchange.getIn().getHeader(customHeaders, String.class);
|
||||
|
||||
if (headers == null) {
|
||||
log.warn(customHeaders + " not found");
|
||||
return;
|
||||
}
|
||||
|
||||
Gson gson = new Gson();
|
||||
List<CustomHeader> customHeaders = gson.fromJson(headers, new TypeToken<ArrayList<CustomHeader>>() {
|
||||
}.getType());
|
||||
|
||||
Node item = null;
|
||||
Node child = null;
|
||||
Document xmlDoc = new DocumentImpl();
|
||||
Node list = xmlDoc.createElement("list");
|
||||
|
||||
for (CustomHeader customHeader : customHeaders) {
|
||||
item = xmlDoc.createElementNS(namespace, "customHeader");
|
||||
|
||||
child = xmlDoc.createElementNS(namespace, "id");
|
||||
child.appendChild(xmlDoc.createTextNode(customHeader.getId()));
|
||||
item.appendChild(child);
|
||||
|
||||
child = xmlDoc.createElementNS(namespace, "type");
|
||||
child.appendChild(xmlDoc.createTextNode(customHeader.getType()));
|
||||
item.appendChild(child);
|
||||
|
||||
child = xmlDoc.createElementNS(namespace, "value");
|
||||
child.appendChild(xmlDoc.createTextNode(customHeader.getValue()));
|
||||
item.appendChild(child);
|
||||
|
||||
list.appendChild(item);
|
||||
}
|
||||
|
||||
exchange.getIn().setHeader(this.customHeaders, list);
|
||||
}
|
||||
|
||||
public void setCustomHeaderPrefix(String customHeaderPrefix) {
|
||||
this.customHeaderPrefix = customHeaderPrefix;
|
||||
}
|
||||
|
||||
public void setCustomHeaders(String customHeaders) {
|
||||
this.customHeaders = customHeaders;
|
||||
}
|
||||
|
||||
public void setNamespace(String namespace) {
|
||||
this.namespace = namespace;
|
||||
}
|
||||
}
|
@ -1,157 +0,0 @@
|
||||
/*-
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
* rights to the Software and any copies are the property of the Copyright Holder. Unless
|
||||
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
|
||||
* Software for commercial purposes to provide services to third parties.
|
||||
*
|
||||
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
|
||||
* Under no circumstances does the Copyright Holder guarantee or promise that the
|
||||
* Software provided by him will be suitable or not suitable for the specific purposes
|
||||
* of the User, that the Software will meet all commercial and personal subjective
|
||||
* expectations of the User, that the Software will work properly, without technical
|
||||
* errors, quickly and uninterruptedly.
|
||||
*
|
||||
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
|
||||
* to the User for any direct or indirect losses of the User, his expenses or actual
|
||||
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
|
||||
* or damage to data, property, etc.
|
||||
* ~~~~~~/licensing~~~~~~
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.util;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Properties;
|
||||
|
||||
public class PropertiesHelper {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(PropertiesHelper.class.getName());
|
||||
|
||||
private static final String EMPTY_STRING = "";
|
||||
|
||||
private String configPath;
|
||||
private static final String karafEtcPath = System.getProperty("karaf.etc");
|
||||
private String configFile;
|
||||
|
||||
private Properties properties;
|
||||
|
||||
public PropertiesHelper() {
|
||||
}
|
||||
|
||||
public PropertiesHelper(String configPath) {
|
||||
LOG.debug("Set custom path: " + configPath + " file: " + configFile);
|
||||
this.configPath = configPath;
|
||||
}
|
||||
|
||||
public PropertiesHelper(String configPath, String configFile) {
|
||||
LOG.debug("Load properties from custom path: " + configPath + " file: " + configFile);
|
||||
this.configPath = configPath;
|
||||
this.configFile = configFile;
|
||||
loadProperties(configPath, configFile);
|
||||
}
|
||||
|
||||
public PropertiesHelper(String configFile, boolean loadKarafEtc) {
|
||||
this.configFile = configFile;
|
||||
LOG.debug("Load properties from karaf etc: " + karafEtcPath + " file: " + configFile);
|
||||
if (loadKarafEtc) {
|
||||
loadProperties(karafEtcPath, configFile);
|
||||
LOG.debug("Loaded properties: " + (properties != null ? properties.size() : "null"));
|
||||
}
|
||||
}
|
||||
|
||||
public Properties load() {
|
||||
if (this.configPath != null && !this.configPath.isEmpty()
|
||||
&& this.configFile != null && !this.configFile.isEmpty()) {
|
||||
return loadProperties(configPath, configFile);
|
||||
} else if (this.configFile != null && !this.configFile.isEmpty()) {
|
||||
return loadProperties(karafEtcPath, configFile);
|
||||
} else {
|
||||
throw new IllegalArgumentException("configPath OR configFile NOT SETTED");
|
||||
}
|
||||
}
|
||||
|
||||
protected Properties loadProperties(String path, String configFile) {
|
||||
try (InputStream input = new FileInputStream(path + File.separator + configFile)) {
|
||||
properties = new Properties();
|
||||
properties.load(input);
|
||||
} catch (IOException ex) {
|
||||
LOG.error(ex);
|
||||
}
|
||||
return properties;
|
||||
}
|
||||
|
||||
public long getInteger(String name) {
|
||||
return getInteger(name, 0);
|
||||
}
|
||||
|
||||
public long getInteger(String name, int defaultValue) {
|
||||
String value = this.properties.getProperty(name);
|
||||
return value != null && !EMPTY_STRING.equals(value) ? Integer.valueOf(value) : defaultValue;
|
||||
}
|
||||
|
||||
public int getInteger(String name, String defaultValue) {
|
||||
return Integer.valueOf(this.properties.getProperty(name, defaultValue));
|
||||
}
|
||||
|
||||
public long getLong(String name) {
|
||||
return getLong(name, 0L);
|
||||
}
|
||||
|
||||
public long getLong(String name, long defaultValue) {
|
||||
String value = this.properties.getProperty(name);
|
||||
return value != null && !EMPTY_STRING.equals(value) ? Long.valueOf(value) : defaultValue;
|
||||
}
|
||||
|
||||
public long getLong(String name, String defaultValue) {
|
||||
return Long.valueOf(this.properties.getProperty(name, defaultValue));
|
||||
}
|
||||
|
||||
public String getString(String name) {
|
||||
return this.properties.getProperty(name, EMPTY_STRING);
|
||||
}
|
||||
|
||||
public String getString(String name, String defaultValue) {
|
||||
return this.properties.getProperty(name, defaultValue);
|
||||
}
|
||||
|
||||
public String[] getStringArray(String name, String[] defaultValue) {
|
||||
String value = this.properties.getProperty(name, EMPTY_STRING);
|
||||
return !value.equals(EMPTY_STRING) ? value.split(",") : defaultValue;
|
||||
}
|
||||
|
||||
public String getConfigPath() {
|
||||
return configPath;
|
||||
}
|
||||
|
||||
public void setConfigPath(String configPath) {
|
||||
this.configPath = configPath;
|
||||
}
|
||||
|
||||
public String getConfigFile() {
|
||||
return configFile;
|
||||
}
|
||||
|
||||
public void setConfigFile(String configFile) {
|
||||
this.configFile = configFile;
|
||||
}
|
||||
|
||||
public Properties getProperties() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
public void setProperties(Properties properties) {
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
}
|
@ -1,56 +0,0 @@
|
||||
/*-
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
* rights to the Software and any copies are the property of the Copyright Holder. Unless
|
||||
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
|
||||
* Software for commercial purposes to provide services to third parties.
|
||||
*
|
||||
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
|
||||
* Under no circumstances does the Copyright Holder guarantee or promise that the
|
||||
* Software provided by him will be suitable or not suitable for the specific purposes
|
||||
* of the User, that the Software will meet all commercial and personal subjective
|
||||
* expectations of the User, that the Software will work properly, without technical
|
||||
* errors, quickly and uninterruptedly.
|
||||
*
|
||||
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
|
||||
* to the User for any direct or indirect losses of the User, his expenses or actual
|
||||
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
|
||||
* or damage to data, property, etc.
|
||||
* ~~~~~~/licensing~~~~~~
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.util;
|
||||
|
||||
import org.apache.camel.Exchange;
|
||||
import org.apache.camel.spi.HeaderFilterStrategy;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
public class SimpleOutHeaderFilterStrategy implements HeaderFilterStrategy {
|
||||
|
||||
private Set<String> outFilter;
|
||||
|
||||
public void setOutFilter(Set<String> value) {
|
||||
if (value == null) {
|
||||
outFilter = new HashSet<>();
|
||||
} else {
|
||||
outFilter = value;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean applyFilterToCamelHeaders(String headerName, Object headerValue, Exchange exchange) {
|
||||
return !outFilter.contains(headerName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean applyFilterToExternalHeaders(String headerName, Object headerValue, Exchange exchange) {
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
@ -1,38 +0,0 @@
|
||||
/*-
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
* rights to the Software and any copies are the property of the Copyright Holder. Unless
|
||||
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
|
||||
* Software for commercial purposes to provide services to third parties.
|
||||
*
|
||||
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
|
||||
* Under no circumstances does the Copyright Holder guarantee or promise that the
|
||||
* Software provided by him will be suitable or not suitable for the specific purposes
|
||||
* of the User, that the Software will meet all commercial and personal subjective
|
||||
* expectations of the User, that the Software will work properly, without technical
|
||||
* errors, quickly and uninterruptedly.
|
||||
*
|
||||
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
|
||||
* to the User for any direct or indirect losses of the User, his expenses or actual
|
||||
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
|
||||
* or damage to data, property, etc.
|
||||
* ~~~~~~/licensing~~~~~~
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.util;
|
||||
|
||||
public class SystemHeadersConstants {
|
||||
public static final String HEADER_USER_LOGIN = "X-ForwardedUser";
|
||||
public static final String HEADER_USER_ID = "X-ForwardedUserId";
|
||||
public static final String HEADER_SYSTEM_NAME = "X-SystemName";
|
||||
public static final String HEADER_SYSTEM_UUID = "X-SystemUuid";
|
||||
public static final String HEADER_SYSTEM_ID = "X-SystemId";
|
||||
public static final String HEADER_IS_DISPLAY_SERVICE_SERVICE = "NTX_IsDisplayServiceSchema";
|
||||
|
||||
private SystemHeadersConstants() {
|
||||
}
|
||||
}
|
@ -1,122 +0,0 @@
|
||||
/*-
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
* rights to the Software and any copies are the property of the Copyright Holder. Unless
|
||||
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
|
||||
* Software for commercial purposes to provide services to third parties.
|
||||
*
|
||||
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
|
||||
* Under no circumstances does the Copyright Holder guarantee or promise that the
|
||||
* Software provided by him will be suitable or not suitable for the specific purposes
|
||||
* of the User, that the Software will meet all commercial and personal subjective
|
||||
* expectations of the User, that the Software will work properly, without technical
|
||||
* errors, quickly and uninterruptedly.
|
||||
*
|
||||
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
|
||||
* to the User for any direct or indirect losses of the User, his expenses or actual
|
||||
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
|
||||
* or damage to data, property, etc.
|
||||
* ~~~~~~/licensing~~~~~~
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.validator;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.cxf.binding.soap.SoapMessage;
|
||||
import org.apache.cxf.binding.soap.saaj.SAAJInInterceptor;
|
||||
import org.apache.cxf.interceptor.Fault;
|
||||
import org.apache.cxf.phase.AbstractPhaseInterceptor;
|
||||
import org.apache.cxf.phase.Phase;
|
||||
import org.apache.cxf.service.Service;
|
||||
import org.apache.cxf.service.model.ServiceModelUtil;
|
||||
import org.apache.cxf.ws.addressing.EndpointReferenceUtils;
|
||||
import org.w3c.dom.Element;
|
||||
import org.w3c.dom.Node;
|
||||
import org.xml.sax.SAXException;
|
||||
|
||||
import javax.xml.soap.SOAPMessage;
|
||||
import javax.xml.stream.XMLStreamException;
|
||||
import javax.xml.transform.dom.DOMSource;
|
||||
import javax.xml.validation.Schema;
|
||||
import javax.xml.validation.Validator;
|
||||
import javax.xml.xpath.XPathExpressionException;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
public class ValidateInterceptor extends AbstractPhaseInterceptor<SoapMessage> {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ValidateInterceptor.class);
|
||||
|
||||
private final SAAJInInterceptor saajIn;
|
||||
private final XmlParser xmlParser;
|
||||
private boolean schemaValidationEnabled;
|
||||
|
||||
public ValidateInterceptor() {
|
||||
super(Phase.PRE_PROTOCOL);
|
||||
saajIn = new SAAJInInterceptor();
|
||||
xmlParser = new XmlParser();
|
||||
|
||||
getAfter().add(SAAJInInterceptor.class.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleMessage(SoapMessage message) throws Fault {
|
||||
try {
|
||||
Node body = getMessageBody((DOMSource) getSOAPMessage(message).getSOAPPart().getContent());
|
||||
if (body != null)
|
||||
validate(body, message);
|
||||
else
|
||||
throw new XMLStreamException("Can't find the tag \"Body\"");
|
||||
} catch (RuntimeException re) {
|
||||
throw re;
|
||||
} catch (Exception e) {
|
||||
throw new Fault(e);
|
||||
}
|
||||
}
|
||||
|
||||
private Node getMessageBody(DOMSource source) throws XPathExpressionException {
|
||||
Node node = source.getNode().cloneNode(true);
|
||||
List<Node> nodeList = xmlParser.getNodes(node.getLastChild(), "Body");
|
||||
return !nodeList.isEmpty() ? nodeList.get(0) : null;
|
||||
}
|
||||
|
||||
private void validate(Node node, SoapMessage soapMessage) throws IOException, SAXException, XPathExpressionException {
|
||||
Validator validator = getValidator(soapMessage);
|
||||
validator.validate(new DOMSource(getNodeForValidate(node)));
|
||||
}
|
||||
|
||||
private Node getNodeForValidate(Node node) throws XPathExpressionException {
|
||||
if (schemaValidationEnabled && node.getLocalName().contains("packets")) {
|
||||
Element element = (Element) node;
|
||||
for (Node content : xmlParser.getNodes(element, "content")) {
|
||||
content.getParentNode().removeChild(content);
|
||||
}
|
||||
return element;
|
||||
}
|
||||
return node;
|
||||
}
|
||||
|
||||
private Validator getValidator(SoapMessage soapMessage) {
|
||||
Service service = ServiceModelUtil.getService(soapMessage.getExchange());
|
||||
Schema schema = EndpointReferenceUtils.getSchema(service.getServiceInfos().get(0), soapMessage.getExchange().getBus());
|
||||
return schema.newValidator();
|
||||
}
|
||||
|
||||
private SOAPMessage getSOAPMessage(SoapMessage smsg) {
|
||||
SOAPMessage soapMessage = smsg.getContent(SOAPMessage.class);
|
||||
if (soapMessage == null) {
|
||||
saajIn.handleMessage(smsg);
|
||||
soapMessage = smsg.getContent(SOAPMessage.class);
|
||||
}
|
||||
return soapMessage;
|
||||
}
|
||||
|
||||
public void setSchemaValidationEnabled(boolean schemaValidationEnabled) {
|
||||
this.schemaValidationEnabled = schemaValidationEnabled;
|
||||
}
|
||||
}
|
@ -1,56 +0,0 @@
|
||||
/*-
|
||||
* ~~~~~~licensing~~~~~~
|
||||
* system-commons
|
||||
* ==========
|
||||
* Copyright (C) 2020 - 2023 EmDev LLC
|
||||
* ==========
|
||||
* You may not use this file except in accordance with the License Terms of the Copyright
|
||||
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
* rights to the Software and any copies are the property of the Copyright Holder. Unless
|
||||
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
|
||||
* Software for commercial purposes to provide services to third parties.
|
||||
*
|
||||
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
|
||||
* Under no circumstances does the Copyright Holder guarantee or promise that the
|
||||
* Software provided by him will be suitable or not suitable for the specific purposes
|
||||
* of the User, that the Software will meet all commercial and personal subjective
|
||||
* expectations of the User, that the Software will work properly, without technical
|
||||
* errors, quickly and uninterruptedly.
|
||||
*
|
||||
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
|
||||
* to the User for any direct or indirect losses of the User, his expenses or actual
|
||||
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
|
||||
* or damage to data, property, etc.
|
||||
* ~~~~~~/licensing~~~~~~
|
||||
*/
|
||||
package ru.entaxy.esb.system.common.validator;
|
||||
|
||||
import org.w3c.dom.Node;
|
||||
import org.w3c.dom.NodeList;
|
||||
|
||||
import javax.xml.xpath.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class XmlParser {
|
||||
|
||||
public List<Node> getNodes(Node node, String elementName) throws XPathExpressionException {
|
||||
XPath xpath = XPathFactory.newInstance().newXPath();
|
||||
|
||||
XPathExpression expr = xpath.compile("//*[local-name()='" + elementName + "']/child::node()");
|
||||
NodeList nodeList = (NodeList) expr.evaluate(node.getOwnerDocument(), XPathConstants.NODESET);
|
||||
|
||||
return getNotNullNodes(nodeList);
|
||||
}
|
||||
|
||||
private List<Node> getNotNullNodes(NodeList nodeList) {
|
||||
List<Node> result = new ArrayList<>();
|
||||
for (int i = 0; i < nodeList.getLength(); i++) {
|
||||
Node node = nodeList.item(i);
|
||||
if (node.getLocalName() != null) {
|
||||
result.add(node);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
@ -3,7 +3,7 @@
|
||||
~~~~~~licensing~~~~~~
|
||||
system-commons
|
||||
==========
|
||||
Copyright (C) 2020 - 2023 EmDev LLC
|
||||
Copyright (C) 2020 - 2024 EmDev LLC
|
||||
==========
|
||||
You may not use this file except in accordance with the License Terms of the Copyright
|
||||
Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
|
@ -2,7 +2,7 @@
|
||||
# ~~~~~~licensing~~~~~~
|
||||
# system-commons
|
||||
# ==========
|
||||
# Copyright (C) 2020 - 2023 EmDev LLC
|
||||
# Copyright (C) 2020 - 2024 EmDev LLC
|
||||
# ==========
|
||||
# You may not use this file except in accordance with the License Terms of the Copyright
|
||||
# Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
|
||||
|
Reference in New Issue
Block a user