existingMutexes = this.getCollection().find(eq("mutexId", mutexId)).into(new ArrayList<>());
+
+ log.info("Existent mutexes: {}", existingMutexes);
+
+ if (existingMutexes.isEmpty()) {
+ // create it assuming no collision at this point
+ InstanceMutex newMutex = new InstanceMutex(mutexId);
+ log.info("Creating new mutex {}", newMutex);
+
+ UpdateResult result = this.getCollection().updateOne(
+ eq("mutexId", mutexId),
+ Updates.set("mutexId", mutexId),
+ new UpdateOptions().upsert(true)
+ );
+
+ if (result.getUpsertedId() != null) {
+ log.info("Mutex {} created.", mutexId);
+ } else {
+ log.info("Mutex {} was added in another thread before we could get to it.", mutexId);
+ }
+ } else if (existingMutexes.size() > 1) {
+ // problem
+ log.error("multiple mutex for id {}", mutexId);
+ this.clearDuplicateMutexes(mutexId);
+ }
+ }
+
+ /**
+ * Locks the mutex with the given id.
+ *
+ * Non blocking.
+ *
+ * Call {@link #register(String)} before using this method.
+ *
+ * @param mutexId The is of the mutex to get the lock for
+ * @param additionalIdentity If an additional identity is required. Only use if need the mutex to offer concurrency within the same instance of the app.
+ * @return true when reserved, false when not.
+ */
+ public boolean lock(@NonNull String mutexId, Optional additionalIdentity) {
+ String identity = this.getIdentity(additionalIdentity);
+ Bson mutexIdEquals = eq("mutexId", mutexId);
+
+ //ensure only one mutex object
+ this.clearDuplicateMutexes(mutexId);
+
+ // try an update
+ InstanceMutex old = this.getCollection().findOneAndUpdate(
+ and(
+ mutexIdEquals,
+ or(
+ not(exists("taken")),
+ eq("taken", false)
+ )
+
+ ),
+ Updates.combine(
+ Updates.set("taken", true),
+ Updates.set("takenAt", ZonedDateTime.now()),
+ Updates.set("takenBy", identity)
+ )
+ );
+
+ if (old != null) {
+ // success ...
+ // Update the information
+ log.debug("Got lock for {} on mutex (showing old data) {}", identity, old);
+ log.info("Acquired lock for {} on mutex {}", identity, mutexId);
+ return true;
+ } else {
+ log.info("Failed to reserve Mutex {} for {}", mutexId, identity);
+
+ InstanceMutex lockedMutex = this.getCollection().find(mutexIdEquals).first();
+ log.debug("Locked mutex: {}", lockedMutex);
+
+ if (lockedMutex == null) {
+ log.warn("No mutex found. It needs to be registered first.");
+ } else if (lockedMutex.getTakenAt() != null && ZonedDateTime.now().isAfter(lockedMutex.getTakenAt().plus(this.lockExpire))) {
+ this.getCollection().findOneAndUpdate(mutexIdEquals, Updates.set("taken", false));
+ log.warn("Unlocked mutex that appeared deadlocked: {}", mutexId);
+ } else {
+ log.debug("Was locked. returning.");
+ }
+
+ return false;
+ }
+ }
+
+ public boolean lock(@NonNull String mutexId) {
+ return this.lock(mutexId, Optional.empty());
+ }
+
+ /**
+ * Free a mutex previously reserved.
+ *
+ * @param mutexId The id of the mutex to free
+ */
+ public void free(@NonNull String mutexId, Optional additionalIdentity) {
+ String identity = this.getIdentity(additionalIdentity);
+
+ InstanceMutex mutex = this.getCollection().findOneAndUpdate(
+ and(
+ eq("mutexId", mutexId),
+ eq("taken", true),
+ eq("takenBy", identity)
+ ),
+ Updates.combine(
+ Updates.set("taken", false),
+ Updates.set("takenAt", null),
+ Updates.set("takenBy", null)
+ )
+ );
+
+ if (mutex == null) {
+ log.info("Mutex NOT freed. Either not taken or not taken by this identity. Mutex: {}", mutex);
+ } else {
+ log.info("Mutex FREED: {}", mutexId);
+ }
+ }
+
+ public void free(@NonNull String mutexId) {
+ this.free(mutexId, Optional.empty());
+ }
+}
diff --git a/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/MongoDbAwareService.java b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/MongoDbAwareService.java
index 34c51038e..ec6c7f4a2 100644
--- a/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/MongoDbAwareService.java
+++ b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/MongoDbAwareService.java
@@ -18,6 +18,7 @@
import lombok.Getter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
+import org.bson.Document;
import org.bson.types.ObjectId;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import tech.ebp.oqm.core.api.model.collectionStats.CollectionStats;
@@ -127,7 +128,7 @@ protected MongoDbAwareService(
// return this.collection;
// }
- protected MongoCollection getCollection(DbCacheEntry db) {
+ protected MongoCollection getTypedCollection(DbCacheEntry db) {
log.debug("Getting collection for cache entry {}", db);
if(!this.collections.containsKey(db.getDbId())){
log.info("Collection for db cache entry not present. Creating. Cache entry: {}", db);
@@ -138,11 +139,18 @@ protected MongoCollection getCollection(DbCacheEntry db) {
}
return this.collections.get(db.getDbId());
}
-
- protected MongoCollection getCollection(String oqmDbIdOrName) {
- return this.getCollection(this.getOqmDatabaseService().getOqmDatabase(oqmDbIdOrName));
+
+ public MongoCollection getTypedCollection(String oqmDbIdOrName) {
+ DbCacheEntry dbCacheEntry = this.getOqmDatabaseService().getOqmDatabase(oqmDbIdOrName);
+
+ return this.getTypedCollection(dbCacheEntry);
}
-
+
+ public MongoCollection getDocumentCollection(String oqmDbIdOrName) {
+ DbCacheEntry dbCacheEntry = this.getOqmDatabaseService().getOqmDatabase(oqmDbIdOrName);
+ return dbCacheEntry.getMongoDatabase().getCollection(this.collectionName);
+ }
+
public static TransactionOptions getDefaultTransactionOptions() {
return TransactionOptions.builder()
.readPreference(ReadPreference.primary())
@@ -177,7 +185,7 @@ public void ensureObjectValid(String oqmDbIdOrName, boolean newObject, @Valid T
}
protected > X addBaseStats(String oqmDbIdOrName, X builder){
- return (X) builder.size(this.getCollection(oqmDbIdOrName).countDocuments());
+ return (X) builder.size(this.getTypedCollection(oqmDbIdOrName).countDocuments());
}
/**
diff --git a/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/MongoHistoriedObjectService.java b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/MongoHistoriedObjectService.java
index 05f81f5e0..bb8f6e9e4 100644
--- a/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/MongoHistoriedObjectService.java
+++ b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/MongoHistoriedObjectService.java
@@ -274,7 +274,7 @@ public T remove(String oqmDbIdOrName, ObjectId objectId) {
@WithSpan
public long removeAll(String oqmDbIdOrName, ClientSession session, InteractingEntity entity) {
//TODO:: add history event to each
- MongoCollection collection = this.getCollection(oqmDbIdOrName);
+ MongoCollection collection = this.getTypedCollection(oqmDbIdOrName);
if (session == null) {
return collection.deleteMany(new BsonDocument()).getDeletedCount();
} else {
diff --git a/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/MongoHistoryService.java b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/MongoHistoryService.java
index 9060acd8b..37a842a84 100644
--- a/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/MongoHistoryService.java
+++ b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/MongoHistoryService.java
@@ -69,7 +69,7 @@ public MongoHistoryService(
@WithSpan
public DeleteEvent isDeleted(String oqmDbIdOrName, ClientSession clientSession, ObjectId id) {
- MongoCollection collection = this.getCollection(oqmDbIdOrName);
+ MongoCollection collection = this.getTypedCollection(oqmDbIdOrName);
DeleteEvent found;
Bson search = and(
@@ -103,7 +103,7 @@ public DeleteEvent isDeleted(String oqmDbIdOrName, ObjectId id) {
@WithSpan
public ObjectHistoryEvent getLatestHistoryEventFor(String oqmDbIdOrName, ClientSession clientSession, ObjectId id) {
ObjectHistoryEvent found;
- MongoCollection collection = this.getCollection(oqmDbIdOrName);
+ MongoCollection collection = this.getTypedCollection(oqmDbIdOrName);
if (clientSession != null) {
found = collection
.find(clientSession, eq("objectId", id))
@@ -130,7 +130,7 @@ public ObjectHistoryEvent getLatestHistoryEventFor(String oqmDbIdOrName, ObjectI
@WithSpan
public boolean hasHistoryFor(String oqmDbIdOrName, ClientSession clientSession, ObjectId id) {
ObjectHistoryEvent found;
- MongoCollection collection = this.getCollection(oqmDbIdOrName);
+ MongoCollection collection = this.getTypedCollection(oqmDbIdOrName);
if (clientSession != null) {
found = collection
.find(clientSession, eq("objectId", id))
diff --git a/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/MongoObjectService.java b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/MongoObjectService.java
index a391c2b57..55f1177d8 100644
--- a/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/MongoObjectService.java
+++ b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/MongoObjectService.java
@@ -69,7 +69,7 @@ private FindIterable find(String oqmDbIdOrName, ClientSession session, Bson f
log.debug("Filter for find: {}", filter);
FindIterable output;
- MongoCollection collection = this.getCollection(oqmDbIdOrName);
+ MongoCollection collection = this.getTypedCollection(oqmDbIdOrName);
if (filter != null) {
if (session == null) {
output = collection.find(filter);
@@ -179,7 +179,7 @@ public List list(String oqmDbIdOrName) {
}
public Iterator iterator(String oqmDbIdOrName) {
- return getCollection(oqmDbIdOrName).find().iterator();
+ return getTypedCollection(oqmDbIdOrName).find().iterator();
}
/**
@@ -224,7 +224,7 @@ public SearchResult search(String oqmDbIdOrName, @NonNull S searchObject, boo
* @return If the collection is empty or not.
*/
public boolean collectionEmpty(String oqmDbIdOrName) {
- return this.getCollection(oqmDbIdOrName).countDocuments() == 0;
+ return this.getTypedCollection(oqmDbIdOrName).countDocuments() == 0;
}
/**
@@ -235,7 +235,7 @@ public boolean collectionEmpty(String oqmDbIdOrName) {
* @return the count of records in the collection
*/
public long count(String oqmDbIdOrName, ClientSession clientSession, Bson filter) {
- MongoCollection collection = this.getCollection(oqmDbIdOrName);
+ MongoCollection collection = this.getTypedCollection(oqmDbIdOrName);
if (filter == null) {
if (clientSession == null) {
return collection.countDocuments();
@@ -280,7 +280,7 @@ public long count(String oqmDbIdOrName) {
* @return The object found. Null if not found.
*/
public T get(String oqmDbIdOrName, ObjectId objectId) throws DbNotFoundException, DbDeletedException {
- T found = getCollection(oqmDbIdOrName)
+ T found = getTypedCollection(oqmDbIdOrName)
.find(eq("_id", objectId))
.limit(1)
.first();
@@ -293,7 +293,7 @@ public T get(String oqmDbIdOrName, ObjectId objectId) throws DbNotFoundException
}
public T get(String oqmDbIdOrName, ClientSession clientSession, ObjectId objectId) throws DbNotFoundException, DbDeletedException {
- MongoCollection collection = this.getCollection(oqmDbIdOrName);
+ MongoCollection collection = this.getTypedCollection(oqmDbIdOrName);
T found;
if (clientSession == null) {
@@ -362,7 +362,7 @@ public T update(String oqmDbIdOrName, ObjectId id, ObjectNode updateJson) throws
}
this.ensureObjectValid(oqmDbIdOrName, false, object, null);//TODO:: add client session
- this.getCollection(oqmDbIdOrName).findOneAndReplace(eq("_id", id), object);
+ this.getTypedCollection(oqmDbIdOrName).findOneAndReplace(eq("_id", id), object);
return object;
}
@@ -381,7 +381,7 @@ public T update(String oqmDbIdOrName, String id, ObjectNode updateJson) {
public T update(String oqmDbIdOrName, ClientSession clientSession, @Valid T object) throws DbNotFoundException {
//TODO:: review this
this.get(oqmDbIdOrName, clientSession, object.getId());
- MongoCollection collection = this.getCollection(oqmDbIdOrName);
+ MongoCollection collection = this.getTypedCollection(oqmDbIdOrName);
if (clientSession != null) {
return collection.findOneAndReplace(clientSession, eq("_id", object.getId()), object);
} else {
@@ -407,7 +407,7 @@ public ObjectId add(String oqmDbIdOrName, ClientSession session, @NonNull @Valid
this.ensureObjectValid(oqmDbIdOrName, true, object, session);
InsertOneResult result;
- MongoCollection collection = this.getCollection(oqmDbIdOrName);
+ MongoCollection collection = this.getTypedCollection(oqmDbIdOrName);
if (session == null) {
result = collection.insertOne(object);
} else {
@@ -480,7 +480,7 @@ public T remove(String oqmDbIdOrName, ClientSession clientSession, ObjectId obje
this.assertNotReferenced(oqmDbIdOrName, clientSession, toRemove);
DeleteResult result;
- MongoCollection collection = this.getCollection(oqmDbIdOrName);
+ MongoCollection collection = this.getTypedCollection(oqmDbIdOrName);
if (clientSession == null) {
result = collection.deleteOne(eq("_id", objectId));
} else {
@@ -523,12 +523,12 @@ public T remove(String oqmDbIdOrName, String objectId) {
*/
public long removeAll(String oqmDbIdOrName) {
//TODO:: client session
- return this.getCollection(oqmDbIdOrName).deleteMany(new BsonDocument()).getDeletedCount();
+ return this.getTypedCollection(oqmDbIdOrName).deleteMany(new BsonDocument()).getDeletedCount();
}
@Override
public long clear(String oqmDbIdOrName, @NonNull ClientSession session) {
- return this.getCollection(oqmDbIdOrName).deleteMany(new BsonDocument()).getDeletedCount();
+ return this.getTypedCollection(oqmDbIdOrName).deleteMany(new BsonDocument()).getDeletedCount();
}
/**
@@ -539,7 +539,7 @@ public long clear(String oqmDbIdOrName, @NonNull ClientSession session) {
* @return The sum of all values at the field
*/
protected long getSumOfIntField(String oqmDbIdOrName, String field) {
- Document returned = this.getCollection(oqmDbIdOrName).aggregate(
+ Document returned = this.getTypedCollection(oqmDbIdOrName).aggregate(
List.of(
new Document(
"$group",
@@ -557,7 +557,7 @@ protected long getSumOfIntField(String oqmDbIdOrName, String field) {
}
protected double getSumOfFloatField(String oqmDbIdOrName, String field) {
- Document returned = this.getCollection(oqmDbIdOrName).aggregate(
+ Document returned = this.getTypedCollection(oqmDbIdOrName).aggregate(
List.of(
new Document(
"$group",
@@ -579,7 +579,7 @@ public boolean fieldValueExists(
String field,
String value
) {
- return this.getCollection(oqmDbIdOrName)
+ return this.getTypedCollection(oqmDbIdOrName)
.find(
eq(field, value)
).limit(1)
diff --git a/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/image/ImageService.java b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/image/ImageService.java
index fdd3cfe52..363e71694 100644
--- a/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/image/ImageService.java
+++ b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/mongo/image/ImageService.java
@@ -64,6 +64,7 @@ public ImageService() {
* @return The resized image
*/
public BufferedImage resizeImage(BufferedImage inputImage) {
+ log.debug("Resizing image {}", inputImage);
// creates output image
BufferedImage outputImage = new BufferedImage(
this.imageResizeConfig.width(),
@@ -95,8 +96,14 @@ public ObjectId add(String oqmDbIdOrName, ClientSession clientSession, Image fil
FilenameUtils.getExtension(fileName),
"imageUploads"
);
+ log.info("Image needs resized: {}", origImage);
{
BufferedImage bufferedImage = ImageIO.read(origImage);
+
+ if(bufferedImage == null){
+ throw new IllegalArgumentException("Image data given was invalid or unsupported.");
+ }
+
bufferedImage = resizeImage(bufferedImage);
ImageIO.write(bufferedImage, this.imageResizeConfig.savedType(), usingImage);
}
diff --git a/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/notification/EventNotificationWrapper.java b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/notification/EventNotificationWrapper.java
index 55b22231e..4f3249942 100644
--- a/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/notification/EventNotificationWrapper.java
+++ b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/notification/EventNotificationWrapper.java
@@ -1,14 +1,12 @@
package tech.ebp.oqm.core.api.service.notification;
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.Setter;
+import lombok.*;
import org.bson.types.ObjectId;
import tech.ebp.oqm.core.api.model.object.history.ObjectHistoryEvent;
@Data
@AllArgsConstructor
+@NoArgsConstructor
@Setter(AccessLevel.PRIVATE)
public class EventNotificationWrapper {
private ObjectId database;
diff --git a/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/notification/HistoryEventNotificationService.java b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/notification/HistoryEventNotificationService.java
index 7f392e74f..cf40dbdaf 100644
--- a/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/notification/HistoryEventNotificationService.java
+++ b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/notification/HistoryEventNotificationService.java
@@ -6,6 +6,8 @@
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.bson.types.ObjectId;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Channel;
@@ -22,78 +24,98 @@
@Slf4j
@ApplicationScoped
public class HistoryEventNotificationService {
-
+
public static final String INTERNAL_EVENT_CHANNEL = "events-internal";
public static final String OUTGOING_EVENT_CHANNEL = "events-outgoing";
- public static final String ALL_EVENT_TOPIC = "all-events";
-
+ public static final String TOPIC_PREPEND = "oqm-core-";
+ public static final String ALL_EVENT_TOPIC_LABEL = "all-events";
+ public static final String ALL_EVENT_TOPIC = TOPIC_PREPEND + ALL_EVENT_TOPIC_LABEL;
+
@ConfigProperty(name = "mp.messaging.outgoing.events-outgoing.bootstrap.servers")
Optional outgoingServers;
@ConfigProperty(name = "kafka.bootstrap.servers")
Optional kafkaServers;
-
+
@Inject
@Broadcast
@Channel(INTERNAL_EVENT_CHANNEL)
- @OnOverflow(value = OnOverflow.Strategy.DROP)//TODO:: this better https://quarkus.io/version/3.2/guides/kafka#sending-messages-with-emitter
+ @OnOverflow(value = OnOverflow.Strategy.DROP)
Emitter internalEventEmitter;
-
+
+
@Inject
@Broadcast
@Channel(OUTGOING_EVENT_CHANNEL)
@OnOverflow(value = OnOverflow.Strategy.DROP)
- Emitter outgoingEventEmitter;
-
- private boolean haveOutgoingServers(){
+ Emitter outgoingEventEmitter;
+
+ private boolean haveOutgoingServers() {
return outgoingServers.isPresent() || kafkaServers.isPresent();
}
-
+
/**
* Don't call this directly, use the other one
*/
@WithSpan
@Incoming(INTERNAL_EVENT_CHANNEL)
void sendEventOutgoing(EventNotificationWrapper notificationWrapper) {
- if(!this.haveOutgoingServers()){
+ if (!this.haveOutgoingServers()) {
log.info("NOT Sending event to external channels (no outgoing servers configured): {}/{}", notificationWrapper.getClass().getSimpleName(),
notificationWrapper.getEvent().getId());
return;
}
log.info("Sending event to external channels: {}/{}", notificationWrapper.getClass().getSimpleName(), notificationWrapper.getEvent().getId());
try {
+ Headers headers = new RecordHeaders()
+ .add("database", notificationWrapper.getDatabase().toHexString().getBytes())
+ .add("object", notificationWrapper.getObjectName().getBytes());
this.outgoingEventEmitter.send(
- Message.of(
- notificationWrapper.getEvent()
- ).addMetadata(
- OutgoingKafkaRecordMetadata.builder()
- .withTopic(
- (notificationWrapper.getDatabase() == null? "" : notificationWrapper.getDatabase().toHexString() + "-") + notificationWrapper.getObjectName() + "-" + notificationWrapper.getEvent().getType()
- )
- .build()
- ));
- this.outgoingEventEmitter.send(
- Message.of(
- notificationWrapper.getEvent()
- ).addMetadata(
- OutgoingKafkaRecordMetadata.builder()
- .withTopic((notificationWrapper.getDatabase() == null? "" : notificationWrapper.getDatabase().toHexString() + "-") + ALL_EVENT_TOPIC)
- .build()
- ));
+ Message.of(notificationWrapper)
+ .addMetadata(
+ OutgoingKafkaRecordMetadata.builder()
+ .withTopic(ALL_EVENT_TOPIC)
+ .withHeaders(headers)
+ .build()
+ ));
+ //TODO:: maybe support in future
+// this.outgoingEventEmitter.send(
+// Message.of(
+// notificationWrapper
+// ).addMetadata(
+// OutgoingKafkaRecordMetadata.builder()
+// .withTopic(
+// TOPIC_PREPEND + (notificationWrapper.getDatabase() == null ? "" : notificationWrapper.getDatabase().toHexString() + "-") + ALL_EVENT_TOPIC_LABEL
+// )
+// .withHeaders(headers)
+// .build()
+// ));
+
+
+ //TODO:: maybe support this in future
+// this.outgoingEventEmitter.send(
+// Message.of(notificationWrapper.getEvent()).addMetadata(
+// OutgoingKafkaRecordMetadata.builder()
+// .withTopic(
+// TOPIC_PREPEND + (notificationWrapper.getDatabase() == null ? "" : notificationWrapper.getDatabase().toHexString() + "-") + notificationWrapper.getObjectName() + "-" + notificationWrapper.getEvent().getType()
+// )
+// .withHeaders(headers)
+// .build()
+// ));
log.debug("Sent event to external channels: {}/{}", notificationWrapper.getClass().getSimpleName(), notificationWrapper.getEvent().getId());
- } catch(Throwable e) {
+ } catch (Throwable e) {
log.error("FAILED to send event to external channels: {}/{}:", notificationWrapper.getClass().getSimpleName(), notificationWrapper.getEvent().getId(), e);
throw e;
}
}
-
+
public void sendEvent(ObjectId oqmDatabase, Class> objectClass, ObjectHistoryEvent event) {
this.sendEvents(oqmDatabase, objectClass, event);
}
-
+
public void sendEvents(ObjectId oqmDatabase, Class> objectClass, ObjectHistoryEvent... events) {
this.sendEvents(oqmDatabase, objectClass, Arrays.asList(events));
}
-
+
public void sendEvents(ObjectId oqmDatabase, Class> objectClass, Collection events) {
for (ObjectHistoryEvent event : events) {
log.info("Sending event to internal channel: {}/{}", objectClass.getSimpleName(), event.getId());
@@ -103,5 +125,5 @@ public void sendEvents(ObjectId oqmDatabase, Class> objectClass, Collection, ObjectSchemaUpgrader>> upgraderMap;
+ private OqmDatabaseService oqmDatabaseService;
+ private List> oqmDbServices;
+ private TotalUpgradeResult startupUpgradeResult = null;
+
+ public ObjectSchemaUpgrader getInstanceForClass(@NonNull Class clazz) throws ClassUpgraderNotFoundException {
+ if (!this.upgraderMap.containsKey(clazz)) {
+ throw new ClassUpgraderNotFoundException(clazz);
+ }
+ return (ObjectSchemaUpgrader) this.upgraderMap.get(clazz);
+ }
+
+ private void clearUpgraderMap() {
+ this.upgraderMap = null;
+ }
+
+ @Inject
+ public ObjectSchemaUpgradeService(
+ OqmDatabaseService oqmDatabaseService,
+ StorageBlockService storageBlockService,
+ InventoryItemService inventoryItemService
+ ) {
+ this.oqmDatabaseService = oqmDatabaseService;
+ //TODO:: populate rest of oqmDbServices
+ this.oqmDbServices = List.of(
+ storageBlockService,
+ inventoryItemService
+ );
+
+ this.upgraderMap = Map.of(
+ StorageBlock.class, new StorageBlockSchemaUpgrader(),
+ InventoryItem.class, new InventoryItemSchemaUpgrader()
+ );
+ }
+
+ public Optional getStartupUpgradeResult() {
+ return Optional.ofNullable(this.startupUpgradeResult);
+ }
+
+ public boolean upgradeRan() {
+ return this.startupUpgradeResult == null;
+ }
+
+
+ private CollectionUpgradeResult upgradeOqmCollection(ClientSession cs, MongoCollection documentCollection, MongoCollection typedCollection, Class objectClass) throws ClassUpgraderNotFoundException {
+ ObjectSchemaUpgrader objectVersionBumper = this.getInstanceForClass(objectClass);
+ CollectionUpgradeResult.Builder outputBuilder = CollectionUpgradeResult.builder()
+ .collectionName(documentCollection.getNamespace().getCollectionName());
+
+ StopWatch sw = StopWatch.createStarted();
+ long numUpdated = 0;
+
+ if(objectVersionBumper.upgradesAvailable()) {
+ //TODO:: add search for any objects with versions less than current.
+ try (MongoCursor it = documentCollection.find().cursor()) {
+ while (it.hasNext()) {
+ Document doc = it.next();
+ ObjectUpgradeResult result = objectVersionBumper.upgrade(doc);
+
+ if (result.wasUpgraded()) {
+ numUpdated++;
+ typedCollection.findOneAndReplace(
+ cs,
+ eq("id", result.getUpgradedObject().getId()),
+ result.getUpgradedObject()
+ );
+ }
+ }
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ sw.stop();
+ outputBuilder.timeTaken(Duration.of(sw.getTime(TimeUnit.MILLISECONDS), ChronoUnit.MILLIS))
+ .numObjectsUpgraded(numUpdated);
+
+ return outputBuilder.build();
+ }
+
+ private CollectionUpgradeResult upgradeOqmCollection(ClientSession dbCs, OqmMongoDatabase oqmDb, MongoDbAwareService service) throws ClassUpgraderNotFoundException {
+ log.info("Updating schema of oqm database service {} in ", service.getClass());
+ String oqmDbId = oqmDb.getId().toHexString();
+ //TODO:: hande upgrading history
+ CollectionUpgradeResult result = this.upgradeOqmCollection(
+ dbCs,
+ service.getDocumentCollection(oqmDbId),
+ service.getTypedCollection(oqmDbId),
+ service.getClazz()
+ );
+
+ log.info("DONE Updating schema of oqm database service {} in ", service.getClass());
+ return result;
+ }
+
+
+ private OqmDbUpgradeResult upgradeOqmDb(OqmMongoDatabase oqmDb) {
+ log.info("Updating schema of oqm database: {}", oqmDb);
+ OqmDbUpgradeResult.Builder outputBuilder = OqmDbUpgradeResult.builder()
+ .dbName(oqmDb.getName());
+ StopWatch dbUpgradeTime = StopWatch.createStarted();
+
+ List> futures = new ArrayList<>();
+ ClientSession cs = null;
+
+ try {
+ for (MongoDbAwareService,?,?> curService : this.oqmDbServices) {
+ if (cs == null) {
+ cs = curService.getNewClientSession(true);
+ }
+ ClientSession finalCs = cs;
+ futures.add(
+ CompletableFuture.supplyAsync(() -> {
+ return upgradeOqmCollection(finalCs, oqmDb, curService);
+ })
+ );
+ }
+ if(cs != null) {
+ cs.commitTransaction();
+ }
+ } finally {
+ if(cs != null){
+ cs.close();
+ }
+ }
+
+ outputBuilder.collectionUpgradeResults(
+ futures.stream().map(CompletableFuture::join).toList()
+ );
+ dbUpgradeTime.stop();
+ outputBuilder.timeTaken(Duration.of(dbUpgradeTime.getTime(TimeUnit.MILLISECONDS), ChronoUnit.MILLIS));
+
+ log.info("Done updating oqm database: {}", oqmDb);
+
+ return outputBuilder.build();
+ }
+
+ public Optional updateSchema() {
+ if (this.upgradeRan()) {
+ return Optional.empty();
+ }
+ log.info("Upgrading the schema held in the Database.");
+
+ TotalUpgradeResult.Builder totalResultBuilder = TotalUpgradeResult.builder();
+ StopWatch totalTime = StopWatch.createStarted();
+
+ //TODO:: migrate top levels
+
+
+ List> resultMap = new ArrayList<>();
+ for (OqmMongoDatabase curDb : this.oqmDatabaseService.listIterator()) {
+ resultMap.add(CompletableFuture.supplyAsync(() -> {
+ return upgradeOqmDb(curDb);
+ })
+ );
+ }
+ totalResultBuilder.dbUpgradeResults(resultMap.stream().map((CompletableFuture future) -> {
+ try {
+ return future.get();
+ } catch (Throwable e) {
+ throw new UpgradeFailedException("Failed to upgrade data in database.", e);
+ }
+ })
+ .toList());
+ totalTime.stop();
+ totalResultBuilder.timeTaken(Duration.of(totalTime.getTime(TimeUnit.MILLISECONDS), ChronoUnit.MILLIS));
+
+ log.info("DONE upgrading the schema held in the Database.");
+ this.startupUpgradeResult = totalResultBuilder.build();
+
+ return this.getStartupUpgradeResult();
+ }
+}
diff --git a/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/schemaVersioning/upgraders/ObjectSchemaUpgrader.java b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/schemaVersioning/upgraders/ObjectSchemaUpgrader.java
new file mode 100644
index 000000000..1a4bace5a
--- /dev/null
+++ b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/schemaVersioning/upgraders/ObjectSchemaUpgrader.java
@@ -0,0 +1,127 @@
+package tech.ebp.oqm.core.api.service.schemaVersioning.upgraders;
+
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.time.StopWatch;
+import org.bson.Document;
+import org.bson.json.JsonWriterSettings;
+import tech.ebp.oqm.core.api.exception.UpgradeFailedException;
+import tech.ebp.oqm.core.api.exception.VersionBumperListIncontiguousException;
+import tech.ebp.oqm.core.api.model.object.ObjectUtils;
+import tech.ebp.oqm.core.api.model.object.Versionable;
+import tech.ebp.oqm.core.api.model.object.upgrade.ObjectUpgradeResult;
+
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Handles object upgrading for a particular object.
+ * @param
+ */
+@Slf4j
+public abstract class ObjectSchemaUpgrader {
+
+ @Getter
+ private final SortedSet> versionBumpers;
+ @Getter
+ private final Class objClass;
+ private final Map>> bumperListCacheMap = new ConcurrentHashMap<>();
+
+ protected ObjectSchemaUpgrader(Class objClass, SortedSet> versionBumpers) throws VersionBumperListIncontiguousException {
+ this.versionBumpers = versionBumpers;
+ this.objClass = objClass;
+
+ //check that the set is contiguous
+ int lastVersion = 1;
+ for(ObjectSchemaVersionBumper cur : this.versionBumpers){
+ if((lastVersion + 1) != cur.getBumperTo()){
+ throw new VersionBumperListIncontiguousException(lastVersion, this.objClass);
+ }
+ lastVersion = cur.getBumperTo();
+ }
+ }
+
+ protected ObjectSchemaUpgrader(Class objClass, ObjectSchemaVersionBumper... versionBumpers) throws VersionBumperListIncontiguousException {
+ this(
+ objClass,
+ new TreeSet<>(Arrays.stream(versionBumpers).toList())
+ );
+ }
+
+ protected LinkedList> getBumperFromCache(int versionTo){
+ if(!this.bumperListCacheMap.containsKey(versionTo)){
+ return null;
+ }
+ return new LinkedList<>(this.bumperListCacheMap.get(versionTo));
+ }
+
+ protected Iterator> getBumperIteratorAtVersion(int curObjVersion){
+ int curVersionTo = curObjVersion + 1;
+
+ LinkedList> bumpers = this.getBumperFromCache(curVersionTo);
+
+ if(bumpers != null){
+ return bumpers.iterator();
+ }
+
+ bumpers = new LinkedList<>(this.versionBumpers);
+
+ while(!bumpers.isEmpty() && bumpers.getFirst().getBumperTo() < curVersionTo){
+ bumpers.removeFirst();
+ }
+
+ this.bumperListCacheMap.put(curObjVersion, bumpers);
+ bumpers = this.getBumperFromCache(curObjVersion);
+
+ return bumpers.iterator();
+ }
+
+
+ public ObjectUpgradeResult upgrade(JsonNode oldObj){
+ int curVersion = oldObj.get("schemaVersion").asInt(1);
+ ObjectUpgradeResult.Builder resultBuilder = ObjectUpgradeResult.builder();
+ resultBuilder.oldVersion(curVersion);
+
+ JsonNode upgradedJson = oldObj.deepCopy();
+
+ StopWatch sw = StopWatch.createStarted();
+ Iterator> it = getBumperIteratorAtVersion(curVersion);
+ while (it.hasNext()){
+ ObjectSchemaVersionBumper curBumper = it.next();
+
+ upgradedJson = curBumper.bumpObject(upgradedJson);
+ }
+ T upgradedObj = null;
+ try {
+ upgradedObj = ObjectUtils.OBJECT_MAPPER.treeToValue(upgradedJson, this.objClass);
+ } catch(JsonProcessingException e) {
+ throw new UpgradeFailedException(e, this.getObjClass());
+ }
+ sw.stop();
+
+ resultBuilder.upgradedObject(upgradedObj);
+ resultBuilder.timeTaken(Duration.ofMillis(sw.getTime()));
+
+ return resultBuilder.build();
+ }
+
+ public ObjectUpgradeResult upgrade(Document oldObj) throws JsonProcessingException {
+ return this.upgrade(
+ ObjectUtils.OBJECT_MAPPER.readTree(
+ oldObj.toJson(
+ JsonWriterSettings.builder()
+ .build()
+ )
+ )
+ );
+ }
+
+ public boolean upgradesAvailable(){
+ return !this.versionBumpers.isEmpty();
+ }
+}
diff --git a/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/schemaVersioning/upgraders/ObjectSchemaVersionBumper.java b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/schemaVersioning/upgraders/ObjectSchemaVersionBumper.java
new file mode 100644
index 000000000..bafd957dd
--- /dev/null
+++ b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/schemaVersioning/upgraders/ObjectSchemaVersionBumper.java
@@ -0,0 +1,42 @@
+package tech.ebp.oqm.core.api.service.schemaVersioning.upgraders;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import lombok.Getter;
+import tech.ebp.oqm.core.api.model.object.Versionable;
+
+/**
+ * Abstract class to take an object from the next lower version to the version noted by {@link #bumperTo}
+ *
+ * @param
+ */
+public abstract class ObjectSchemaVersionBumper implements Comparable> {
+
+ protected ObjectSchemaVersionBumper(int bumperTo) {
+ this.bumperTo = bumperTo;
+ }
+
+ /**
+ * Notes what version this bumper bumps the object to.
+ */
+ @Getter
+ public final int bumperTo;
+
+ /**
+ * Method to mutate the given object and return the resulting upgraded object.
+ *
+ * The updates can happen either in-place, or using a copy of the object. It is recommended users of this method use the returned value rather than rely on pass-by-reference.
+ *
+ * @param oldObj The object to update
+ *
+ * @return The updated object
+ */
+ public abstract JsonNode bumpObject(JsonNode oldObj);
+
+ @Override
+ public int compareTo(ObjectSchemaVersionBumper tObjectSchemaVersionBumper) {
+ return Integer.compare(
+ this.getBumperTo(),
+ tObjectSchemaVersionBumper.getBumperTo()
+ );
+ }
+}
diff --git a/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/schemaVersioning/upgraders/inventoryItem/InventoryItemSchemaUpgrader.java b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/schemaVersioning/upgraders/inventoryItem/InventoryItemSchemaUpgrader.java
new file mode 100644
index 000000000..061bbe96f
--- /dev/null
+++ b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/schemaVersioning/upgraders/inventoryItem/InventoryItemSchemaUpgrader.java
@@ -0,0 +1,19 @@
+package tech.ebp.oqm.core.api.service.schemaVersioning.upgraders.inventoryItem;
+
+import tech.ebp.oqm.core.api.model.object.storage.items.InventoryItem;
+import tech.ebp.oqm.core.api.service.schemaVersioning.upgraders.ObjectSchemaUpgrader;
+
+import java.util.TreeSet;
+
+/**
+ * TODO:: figure out how to handle the subtypes
+ */
+public class InventoryItemSchemaUpgrader extends ObjectSchemaUpgrader {
+
+ public InventoryItemSchemaUpgrader() {
+ super(
+ InventoryItem.class,
+ new TreeSet<>()
+ );
+ }
+}
diff --git a/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/schemaVersioning/upgraders/storageBlock/StorageBlockSchemaUpgrader.java b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/schemaVersioning/upgraders/storageBlock/StorageBlockSchemaUpgrader.java
new file mode 100644
index 000000000..d6810e26c
--- /dev/null
+++ b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/schemaVersioning/upgraders/storageBlock/StorageBlockSchemaUpgrader.java
@@ -0,0 +1,18 @@
+package tech.ebp.oqm.core.api.service.schemaVersioning.upgraders.storageBlock;
+
+import lombok.extern.slf4j.Slf4j;
+import tech.ebp.oqm.core.api.model.object.storage.storageBlock.StorageBlock;
+import tech.ebp.oqm.core.api.service.schemaVersioning.upgraders.ObjectSchemaUpgrader;
+
+import java.util.TreeSet;
+
+@Slf4j
+public class StorageBlockSchemaUpgrader extends ObjectSchemaUpgrader {
+
+ public StorageBlockSchemaUpgrader() {
+ super(
+ StorageBlock.class,
+ new TreeSet<>()
+ );
+ }
+}
diff --git a/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/serviceState/db/OqmMongoDatabase.java b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/serviceState/db/OqmMongoDatabase.java
index 61054fe24..92cf4f72b 100644
--- a/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/serviceState/db/OqmMongoDatabase.java
+++ b/software/oqm-core-api/src/main/java/tech/ebp/oqm/core/api/service/serviceState/db/OqmMongoDatabase.java
@@ -21,6 +21,7 @@
@AllArgsConstructor
@Builder
public class OqmMongoDatabase extends AttKeywordMainObject {
+ public static final int CUR_SCHEMA_VERSION = 1;
@NotNull
@Length(min = 1, max = 15)
@@ -42,4 +43,9 @@ public String getDisplayName(){
}
return this.displayName;
}
+
+ @Override
+ public int getSchemaVersion() {
+ return CUR_SCHEMA_VERSION;
+ }
}
diff --git a/software/oqm-core-api/src/main/resources/application.yaml b/software/oqm-core-api/src/main/resources/application.yaml
index d5230be0b..7b0c3f32d 100644
--- a/software/oqm-core-api/src/main/resources/application.yaml
+++ b/software/oqm-core-api/src/main/resources/application.yaml
@@ -102,7 +102,8 @@ quarkus:
body:
preallocate-body-buffer: true
limits:
- max-body-size: 500M
+ max-body-size: 750M
+ max-form-attribute-size: 750M
auth:
proactive: false
access-log:
diff --git a/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/model/TestMainObject.java b/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/model/TestMainObject.java
index 6075c5e4e..8053d8762 100644
--- a/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/model/TestMainObject.java
+++ b/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/model/TestMainObject.java
@@ -13,4 +13,9 @@ public class TestMainObject extends AttKeywordMainObject {
public TestMainObject(ObjectId objectId, Map atts, List keywords) {
super(objectId, atts, keywords);
}
+
+ @Override
+ public int getSchemaVersion() {
+ return 1;
+ }
}
diff --git a/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/model/object/history/ObjectHistoryEventTest.java b/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/model/object/history/ObjectHistoryEventTest.java
index de8563402..bd1a592a7 100644
--- a/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/model/object/history/ObjectHistoryEventTest.java
+++ b/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/model/object/history/ObjectHistoryEventTest.java
@@ -18,6 +18,11 @@ public class TestObjectHistoryEvent extends DescriptiveEvent {
public EventType getType() {
return EventType.CREATE;
}
+
+ @Override
+ public int getSchemaVersion() {
+ return 1;
+ }
}
@Test
diff --git a/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/service/mongo/InstanceMutexServiceTest.java b/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/service/mongo/InstanceMutexServiceTest.java
new file mode 100644
index 000000000..83511be7c
--- /dev/null
+++ b/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/service/mongo/InstanceMutexServiceTest.java
@@ -0,0 +1,170 @@
+package tech.ebp.oqm.core.api.service.mongo;
+
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+import jakarta.inject.Inject;
+import lombok.*;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import tech.ebp.oqm.core.api.testResources.lifecycleManagers.TestResourceLifecycleManager;
+import tech.ebp.oqm.core.api.testResources.testClasses.RunningServerTest;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+@Slf4j
+@QuarkusTest
+@QuarkusTestResource(TestResourceLifecycleManager.class)
+public class InstanceMutexServiceTest extends RunningServerTest {
+
+ public static Stream getParams() {
+ return Stream.of(
+ Arguments.of(2, 10, Duration.of(250, ChronoUnit.MILLIS)),
+ Arguments.of(3, 10, Duration.of(250, ChronoUnit.MILLIS)),
+ Arguments.of(5, 10, Duration.of(250, ChronoUnit.MILLIS)),
+ Arguments.of(10, 10, Duration.of(250, ChronoUnit.MILLIS)),
+ Arguments.of(20, 20, Duration.of(150, ChronoUnit.MILLIS))
+ );
+ }
+
+ @Inject
+ InstanceMutexService instanceMutexService;
+
+ @Test
+ public void basicTest() {
+ String mutexId = "testMutex";
+ this.instanceMutexService.register(mutexId);
+
+ assertTrue(this.instanceMutexService.lock(mutexId));
+
+ assertFalse(this.instanceMutexService.lock(mutexId));
+
+ this.instanceMutexService.free(mutexId);
+
+ assertTrue(this.instanceMutexService.lock(mutexId));
+ this.instanceMutexService.free(mutexId);
+ }
+
+
+ @ParameterizedTest
+ @MethodSource("getParams")
+ public void threadTest(int numThreads, int numIterations, Duration workDuration) throws InterruptedException, ExecutionException {
+ String mutexId = "testMutex2";
+ List>> futures = new ArrayList<>(numThreads);
+ SortedSet results = new TreeSet<>();
+ ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
+
+ TestThread.Builder threadBuilder = TestThread.builder()
+ .mutexId(mutexId)
+ .numIterations(numIterations)
+ .durationOfWork(workDuration)
+ .instanceMutexService(instanceMutexService);
+
+ for (int i = 1; i <= numThreads; i++) {
+ threadBuilder.threadId("testThread-" + i);
+
+ futures.add(executor.submit(threadBuilder.build()));
+ }
+ executor.shutdown();
+ while (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
+ log.info("Still waiting on threads...");
+ }
+
+ for (Future> future : futures) {
+ results.addAll(future.get());
+ }
+
+ assertEquals(numIterations * numThreads, results.size());
+
+ //TODO:: check results
+ log.info("Results: {}", results);
+
+ Iterator iterator = results.iterator();
+ ThreadResult cur = iterator.next();
+ while (iterator.hasNext()) {
+ ThreadResult next = iterator.next();
+
+ assertTrue(
+ next.getStart().isAfter(cur.getStart()),
+ "result " + cur + " start overlaps with the next result " + next + " (next start is before cur start)"
+ );
+ assertTrue(
+ (next.getStart().isAfter(cur.getEnd()) || next.getStart().equals(cur.getEnd())),
+ "result " + cur + " overlaps with the next result " + next + " (next start is before cur end)"
+ );
+
+ cur = next;
+ }
+
+ }
+
+ @Builder
+ @Data
+ @AllArgsConstructor
+ static
+ class ThreadResult implements Comparable {
+ private String threadId;
+ private LocalDateTime start;
+ private LocalDateTime end;
+
+ @Override
+ public int compareTo(@NonNull InstanceMutexServiceTest.ThreadResult threadResult) {
+ return this.getStart().compareTo(threadResult.getStart());
+ }
+ }
+
+ @Builder
+ @Slf4j
+ @AllArgsConstructor
+ static class TestThread implements Callable> {
+
+ private String mutexId;
+ private String threadId;
+ private InstanceMutexService instanceMutexService;
+ private int numIterations;
+ private Duration durationOfWork;
+
+ @SneakyThrows
+ @Override
+ public List call() {
+ log.info("Running test thread {}", this.threadId);
+
+ this.instanceMutexService.register(this.mutexId);
+
+// Thread.sleep(500);
+
+ List results = new ArrayList<>(this.numIterations);
+ for (int i = 1; i <= this.numIterations; i++) {
+ log.info("Thread {} waiting for lock on iteration {}", this.threadId, i);
+ while (!instanceMutexService.lock(this.mutexId, Optional.of(this.threadId))) {
+ Thread.sleep(50);
+ }
+ log.info("Thread {} got lock on iteration {}/{}", this.threadId, i, this.numIterations);
+ ThreadResult.Builder resultBuilder = ThreadResult.builder()
+ .threadId(this.threadId)
+ .start(LocalDateTime.now());
+
+ Thread.sleep(this.durationOfWork);
+
+ resultBuilder.end(LocalDateTime.now());
+
+ this.instanceMutexService.free(this.mutexId, Optional.of(this.threadId));
+ log.info("Thread {} done doing work & released lock on iteration {}", this.threadId, i);
+ results.add(resultBuilder.build());
+ }
+ log.info("DONE running test thread {}", this.threadId);
+ return results;
+ }
+ }
+
+}
diff --git a/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/service/mongo/MongoHistoriedObjectServiceTest.java b/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/service/mongo/MongoHistoriedObjectServiceTest.java
index 3d6c83d69..60f4ce8d3 100644
--- a/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/service/mongo/MongoHistoriedObjectServiceTest.java
+++ b/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/service/mongo/MongoHistoriedObjectServiceTest.java
@@ -11,12 +11,11 @@
import org.bson.types.ObjectId;
import org.junit.jupiter.api.Test;
import tech.ebp.oqm.core.api.model.object.ObjectUtils;
-import tech.ebp.oqm.core.api.service.mongo.InteractingEntityService;
+import tech.ebp.oqm.core.api.service.notification.EventNotificationWrapper;
import tech.ebp.oqm.core.api.service.notification.HistoryEventNotificationService;
import tech.ebp.oqm.core.api.service.serviceState.db.OqmDatabaseService;
import tech.ebp.oqm.core.api.testResources.data.TestMainObject;
import tech.ebp.oqm.core.api.testResources.data.TestMongoHistoriedService;
-import tech.ebp.oqm.core.api.testResources.data.TestUserService;
import tech.ebp.oqm.core.api.testResources.lifecycleManagers.TestResourceLifecycleManager;
import tech.ebp.oqm.core.api.testResources.testClasses.RunningServerTest;
import tech.ebp.oqm.core.api.model.object.history.ObjectHistoryEvent;
@@ -73,24 +72,28 @@ public void testAdd() throws JsonProcessingException {
assertEquals(objectId, createEvent.getObjectId());
assertNotNull(createEvent.getEntity());
assertEquals(testUser.getId(), createEvent.getEntity());
-
+
+
ConsumerTask createFromAll = this.kafkaCompanion.consumeStrings().fromTopics(
- this.oqmDatabaseService.getDatabaseCache().getFromName(DEFAULT_TEST_DB_NAME).get().getDbId().toHexString() + "-" + HistoryEventNotificationService.ALL_EVENT_TOPIC,
+ HistoryEventNotificationService.ALL_EVENT_TOPIC,
1
);
createFromAll.awaitCompletion();
assertEquals(1, createFromAll.count());
- CreateEvent createEventFromMessage = ObjectUtils.OBJECT_MAPPER.readValue(createFromAll.getFirstRecord().value(), CreateEvent.class);
- assertEquals(createEvent, createEventFromMessage);
-
- ConsumerTask createFromCreate = this.kafkaCompanion.consumeStrings().fromTopics(
- this.oqmDatabaseService.getDatabaseCache().getFromName(DEFAULT_TEST_DB_NAME).get().getDbId().toHexString() + "-" + HistoryEventNotificationService.ALL_EVENT_TOPIC,
- 1
- );
- createFromCreate.awaitCompletion();
- assertEquals(1, createFromCreate.count());
- createEventFromMessage = ObjectUtils.OBJECT_MAPPER.readValue(createFromCreate.getFirstRecord().value(), CreateEvent.class);
- assertEquals(createEvent, createEventFromMessage);
+ EventNotificationWrapper createEventFromMessage = ObjectUtils.OBJECT_MAPPER.readValue(createFromAll.getFirstRecord().value(), EventNotificationWrapper.class);
+ assertEquals(createEvent, createEventFromMessage.getEvent());
+
+ // TODO: more when we want to
+// ConsumerTask createFromAllInDb = this.kafkaCompanion.consumeStrings().fromTopics(
+// HistoryEventNotificationService.TOPIC_PREPEND + this.oqmDatabaseService.getDatabaseCache().getFromName(DEFAULT_TEST_DB_NAME).get().getDbId().toHexString() + "-" + HistoryEventNotificationService.ALL_EVENT_TOPIC_LABEL,
+// 1
+// );
+// createFromAllInDb.awaitCompletion();
+// assertEquals(1, createFromAllInDb.count());
+// createEventFromMessage = ObjectUtils.OBJECT_MAPPER.readValue(createFromAllInDb.getFirstRecord().value(), EventNotificationWrapper.class);
+// assertEquals(createEvent, createEventFromMessage.getEvent());
+
+ //TODO:: cover last type?
}
//TODO:: test rest
}
\ No newline at end of file
diff --git a/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/service/mongo/exception/DbDeleteRelationalExceptionTest.java b/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/service/mongo/exception/DbDeleteRelationalExceptionTest.java
index eaf29e416..73f0d6df2 100644
--- a/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/service/mongo/exception/DbDeleteRelationalExceptionTest.java
+++ b/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/service/mongo/exception/DbDeleteRelationalExceptionTest.java
@@ -26,7 +26,12 @@ public void testMessage(){
FAKER.name().name(), new TreeSet<>(List.of(ObjectId.get()))
);
- DbDeleteRelationalException e = new DbDeleteRelationalException(new MainObject(objectId){}, references);
+ DbDeleteRelationalException e = new DbDeleteRelationalException(new MainObject(objectId){
+ @Override
+ public int getSchemaVersion() {
+ return 1;
+ }
+ }, references);
log.info("Error message: {}", e.getMessage());
diff --git a/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/testResources/data/TestMainObject.java b/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/testResources/data/TestMainObject.java
index 4609e3c31..7eb7e1030 100644
--- a/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/testResources/data/TestMainObject.java
+++ b/software/oqm-core-api/src/test/java/tech/ebp/oqm/core/api/testResources/data/TestMainObject.java
@@ -43,7 +43,12 @@ public TestMainObject(String testField, double floatValue){
this.setTestField(testField);
this.setFloatValue(floatValue);
}
-
+
+ @Override
+ public int getSchemaVersion() {
+ return 1;
+ }
+
// public TestMainObject(String testField, BigInteger bigIntValue){
// this.setTestField(testField);
// this.setBigIntValue(bigIntValue);
diff --git a/software/oqm-core-base-station/build.gradle b/software/oqm-core-base-station/build.gradle
index fe7b7505e..852b27d70 100644
--- a/software/oqm-core-base-station/build.gradle
+++ b/software/oqm-core-base-station/build.gradle
@@ -1,7 +1,7 @@
plugins {
id 'java'
id 'io.quarkus'
- id "io.freefair.lombok" version "8.6"
+ id "io.freefair.lombok" version "8.7.1"
}
group 'com.ebp.openQuarterMaster'
@@ -34,7 +34,7 @@ dependencies {
implementation 'org.apache.commons:commons-text:1.12.0'
implementation group: 'org.jsoup', name: 'jsoup', version: '1.18.1'
- implementation 'uk.org.okapibarcode:okapibarcode:0.4.6'
+ implementation 'uk.org.okapibarcode:okapibarcode:0.4.7'
implementation 'com.itextpdf:html2pdf:5.0.5'
//webjars
@@ -46,7 +46,7 @@ dependencies {
testImplementation 'io.rest-assured:rest-assured'
testImplementation 'net.datafaker:datafaker:2.3.1'
- testImplementation 'com.microsoft.playwright:playwright:1.45.1'
+ testImplementation 'com.microsoft.playwright:playwright:1.46.0'
testImplementation 'com.deque.html.axe-core:playwright:4.9.1'
}
diff --git a/software/oqm-core-base-station/installerSrc/oqm-core-base_station.service b/software/oqm-core-base-station/installerSrc/oqm-core-base_station.service
index a9d559d54..dd6f8953e 100644
--- a/software/oqm-core-base-station/installerSrc/oqm-core-base_station.service
+++ b/software/oqm-core-base-station/installerSrc/oqm-core-base_station.service
@@ -53,6 +53,7 @@ ExecStart=/bin/bash -c "/usr/bin/docker run --rm --name $CONTAINER_NAME \
-p $(oqm-config -g core.baseStation.httpPort):80 \
-v /etc/oqm/serviceConfig/core/base+station/files:/etc/oqm/serviceConfig/core/base+station/files:ro \
--env-file /tmp/oqm/serviceConfig/core/base+station/base-station-config.list \
+ --env-file /tmp/oqm/serviceConfig/core/base+station/user-config.list \
--add-host $(oqm-config -g system.hostname):host-gateway \
$IMAGE_NAME:$IMAGE_VERSION"
# Ensure is up
diff --git a/software/oqm-core-base-station/src/main/resources/META-INF/resources/res/js/item/extItemSearch.js b/software/oqm-core-base-station/src/main/resources/META-INF/resources/res/js/item/extItemSearch.js
index d376bcefd..3d59398ee 100644
--- a/software/oqm-core-base-station/src/main/resources/META-INF/resources/res/js/item/extItemSearch.js
+++ b/software/oqm-core-base-station/src/main/resources/META-INF/resources/res/js/item/extItemSearch.js
@@ -116,23 +116,33 @@ const ExtItemSearch = {
method: "GET",
failMessagesDiv: ExtItemSearch.extItemSearchSearchFormMessages,
done: async function (data) {
+ console.log("Got search result.");
let imageId;
let imageName = resultUnifiedName;
if (!data.length) {
- console.log("No results for given source. Adding.")
+ console.log("No results for given source. Adding.");
//TODO:: use image add form to add image, come back to this?
-
let saveImageFail = false;
+ let filename = new URL(imageUrl).pathname;
+ filename = filename.substring(filename.lastIndexOf('/') + 1);
+ if(filename.includes(".")){
+ filename = filename.split('.').slice(0, -1).join('.')
+ }
+ filename += "."+imageData.split(';')[0].split('/')[1];
+
+ let addData = new FormData();
+ addData.append("fileName", filename);
+ addData.append("description", "");
+ addData.append("source", imageUrl);
+ addData.append("file", await (await (await fetch(imageData)).blob()));
+
await Rest.call({
async: false,
url: Rest.passRoot + "/media/image",
method: "POST",
- data: {
- title: resultUnifiedName,
- source: imageUrl,
- imageData: imageData
- },
+ data: addData,
+ dataType: false,
failMessagesDiv: ExtItemSearch.extItemSearchSearchFormMessages,
fail: function () {
saveImageFail = true;
@@ -214,7 +224,7 @@ const ExtItemSearch = {
let newCarImage = newCarImageDir.find("img");
newCarImage.prop("src", imageData);
- let useButton = $('');
+ let useButton = $('');
useButton.on("click", function () {
ExtItemSearch.addOrGetAndSelectImage(curImageLoc, result.unifiedName, imageData);
});
diff --git a/software/oqm-core-base-station/src/main/resources/application.yml b/software/oqm-core-base-station/src/main/resources/application.yml
index 300c93803..69d09ca59 100644
--- a/software/oqm-core-base-station/src/main/resources/application.yml
+++ b/software/oqm-core-base-station/src/main/resources/application.yml
@@ -101,7 +101,8 @@ quarkus:
body:
preallocate-body-buffer: true
limits:
- max-body-size: 500M
+ max-body-size: 750M
+ max-form-attribute-size: 750M
auth:
proactive: false
permission:
@@ -122,7 +123,6 @@ quarkus:
post-logout-path: /?message=You%20have%20successfully%20logged%20out&messageType=success&messageHeading=Logged%20Out
oqmCoreApi:
devservice:
- coreApiVersion: 2.1.0-DEV
certPath: ../../../../dev/devTest-cert-cert.pem
certKeyPath: ../../../../dev/devTest-cert-key.pem
diff --git a/software/oqm-core-base-station/src/main/resources/templates/webui/mainWebPageTemplate.html b/software/oqm-core-base-station/src/main/resources/templates/webui/mainWebPageTemplate.html
index 56c0500b9..7a77b63da 100644
--- a/software/oqm-core-base-station/src/main/resources/templates/webui/mainWebPageTemplate.html
+++ b/software/oqm-core-base-station/src/main/resources/templates/webui/mainWebPageTemplate.html
@@ -58,7 +58,7 @@