Source code for easy_tpp.preprocess.event_tokenizer

import copy
from collections import UserDict
from typing import Optional, Union, Dict, Any, List, Mapping

import numpy as np

from easy_tpp.utils import is_torch_available, is_tf_available, logger, TruncationStrategy, PaddingStrategy, \
    TensorType, is_torch_device, requires_backends, is_numpy_array, py_assert


class BatchEncoding(UserDict):
    """
    Holds the output of the [`~event_tokenizer.EventTokenizer.__call__`],
    [`~event_tokenizer.EventTokenizer.encode_plus`] methods (tokens, attention_masks, etc).

    This class is derived from a python dictionary and can be used as a dictionary.

    Args:
        data (`dict`):
            Dictionary of lists/arrays/tensors returned by the `__call__`/`encode_plus`/`batch_encode_plus` methods
            ('input_ids', 'attention_mask', etc.).
        tensor_type (`Union[None, str, TensorType]`, *optional*):
            You can give a tensor_type here to convert the lists of integers in PyTorch/TensorFlow/Numpy Tensors at
            initialization.
        prepend_batch_axis (`bool`, *optional*, defaults to `False`):
            Whether or not to add a batch axis when converting to tensors (see `tensor_type` above).
        n_sequences (`Optional[int]`, *optional*):
            You can give a tensor_type here to convert the lists of integers in PyTorch/TensorFlow/Numpy Tensors at
            initialization.
    """

    def __init__(
            self,
            data: Optional[Dict[str, Any]] = None,
            tensor_type: Union[None, str, TensorType] = None,
            prepend_batch_axis: bool = False
    ):
        super().__init__(data)

        self.convert_to_tensors(tensor_type=tensor_type, prepend_batch_axis=prepend_batch_axis)

    def keys(self):
        return self.data.keys()

    def values(self):
        return list(self.data.values())

    def items(self):
        return self.data.items()

    def convert_to_tensors(
            self, tensor_type: Optional[Union[str, TensorType]] = None, prepend_batch_axis: bool = False
    ):
        """
        Convert the inner content to tensors.

        Args:
            tensor_type (`str` or [`~utils.TensorType`], *optional*):
                The type of tensors to use. If `str`, should be one of the values of the enum [`~utils.TensorType`]. If
                `None`, no modification is done.
            prepend_batch_axis (`int`, *optional*, defaults to `False`):
                Whether or not to add the batch dimension during the conversion.
        """
        if tensor_type is None:
            return self

        # Convert to TensorType
        if not isinstance(tensor_type, TensorType):
            tensor_type = TensorType(tensor_type)

        # Get a function reference for the correct framework
        if tensor_type == TensorType.TENSORFLOW:
            if not is_tf_available():
                raise ImportError(
                    "Unable to convert output to TensorFlow tensors format, TensorFlow is not installed."
                )
            import tensorflow as tf

            as_tensor = tf.constant
            is_tensor = tf.is_tensor
        elif tensor_type == TensorType.PYTORCH:
            if not is_torch_available():
                raise ImportError("Unable to convert output to PyTorch tensors format, PyTorch is not installed.")
            import torch

            as_tensor = torch.tensor
            is_tensor = torch.is_tensor
        else:
            as_tensor = np.asarray
            is_tensor = is_numpy_array

        # Do the tensor conversion in batch
        for key, value in self.items():
            try:
                if prepend_batch_axis:
                    value = [value]

                if not is_tensor(value):
                    tensor = as_tensor(value)

                    self[key] = tensor
            except Exception as e:
                if key == "overflowing_tokens":
                    raise ValueError(
                        "Unable to create tensor returning overflowing tokens of different lengths. "
                        "Please see if a fast version of this tokenizer is available to have this feature available."
                    ) from e
                raise ValueError(
                    "Unable to create tensor, you should probably activate truncation and/or padding with"
                    " 'padding=True' 'truncation=True' to have batched tensors with the same length. Perhaps your"
                    f" features (`{key}` in this case) have excessive nesting (inputs type `list` where type `int` is"
                    " expected)."
                ) from e

        return self

    def to(self, device: Union[str, "torch.device"]) -> "BatchEncoding":
        """
        Send all values to device by calling `v.to(device)` (PyTorch only).

        Args:
            device (`str` or `torch.device`): The device to put the tensors on.

        Returns:
            [`BatchEncoding`]: The same instance after modification.
        """
        requires_backends(self, ["torch"])

        # This check catches things like APEX blindly calling "to" on all inputs to a module
        # Otherwise it passes the casts down and casts the LongTensor containing the token idxs
        # into a HalfTensor
        if isinstance(device, str) or is_torch_device(device) or isinstance(device, int):
            self.data = {k: v.to(device=device) for k, v in self.data.items()}
        else:
            logger.warning(f"Attempting to cast a BatchEncoding to type {str(device)}. This is not supported.")
        return self


[docs]class EventTokenizer: """ Base class for tokenizer event sequences, vendored from huggingface/transformer """ padding_side: str = "right" truncation_side: str = "right" model_input_names: List[str] = ["time_seqs", "time_delta_seqs", "type_seqs", "seq_non_pad_mask", "attention_mask", "type_mask"]
[docs] def __init__(self, config): config = copy.deepcopy(config) self.num_event_types = config.num_event_types self.pad_token_id = config.pad_token_id self.model_max_length = config.max_len self.padding_strategy = config.padding_strategy self.truncation_strategy = config.truncation_strategy # Padding and truncation side are right by default and overridden in subclasses. If specified in the kwargs, it # is changed. self.padding_side = config.pop("padding_side", self.padding_side) self.truncation_side = config.pop("truncation_side", self.truncation_side) self.model_input_names = config.pop("model_input_names", self.model_input_names)
def _get_padding_truncation_strategies( self, padding=False, truncation=None, max_length=None, verbose=False, **kwargs ): padding_strategy, truncation_strategy = None, None # If you only set max_length, it activates truncation for max_length if max_length is not None and padding is False and truncation is None: if verbose: logger.warning( "Truncation was not explicitly activated but `max_length` is provided a specific value, please" " use `truncation=True` to explicitly truncate examples to max length. Defaulting to" " 'longest_first' truncation strategy" ) truncation = "longest_first" # Get padding strategy if padding is False: if max_length is None: padding_strategy = PaddingStrategy.LONGEST else: padding_strategy = PaddingStrategy.MAX_LENGTH elif padding is not False: if padding is True: if verbose: if max_length is not None and ( truncation is None or truncation is False or truncation == "do_not_truncate" ): logger.warn( "`max_length` is ignored when `padding`=`True` and there is no truncation strategy. " "To pad to max length, use `padding='max_length'`." ) padding_strategy = PaddingStrategy.LONGEST # Default to pad to the longest sequence in the batch elif not isinstance(padding, PaddingStrategy): padding_strategy = PaddingStrategy(padding) elif isinstance(padding, PaddingStrategy): padding_strategy = padding else: padding_strategy = PaddingStrategy.DO_NOT_PAD # Get truncation strategy if truncation is not None and truncation is not False: if truncation is True: truncation_strategy = ( TruncationStrategy.LONGEST_FIRST ) # Default to truncate the longest sequences in pairs of inputs elif not isinstance(truncation, TruncationStrategy): truncation_strategy = TruncationStrategy(truncation) elif isinstance(truncation, TruncationStrategy): truncation_strategy = truncation else: truncation_strategy = TruncationStrategy.DO_NOT_TRUNCATE # Set max length if needed if max_length is None: if padding_strategy == PaddingStrategy.MAX_LENGTH: max_length = self.model_max_length if truncation_strategy != TruncationStrategy.DO_NOT_TRUNCATE: max_length = self.model_max_length # Test if we have a padding token if padding_strategy != PaddingStrategy.DO_NOT_PAD and (not self.pad_token_id): raise ValueError( "Asking to pad but the tokenizer does not have a padding token. " "Please select a token to use as `pad_token` `(tokenizer.pad_token = tokenizer.eos_token e.g.)` " "or add a new pad token via `tokenizer.add_special_tokens({'pad_token': '[PAD]'})`." ) return padding_strategy, truncation_strategy, max_length, kwargs def _truncate(self, encoded_inputs: Union[Dict[str, Any], Dict[str, List]], truncation_strategy: TruncationStrategy, truncation_side: str, max_length: Optional[int] = None): if truncation_strategy != TruncationStrategy.DO_NOT_TRUNCATE: py_assert(max_length is not None, ValueError, 'must pass max_length when truncation is activated!') for k, v in encoded_inputs.items(): seq_ = [seq[:max_length] for seq in v] if truncation_side == 'right' \ else [seq[-max_length:] for seq in v] encoded_inputs[k] = seq_ return encoded_inputs
[docs] def pad( self, encoded_inputs: Union[ Dict[str, Any], Dict[str, List], ], padding: Union[bool, str, PaddingStrategy] = True, truncation: Union[bool, str, TruncationStrategy] = False, max_length: Optional[int] = None, return_attention_mask: Optional[bool] = None, return_tensors: Optional[Union[str, TensorType]] = None, verbose: bool = False, ) -> BatchEncoding: """ Pad a single encoded input or a batch of encoded inputs up to predefined length or to the max sequence length in the batch. Padding side (left/right) padding token ids are defined at the tokenizer level (with `self.padding_side`, `self.pad_token_id` and `self.pad_token_type_id`). Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding. <Tip> If the `encoded_inputs` passed are dictionary of numpy arrays, PyTorch tensors or TensorFlow tensors, the result will use the same type unless you provide a different tensor type with `return_tensors`. In the case of PyTorch tensors, you will lose the specific device of your tensors however. </Tip> Args: encoded_inputs ([`BatchEncoding`], list of [`BatchEncoding`]: Tokenized inputs. Can represent one input ([`BatchEncoding`] or `Dict[str, List[int]]`) or a batch of tokenized inputs (list of [`BatchEncoding`], *Dict[str, List[List[int]]]* or *List[Dict[str, List[int]]]*) so you can use this method during preprocessing as well as in a PyTorch Dataloader collate function. Instead of `List[int]` you can have tensors (numpy arrays, PyTorch tensors or TensorFlow tensors), see the note above for the return type. padding (`bool`, `str` or [`~utils.PaddingStrategy`], *optional*, defaults to `True`): Select a strategy to pad the returned sequences (according to the model's padding side and padding index) among: - `True` or `'longest'`: Pad to the longest sequence in the batch (or no padding if only a single sequence if provided). - `'max_length'`: Pad to a maximum length specified with the argument `max_length` or to the maximum acceptable input length for the model if that argument is not provided. - `False` or `'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of different lengths). max_length (`int`, *optional*): Maximum length of the returned list and optionally padding length (see above). return_attention_mask (`bool`, *optional*): Whether to return the attention mask. If left to the default, will return the attention mask according to the specific tokenizer's default, defined by the `return_outputs` attribute. return_tensors (`str` or [`~utils.TensorType`], *optional*): If set, will return tensors instead of list of python integers. Acceptable values are: - `'tf'`: Return TensorFlow `tf.constant` objects. - `'pt'`: Return PyTorch `torch.Tensor` objects. - `'np'`: Return Numpy `np.ndarray` objects. verbose (`bool`, *optional*, defaults to `True`): Whether or not to print more information and warnings. """ # If we have a list of dicts, let's convert it in a dict of lists # We do this to allow using this method as a collate_fn function in PyTorch Dataloader if isinstance(encoded_inputs, (list, tuple)) and isinstance(encoded_inputs[0], Mapping): encoded_inputs = {key: [example[key] for example in encoded_inputs] for key in encoded_inputs[0].keys()} # The model's main input name, usually `time_seqs`, has be passed for padding if self.model_input_names[0] not in encoded_inputs: raise ValueError( "You should supply an encoding or a list of encodings to this method " f"that includes {self.model_input_names[0]}, but you provided {list(encoded_inputs.keys())}" ) required_input = encoded_inputs[self.model_input_names[0]] padding_strategy, truncation_strategy, max_length, _ = self._get_padding_truncation_strategies( padding=padding, max_length=max_length, truncation=truncation, verbose=verbose ) encoded_inputs = self._truncate(encoded_inputs, truncation_strategy=truncation_strategy, max_length=max_length, truncation_side=self.truncation_side) batch_size = len(required_input) assert all( len(v) == batch_size for v in encoded_inputs.values() ), "Some items in the output dictionary have a different batch size than others." if padding_strategy == PaddingStrategy.LONGEST: max_length = max(len(inputs) for inputs in required_input) padding_strategy = PaddingStrategy.MAX_LENGTH batch_output = self._pad( encoded_inputs, max_length=max_length, padding_strategy=padding_strategy, return_attention_mask=return_attention_mask, ) return BatchEncoding(batch_output, tensor_type=return_tensors)
def _pad( self, encoded_inputs: Union[Dict[str, Any], BatchEncoding], max_length: Optional[int] = None, padding_strategy: PaddingStrategy = PaddingStrategy.DO_NOT_PAD, return_attention_mask: Optional[bool] = None, ) -> dict: """ Pad encoded inputs (on left/right and up to predefined length or max length in the batch) Args: encoded_inputs: Dictionary of tokenized inputs (`List[int]`) or batch of tokenized inputs (`List[List[int]]`). max_length: maximum length of the returned list and optionally padding length (see below). Will truncate by taking into account the special tokens. padding_strategy: PaddingStrategy to use for padding. - PaddingStrategy.LONGEST Pad to the longest sequence in the batch - PaddingStrategy.MAX_LENGTH: Pad to the max length (default) - PaddingStrategy.DO_NOT_PAD: Do not pad The tokenizer padding sides are defined in self.padding_side: - 'left': pads on the left of the sequences - 'right': pads on the right of the sequences pad_to_multiple_of: (optional) Integer if set will pad the sequence to a multiple of the provided value. This is especially useful to enable the use of Tensor Core on NVIDIA hardware with compute capability `>= 7.5` (Volta). return_attention_mask: (optional) Set to False to avoid returning attention mask (default: set to model specifics) """ # Load from model defaults if return_attention_mask is None: return_attention_mask = "attention_mask" in self.model_input_names required_input = encoded_inputs[self.model_input_names[0]] if padding_strategy == PaddingStrategy.LONGEST: max_length = len(required_input) # check whether we need to pad it is_all_seq_equal_max_length = [len(seq) == max_length for seq in required_input] is_all_seq_equal_max_length = np.prod(is_all_seq_equal_max_length) needs_to_be_padded = padding_strategy != PaddingStrategy.DO_NOT_PAD and ~is_all_seq_equal_max_length batch_output = dict() if needs_to_be_padded: # time seqs batch_output[self.model_input_names[0]] = self.make_pad_sequence(encoded_inputs[self.model_input_names[0]], self.pad_token_id, padding_side=self.padding_side, max_len=max_length) # time_delta seqs batch_output[self.model_input_names[1]] = self.make_pad_sequence(encoded_inputs[self.model_input_names[1]], self.pad_token_id, padding_side=self.padding_side, max_len=max_length) # type_seqs batch_output[self.model_input_names[2]] = self.make_pad_sequence(encoded_inputs[self.model_input_names[2]], self.pad_token_id, padding_side=self.padding_side, max_len=max_length, dtype=np.int32) else: batch_output = encoded_inputs # non_pad_mask # we must use type seqs to check the mask, because the pad_token_id maybe one of valid values in # time seqs seq_pad_mask = batch_output[self.model_input_names[2]] == self.pad_token_id batch_output[self.model_input_names[3]] = ~ seq_pad_mask if return_attention_mask: # attention_mask batch_output[self.model_input_names[4]] = self.make_attn_mask_for_pad_sequence( batch_output[self.model_input_names[2]], self.pad_token_id) else: batch_output[self.model_input_names[4]] = [] # type_mask batch_output[self.model_input_names[5]] = self.make_type_mask_for_pad_sequence( batch_output[self.model_input_names[2]]) return batch_output
[docs] @staticmethod def make_pad_sequence(seqs, pad_token_id, padding_side, max_len, dtype=np.float32, group_by_event_types=False): """Pad the sequence batch-wise. Args: seqs (list): list of sequences with variational length pad_token_id (int, float): optional, a value that used to pad the sequences. If None, then the pad index is set to be the event_num_with_pad max_len (int): optional, the maximum length of the sequence after padding. If None, then the length is set to be the max length of all input sequences. pad_at_end (bool): optional, whether to pad the sequnce at the end. If False, the sequence is pad at the beginning Returns: a numpy array of padded sequence Example: ```python seqs = [[0, 1], [3, 4, 5]] pad_sequence(seqs, 100) >>> [[0, 1, 100], [3, 4, 5]] pad_sequence(seqs, 100, max_len=5) >>> [[0, 1, 100, 100, 100], [3, 4, 5, 100, 100]] ``` """ if not group_by_event_types: if padding_side == "right": pad_seq = np.array([seq + [pad_token_id] * (max_len - len(seq)) for seq in seqs], dtype=dtype) else: pad_seq = np.array([[pad_token_id] * (max_len - len(seq)) + seq for seq in seqs], dtype=dtype) else: pad_seq = [] for seq in seqs: if padding_side == "right": pad_seq.append(np.array([s + [pad_token_id] * (max_len - len(s)) for s in seq], dtype=dtype)) else: pad_seq.append(np.array([[pad_token_id] * (max_len - len(s)) + s for s in seqs], dtype=dtype)) pad_seq = np.array(pad_seq) return pad_seq
[docs] def make_attn_mask_for_pad_sequence(self, pad_seqs, pad_token_id): """Make the attention masks for the sequence. Args: pad_seqs (tensor): list of sequences that have been padded with fixed length pad_token_id (int): optional, a value that used to pad the sequences. If None, then the pad index is set to be the event_num_with_pad Returns: np.array: a bool matrix of the same size of input, denoting the masks of the sequence (True: non mask, False: mask) Example: ```python seqs = [[ 1, 6, 0, 7, 12, 12], [ 1, 0, 5, 1, 10, 9]] make_attn_mask_for_pad_sequence(seqs, pad_index=12) >>> batch_non_pad_mask ([[ True, True, True, True, False, False], [ True, True, True, True, True, True]]) attention_mask [[[ True True True True True True] [False True True True True True] [False False True True True True] [False False False True True True] [False False False False True True] [False False False False True True]] [[ True True True True True True] [False True True True True True] [False False True True True True] [False False False True True True] [False False False False True True] [False False False False False True]]] ``` """ seq_num, seq_len = pad_seqs.shape # [batch_size, seq_len] seq_pad_mask = pad_seqs == pad_token_id # [batch_size, seq_len, seq_len] attention_key_pad_mask = np.tile(seq_pad_mask[:, None, :], (1, seq_len, 1)) subsequent_mask = np.tile(np.triu(np.ones((seq_len, seq_len), dtype=bool), k=0)[None, :, :], (seq_num, 1, 1)) attention_mask = subsequent_mask | attention_key_pad_mask return attention_mask
[docs] def make_type_mask_for_pad_sequence(self, pad_seqs): """Make the type mask. Args: pad_seqs (tensor): a list of sequence events with equal length (i.e., padded sequence) Returns: np.array: a 3-dim matrix, where the last dim (one-hot vector) indicates the type of event """ type_mask = np.zeros([*pad_seqs.shape, self.num_event_types], dtype=np.int32) for i in range(self.num_event_types): type_mask[:, :, i] = pad_seqs == i return type_mask