import math
import torch
from torch import nn
def attention(query, key, value, mask=None, dropout=None):
d_k = query.size(-1)
scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
if mask is not None:
# small change here -- we use "1" for masked element
scores = scores.masked_fill(mask > 0, -1e9)
p_attn = torch.softmax(scores, dim=-1)
if dropout is not None:
p_attn = dropout(p_attn)
return torch.matmul(p_attn, value), p_attn
[docs]class MultiHeadAttention(nn.Module):
[docs] def __init__(self, n_head, d_input, d_model, dropout=0.1, output_linear=False):
super(MultiHeadAttention, self).__init__()
assert d_model % n_head == 0
self.n_head = n_head
self.d_k = d_model // n_head
self.d_v = self.d_k
self.d_model = d_model
self.output_linear = output_linear
if output_linear:
self.linears = nn.ModuleList(
[nn.Linear(d_input, d_model) for _ in range(3)] + [nn.Linear(d_model, d_model), ])
else:
self.linears = nn.ModuleList([nn.Linear(d_input, d_model) for _ in range(3)])
self.dropout = nn.Dropout(p=dropout)
[docs] def forward(self, query, key, value, mask, output_weight=False):
if mask is not None:
mask = mask.unsqueeze(1)
nbatches = query.size(0)
query, key, value = [
lin_layer(x).view(nbatches, -1, self.n_head, self.d_k).transpose(1, 2)
for lin_layer, x in zip(self.linears, (query, key, value))
]
x, attn_weight = attention(query, key, value, mask=mask, dropout=self.dropout)
x = x.transpose(1, 2).contiguous() \
.view(nbatches, -1, self.n_head * self.d_k)
if self.output_linear:
if output_weight:
return self.linears[-1](x), attn_weight
else:
return self.linears[-1](x)
else:
if output_weight:
return x, attn_weight
else:
return x
[docs]class SublayerConnection(nn.Module):
# used for residual connection
[docs] def __init__(self, d_model, dropout):
super(SublayerConnection, self).__init__()
self.norm = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
[docs] def forward(self, x, sublayer):
return x + self.dropout(sublayer(self.norm(x)))
[docs]class EncoderLayer(nn.Module):
[docs] def __init__(self, d_model, self_attn, feed_forward=None, use_residual=False, dropout=0.1):
super(EncoderLayer, self).__init__()
self.self_attn = self_attn
self.feed_forward = feed_forward
self.use_residual = use_residual
if use_residual:
self.sublayer = nn.ModuleList([SublayerConnection(d_model, dropout) for _ in range(2)])
self.d_model = d_model
[docs] def forward(self, x, mask):
if self.use_residual:
x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))
if self.feed_forward is not None:
return self.sublayer[1](x, self.feed_forward)
else:
return x
else:
return self.self_attn(x, x, x, mask)
[docs]class TimePositionalEncoding(nn.Module):
"""Temporal encoding in THP, ICML 2020
"""
[docs] def __init__(self, d_model, max_len=5000, device='cpu'):
super().__init__()
pe = torch.zeros(max_len, d_model, device=device).float()
position = torch.arange(0, max_len, device=device).float().unsqueeze(1)
div_term = (torch.arange(0, d_model, 2, device=device).float() * -(math.log(10000.0) / d_model)).exp()
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
# [1, max_len, d_model]
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
[docs] def forward(self, x):
"""Compute time positional encoding defined in Equation (2) in THP model.
Args:
x (tensor): time_seqs, [batch_size, seq_len]
Returns:
temporal encoding vector, [batch_size, seq_len, model_dim]
"""
length = x.size(1)
return self.pe[:, :length]
[docs]class TimeShiftedPositionalEncoding(nn.Module):
"""Time shifted positional encoding in SAHP, ICML 2020
"""
[docs] def __init__(self, d_model, max_len=5000, device='cpu'):
super().__init__()
# [max_len, 1]
position = torch.arange(0, max_len, device=device).float().unsqueeze(1)
# [model_dim //2 ]
div_term = (torch.arange(0, d_model, 2, device=device).float() * -(math.log(10000.0) / d_model)).exp()
self.layer_time_delta = nn.Linear(1, d_model // 2, bias=False)
self.register_buffer('position', position)
self.register_buffer('div_term', div_term)
[docs] def forward(self, x, interval):
"""
Args:
x: time_seq, [batch_size, seq_len]
interval: time_delta_seq, [batch_size, seq_len]
Returns:
Time shifted positional encoding defined in Equation (8) in SAHP model
"""
phi = self.layer_time_delta(interval.unsqueeze(-1))
aa = len(x.size())
if aa > 1:
length = x.size(1)
else:
length = x.size(0)
arc = (self.position[:length] * self.div_term).unsqueeze(0)
pe_sin = torch.sin(arc + phi)
pe_cos = torch.cos(arc + phi)
pe = torch.cat([pe_sin, pe_cos], dim=-1)
return pe
[docs]class GELU(nn.Module):
"""GeLu activation function
"""
[docs] def forward(self, x):
return 0.5 * x * (1 + torch.tanh(math.sqrt(2 / math.pi) * (x + 0.044715 * torch.pow(x, 3))))
[docs]class Identity(nn.Module):
[docs] def forward(self, inputs):
return inputs
[docs]def activation_layer(act_name):
"""Construct activation layers
Args:
act_name: str or nn.Module, name of activation function
Return:
act_layer: activation layer
"""
if isinstance(act_name, str):
if act_name.lower() == 'sigmoid':
act_layer = nn.Sigmoid()
elif act_name.lower() == 'linear':
act_layer = Identity()
elif act_name.lower() == 'relu':
act_layer = nn.ReLU(inplace=True)
elif act_name.lower() == 'prelu':
act_layer = nn.PReLU()
elif act_name.lower() == 'gelu':
act_layer = GELU()
elif issubclass(act_name, nn.Module):
act_layer = act_name()
else:
raise NotImplementedError
return act_layer
[docs]class DNN(nn.Module):
"""The Multi Layer Percetron
Input shape
- nD tensor with shape: ``(batch_size, ..., input_dim)``.
The most common situation would be a 2D input with shape ``(batch_size, input_dim)``.
Output shape
- nD tensor with shape: ``(batch_size, ..., hidden_size[-1])``.
For instance, for a 2D input with shape ``(batch_size, input_dim)``,
the output would have shape ``(batch_size, hidden_size[-1])``.
Arguments
- **inputs_dim**: input feature dimension.
- **hidden_size**:list of positive integer, the layer number and units in each layer.
- **activation**: Activation function to use.
- **l2_reg**: float between 0 and 1. L2 regularizer strength applied to the kernel weights matrix.
- **dropout_rate**: float in [0,1). Fraction of the units to dropout.
- **use_bn**: bool. Whether use BatchNormalization before activation or not.
- **seed**: A Python integer to use as random seed.
"""
[docs] def __init__(self, inputs_dim, hidden_size, activation='relu', l2_reg=0, dropout_rate=0, use_bn=False,
init_std=0.0001):
super(DNN, self).__init__()
self.dropout_rate = dropout_rate
self.dropout = nn.Dropout(dropout_rate)
self.l2_reg = l2_reg
self.use_bn = use_bn
if len(hidden_size) == 0:
raise ValueError("hidden_units is empty!!")
hidden_size = [inputs_dim] + list(hidden_size)
self.linears = nn.ModuleList(
[nn.Linear(hidden_size[i], hidden_size[i + 1]) for i in range(len(hidden_size) - 1)])
if self.use_bn:
self.bn = nn.ModuleList(
[nn.BatchNorm1d(hidden_size[i + 1]) for i in range(len(hidden_size) - 1)])
self.activation_layers = nn.ModuleList(
[activation_layer(activation) for i in range(len(hidden_size) - 1)])
for name, tensor in self.linears.named_parameters():
if 'weight' in name:
nn.init.normal_(tensor, mean=0, std=init_std)
[docs] def forward(self, inputs):
deep_input = inputs
for i in range(len(self.linears)):
fc = self.linears[i](deep_input)
if self.use_bn:
fc = self.bn[i](fc)
fc = self.activation_layers[i](fc)
fc = self.dropout(fc)
deep_input = fc
return deep_input