Source code for farm.data_handler.samples

from transformers.models.bert.tokenization_bert import whitespace_tokenize
from farm.visual.ascii.images import SAMPLE
import numpy as np

import logging

logger = logging.getLogger(__name__)


[docs]class SampleBasket: """ An object that contains one source text and the one or more samples that will be processed. This is needed for tasks like question answering where the source text can generate multiple input - label pairs."""
[docs] def __init__(self, id_internal: str, raw: dict, id_external=None, samples=None): """ :param id: A unique identifying id. Used for identification within FARM. :type id: str :param external_id: Used for identification outside of FARM. E.g. if another framework wants to pass along its own id with the results. :type external_id: str :param raw: Contains the various data needed to form a sample. It is ideally in human readable form. :type raw: dict :param samples: An optional list of Samples used to populate the basket at initialization. :type samples: Sample """ self.id_internal = id_internal self.id_external = id_external self.raw = raw self.samples = samples
[docs]class Sample(object): """A single training/test sample. This should contain the input and the label. Is initialized with the human readable clear_text. Over the course of data preprocessing, this object is populated with tokenized and featurized versions of the data."""
[docs] def __init__(self, id, clear_text, tokenized=None, features=None): """ :param id: The unique id of the sample :type id: str :param clear_text: A dictionary containing various human readable fields (e.g. text, label). :type clear_text: dict :param tokenized: A dictionary containing the tokenized version of clear text plus helpful meta data: offsets (start position of each token in the original text) and start_of_word (boolean if a token is the first one of a word). :type tokenized: dict :param features: A dictionary containing features in a vectorized format needed by the model to process this sample. :type features: dict """ self.id = id self.clear_text = clear_text self.features = features self.tokenized = tokenized
def __str__(self): if self.clear_text: clear_text_str = "\n \t".join( [k + ": " + str(v) for k, v in self.clear_text.items()] ) if len(clear_text_str) > 10000: clear_text_str = clear_text_str[:10_000] + f"\nTHE REST IS TOO LONG TO DISPLAY. " \ f"Remaining chars :{len(clear_text_str)-10_000}" else: clear_text_str = "None" if self.features: if isinstance(self.features, list): features = self.features[0] else: features = self.features feature_str = "\n \t".join([k + ": " + str(v) for k, v in features.items()]) else: feature_str = "None" if self.tokenized: tokenized_str = "\n \t".join( [k + ": " + str(v) for k, v in self.tokenized.items()] ) if len(tokenized_str) > 10000: tokenized_str = tokenized_str[:10_000] + f"\nTHE REST IS TOO LONG TO DISPLAY. " \ f"Remaining chars: {len(tokenized_str)-10_000}" else: tokenized_str = "None" s = ( f"\n{SAMPLE}\n" f"ID: {self.id}\n" f"Clear Text: \n \t{clear_text_str}\n" f"Tokenized: \n \t{tokenized_str}\n" f"Features: \n \t{feature_str}\n" "_____________________________________________________" ) return s
[docs]def create_sample_one_label_one_text(raw_data, text_index, label_index, basket_id): # text = " ".join(raw_data[text_index:]) text = raw_data[text_index] label = raw_data[label_index] return [Sample(id=basket_id + "-0", clear_text={"text": text, "label": label})]
[docs]def create_sample_ner(split_text, label, basket_id): text = " ".join(split_text) label = label return [Sample(id=basket_id + "-0", clear_text={"text": text, "label": label})]
[docs]def process_answers(answers, doc_offsets, passage_start_c, passage_start_t): """TODO Write Comment""" answers_clear = [] answers_tokenized = [] for answer in answers: # This section calculates start and end relative to document answer_text = answer["text"] answer_len_c = len(answer_text) if "offset" in answer: answer_start_c = answer["offset"] else: answer_start_c = answer["answer_start"] answer_end_c = answer_start_c + answer_len_c - 1 answer_start_t = offset_to_token_idx_vecorized(doc_offsets, answer_start_c) answer_end_t = offset_to_token_idx_vecorized(doc_offsets, answer_end_c) # # Leaving this code for potentially debugging 'offset_to_token_idx_vecorized()' # answer_start_t2 = offset_to_token_idx(doc_offsets, answer_start_c) # answer_end_t2 = offset_to_token_idx(doc_offsets, answer_end_c) # if (answer_start_t != answer_start_t2) or (answer_end_t != answer_end_t2): # pass # TODO: Perform check that answer can be recovered from document? # This section converts start and end so that they are relative to the passage # TODO: Is this actually necessary on character level? answer_start_c -= passage_start_c answer_end_c -= passage_start_c answer_start_t -= passage_start_t answer_end_t -= passage_start_t curr_answer_clear = {"text": answer_text, "start_c": answer_start_c, "end_c": answer_end_c} curr_answer_tokenized = {"start_t": answer_start_t, "end_t": answer_end_t, "answer_type": answer.get("answer_type","span")} answers_clear.append(curr_answer_clear) answers_tokenized.append(curr_answer_tokenized) return answers_clear, answers_tokenized
[docs]def get_passage_offsets(doc_offsets, doc_stride, passage_len_t, doc_text): """ Get spans (start and end offsets) for passages by applying a sliding window function. The sliding window moves in steps of doc_stride. Returns a list of dictionaries which each describe the start, end and id of a passage that is formed when chunking a document using a sliding window approach. """ passage_spans = [] passage_id = 0 doc_len_t = len(doc_offsets) while True: passage_start_t = passage_id * doc_stride passage_end_t = passage_start_t + passage_len_t passage_start_c = doc_offsets[passage_start_t] # If passage_end_t points to the last token in the passage, define passage_end_c as the length of the document if passage_end_t >= doc_len_t - 1: passage_end_c = len(doc_text) # Get document text up to the first token that is outside the passage. Strip of whitespace. # Use the length of this text as the passage_end_c else: end_ch_idx = doc_offsets[passage_end_t + 1] raw_passage_text = doc_text[:end_ch_idx] passage_end_c = len(raw_passage_text.strip()) passage_span = {"passage_start_t": passage_start_t, "passage_end_t": passage_end_t, "passage_start_c": passage_start_c, "passage_end_c": passage_end_c, "passage_id": passage_id} passage_spans.append(passage_span) passage_id += 1 # If the end idx is greater than or equal to the length of the passage if passage_end_t >= doc_len_t: break return passage_spans
[docs]def offset_to_token_idx(token_offsets, ch_idx): """ Returns the idx of the token at the given character idx""" n_tokens = len(token_offsets) for i in range(n_tokens): if (i + 1 == n_tokens) or (token_offsets[i] <= ch_idx < token_offsets[i + 1]): return i
[docs]def offset_to_token_idx_vecorized(token_offsets, ch_idx): """ Returns the idx of the token at the given character idx""" ################ ################ ################## # TODO CHECK THIS fct thoroughly - This must be bulletproof and inlcude start and end of sequence checks # todo Possibly this function does not work for Natural Questions and needs adjustments ################ ################ ################## # case ch_idx is at end of tokens if ch_idx >= np.max(token_offsets): # TODO check "+ 1" (it is needed for making end indices compliant with old offset_to_token_idx() function) # check weather end token is incluse or exclusive idx = np.argmax(token_offsets) + 1 # looking for the first occurence of token_offsets larger than ch_idx and taking one position to the left. # This is needed to overcome n special_tokens at start of sequence # and failsafe matching (the character start might not always coincide with a token offset, e.g. when starting at whitespace) else: idx = np.argmax(token_offsets > ch_idx) - 1 return idx