Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backend] now also send event_id for events sent to worker coming from stream (#9824) #9791

Merged
merged 7 commits into from
Feb 4, 2025
8 changes: 6 additions & 2 deletions opencti-platform/opencti-graphql/src/database/middleware.js
Original file line number Diff line number Diff line change
Expand Up @@ -1870,6 +1870,10 @@
};
};

const computeDateFromEventId = (context) => {
return utcDate(parseInt(context.eventId.split('-')[0], 10)).toISOString();
};

Check warning on line 1875 in opencti-platform/opencti-graphql/src/database/middleware.js

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/database/middleware.js#L1874-L1875

Added lines #L1874 - L1875 were not covered by tests

export const updateAttributeMetaResolved = async (context, user, initial, inputs, opts = {}) => {
const { locks = [], impactStandardId = true } = opts;
const updates = Array.isArray(inputs) ? inputs : [inputs];
Expand Down Expand Up @@ -2143,7 +2147,7 @@
const uniqImpactKey = uniqImpactKeys[i];
attributesMap.set(uniqImpactKey, {
name: uniqImpactKey,
updated_at: now(),
updated_at: context?.eventId ? computeDateFromEventId(context) : now(),
confidence: confidenceLevelToApply,
user_id: user.internal_id,
});
Expand Down Expand Up @@ -2509,7 +2513,7 @@
const { updated_at: lastAttributeUpdateDate } = attributesMap.get(attributeKey) ?? {};
if (lastAttributeUpdateDate) {
try {
const eventDate = utcDate(parseInt(context.eventId.split('-')[0], 10)).toISOString();
const eventDate = computeDateFromEventId(context);

Check warning on line 2516 in opencti-platform/opencti-graphql/src/database/middleware.js

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/database/middleware.js#L2516

Added line #L2516 was not covered by tests
return utcDate(lastAttributeUpdateDate).isAfter(eventDate);
} catch (e) {
logApp.error('Error evaluating event id', { key: attributeKey, event_id: context.eventId });
Expand Down
5 changes: 3 additions & 2 deletions opencti-platform/opencti-graphql/src/database/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -454,12 +454,13 @@
};
const pushToStream = async (context: AuthContext, user: AuthUser, client: Cluster | Redis, event: BaseEvent, opts: EventOpts = {}) => {
const draftContext = getDraftContext(context, user);
const eventToPush = { ...event, event_id: context.eventId };
if (!draftContext && isStreamPublishable(opts)) {
const pushToStreamFn = async () => {
if (streamTrimming) {
await client.call('XADD', REDIS_STREAM_NAME, 'MAXLEN', '~', streamTrimming, '*', ...mapJSToStream(event));
await client.call('XADD', REDIS_STREAM_NAME, 'MAXLEN', '~', streamTrimming, '*', ...mapJSToStream(eventToPush));
} else {
await client.call('XADD', REDIS_STREAM_NAME, '*', ...mapJSToStream(event));
await client.call('XADD', REDIS_STREAM_NAME, '*', ...mapJSToStream(eventToPush));

Check warning on line 463 in opencti-platform/opencti-graphql/src/database/redis.ts

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/database/redis.ts#L463

Added line #L463 was not covered by tests
}
};
telemetry(context, user, 'INSERT STREAM', {
Expand Down
9 changes: 6 additions & 3 deletions opencti-platform/opencti-graphql/src/graphql/sseMiddleware.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import * as jsonpatch from 'fast-json-patch';
import { Promise } from 'bluebird';
import { LRUCache } from 'lru-cache';
import { now } from 'moment';

Check warning on line 5 in opencti-platform/opencti-graphql/src/graphql/sseMiddleware.js

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/graphql/sseMiddleware.js#L5

Added line #L5 was not covered by tests
import conf, { basePath, logApp } from '../config/conf';
import { AUTH_BEARER, authenticateUserFromRequest, TAXIIAPI } from '../domain/user';
import { createStreamProcessor, EVENT_CURRENT_VERSION } from '../database/redis';
Expand Down Expand Up @@ -33,7 +34,7 @@
KNOWLEDGE_ORGANIZATION_RESTRICT,
SYSTEM_USER
} from '../utils/access';
import { FROM_START_STR, utcDate } from '../utils/format';
import { streamEventId, FROM_START_STR, utcDate } from '../utils/format';

Check warning on line 37 in opencti-platform/opencti-graphql/src/graphql/sseMiddleware.js

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/graphql/sseMiddleware.js#L37

Added line #L37 was not covered by tests
import { stixRefsExtractor } from '../schema/stixEmbeddedRelationship';
import { ABSTRACT_STIX_CORE_RELATIONSHIP, buildRefRelationKey, ENTITY_TYPE_CONTAINER, STIX_TYPE_RELATION, STIX_TYPE_SIGHTING } from '../schema/general';
import { convertStoreToStix } from '../database/stix-converter';
Expand Down Expand Up @@ -577,7 +578,9 @@
for (let index = 0; index < elements.length; index += 1) {
const element = elements[index];
const { id: eventId, event, data: eventData } = element;
const { type, data: stix, version: eventVersion, context: evenContext } = eventData;
const { type, data: stix, version: eventVersion, context: evenContext, event_id } = eventData;
const updateTime = stix.extensions[STIX_EXT_OCTI]?.updated_at ?? now();
eventData.event_id = event_id ?? streamEventId(updateTime, index);

Check warning on line 583 in opencti-platform/opencti-graphql/src/graphql/sseMiddleware.js

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/graphql/sseMiddleware.js#L581-L583

Added lines #L581 - L583 were not covered by tests
const isRelation = stix.type === 'relationship' || stix.type === 'sighting';
// New stream support only v4+ events.
const isCompatibleVersion = parseInt(eventVersion ?? '0', 10) >= 4;
Expand Down Expand Up @@ -672,7 +675,7 @@
const instance = instances[index];
const stixData = convertStoreToStix(instance);
const stixUpdatedAt = stixData.extensions[STIX_EXT_OCTI].updated_at;
const eventId = `${utcDate(stixUpdatedAt).toDate().getTime()}-0`;
const eventId = streamEventId(stixUpdatedAt);

Check warning on line 678 in opencti-platform/opencti-graphql/src/graphql/sseMiddleware.js

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/graphql/sseMiddleware.js#L678

Added line #L678 was not covered by tests
if (channel.connected()) {
// publish missing dependencies if needed
const isValidResolution = await resolveAndPublishDependencies(context, noDependencies, cache, channel, req, eventId, stixData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import { executionContext, RETENTION_MANAGER_USER, SYSTEM_USER } from '../utils/access';
import type { SseEvent, StreamDataEvent } from '../types/event';
import type { StixBundle } from '../types/stix-common';
import { utcDate } from '../utils/format';
import { streamEventId, utcDate } from '../utils/format';
import { findById } from '../modules/playbook/playbook-domain';
import { type CronConfiguration, PLAYBOOK_INTERNAL_DATA_CRON, type StreamConfiguration } from '../modules/playbook/playbook-components';
import { PLAYBOOK_COMPONENTS } from '../modules/playbook/playbook-components';
Expand Down Expand Up @@ -326,7 +326,7 @@
const data = await stixLoadById(context, RETENTION_MANAGER_USER, entityId);
if (data) {
try {
const eventId = `${utcDate().toDate().getTime()}-0`;
const eventId = streamEventId();

Check warning on line 329 in opencti-platform/opencti-graphql/src/manager/playbookManager.ts

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/manager/playbookManager.ts#L329

Added line #L329 was not covered by tests
const nextStep = { component: connector, instance };
const bundle: StixBundle = {
id: uuidv4(),
Expand Down Expand Up @@ -465,7 +465,7 @@
const data = await stixLoadById(context, RETENTION_MANAGER_USER, node.internal_id);
if (data) {
try {
const eventId = `${utcDate().toDate().getTime()}-${index}`;
const eventId = streamEventId(null, index);

Check warning on line 468 in opencti-platform/opencti-graphql/src/manager/playbookManager.ts

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/manager/playbookManager.ts#L468

Added line #L468 was not covered by tests
const nextStep = { component: connector, instance };
const bundle: StixBundle = {
id: uuidv4(),
Expand Down
7 changes: 4 additions & 3 deletions opencti-platform/opencti-graphql/src/manager/syncManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
let lastStateSaveTime;
const handleEvent = (event) => {
const { type, data, lastEventId } = event;
const { data: stixData, context, version } = JSON.parse(data);
const { data: stixData, context, version, event_id } = JSON.parse(data);

Check warning on line 36 in opencti-platform/opencti-graphql/src/manager/syncManager.js

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/manager/syncManager.js#L36

Added line #L36 was not covered by tests
if (version === EVENT_CURRENT_VERSION) {
eventsQueue.enqueue({ id: lastEventId, type, data: stixData, context });
eventsQueue.enqueue({ id: lastEventId, type, data: stixData, context, event_id });

Check warning on line 38 in opencti-platform/opencti-graphql/src/manager/syncManager.js

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/manager/syncManager.js#L38

Added line #L38 was not covered by tests
}
};
const startStreamListening = (sseUri, syncElement) => {
Expand Down Expand Up @@ -137,7 +137,7 @@
if (event) {
try {
currentDelay = await manageBackPressure(httpClient, sync, currentDelay);
const { id: eventId, type: eventType, data, context: eventContext } = event;
const { id: eventId, type: eventType, data, context: eventContext, event_id } = event;

Check warning on line 140 in opencti-platform/opencti-graphql/src/manager/syncManager.js

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/manager/syncManager.js#L140

Added line #L140 was not covered by tests
if (eventType === 'heartbeat') {
await saveCurrentState(context, eventType, sync, eventId);
} else {
Expand All @@ -147,6 +147,7 @@
// Applicant_id should be a userId coming from synchronizer
await pushToWorkerForConnector(sync.internal_id, {
type: 'event',
event_id,

Check warning on line 150 in opencti-platform/opencti-graphql/src/manager/syncManager.js

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/manager/syncManager.js#L150

Added line #L150 was not covered by tests
synchronized,
previous_standard,
update: true,
Expand Down
1 change: 1 addition & 0 deletions opencti-platform/opencti-graphql/src/types/user.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,5 @@ interface AuthContext {
tracing: TracingContext
user: AuthUser | undefined
draft_context?: string | undefined
eventId?: string | undefined
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other fields seem to be snake case, why not this one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and in the other files you changed event_id is snake case

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a field that existed already in AuthContext, it has been declared here for typing (it was only used in js files)

}
2 changes: 2 additions & 0 deletions opencti-platform/opencti-graphql/src/utils/format.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ export const UNTIL_END_STR = '5138-11-16T09:46:40.000Z';
const dateFormat = 'YYYY-MM-DDTHH:mm:ss.SSS';

export const utcDate = (date) => (date ? moment(date).utc() : moment().utc());
export const utcEpochTime = (date = null) => utcDate(date).toDate().getTime();
export const streamEventId = (date = null, index = 0) => `${utcEpochTime(date)}-${index}`;
export const now = () => utcDate().toISOString();
export const nowTime = () => timeFormat(now());
export const sinceNowInMinutes = (lastModified) => {
Expand Down