From 2438431e161c9ed3947b1511d614601bc70db0ed Mon Sep 17 00:00:00 2001 From: Marc Vilanova Date: Tue, 2 May 2023 12:58:50 -0700 Subject: [PATCH 1/2] Downgrades error levels and improves some logic --- src/dispatch/document/scheduled.py | 53 ++++++++----------- .../plugins/dispatch_google/drive/drive.py | 4 +- src/dispatch/signal/views.py | 8 +-- src/dispatch/workflow/scheduled.py | 28 ++++++---- src/dispatch/workflow/service.py | 16 +++--- 5 files changed, 57 insertions(+), 52 deletions(-) diff --git a/src/dispatch/document/scheduled.py b/src/dispatch/document/scheduled.py index c20d0346d88d..d27544804da7 100644 --- a/src/dispatch/document/scheduled.py +++ b/src/dispatch/document/scheduled.py @@ -4,13 +4,13 @@ from sqlalchemy import func from dispatch.database.core import SessionLocal -from dispatch.nlp import build_phrase_matcher, build_term_vocab, extract_terms_from_text from dispatch.decorators import scheduled_project_task -from dispatch.project.models import Project +from dispatch.nlp import build_phrase_matcher, build_term_vocab, extract_terms_from_text from dispatch.plugin import service as plugin_service +from dispatch.project.models import Project from dispatch.scheduler import scheduler -from dispatch.term.models import Term from dispatch.term import service as term_service +from dispatch.term.models import Term from .service import get_all @@ -21,46 +21,39 @@ @scheduled_project_task def sync_document_terms(db_session: SessionLocal, project: Project): """Performs term extraction from known documents.""" - p = plugin_service.get_active_instance( + plugin = plugin_service.get_active_instance( db_session=db_session, plugin_type="storage", project_id=project.id ) - if not p: - log.debug("Tried to sync document terms but couldn't find any active storage plugins.") + if not plugin: + log.warn(f"Document terms not synced. No storage plugin enabled in {project.name} project.") return terms = term_service.get_all(db_session=db_session, project_id=project.id).all() - log.debug(f"Fetched {len(terms)} terms from database.") - term_strings = [t.text.lower() for t in terms if t.discoverable] phrases = build_term_vocab(term_strings) matcher = build_phrase_matcher("dispatch-term", phrases) documents = get_all(db_session=db_session) - for doc in documents: - log.debug(f"Processing document. Name: {doc.name}") + for document in documents: + mime_type = "text/plain" + if "sheet" in document.resource_type: + mime_type = "text/csv" try: - if "sheet" in doc.resource_type: - mime_type = "text/csv" - else: - mime_type = "text/plain" - - doc_text = p.instance.get(doc.resource_id, mime_type) - extracted_terms = list(set(extract_terms_from_text(doc_text, matcher))) - - matched_terms = ( - db_session.query(Term) - .filter(func.upper(Term.text).in_([func.upper(t) for t in extracted_terms])) - .all() - ) + document_text = plugin.instance.get(document.resource_id, mime_type) + except Exception as e: + log.warn(e) + continue - log.debug(f"Extracted the following terms from {doc.weblink}. Terms: {extracted_terms}") + extracted_terms = list(set(extract_terms_from_text(document_text, matcher))) - if matched_terms: - doc.terms = matched_terms - db_session.commit() + matched_terms = ( + db_session.query(Term) + .filter(func.upper(Term.text).in_([func.upper(t) for t in extracted_terms])) + .all() + ) - except Exception as e: - # even if one document fails we don't want them to all fail - log.exception(e) + if matched_terms: + document.terms = matched_terms + db_session.commit() diff --git a/src/dispatch/plugins/dispatch_google/drive/drive.py b/src/dispatch/plugins/dispatch_google/drive/drive.py index 4396878595b0..97ea85cb8d4c 100644 --- a/src/dispatch/plugins/dispatch_google/drive/drive.py +++ b/src/dispatch/plugins/dispatch_google/drive/drive.py @@ -155,8 +155,8 @@ def download_google_document(client: Any, file_id: str, mime_type: str = "text/p _, response = downloader.next_chunk() return fp.getvalue().decode("utf-8") except (HttpError, OSError): - # Do no retry. Log the error fail. - raise Exception(f"Failed to export the file. Id: {file_id} MimeType: {mime_type}") from None + # Do no retry and raise exception + raise Exception("Failed to export the file. Id: {file_id} MimeType: {mime_type}") from None def create_file( diff --git a/src/dispatch/signal/views.py b/src/dispatch/signal/views.py index 6318f55f82fe..dd022fd84a87 100644 --- a/src/dispatch/signal/views.py +++ b/src/dispatch/signal/views.py @@ -77,18 +77,18 @@ def create_signal_instance( signal_instance_in.signal = signal if not signal: - msg = f"No signal definition found. Id: {external_id} Variant: {variant}" - log.error(msg) + msg = f"No signal definition found. External Id: {external_id} Variant: {variant}" + log.warn(msg) raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=[{"msg": msg}], ) from None if not signal.enabled: - msg = f"Signal definition not enabled. SignalName: {signal.name}" + msg = f"Signal definition not enabled. Signal Name: {signal.name}" log.info(msg) raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status_code=status.HTTP_403_FORBIDDEN, detail=[{"msg": msg}], ) from None diff --git a/src/dispatch/workflow/scheduled.py b/src/dispatch/workflow/scheduled.py index 8ab1f25b1285..bab3b2bf4662 100644 --- a/src/dispatch/workflow/scheduled.py +++ b/src/dispatch/workflow/scheduled.py @@ -1,7 +1,7 @@ import logging from schedule import every -from dispatch.database.core import SessionLocal +from sqlalchemy.orm import Session from dispatch.decorators import scheduled_project_task from dispatch.messaging.strings import ( @@ -9,21 +9,27 @@ INCIDENT_WORKFLOW_UPDATE_NOTIFICATION, ) from dispatch.plugin import service as plugin_service +from dispatch.plugin.models import PluginInstance from dispatch.project.models import Project from dispatch.scheduler import scheduler from dispatch.workflow import service as workflow_service from .enums import WorkflowInstanceStatus from .flows import send_workflow_notification -from .models import WorkflowInstanceUpdate - +from .models import WorkflowInstance, WorkflowInstanceUpdate log = logging.getLogger(__name__) WORKFLOW_SYNC_INTERVAL = 30 # seconds -def sync_workflow(db_session, project, workflow_plugin, instance, notify: bool = False): +def sync_workflow( + db_session: Session, + project: Project, + workflow_plugin: PluginInstance, + instance: WorkflowInstance, + notify: bool = False, +): """Performs workflow sync.""" log.debug( f"Processing workflow instance. Instance: {instance.parameters} Workflow: {instance.workflow.name}" @@ -88,18 +94,20 @@ def sync_workflow(db_session, project, workflow_plugin, instance, notify: bool = @scheduler.add(every(WORKFLOW_SYNC_INTERVAL).seconds, name="workflow-sync") @scheduled_project_task -def sync_all_workflows(db_session: SessionLocal, project: Project): - """Syncs incident workflows.""" +def sync_all_workflows(db_session: Session, project: Project): + """Syncs all incident workflows.""" workflow_plugin = plugin_service.get_active_instance( db_session=db_session, project_id=project.id, plugin_type="workflow" ) if not workflow_plugin: log.warning( - f"No workflow plugin is enabled. Project: {project.name}. Organization: {project.organization.name}" + f"Workflows not synced. No workflow plugin enabled in {project.name} project and {project.organization.name} organization." ) return - instances = workflow_service.get_running_instances(db_session=db_session) - for i in instances: - sync_workflow(db_session, project, workflow_plugin, i) + workflow_instances = workflow_service.get_running_instances( + db_session=db_session, project=project + ) + for instance in workflow_instances: + sync_workflow(db_session, project, workflow_plugin, instance) diff --git a/src/dispatch/workflow/service.py b/src/dispatch/workflow/service.py index 241216f7142e..86315adbdf56 100644 --- a/src/dispatch/workflow/service.py +++ b/src/dispatch/workflow/service.py @@ -2,16 +2,19 @@ from sqlalchemy.orm import Session from sqlalchemy.sql.expression import true + from pydantic.error_wrappers import ErrorWrapper, ValidationError + +from dispatch.case import service as case_service from dispatch.config import DISPATCH_UI_URL +from dispatch.document import service as document_service from dispatch.exceptions import NotFoundError -from dispatch.project import service as project_service -from dispatch.plugin import service as plugin_service from dispatch.incident import service as incident_service -from dispatch.case import service as case_service -from dispatch.signal import service as signal_service from dispatch.participant import service as participant_service -from dispatch.document import service as document_service +from dispatch.plugin import service as plugin_service +from dispatch.project import service as project_service +from dispatch.project.models.py import Project +from dispatch.signal import service as signal_service from dispatch.workflow.enums import WorkflowInstanceStatus from .models import ( @@ -119,10 +122,11 @@ def get_instance(*, db_session, instance_id: int) -> WorkflowInstance: ) -def get_running_instances(*, db_session) -> List[WorkflowInstance]: +def get_running_instances(*, db_session, project: Project) -> List[WorkflowInstance]: """Fetches all running instances.""" return ( db_session.query(WorkflowInstance) + .filter(WorkflowInstance.workflow.project.id == project.id) .filter( WorkflowInstance.status.in_( ( From 846fe190d290c7bbfd946c7ef156088e0de55f9f Mon Sep 17 00:00:00 2001 From: Marc Vilanova <39573146+mvilanova@users.noreply.github.com> Date: Tue, 2 May 2023 13:29:57 -0700 Subject: [PATCH 2/2] Update src/dispatch/plugins/dispatch_google/drive/drive.py Co-authored-by: Will Sheldon <114631109+wssheldon@users.noreply.github.com> --- src/dispatch/plugins/dispatch_google/drive/drive.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dispatch/plugins/dispatch_google/drive/drive.py b/src/dispatch/plugins/dispatch_google/drive/drive.py index 97ea85cb8d4c..1647e0309f27 100644 --- a/src/dispatch/plugins/dispatch_google/drive/drive.py +++ b/src/dispatch/plugins/dispatch_google/drive/drive.py @@ -156,7 +156,7 @@ def download_google_document(client: Any, file_id: str, mime_type: str = "text/p return fp.getvalue().decode("utf-8") except (HttpError, OSError): # Do no retry and raise exception - raise Exception("Failed to export the file. Id: {file_id} MimeType: {mime_type}") from None + raise Exception(f"Failed to export the file. Id: {file_id} MimeType: {mime_type}") from None def create_file(