Commit f28545be authored by Marko Kollo's avatar Marko Kollo 😄
Browse files

Added some initial batching.

parent ab7f52da
Pipeline #6246 failed with stage
in 4 minutes and 17 seconds
......@@ -162,12 +162,12 @@ def test_removal_of_duplicate_facts(mlp: MLP):
def test_processing_docs_with_missing_docpath(mlp: MLP):
result = mlp.process_docs(docs=[{"text": {"presidents": "Barack Obama"}}], doc_paths=["text.presidents.people"])
assert result == [{'text': {'presidents': 'Barack Obama'}, 'texta_facts': []}]
assert result == [{'text': {'presidents': 'Barack Obama'}}]
def test_processing_docs_with_missing_list_value(mlp: MLP):
result = mlp.process_docs(docs=[{"text": {"presidents": ["Barack Obama"]}}], doc_paths=["text.presidents.people"])
assert result == [{'text': {'presidents': ['Barack Obama']}, 'texta_facts': []}]
assert result == [{'text': {'presidents': ['Barack Obama']}}]
def test_processing_docs_with_correct_docpath(mlp: MLP):
......@@ -192,7 +192,7 @@ def test_processing_docs_with_list_value(mlp: MLP):
def test_processing_docs_with_none_value(mlp: MLP):
result = mlp.process_docs(docs=[{"text": {"presidents": None}}], doc_paths=["text.presidents"])
assert result == [{'text': {'presidents': None}, 'texta_facts': []}]
assert result == [{'text': {'presidents': None}}]
def test_processing_documents_with_multiple_doc_paths(mlp: MLP):
......
......@@ -78,13 +78,14 @@ class Document:
self.__texta_facts: List[Fact] = []
self.__handle_existing_facts()
if self.stanza_document:
self.words()
@property
def stanza_sentences(self):
if not self.__stanza_sentences:
if not self.__stanza_sentences and self.stanza_document:
for sentence in self.stanza_document.sentences:
self.__stanza_sentences.append(sentence)
return self.__stanza_sentences
......@@ -92,7 +93,7 @@ class Document:
@property
def stanza_words(self):
if not self.__stanza_words:
if not self.__stanza_words and self.stanza_document:
for sentence in self.__stanza_sentences:
for word in sentence.words:
self.__stanza_words.append(word)
......@@ -122,7 +123,7 @@ class Document:
Add existing texta_facts inside the document into the private
fact container variable so that they wouldn't be overwritten.
"""
if self.json_doc and "texta_facts" in self.json_doc:
if self.json_doc:
existing_facts = self.json_doc.get("texta_facts", [])
facts = Fact.from_json(existing_facts)
for fact in facts:
......@@ -174,6 +175,29 @@ class Document:
return wrapper.convert()
@staticmethod
def parse_doc(doc_path: str, document: dict) -> list:
"""
Function for parsing text values from a nested dictionary given a field path.
:param doc_path: Dot separated path of fields to the value we wish to parse.
:param document: Document to be worked on.
:return: List of text fields that will be processed by MLP.
"""
wrapper = PelicanJson(document)
doc_path_as_list = doc_path.split(".")
content = wrapper.safe_get_nested_value(doc_path_as_list, default=[])
if content and isinstance(content, str):
return [content]
# Check that content is non-empty list and there are only stings in the list.
elif content and isinstance(content, list) and all([isinstance(list_content, str) for list_content in content]):
return content
# In case the field path is faulty and it gives you a dictionary instead.
elif isinstance(content, dict):
return []
else:
return []
def document_to_json(self, use_default_doc_path=True) -> dict:
"""
:param use_default_doc_path: Normal string values will be given the default path for facts but for dictionary input you already have them.
......
......@@ -195,8 +195,10 @@ class MLP:
return lang
def generate_document(self, raw_text: str, analyzers: List[str], json_object: dict = None, doc_paths="text", lang=None):
def generate_document(self, raw_text: str, analyzers: List[str], json_object: dict = None, doc_paths="text", lang=None, stanza_document=None):
processed_text = MLP.normalize_input_text(raw_text)
e = ""
# detect language
if not lang:
lang = self.detect_language(processed_text)
......@@ -204,13 +206,21 @@ class MLP:
check if detected language is supported if the language is not supported it will use default_lang to load
stanza models yet keep the document lang as the detected language
'''
# Resolve the language.
if lang not in self.supported_langs:
analysis_lang = self.default_lang
document, e = self._get_stanza_document(analysis_lang, processed_text) if processed_text else (None, "")
else:
analysis_lang = lang
# Use the pre-given document if it exists, otherwise calculate on own.
if processed_text and stanza_document is None:
document, e = self._get_stanza_document(analysis_lang, processed_text) if processed_text else (None, "")
elif stanza_document and processed_text:
document = stanza_document
else:
document = None
# Create the overall wrapper.
document = Document(
original_text=processed_text,
dominant_language_code=lang,
......@@ -323,28 +333,6 @@ class MLP:
return document["text"]["lemmas"]
def parse_doc_texts(self, doc_path: str, document: dict) -> list:
"""
Function for parsing text values from a nested dictionary given a field path.
:param doc_path: Dot separated path of fields to the value we wish to parse.
:param document: Document to be worked on.
:return: List of text fields that will be processed by MLP.
"""
wrapper = PelicanJson(document)
doc_path_as_list = doc_path.split(".")
content = wrapper.safe_get_nested_value(doc_path_as_list, default=[])
if content and isinstance(content, str):
return [content]
# Check that content is non-empty list and there are only stings in the list.
elif content and isinstance(content, list) and all([isinstance(list_content, str) for list_content in content]):
return content
# In case the field path is faulty and it gives you a dictionary instead.
elif isinstance(content, dict):
return []
else:
return []
def __apply_analyzer(self, doc, analyzer):
try:
getattr(doc, analyzer)()
......@@ -360,15 +348,47 @@ class MLP:
:return: List of dictionaries where the mlp information is stored inside texta_facts and the last field of the doc_path in the format {doc_path}_mlp.
"""
# Container for keeping the tuples of the doc and meta pairs.
container = []
for document in docs:
for doc_path in doc_paths:
analyzers = self._load_analyzers(analyzers, SUPPORTED_ANALYZERS)
for doc_path in doc_paths:
lang_group = {}
texts = [Document.parse_doc(doc_path, document) for document in docs]
for index, text in enumerate(texts):
text = text[0] if text and len(text) == 1 else text
lang = self.detect_language(text)
if lang not in self.supported_langs:
lang = self.default_lang
if lang and lang not in lang_group:
lang_group[lang] = [{"index": index, "text": text}]
elif lang in lang_group:
lang_group[lang].append({"index": index, "text": text})
intermediary = []
for lang, items in lang_group.items():
pipeline = self.get_stanza_pipeline(lang)
# Create the batch of Stanza Documents to feed into the pipeline.
documents = []
for item in items:
text = item.get("text", "")
text = text if text else ""
documents.append(stanza.Document([], text=text))
# Analyze the batch.
results = pipeline(documents)
for index, result in enumerate(results):
actual_index = items[index]["index"]
# Tie together the original document and it's location in the list for replacement and the relevant Stanza document.
intermediary.insert(actual_index, ({"actual_doc": docs[actual_index], "actual_index": actual_index, "lang": lang}, result))
for meta_info, stanza_document in intermediary:
# Traverse the (possible) nested dicts and extract their text values from it as a list of strings.
# Since the nested doc_path could lead to a list there are multiple pieces of text which would be needed to process.
doc_texts = self.parse_doc_texts(doc_path, document)
actual_document = meta_info["actual_doc"]
actual_index = meta_info["actual_index"]
lang = meta_info["lang"]
doc_texts = Document.parse_doc(doc_path, actual_document)
for raw_text in doc_texts:
analyzers = self._load_analyzers(analyzers, SUPPORTED_ANALYZERS)
doc = self.generate_document(raw_text, analyzers=analyzers, json_object=document, doc_paths=doc_path, )
doc = self.generate_document(raw_text, analyzers=analyzers, json_object=actual_document, lang=lang, stanza_document=stanza_document, doc_paths=doc_path, )
if doc:
for analyzer in analyzers:
# For every analyzer, activate the function that processes it from the
......@@ -376,22 +396,9 @@ class MLP:
self.__apply_analyzer(doc, analyzer)
result = doc.document_to_json(use_default_doc_path=False)
new_facts = result.pop("texta_facts", [])
existing_facts = document.get("texta_facts", [])
unique_facts = Document.remove_duplicate_facts(new_facts + existing_facts)
result["texta_facts"] = unique_facts
document = result
if document:
# Add in texta_facts even if nothing was done due to missing values.
facts = document.get("texta_facts", [])
document["texta_facts"] = facts
container.append(document)
else:
# Add in at least something to avoid problems with operations that include indexing.
container.append({})
return container
docs[actual_index] = result
return docs
@staticmethod
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment