Commit 7742ca79 authored by Raul Sirel's avatar Raul Sirel
Browse files

add mlp interface for docparser

parent b96449e0
Pipeline #4483 canceled with stages
in 7 minutes and 51 seconds
import logging
logging.basicConfig(
format='%(levelname)s %(asctime)s: %(message)s',
datefmt='%d.%m.%Y %H:%M:%S',
level=logging.INFO
)
class MLPDocParser:
"""
A wrapper class to apply MLP to DocParser documents.
"""
def __init__(self, mlp):
self.mlp = mlp
def apply_mlp(self, generator, analyzers=["all"]):
""" Applies MLP to objects in a given generator.
"""
for item in generator:
# check if email (it returns a tuple because of attachments)
if isinstance(item, tuple):
email, attachments = item
self._apply_mlp_to_mails(email, attachments, analyzers)
else:
self._apply_mlp_to_item(item, analyzers)
yield item
def _apply_mlp(self, document: dict, field: str, analyzers: list):
if (field not in document):
return
content = document.get(field, "")
if content:
mlp_res = self.mlp.process(content, analyzers=analyzers)
mlp_res_path = field + "_mlp"
# Add the MLP output dictionary.
document[mlp_res_path] = mlp_res["text"]
facts = []
for f in mlp_res["texta_facts"]:
f["doc_path"] = f"{mlp_res_path}.text"
facts.append(f)
if facts:
document["texta_facts"] = facts
def _apply_mlp_to_mails(self, email: dict, attachments: list, analyzers: list):
self._apply_mlp(email, "body", analyzers)
for attachment in attachments:
self._apply_mlp(attachment, "content", analyzers)
def _apply_mlp_to_item(self, item: dict, analyzers: list):
# apply it to all fields as we don't know anything about the item or it's fields
item_copy = item.copy()
for key in item_copy.keys():
self._apply_mlp(item, key, analyzers)
......@@ -3,7 +3,7 @@ nodaemon=true
user=root
[program:mlp]
command=celery -A taskman worker --concurrency=%(ENV_TEXTA_MLP_TASK_WORKERS)s -l info
command=celery -A taskman worker --concurrency=%(ENV_TEXTA_MLP_TASK_WORKERS)s -l info -Q mlp_queue
directory=/var/texta-mlp
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
......
......@@ -13,6 +13,11 @@ from settings import (
MLP_WORKER_RESULT_BACKEND
)
import logging
from texta_mlp.mlp_processor import MLPProcessor
# Create Celery app with proper conf
app = Celery("worker")
app.conf.broker_url = MLP_WORKER_BROKER
......@@ -29,20 +34,40 @@ ml_processor: Optional[MLP] = None
def load_mlp():
global ml_processor
logging.info("Start loading MLP")
if ml_processor is None:
logging.info("Start loading MLP models.")
ml_processor = MLP(
language_codes=MLP_WORKER_LANGUAGE_CODES,
default_language_code=MLP_WORKER_DEFAULT_LANGUAGE_CODE,
resource_dir=MLP_WORKER_RESOURCE_DIR
)
logging.info("Successfully loaded MLP")
logging.info("Successfully loaded MLP models.")
#@shared_task
#def mlp(document: list, field: list, analyzers: list):
# try:
# load_mlp()
# processed = ml_processor.process_docs(docs=document, doc_paths=field, analyzers=analyzers)
# return processed
# except Exception as e:
# return e
@shared_task
def mlp(document: list, field: list, analyzers: list):
try:
load_mlp()
processed = ml_processor.process_docs(docs=document, doc_paths=field, analyzers=analyzers)
return processed
except Exception as e:
return e
def mlp_docparser(task_results: dict):
"""
Adds MLP analyses to DocParser documents.
"""
logging.getLogger(INFO_LOGGER).info("Start MLP processing using mlp_docparser task.")
load_mlp()
document_list = task_results.get("document_list", [])
files_to_remove = task_results.get("files_to_remove", [])
file_hash = task_results.get("file_hash", "")
# mlp processor
mlp_analyzer = MLPProcessor(ml_processor)
# apply MLP
document_generator = mlp_analyzer.apply_mlp(document_list, analyzers=["lemmas", "ner", "transliteration", "pos_tags", "entities", "addresses", "emails", "phone_strict"])
document_list = list(document_generator)
logging.getLogger(INFO_LOGGER).info("Sucessfully processed with MLP using mlp_docparser task.")
return {"document_list": document_list, "meta": {"files_to_remove": files_to_remove, "hash": file_hash}}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment