Skip to content
This repository was archived by the owner on Sep 3, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 23 additions & 30 deletions src/dispatch/document/scheduled.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
from sqlalchemy import func

from dispatch.database.core import SessionLocal
from dispatch.nlp import build_phrase_matcher, build_term_vocab, extract_terms_from_text
from dispatch.decorators import scheduled_project_task
from dispatch.project.models import Project
from dispatch.nlp import build_phrase_matcher, build_term_vocab, extract_terms_from_text
from dispatch.plugin import service as plugin_service
from dispatch.project.models import Project
from dispatch.scheduler import scheduler
from dispatch.term.models import Term
from dispatch.term import service as term_service
from dispatch.term.models import Term

from .service import get_all

Expand All @@ -21,46 +21,39 @@
@scheduled_project_task
def sync_document_terms(db_session: SessionLocal, project: Project):
"""Performs term extraction from known documents."""
p = plugin_service.get_active_instance(
plugin = plugin_service.get_active_instance(
db_session=db_session, plugin_type="storage", project_id=project.id
)

if not p:
log.debug("Tried to sync document terms but couldn't find any active storage plugins.")
if not plugin:
log.warn(f"Document terms not synced. No storage plugin enabled in {project.name} project.")
return

terms = term_service.get_all(db_session=db_session, project_id=project.id).all()
log.debug(f"Fetched {len(terms)} terms from database.")

term_strings = [t.text.lower() for t in terms if t.discoverable]
phrases = build_term_vocab(term_strings)
matcher = build_phrase_matcher("dispatch-term", phrases)

documents = get_all(db_session=db_session)
for doc in documents:
log.debug(f"Processing document. Name: {doc.name}")
for document in documents:
mime_type = "text/plain"
if "sheet" in document.resource_type:
mime_type = "text/csv"

try:
if "sheet" in doc.resource_type:
mime_type = "text/csv"
else:
mime_type = "text/plain"

doc_text = p.instance.get(doc.resource_id, mime_type)
extracted_terms = list(set(extract_terms_from_text(doc_text, matcher)))

matched_terms = (
db_session.query(Term)
.filter(func.upper(Term.text).in_([func.upper(t) for t in extracted_terms]))
.all()
)
document_text = plugin.instance.get(document.resource_id, mime_type)
except Exception as e:
log.warn(e)
continue

log.debug(f"Extracted the following terms from {doc.weblink}. Terms: {extracted_terms}")
extracted_terms = list(set(extract_terms_from_text(document_text, matcher)))

if matched_terms:
doc.terms = matched_terms
db_session.commit()
matched_terms = (
db_session.query(Term)
.filter(func.upper(Term.text).in_([func.upper(t) for t in extracted_terms]))
.all()
)

except Exception as e:
# even if one document fails we don't want them to all fail
log.exception(e)
if matched_terms:
document.terms = matched_terms
db_session.commit()
2 changes: 1 addition & 1 deletion src/dispatch/plugins/dispatch_google/drive/drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def download_google_document(client: Any, file_id: str, mime_type: str = "text/p
_, response = downloader.next_chunk()
return fp.getvalue().decode("utf-8")
except (HttpError, OSError):
# Do no retry. Log the error fail.
# Do no retry and raise exception
raise Exception(f"Failed to export the file. Id: {file_id} MimeType: {mime_type}") from None


Expand Down
8 changes: 4 additions & 4 deletions src/dispatch/signal/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,18 @@ def create_signal_instance(
signal_instance_in.signal = signal

if not signal:
msg = f"No signal definition found. Id: {external_id} Variant: {variant}"
log.error(msg)
msg = f"No signal definition found. External Id: {external_id} Variant: {variant}"
log.warn(msg)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=[{"msg": msg}],
) from None

if not signal.enabled:
msg = f"Signal definition not enabled. SignalName: {signal.name}"
msg = f"Signal definition not enabled. Signal Name: {signal.name}"
log.info(msg)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
status_code=status.HTTP_403_FORBIDDEN,
detail=[{"msg": msg}],
) from None

Expand Down
28 changes: 18 additions & 10 deletions src/dispatch/workflow/scheduled.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,35 @@
import logging

from schedule import every
from dispatch.database.core import SessionLocal
from sqlalchemy.orm import Session

from dispatch.decorators import scheduled_project_task
from dispatch.messaging.strings import (
INCIDENT_WORKFLOW_COMPLETE_NOTIFICATION,
INCIDENT_WORKFLOW_UPDATE_NOTIFICATION,
)
from dispatch.plugin import service as plugin_service
from dispatch.plugin.models import PluginInstance
from dispatch.project.models import Project
from dispatch.scheduler import scheduler
from dispatch.workflow import service as workflow_service

from .enums import WorkflowInstanceStatus
from .flows import send_workflow_notification
from .models import WorkflowInstanceUpdate

from .models import WorkflowInstance, WorkflowInstanceUpdate

log = logging.getLogger(__name__)

WORKFLOW_SYNC_INTERVAL = 30 # seconds


def sync_workflow(db_session, project, workflow_plugin, instance, notify: bool = False):
def sync_workflow(
db_session: Session,
project: Project,
workflow_plugin: PluginInstance,
instance: WorkflowInstance,
notify: bool = False,
):
"""Performs workflow sync."""
log.debug(
f"Processing workflow instance. Instance: {instance.parameters} Workflow: {instance.workflow.name}"
Expand Down Expand Up @@ -88,18 +94,20 @@ def sync_workflow(db_session, project, workflow_plugin, instance, notify: bool =

@scheduler.add(every(WORKFLOW_SYNC_INTERVAL).seconds, name="workflow-sync")
@scheduled_project_task
def sync_all_workflows(db_session: SessionLocal, project: Project):
"""Syncs incident workflows."""
def sync_all_workflows(db_session: Session, project: Project):
"""Syncs all incident workflows."""
workflow_plugin = plugin_service.get_active_instance(
db_session=db_session, project_id=project.id, plugin_type="workflow"
)

if not workflow_plugin:
log.warning(
f"No workflow plugin is enabled. Project: {project.name}. Organization: {project.organization.name}"
f"Workflows not synced. No workflow plugin enabled in {project.name} project and {project.organization.name} organization."
)
return

instances = workflow_service.get_running_instances(db_session=db_session)
for i in instances:
sync_workflow(db_session, project, workflow_plugin, i)
workflow_instances = workflow_service.get_running_instances(
db_session=db_session, project=project
)
for instance in workflow_instances:
sync_workflow(db_session, project, workflow_plugin, instance)
16 changes: 10 additions & 6 deletions src/dispatch/workflow/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@

from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import true

from pydantic.error_wrappers import ErrorWrapper, ValidationError

from dispatch.case import service as case_service
from dispatch.config import DISPATCH_UI_URL
from dispatch.document import service as document_service
from dispatch.exceptions import NotFoundError
from dispatch.project import service as project_service
from dispatch.plugin import service as plugin_service
from dispatch.incident import service as incident_service
from dispatch.case import service as case_service
from dispatch.signal import service as signal_service
from dispatch.participant import service as participant_service
from dispatch.document import service as document_service
from dispatch.plugin import service as plugin_service
from dispatch.project import service as project_service
from dispatch.project.models.py import Project
from dispatch.signal import service as signal_service
from dispatch.workflow.enums import WorkflowInstanceStatus

from .models import (
Expand Down Expand Up @@ -119,10 +122,11 @@ def get_instance(*, db_session, instance_id: int) -> WorkflowInstance:
)


def get_running_instances(*, db_session) -> List[WorkflowInstance]:
def get_running_instances(*, db_session, project: Project) -> List[WorkflowInstance]:
"""Fetches all running instances."""
return (
db_session.query(WorkflowInstance)
.filter(WorkflowInstance.workflow.project.id == project.id)
.filter(
WorkflowInstance.status.in_(
(
Expand Down