release version 1.12.0

This commit is contained in:
2026-02-18 23:32:39 +03:00
parent 24ce86f470
commit 5d0e27b3e2
2858 changed files with 18366 additions and 113588 deletions

View File

@@ -3,7 +3,7 @@
<parent>
<groupId>ru.entaxy.esb.platform.runtime.core</groupId>
<artifactId>cluster</artifactId>
<version>1.11.0</version>
<version>1.12.0</version>
</parent>
<groupId>ru.entaxy.esb.platform.runtime.core.cluster</groupId>
<artifactId>cluster-persistence-service</artifactId>
@@ -54,15 +54,10 @@
<artifactId>org.osgi.compendium</artifactId>
<version>${osgi.compendium.version}</version>
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast-all</artifactId>
<version>${hazelcast.version}</version>
</dependency>
<dependency>
<groupId>org.apache.karaf.cellar</groupId>
<groupId>ru.entaxy.cellar</groupId>
<artifactId>org.apache.karaf.cellar.core</artifactId>
<version>${cellar.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.karaf</groupId>
@@ -71,9 +66,8 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.karaf.cellar</groupId>
<groupId>ru.entaxy.cellar</groupId>
<artifactId>org.apache.karaf.cellar.bundle</artifactId>
<version>${cellar.version}-ENTAXY</version>
<exclusions>
<exclusion>
<groupId>org.apache.karaf</groupId>
@@ -82,9 +76,8 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.karaf.cellar</groupId>
<groupId>ru.entaxy.cellar</groupId>
<artifactId>org.apache.karaf.cellar.features</artifactId>
<version>${cellar.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.karaf</groupId>
@@ -93,9 +86,8 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.karaf.cellar</groupId>
<groupId>ru.entaxy.cellar</groupId>
<artifactId>org.apache.karaf.cellar.config</artifactId>
<version>${cellar.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.karaf</groupId>
@@ -104,9 +96,8 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.karaf.cellar</groupId>
<groupId>ru.entaxy.cellar</groupId>
<artifactId>org.apache.karaf.cellar.hazelcast</artifactId>
<version>${cellar.version}</version>
</dependency>
</dependencies>

View File

@@ -2,7 +2,7 @@
* ~~~~~~licensing~~~~~~
* cluster-persistence-service
* ==========
* Copyright (C) 2020 - 2025 EmDev LLC
* Copyright (C) 2020 - 2026 EmDev LLC
* ==========
* You may not use this file except in accordance with the License Terms of the Copyright
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
@@ -32,7 +32,6 @@ import java.nio.file.Paths;
import java.util.Map;
import java.util.stream.Collectors;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import org.apache.karaf.cellar.core.ClusterManager;
import org.apache.karaf.cellar.core.Configurations;
import org.slf4j.Logger;
@@ -100,7 +99,7 @@ public class Helper<T> {
} catch (IOException e) {
log.error("FAILED saving state to [" + p.toUri().toString() + "]", e);
}
} catch (HazelcastInstanceNotActiveException e) {
} catch (IllegalStateException e) {
log.error("Hazelcast instance is not active for group [" + groupName + "]", e);
}

View File

@@ -2,7 +2,7 @@
* ~~~~~~licensing~~~~~~
* cluster-persistence-service
* ==========
* Copyright (C) 2020 - 2025 EmDev LLC
* Copyright (C) 2020 - 2026 EmDev LLC
* ==========
* You may not use this file except in accordance with the License Terms of the Copyright
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
@@ -38,87 +38,32 @@ public class PersistenceManager<T> {
private static final Logger log = LoggerFactory.getLogger(PersistenceManager.class);
private static class UpdateMarker {
private boolean updated = false;
public void update() {
this.updated = true;
}
public void clear() {
this.updated = false;
}
public boolean isUpdated() {
return updated;
}
}
private static interface UpdateTaskCallback {
public void executed();
}
private class Executor implements UpdateTaskCallback {
private Boolean isRunning = false;
private ExecutorService threadPool = Executors.newSingleThreadExecutor();
public synchronized void run() {
synchronized (isRunning) {
if (this.isRunning)
return;
this.isRunning = true;
log.debug("~~> EXECUTOR :: RUN");
UpdateTask ut = new UpdateTask(PersistenceManager.this.helper, this);
PersistenceManager.this.clear();
threadPool.execute(ut);
}
}
@Override
public synchronized void executed() {
log.debug("~~> EXECUTOR :: EXECUTED");
synchronized (isRunning) {
this.isRunning = false;
}
PersistenceManager.this.executed();
}
}
protected ExecutorService threadPool = Executors.newSingleThreadExecutor();
private static class UpdateTask implements Runnable {
private Helper<?> helper;
private UpdateTaskCallback callback;
private String groupName;
public UpdateTask(Helper<?> helper, UpdateTaskCallback callback) {
public UpdateTask(Helper<?> helper, String groupName) {
this.helper = helper;
this.callback = callback;
this.groupName = groupName;
}
@Override
public void run() {
log.debug("~~> UPDATE_TASK :: RUN");
// TODO add support for different groups
this.helper.saveClusterState("default");
Thread.currentThread().setContextClassLoader(PersistenceManager.class.getClassLoader());
log.debug("~~> UPDATE_TASK :: RUN :: " + groupName);
this.helper.saveClusterState(groupName);
log.debug("~~> UPDATE_TASK :: RUN :: processed");
this.callback.executed();
}
}
private final UpdateMarker marker = new UpdateMarker();
private Helper<T> helper;
private Executor executor = new Executor();
private List<PersistenceManager<?>> dependents = new ArrayList<>();
@@ -137,47 +82,19 @@ public class PersistenceManager<T> {
}
}
public void restore() {
public void restore(String groupName) {
// this is called only once before the consumer started
// so we can call helper directly
helper.loadClusterState("default");
helper.loadClusterState(groupName);
}
public void persist() {
this.updated();
public void persist(String groupName) {
this.updated(groupName);
}
protected void runExecutor() {
synchronized (executor) {
if (!executor.isRunning)
executor.run();
}
}
public void updated() {
public void updated(String groupName) {
log.debug("~~> UPDATED");
synchronized (marker) {
marker.update();
}
runExecutor();
synchronized (dependentsLock) {
for (PersistenceManager<?> dep : dependents)
dep.updated();
}
threadPool.execute(new UpdateTask(helper, groupName));
}
protected synchronized void clear() {
synchronized (marker) {
marker.clear();
}
}
protected synchronized void executed() {
boolean updated = false;
synchronized (marker) {
updated = marker.isUpdated();
}
if (updated)
runExecutor();
}
}

View File

@@ -2,7 +2,7 @@
* ~~~~~~licensing~~~~~~
* cluster-persistence-service
* ==========
* Copyright (C) 2020 - 2025 EmDev LLC
* Copyright (C) 2020 - 2026 EmDev LLC
* ==========
* You may not use this file except in accordance with the License Terms of the Copyright
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property

View File

@@ -2,7 +2,7 @@
* ~~~~~~licensing~~~~~~
* cluster-persistence-service
* ==========
* Copyright (C) 2020 - 2025 EmDev LLC
* Copyright (C) 2020 - 2026 EmDev LLC
* ==========
* You may not use this file except in accordance with the License Terms of the Copyright
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property

View File

@@ -2,7 +2,7 @@
* ~~~~~~licensing~~~~~~
* hazelcast-test
* ==========
* Copyright (C) 2020 - 2025 EmDev LLC
* Copyright (C) 2020 - 2026 EmDev LLC
* ==========
* You may not use this file except in accordance with the License Terms of the Copyright
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
@@ -25,16 +25,21 @@
*/
package ru.entaxy.esb.platform.core.cluster.persistence.activator;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.karaf.cellar.bundle.BundleState;
import org.apache.karaf.cellar.bundle.Constants;
import org.apache.karaf.cellar.bundle.management.CellarBundleMBean;
import org.apache.karaf.cellar.core.ClusterManager;
import org.apache.karaf.cellar.core.Group;
import org.apache.karaf.cellar.core.GroupManager;
import org.apache.karaf.cellar.core.event.EventHandler;
import org.apache.karaf.cellar.core.event.EventTransportFactory;
import org.apache.karaf.cellar.features.FeatureState;
import org.apache.karaf.features.FeaturesService;
import org.apache.karaf.util.tracker.BaseActivator;
@@ -45,21 +50,12 @@ import org.osgi.service.cm.ConfigurationAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.Member;
import com.hazelcast.core.MemberAttributeEvent;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;
import ru.entaxy.esb.platform.core.cluster.persistence.PersistenceManager;
import ru.entaxy.esb.platform.core.cluster.persistence.ServiceProvider;
import ru.entaxy.esb.platform.core.cluster.persistence.handler.LocalBundleEventHandler;
import ru.entaxy.esb.platform.core.cluster.persistence.handler.LocalConfigurationEventHandler;
import ru.entaxy.esb.platform.core.cluster.persistence.handler.LocalEventHandlerRegistryDispatcher;
import ru.entaxy.esb.platform.core.cluster.persistence.handler.LocalFeaturesEventHandler;
import ru.entaxy.esb.platform.core.cluster.persistence.handler.LocalHandlerRegistry;
import ru.entaxy.esb.platform.core.cluster.persistence.handler.LocalRepositoryEventHandler;
import ru.entaxy.esb.platform.core.cluster.persistence.handler.LocalTopicConsumer;
@Services(requires = {
@RequireService(ClusterManager.class),
@@ -67,12 +63,13 @@ import ru.entaxy.esb.platform.core.cluster.persistence.handler.LocalTopicConsume
@RequireService(CellarBundleMBean.class),
@RequireService(ConfigurationAdmin.class),
@RequireService(FeaturesService.class),
@RequireService(HazelcastInstance.class)
// @RequireService(HazelcastInstance.class),
@RequireService(EventTransportFactory.class)
},
provides = {
@ProvideService(EventHandler.class)
})
public class Activator extends BaseActivator implements ServiceProvider, MembershipListener {
public class Activator extends BaseActivator implements ServiceProvider {
private static final String CLASSIFIER_BUNDLE = "bundle";
@@ -84,6 +81,17 @@ public class Activator extends BaseActivator implements ServiceProvider, Members
private static final Logger log = LoggerFactory.getLogger(BaseActivator.class);
// private LocalTopicConsumer consumer;
// private EventConsumer<?> eventConsumer;
// private LocalEventHandlerRegistryDispatcher dispatcher;
// private LocalHandlerRegistry handlerRegistry;
// event handlers
private LocalBundleEventHandler bundleEventHandler;
private LocalConfigurationEventHandler configEventHandler;
@@ -92,17 +100,15 @@ public class Activator extends BaseActivator implements ServiceProvider, Members
private LocalFeaturesEventHandler featuresEventHandler;
private LocalTopicConsumer consumer;
private LocalEventHandlerRegistryDispatcher dispatcher;
private LocalHandlerRegistry handlerRegistry;
// services
private ClusterManager clusterManager;
private GroupManager groupManager;
private HazelcastInstance hazelcastInstance;
private EventTransportFactory eventTransportFactory;
// private HazelcastInstance hazelcastInstance;
private Map<String, PersistenceManager> managers = new ConcurrentHashMap<>();
@@ -114,18 +120,30 @@ public class Activator extends BaseActivator implements ServiceProvider, Members
groupManager = getTrackedService(GroupManager.class);
if (groupManager == null)
return;
eventTransportFactory = getTrackedService(EventTransportFactory.class);
if (eventTransportFactory == null)
return;
ConfigurationAdmin configurationAdmin = getTrackedService(ConfigurationAdmin.class);
if (configurationAdmin == null)
return;
FeaturesService featuresService = getTrackedService(FeaturesService.class);
if (featuresService == null)
return;
/*
hazelcastInstance = getTrackedService(HazelcastInstance.class);
if (hazelcastInstance == null)
return;
*/
handlerRegistry = new LocalHandlerRegistry<>();
// handlerRegistry = new LocalHandlerRegistry<>();
/*
* props for registration
*/
Hashtable props = new Hashtable();
props.put("managed", "true");
props.put(org.osgi.framework.Constants.SERVICE_RANKING, 100);
/*
* bundle management
@@ -143,7 +161,8 @@ public class Activator extends BaseActivator implements ServiceProvider, Members
bundleEventHandler.setFeaturesService(featuresService);
bundleEventHandler.setPersistenceManager(bundlePersistenceManager);
bundleEventHandler.init();
handlerRegistry.addHandler(bundleEventHandler);
// handlerRegistry.addHandler(bundleEventHandler);
register(EventHandler.class, bundleEventHandler, props);
log.debug("Cluster bundleEventHandler started");
/*
@@ -161,7 +180,8 @@ public class Activator extends BaseActivator implements ServiceProvider, Members
configEventHandler.setGroupManager(groupManager);
configEventHandler.setPersistenceManager(configPersistenceManager);
configEventHandler.init();
handlerRegistry.addHandler(configEventHandler);
// handlerRegistry.addHandler(configEventHandler);
register(EventHandler.class, configEventHandler, props);
log.debug("Cluster configEventHandler started");
/*
@@ -182,7 +202,8 @@ public class Activator extends BaseActivator implements ServiceProvider, Members
repositoryEventHandler.setFeaturesService(featuresService);
repositoryEventHandler.setPersistenceManager(repositoryPersistenceManager);
repositoryEventHandler.init(bundleContext);
handlerRegistry.addHandler(repositoryEventHandler);
// handlerRegistry.addHandler(repositoryEventHandler);
register(EventHandler.class, repositoryEventHandler, props);
log.debug("Cluster repositoryEventHandler started");
/*
@@ -202,7 +223,8 @@ public class Activator extends BaseActivator implements ServiceProvider, Members
featuresEventHandler.setFeaturesService(featuresService);
featuresEventHandler.setPersistenceManager(featurePersistenceManager);
featuresEventHandler.init(bundleContext);
handlerRegistry.addHandler(featuresEventHandler);
// handlerRegistry.addHandler(featuresEventHandler);
register(EventHandler.class, featuresEventHandler, props);
repositoryPersistenceManager.addDependent(featurePersistenceManager);
@@ -218,53 +240,92 @@ public class Activator extends BaseActivator implements ServiceProvider, Members
* common services
*/
dispatcher = new LocalEventHandlerRegistryDispatcher<>();
dispatcher.setHandlerRegistry(handlerRegistry);
dispatcher.init();
// dispatcher = new LocalEventHandlerRegistryDispatcher<>();
// dispatcher.setHandlerRegistry(handlerRegistry);
// dispatcher.init();
// eventConsumer = eventTransportFactory.getEventConsumer(null, true, dispatcher);
/*
consumer = new LocalTopicConsumer();
consumer.setInstance(hazelcastInstance);
consumer.setDispatcher(dispatcher);
consumer.setNode(clusterManager.getNode());
consumer.setConfigurationAdmin(configurationAdmin);
consumer.init();
*/
if (iAmAlone()) {
restoreAll();
} else {
for (Group group : getGroupsImAlone())
restore(group);
}
consumer.start();
// consumer.start();
// eventConsumer.start();
log.debug("Cluster consumer started");
}
/*
protected void checkStartStopConsumer() {
if (iAmAlone())
consumer.start();
eventConsumer.start();
// consumer.start();
else
consumer.stop();
eventConsumer.stop();
// consumer.stop();
}
*/
protected void restore(Group group) {
for (PersistenceManager pm : managers.values())
pm.restore(group.getName());
}
protected void restoreAll() {
for (PersistenceManager pm : managers.values())
pm.restore();
for (Group group : groupManager.listLocalGroups())
pm.restore(group.getName());
}
protected void persistAll() {
for (PersistenceManager pm : managers.values())
pm.persist();
for (Group group : groupManager.listLocalGroups())
pm.persist(group.getName());
}
protected boolean iAmAlone() {
if (hazelcastInstance.getCluster().getMembers().size() > 1)
if (clusterManager.listNodes().size() > 1)
return false;
return true;
}
protected Set<Group> getGroupsImAlone() {
Set<Group> result = new HashSet<>();
for (Group group : groupManager.listLocalGroups())
if (clusterManager.listNodesByGroup(group).size() < 2)
result.add(group);
return result;
}
@Override
protected void doStop() {
if (consumer != null)
consumer.stop();
/*if (consumer != null)
consumer.stop();*/
/*if (eventConsumer != null)
eventConsumer.stop(); */
if (bundleEventHandler != null)
bundleEventHandler.destroy();
if (configEventHandler != null)
configEventHandler.destroy();
if (featuresEventHandler != null)
featuresEventHandler.destroy();
if (repositoryEventHandler != null)
repositoryEventHandler.destroy();
super.doStop();
}
@@ -278,29 +339,4 @@ public class Activator extends BaseActivator implements ServiceProvider, Members
return groupManager;
}
@Override
public void memberAdded(MembershipEvent membershipEvent) {
Member member = membershipEvent.getMember();
Member local = hazelcastInstance.getCluster().getLocalMember();
checkStartStopConsumer();
}
@Override
public void memberAttributeChanged(MemberAttributeEvent event) {
// TODO Auto-generated method stub
}
@Override
public void memberRemoved(MembershipEvent membershipEvent) {
Member member = membershipEvent.getMember();
Member local = hazelcastInstance.getCluster().getLocalMember();
if (member.equals(local))
consumer.stop();
else if (iAmAlone()) {
// all other nodes gone
persistAll();
consumer.start();
}
}
}

View File

@@ -2,7 +2,7 @@
* ~~~~~~licensing~~~~~~
* cluster-persistence-service
* ==========
* Copyright (C) 2020 - 2025 EmDev LLC
* Copyright (C) 2020 - 2026 EmDev LLC
* ==========
* You may not use this file except in accordance with the License Terms of the Copyright
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
@@ -117,8 +117,8 @@ public class LocalBundleEventHandler extends BundleSupport implements EventHandl
}
// TODO mark cluster state as needed to be saved
log.debug("-->> WE NEED TO SAVE CLUSTER STATE");
this.persistenceManager.updated();
log.info("-->> WE NEED TO SAVE CLUSTER STATE");
this.persistenceManager.updated(event.getSourceGroup().getName());
} else
log.trace("CELLAR BUNDLE: bundle {} is marked BLOCKED INBOUND for cluster group {}",

View File

@@ -2,7 +2,7 @@
* ~~~~~~licensing~~~~~~
* cluster-persistence-service
* ==========
* Copyright (C) 2020 - 2025 EmDev LLC
* Copyright (C) 2020 - 2026 EmDev LLC
* ==========
* You may not use this file except in accordance with the License Terms of the Copyright
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
@@ -93,10 +93,12 @@ public class LocalConfigurationEventHandler extends ConfigurationSupport
}
// @ENTAXY:SKIP check if it's not a "local" event
//if (event.getLocal() != null && event.getLocal().getId().equalsIgnoreCase(clusterManager.getNode().getId())) {
// LOGGER.trace("CELLAR CONFIG: cluster event is local (coming from local synchronizer or listener)");
// return;
//}
// if (event.getLocal() != null &&
// event.getLocal().getId().equalsIgnoreCase(clusterManager.getNode().getId())) {
// LOGGER.trace("CELLAR CONFIG: cluster event is local (coming from local synchronizer or
// listener)");
// return;
// }
Group group = event.getSourceGroup();
String groupName = group.getName();
@@ -110,7 +112,7 @@ public class LocalConfigurationEventHandler extends ConfigurationSupport
try {
// TODO mark cluster state as needed to be saved
LOGGER.debug("-->> WE NEED TO SAVE CLUSTER STATE");
this.persistenceManager.updated();
this.persistenceManager.updated(event.getSourceGroup().getName());
} catch (Exception ex) {
LOGGER.error("CELLAR CONFIG: failed to read cluster configuration", ex);
}

View File

@@ -1,127 +0,0 @@
/*-
* ~~~~~~licensing~~~~~~
* cluster-persistence-service
* ==========
* Copyright (C) 2020 - 2025 EmDev LLC
* ==========
* You may not use this file except in accordance with the License Terms of the Copyright
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
* rights to the Software and any copies are the property of the Copyright Holder. Unless
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
* Software for commercial purposes to provide services to third parties.
*
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
* Under no circumstances does the Copyright Holder guarantee or promise that the
* Software provided by him will be suitable or not suitable for the specific purposes
* of the User, that the Software will meet all commercial and personal subjective
* expectations of the User, that the Software will work properly, without technical
* errors, quickly and uninterruptedly.
*
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
* to the User for any direct or indirect losses of the User, his expenses or actual
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
* or damage to data, property, etc.
* ~~~~~~/licensing~~~~~~
*/
/*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package ru.entaxy.esb.platform.core.cluster.persistence.handler;
import org.apache.karaf.cellar.core.event.Event;
import org.apache.karaf.cellar.core.event.EventHandler;
import org.apache.karaf.cellar.core.event.EventHandlerRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Event dispatcher task.
*/
public class LocalEventDispatchTask<E extends Event> implements Runnable {
private static final transient Logger LOGGER = LoggerFactory.getLogger(LocalEventDispatchTask.class);
private static final long DEFAULT_TIMEOUT = 30000;
private E event;
private EventHandlerRegistry handlerRegistry;
private long timeout;
private long interval = 1000;
public LocalEventDispatchTask(E event, EventHandlerRegistry handlerRegistry) {
this.event = event;
this.handlerRegistry = handlerRegistry;
if (System.getProperty("cellar.timeout") != null) {
try {
this.timeout = Long.parseLong(System.getProperty("cellar.timeout"));
} catch (Exception e) {
this.timeout = DEFAULT_TIMEOUT;
}
} else {
this.timeout = DEFAULT_TIMEOUT;
}
}
public LocalEventDispatchTask(E event, EventHandlerRegistry handlerRegistry, long timeout) {
this.event = event;
this.handlerRegistry = handlerRegistry;
this.timeout = timeout;
}
public LocalEventDispatchTask(EventHandlerRegistry handlerRegistry, long timeout, long interval, E event) {
this.handlerRegistry = handlerRegistry;
this.timeout = timeout;
this.interval = interval;
this.event = event;
}
@Override
public void run() {
try {
boolean dispatched = false;
for (long delay = 0; delay < timeout && !dispatched; delay += interval) {
EventHandler handler = handlerRegistry.getHandler(event);
if (handler != null) {
handler.handle(event);
dispatched = true;
} else {
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
LOGGER.warn("Interrupted while waiting for cluster event handler", e);
}
}
}
if (!dispatched) {
LOGGER.trace("Failed to retrieve handler for cluster event {}", event.getClass());
}
} catch (Exception ex) {
LOGGER.error("Error while dispatching task", ex);
}
}
public long getTimeout() {
return timeout;
}
public void setTimeout(long timeout) {
this.timeout = timeout;
}
public long getInterval() {
return interval;
}
public void setInterval(long interval) {
this.interval = interval;
}
}

View File

@@ -1,103 +0,0 @@
/*-
* ~~~~~~licensing~~~~~~
* cluster-persistence-service
* ==========
* Copyright (C) 2020 - 2025 EmDev LLC
* ==========
* You may not use this file except in accordance with the License Terms of the Copyright
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
* rights to the Software and any copies are the property of the Copyright Holder. Unless
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
* Software for commercial purposes to provide services to third parties.
*
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
* Under no circumstances does the Copyright Holder guarantee or promise that the
* Software provided by him will be suitable or not suitable for the specific purposes
* of the User, that the Software will meet all commercial and personal subjective
* expectations of the User, that the Software will work properly, without technical
* errors, quickly and uninterruptedly.
*
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
* to the User for any direct or indirect losses of the User, his expenses or actual
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
* or damage to data, property, etc.
* ~~~~~~/licensing~~~~~~
*/
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ru.entaxy.esb.platform.core.cluster.persistence.handler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.karaf.cellar.core.event.Event;
import org.apache.karaf.cellar.core.event.EventDispatcher;
import org.apache.karaf.cellar.core.event.EventHandlerRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Event handler service registry dispatcher.
*/
public class LocalEventHandlerRegistryDispatcher<E extends Event> implements EventDispatcher<E> {
private static final transient Logger LOGGER = LoggerFactory.getLogger(LocalEventHandlerRegistryDispatcher.class);
private ExecutorService threadPool;
private EventHandlerRegistry handlerRegistry;
public void init() {
if (threadPool == null) {
if (Boolean.getBoolean(this.getClass().getName() + ".threadPool.singleThreadExecutor")) {
LOGGER.debug("Will use an Executor that uses a single worker thread");
threadPool = Executors.newSingleThreadExecutor();
} else {
LOGGER.debug("Will use an Executor with a pool of threads");
threadPool = Executors.newCachedThreadPool();
}
}
}
/**
* Dispatch a cluster {@code Event} to the appropriate cluster {@code EventHandler}.
*
* @param event the cluster event to dispatch.
*/
public void dispatch(E event) {
LocalEventDispatchTask task = new LocalEventDispatchTask(event, handlerRegistry);
threadPool.execute(task);
}
public EventHandlerRegistry getHandlerRegistry() {
return handlerRegistry;
}
public void setHandlerRegistry(EventHandlerRegistry handlerRegistry) {
this.handlerRegistry = handlerRegistry;
}
public ExecutorService getThreadPool() {
return threadPool;
}
public void setThreadPool(ExecutorService threadPool) {
this.threadPool = threadPool;
}
public void destroy() {
if (threadPool != null) {
threadPool.shutdown();
}
}
}

View File

@@ -2,7 +2,7 @@
* ~~~~~~licensing~~~~~~
* cluster-persistence-service
* ==========
* Copyright (C) 2020 - 2025 EmDev LLC
* Copyright (C) 2020 - 2026 EmDev LLC
* ==========
* You may not use this file except in accordance with the License Terms of the Copyright
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
@@ -125,7 +125,7 @@ public class LocalFeaturesEventHandler extends FeaturesSupport implements EventH
Boolean isInstalled = isFeatureInstalledLocally(name, version);
try {
persistenceManager.updated();
persistenceManager.updated(event.getSourceGroup().getName());
} catch (Exception e) {
LOGGER.error("CELLAR FEATURE: failed to handle cluster feature event", e);

View File

@@ -1,48 +0,0 @@
/*-
* ~~~~~~licensing~~~~~~
* cluster-persistence-service
* ==========
* Copyright (C) 2020 - 2025 EmDev LLC
* ==========
* You may not use this file except in accordance with the License Terms of the Copyright
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
* rights to the Software and any copies are the property of the Copyright Holder. Unless
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
* Software for commercial purposes to provide services to third parties.
*
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
* Under no circumstances does the Copyright Holder guarantee or promise that the
* Software provided by him will be suitable or not suitable for the specific purposes
* of the User, that the Software will meet all commercial and personal subjective
* expectations of the User, that the Software will work properly, without technical
* errors, quickly and uninterruptedly.
*
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
* to the User for any direct or indirect losses of the User, his expenses or actual
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
* or damage to data, property, etc.
* ~~~~~~/licensing~~~~~~
*/
package ru.entaxy.esb.platform.core.cluster.persistence.handler;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.karaf.cellar.core.event.Event;
import org.apache.karaf.cellar.core.event.EventHandler;
import org.apache.karaf.cellar.core.event.EventHandlerRegistry;
public class LocalHandlerRegistry<E extends Event> implements EventHandlerRegistry<E> {
private Map<Class,EventHandler> eventHandlerMap = new ConcurrentHashMap<Class,EventHandler>();
@Override
public EventHandler<E> getHandler(E event) {
return eventHandlerMap.get(event.getClass());
}
public void addHandler(EventHandler eventHandler) {
eventHandlerMap.put(eventHandler.getType(), eventHandler);
}
}

View File

@@ -2,7 +2,7 @@
* ~~~~~~licensing~~~~~~
* cluster-persistence-service
* ==========
* Copyright (C) 2020 - 2025 EmDev LLC
* Copyright (C) 2020 - 2026 EmDev LLC
* ==========
* You may not use this file except in accordance with the License Terms of the Copyright
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
@@ -110,7 +110,7 @@ public class LocalRepositoryEventHandler extends FeaturesSupport implements Even
// TODO mark cluster state as needed to be saved
LOGGER.debug("-->> WE NEED TO SAVE CLUSTER STATE");
this.persistenceManager.updated();
this.persistenceManager.updated(event.getSourceGroup().getName());
} catch (Exception e) {
LOGGER.error("CELLAR FEATURE: failed to add/remove repository URL {}", uri, e);

View File

@@ -1,192 +0,0 @@
/*-
* ~~~~~~licensing~~~~~~
* cluster-persistence-service
* ==========
* Copyright (C) 2020 - 2025 EmDev LLC
* ==========
* You may not use this file except in accordance with the License Terms of the Copyright
* Holder located at: https://entaxy.ru/eula . All copyrights, all intellectual property
* rights to the Software and any copies are the property of the Copyright Holder. Unless
* it is explicitly allowed the Copyright Holder, the User is prohibited from using the
* Software for commercial purposes to provide services to third parties.
*
* The Copyright Holder hereby declares that the Software is provided on an "AS IS".
* Under no circumstances does the Copyright Holder guarantee or promise that the
* Software provided by him will be suitable or not suitable for the specific purposes
* of the User, that the Software will meet all commercial and personal subjective
* expectations of the User, that the Software will work properly, without technical
* errors, quickly and uninterruptedly.
*
* Under no circumstances shall the Copyright Holder or its Affiliates is not liable
* to the User for any direct or indirect losses of the User, his expenses or actual
* damage, including, downtime; loss of bussines; lost profit; lost earnings; loss
* or damage to data, property, etc.
* ~~~~~~/licensing~~~~~~
*/
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ru.entaxy.esb.platform.core.cluster.persistence.handler;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
import org.apache.karaf.cellar.core.Configurations;
import org.apache.karaf.cellar.core.Dispatcher;
import org.apache.karaf.cellar.core.Node;
import org.apache.karaf.cellar.core.control.BasicSwitch;
import org.apache.karaf.cellar.core.control.Switch;
import org.apache.karaf.cellar.core.control.SwitchStatus;
import org.apache.karaf.cellar.core.event.Event;
import org.apache.karaf.cellar.core.event.EventConsumer;
import org.apache.karaf.cellar.hazelcast.Constants;
import org.osgi.service.cm.Configuration;
import org.osgi.service.cm.ConfigurationAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Consumes messages from the Hazelcast {@code ITopic} and calls the {@code EventDispatcher}.
*/
public class LocalTopicConsumer<E extends Event> implements EventConsumer<E>, MessageListener<E> {
private static final transient Logger log = LoggerFactory.getLogger(LocalTopicConsumer.class);
public static final String SWITCH_ID = "org.apache.karaf.cellar.topic.consumer";
private final Switch eventSwitch = new BasicSwitch(SWITCH_ID);
private String registrationId;
private HazelcastInstance instance;
private ITopic topic;
private Dispatcher dispatcher;
private Node node;
private ConfigurationAdmin configurationAdmin;
private boolean isConsuming;
public void init() {
if (topic == null) {
topic = instance.getTopic(Constants.TOPIC);
}
}
public void destroy() {
stop();
}
@Override
public void consume(E event) {
// check if event has a specified destination.
if ((event.getDestination() == null || event.getDestination().contains(node)) && (
this.getSwitch().getStatus().equals(SwitchStatus.ON) || event.getForce())) {
log.debug("==>> EVENT TO PROCESS");
dispatcher.dispatch(event);
} else {
if (eventSwitch.getStatus().equals(SwitchStatus.OFF)) {
log.debug("CELLAR HAZELCAST: {} switch is OFF, cluster event is not consumed", SWITCH_ID);
}
}
}
@Override
public void start() {
isConsuming = true;
if (topic != null) {
registrationId = topic.addMessageListener(this);
} else {
topic = instance.getTopic(Constants.TOPIC);
registrationId = topic.addMessageListener(this);
}
}
@Override
public void stop() {
isConsuming = false;
if (topic != null) {
topic.removeMessageListener(registrationId);
}
}
@Override
public Boolean isConsuming() {
return isConsuming;
}
@Override
public void onMessage(Message<E> message) {
consume(message.getMessageObject());
}
public Dispatcher getDispatcher() {
return dispatcher;
}
public void setDispatcher(Dispatcher dispatcher) {
this.dispatcher = dispatcher;
}
public HazelcastInstance getInstance() {
return instance;
}
public void setInstance(HazelcastInstance instance) {
this.instance = instance;
}
public ITopic getTopic() {
return topic;
}
public void setTopic(ITopic topic) {
this.topic = topic;
}
@Override
public Switch getSwitch() {
// load the switch status from the config
try {
Configuration configuration = configurationAdmin.getConfiguration(Configurations.NODE, null);
if (configuration != null) {
Boolean status = new Boolean((String) configuration.getProperties().get(Configurations.CONSUMER));
if (status) {
eventSwitch.turnOn();
} else {
eventSwitch.turnOff();
}
}
} catch (Exception e) {
// ignore
}
return eventSwitch;
}
public Node getNode() {
return node;
}
public void setNode(Node node) {
this.node = node;
}
public ConfigurationAdmin getConfigurationAdmin() {
return configurationAdmin;
}
public void setConfigurationAdmin(ConfigurationAdmin configurationAdmin) {
this.configurationAdmin = configurationAdmin;
}
}

View File

@@ -3,7 +3,7 @@
<parent>
<groupId>ru.entaxy.esb.platform.runtime</groupId>
<artifactId>core</artifactId>
<version>1.11.0</version>
<version>1.12.0</version>
</parent>
<groupId>ru.entaxy.esb.platform.runtime.core</groupId>
<artifactId>cluster</artifactId>