Source code for easy_tpp.model.torch_model.torch_baselayer

import math

import torch
from torch import nn


def attention(query, key, value, mask=None, dropout=None):
    d_k = query.size(-1)
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
    if mask is not None:
        # small change here -- we use "1" for masked element
        scores = scores.masked_fill(mask > 0, -1e9)
    p_attn = torch.softmax(scores, dim=-1)
    if dropout is not None:
        p_attn = dropout(p_attn)
    return torch.matmul(p_attn, value), p_attn


[docs]class MultiHeadAttention(nn.Module):
[docs] def __init__(self, n_head, d_input, d_model, dropout=0.1, output_linear=False): super(MultiHeadAttention, self).__init__() assert d_model % n_head == 0 self.n_head = n_head self.d_k = d_model // n_head self.d_v = self.d_k self.d_model = d_model self.output_linear = output_linear if output_linear: self.linears = nn.ModuleList( [nn.Linear(d_input, d_model) for _ in range(3)] + [nn.Linear(d_model, d_model), ]) else: self.linears = nn.ModuleList([nn.Linear(d_input, d_model) for _ in range(3)]) self.dropout = nn.Dropout(p=dropout)
[docs] def forward(self, query, key, value, mask, output_weight=False): if mask is not None: mask = mask.unsqueeze(1) nbatches = query.size(0) query, key, value = [ lin_layer(x).view(nbatches, -1, self.n_head, self.d_k).transpose(1, 2) for lin_layer, x in zip(self.linears, (query, key, value)) ] x, attn_weight = attention(query, key, value, mask=mask, dropout=self.dropout) x = x.transpose(1, 2).contiguous() \ .view(nbatches, -1, self.n_head * self.d_k) if self.output_linear: if output_weight: return self.linears[-1](x), attn_weight else: return self.linears[-1](x) else: if output_weight: return x, attn_weight else: return x
[docs]class SublayerConnection(nn.Module): # used for residual connection
[docs] def __init__(self, d_model, dropout): super(SublayerConnection, self).__init__() self.norm = nn.LayerNorm(d_model) self.dropout = nn.Dropout(dropout)
[docs] def forward(self, x, sublayer): return x + self.dropout(sublayer(self.norm(x)))
[docs]class EncoderLayer(nn.Module):
[docs] def __init__(self, d_model, self_attn, feed_forward=None, use_residual=False, dropout=0.1): super(EncoderLayer, self).__init__() self.self_attn = self_attn self.feed_forward = feed_forward self.use_residual = use_residual if use_residual: self.sublayer = nn.ModuleList([SublayerConnection(d_model, dropout) for _ in range(2)]) self.d_model = d_model
[docs] def forward(self, x, mask): if self.use_residual: x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask)) if self.feed_forward is not None: return self.sublayer[1](x, self.feed_forward) else: return x else: return self.self_attn(x, x, x, mask)
[docs]class TimePositionalEncoding(nn.Module): """Temporal encoding in THP, ICML 2020 """
[docs] def __init__(self, d_model, max_len=5000, device='cpu'): super().__init__() pe = torch.zeros(max_len, d_model, device=device).float() position = torch.arange(0, max_len, device=device).float().unsqueeze(1) div_term = (torch.arange(0, d_model, 2, device=device).float() * -(math.log(10000.0) / d_model)).exp() pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) # [1, max_len, d_model] pe = pe.unsqueeze(0) self.register_buffer('pe', pe)
[docs] def forward(self, x): """Compute time positional encoding defined in Equation (2) in THP model. Args: x (tensor): time_seqs, [batch_size, seq_len] Returns: temporal encoding vector, [batch_size, seq_len, model_dim] """ length = x.size(1) return self.pe[:, :length]
[docs]class TimeShiftedPositionalEncoding(nn.Module): """Time shifted positional encoding in SAHP, ICML 2020 """
[docs] def __init__(self, d_model, max_len=5000, device='cpu'): super().__init__() # [max_len, 1] position = torch.arange(0, max_len, device=device).float().unsqueeze(1) # [model_dim //2 ] div_term = (torch.arange(0, d_model, 2, device=device).float() * -(math.log(10000.0) / d_model)).exp() self.layer_time_delta = nn.Linear(1, d_model // 2, bias=False) self.register_buffer('position', position) self.register_buffer('div_term', div_term)
[docs] def forward(self, x, interval): """ Args: x: time_seq, [batch_size, seq_len] interval: time_delta_seq, [batch_size, seq_len] Returns: Time shifted positional encoding defined in Equation (8) in SAHP model """ phi = self.layer_time_delta(interval.unsqueeze(-1)) aa = len(x.size()) if aa > 1: length = x.size(1) else: length = x.size(0) arc = (self.position[:length] * self.div_term).unsqueeze(0) pe_sin = torch.sin(arc + phi) pe_cos = torch.cos(arc + phi) pe = torch.cat([pe_sin, pe_cos], dim=-1) return pe
[docs]class GELU(nn.Module): """GeLu activation function """
[docs] def forward(self, x): return 0.5 * x * (1 + torch.tanh(math.sqrt(2 / math.pi) * (x + 0.044715 * torch.pow(x, 3))))
[docs]class Identity(nn.Module):
[docs] def forward(self, inputs): return inputs
[docs]def activation_layer(act_name): """Construct activation layers Args: act_name: str or nn.Module, name of activation function Return: act_layer: activation layer """ if isinstance(act_name, str): if act_name.lower() == 'sigmoid': act_layer = nn.Sigmoid() elif act_name.lower() == 'linear': act_layer = Identity() elif act_name.lower() == 'relu': act_layer = nn.ReLU(inplace=True) elif act_name.lower() == 'prelu': act_layer = nn.PReLU() elif act_name.lower() == 'gelu': act_layer = GELU() elif issubclass(act_name, nn.Module): act_layer = act_name() else: raise NotImplementedError return act_layer
[docs]class DNN(nn.Module): """The Multi Layer Percetron Input shape - nD tensor with shape: ``(batch_size, ..., input_dim)``. The most common situation would be a 2D input with shape ``(batch_size, input_dim)``. Output shape - nD tensor with shape: ``(batch_size, ..., hidden_size[-1])``. For instance, for a 2D input with shape ``(batch_size, input_dim)``, the output would have shape ``(batch_size, hidden_size[-1])``. Arguments - **inputs_dim**: input feature dimension. - **hidden_size**:list of positive integer, the layer number and units in each layer. - **activation**: Activation function to use. - **l2_reg**: float between 0 and 1. L2 regularizer strength applied to the kernel weights matrix. - **dropout_rate**: float in [0,1). Fraction of the units to dropout. - **use_bn**: bool. Whether use BatchNormalization before activation or not. - **seed**: A Python integer to use as random seed. """
[docs] def __init__(self, inputs_dim, hidden_size, activation='relu', l2_reg=0, dropout_rate=0, use_bn=False, init_std=0.0001): super(DNN, self).__init__() self.dropout_rate = dropout_rate self.dropout = nn.Dropout(dropout_rate) self.l2_reg = l2_reg self.use_bn = use_bn if len(hidden_size) == 0: raise ValueError("hidden_units is empty!!") hidden_size = [inputs_dim] + list(hidden_size) self.linears = nn.ModuleList( [nn.Linear(hidden_size[i], hidden_size[i + 1]) for i in range(len(hidden_size) - 1)]) if self.use_bn: self.bn = nn.ModuleList( [nn.BatchNorm1d(hidden_size[i + 1]) for i in range(len(hidden_size) - 1)]) self.activation_layers = nn.ModuleList( [activation_layer(activation) for i in range(len(hidden_size) - 1)]) for name, tensor in self.linears.named_parameters(): if 'weight' in name: nn.init.normal_(tensor, mean=0, std=init_std)
[docs] def forward(self, inputs): deep_input = inputs for i in range(len(self.linears)): fc = self.linears[i](deep_input) if self.use_bn: fc = self.bn[i](fc) fc = self.activation_layers[i](fc) fc = self.dropout(fc) deep_input = fc return deep_input