Source code for gnnwr.datasets

import json
import os

import numpy as np
import pandas
import pandas as pd
import torch
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from torch.utils.data import Dataset, DataLoader
import warnings
from scipy.spatial import distance

r"""
The package of `datasets` includes the following functions:
    1. init_dataset: initialize the dataset for training, validation and testing
    2. init_dataset_cv: initialize the dataset for cross-validation
    3. init_predict_dataset: initialize the dataset for prediction
    4. BasicDistance: calculate the distance matrix of spatial/spatio-temporal data
    5. ManhattanDistance: calculate the Manhattan distance matrix of spatial/spatio-temporal data
and the following classes:
    1. baseDataset: the base class of dataset
    2. predictDataset: the class of dataset for prediction
the purpose of this package is to provide the basic functions of pre-processing data and calculating distance matrix
to facilitate the use of the model.
"""


[docs] class baseDataset(Dataset): r""" baseDataset is the base class of dataset, which is used to store the data and other information. it also provides the function of data scaling, data saving and data loading. :param data: DataSets with x_column and y_column :param x_column: independent variables column name :param y_column: dependent variables column name :param is_need_STNN: whether to use STNN """ def __init__(self, data=None, x_column: list = None, y_column: list = None, id_column=None, is_need_STNN=False): self.dataframe = data self.x = x_column self.y = y_column self.id = id_column if data is None: self.x_data = None self.datasize = -1 self.coefsize = -1 self.y_data = None self.id_data = None else: self.x_data = data[x_column].astype(np.float32).values # x_data is independent variables data self.datasize = self.x_data.shape[0] # datasize is the number of samples self.coefsize = len(x_column) + 1 # coefsize is the number of coefficients self.y_data = data[y_column].astype(np.float32).values # y_data is dependent variables data if id_column is not None: self.id_data = data[id_column].astype(np.int64).values else: raise ValueError("id_column is None") self.is_need_STNN = is_need_STNN self.scale_fn = None # scale function self.x_scale_info = None # scale information of x_data self.y_scale_info = None # scale information of y_data self.distances = None # distances is the distance matrix of spatial/spatio-temporal data self.temporal = None # temporal is the temporal distance matrix of spatio-temporal data self.distances_scale_params = None # scale parameters of distances def __len__(self): """ :return: the number of samples """ return len(self.y_data) def __getitem__(self, index): """ :param index: the index of sample :return: the index-th distance matrix and the index-th sample """ if self.is_need_STNN: return torch.cat((torch.tensor(self.distances[index], dtype=torch.float), torch.tensor(self.temporal[index], dtype=torch.float)), dim=-1), \ torch.tensor(self.x_data[index], dtype=torch.float), \ torch.tensor(self.y_data[index], dtype=torch.float), \ torch.tensor(self.id_data[index], dtype=torch.float) return torch.tensor(self.distances[index], dtype=torch.float), torch.tensor(self.x_data[index], dtype=torch.float), torch.tensor( self.y_data[index], dtype=torch.float), torch.tensor(self.id_data[index], dtype=torch.float)
[docs] def scale(self, scale_fn=None, scale_params=None): """ scale the data by MinMaxScaler or StandardScaler :param scale_fn: scale function :param scale_params: scale parameters like MinMaxScaler or StandardScaler """ if scale_fn == "minmax_scale": self.scale_fn = "minmax_scale" x_scale_params = scale_params[0] # y_scale_params = scale_params[1] self.x_scale_info = {"min": x_scale_params.data_min_, "max": x_scale_params.data_max_} self.x_data = x_scale_params.transform(pd.DataFrame(self.x_data, columns=self.x)) # self.y_scale_info = {"min": y_scale_params.data_min_, "max": y_scale_params.data_max_} # self.y_data = y_scale_params.transform(pd.DataFrame(self.y_data, columns=self.y)) elif scale_fn == "standard_scale": self.scale_fn = "standard_scale" x_scale_params = scale_params[0] # y_scale_params = scale_params[1] self.x_scale_info = {"mean": x_scale_params.mean_, "var": x_scale_params.var_} self.x_data = x_scale_params.transform(pd.DataFrame(self.x_data, columns=self.x)) # self.y_scale_info = {"mean": y_scale_params.mean_, "var": y_scale_params.var_} # self.y_data = y_scale_params.transform(pd.DataFrame(self.y_data, columns=self.y)) self.getScaledDataframe() self.x_data = np.concatenate((self.x_data, np.ones( (self.datasize, 1))), axis=1)
[docs] def scale2(self, scale_fn, scale_params): """ scale the data with the scale function and scale parameters :param scale_fn: scale function :param scale_params: scale parameters like max and min """ if scale_fn == "minmax_scale": self.scale_fn = "minmax_scale" x_scale_params = scale_params[0] # y_scale_params = scale_params[1] # self.x_data = self.x_data * (x_scale_params["max"] - x_scale_params["min"]) + x_scale_params["min"] self.x_data = (self.x_data - x_scale_params["min"]) / (x_scale_params["max"] - x_scale_params["min"]) # self.y_data = self.y_data * (y_scale_params["max"] - y_scale_params["min"]) + y_scale_params["min"] elif scale_fn == "standard_scale": self.scale_fn = "standard_scale" x_scale_params = scale_params[0] # y_scale_params = scale_params[1] # self.x_data = self.x_data * np.sqrt(x_scale_params["var"]) + x_scale_params["mean"] self.x_data = (self.x_data - x_scale_params['mean']) / np.sqrt(x_scale_params["var"]) # self.y_data = self.y_data * np.sqrt(y_scale_params["var"]) + y_scale_params["mean"] self.getScaledDataframe() self.x_data = np.concatenate((self.x_data, np.ones( (self.datasize, 1))), axis=1)
[docs] def getScaledDataframe(self): """ get the scaled dataframe """ columns = np.concatenate((self.x, self.y), axis=0) scaledData = np.concatenate((self.x_data, self.y_data), axis=1) self.scaledDataframe = pd.DataFrame(scaledData, columns=columns)
[docs] def rescale(self, x, y): """ :param x: Input independent variable data :param y: Input dependent variable data :return: rescaled x and y """ if self.scale_fn == "minmax_scale": x = np.multiply(x, self.x_scale_info["max"] - self.x_scale_info["min"]) + self.x_scale_info["min"] y = np.multiply(y, self.y_scale_info["max"] - self.y_scale_info["min"]) + self.y_scale_info["min"] elif self.scale_fn == "standard_scale": x = np.multiply(x, np.sqrt(self.x_scale_info["var"])) + self.x_scale_info["mean"] y = np.multiply(y, np.sqrt(self.y_scale_info["var"])) + self.y_scale_info["mean"] else: raise ValueError("invalid process_fn") return x, y
[docs] def save(self, dirname): """ save the dataset :param dirname: save directory """ if os.path.exists(dirname): raise ValueError("dir is already exists") if self.dataframe is None: raise ValueError("dataframe is None") os.makedirs(dirname) x_scale_info = {} y_scale_info = {} for key, value in self.x_scale_info.items(): x_scale_info[key] = value.tolist() with open(os.path.join(dirname, "dataset_info.json"), "w") as f: distance_scale_info = {} for key in self.distances_scale_param.keys(): distance_scale_info[key] = self.distances_scale_param[key].tolist() json.dump({"x": self.x, "y": self.y, "id": self.id, "batch_size": self.batch_size, "shuffle": self.shuffle, "is_need_STNN": self.is_need_STNN, "scale_fn": self.scale_fn, "x_scale_info": json.dumps(x_scale_info), "y_scale_info": json.dumps(y_scale_info), "distance_scale_info": json.dumps(distance_scale_info) }, f) # save the distance matrix np.save(os.path.join(dirname, "distances.npy"), self.distances) # save dataframe self.dataframe.to_csv(os.path.join(dirname, "dataframe.csv"), index=False) self.scaledDataframe.to_csv(os.path.join(dirname, "scaledDataframe.csv"), index=False)
[docs] def read(self, dirname): """ read the dataset by the directory :param dirname: read directory """ if not os.path.exists(dirname): raise ValueError("dir is not exists") # read the information of dataset with open(os.path.join(dirname, "dataset_info.json"), "r") as f: dataset_info = json.load(f) self.x = dataset_info["x"] self.y = dataset_info["y"] self.id = dataset_info["id"] self.batch_size = dataset_info["batch_size"] self.shuffle = dataset_info["shuffle"] self.is_need_STNN = dataset_info["is_need_STNN"] self.scale_fn = dataset_info["scale_fn"] self.x_scale_info = json.loads(dataset_info["x_scale_info"]) # self.y_scale_info = json.loads(dataset_info["y_scale_info"]) self.distances_scale_param = json.loads(dataset_info["distance_scale_info"]) x_scale_info = self.x_scale_info # y_scale_info = self.y_scale_info for key, value in x_scale_info.items(): x_scale_info[key] = np.array(value) # for key, value in y_scale_info.items(): # y_scale_info[key] = np.array(value) # read the distance matrix self.distances = np.load(os.path.join(dirname, "distances.npy")).astype(np.float32) # read dataframe self.dataframe = pd.read_csv(os.path.join(dirname, "dataframe.csv")) self.x_data = self.dataframe[self.x].astype(np.float32).values self.datasize = self.x_data.shape[0] self.y_data = self.dataframe[self.y].astype(np.float32).values self.id_data = self.dataframe[self.id].astype(np.int64).values self.coefsize = len(self.x) + 1 self.scale2(self.scale_fn, [self.x_scale_info, self.y_scale_info])
[docs] class predictDataset(Dataset): """ Predict dataset is used to predict the dependent variable of the data. :param data: dataframe :param x_column: independent variable column name :param process_fn: process function name :param scale_info: process function parameters :param is_need_STNN: whether need STNN """ def __init__(self, data, x_column, process_fn="minmax_scale", scale_info=[], is_need_STNN=False): # data = data.astype(np.float32) self.dataframe = data self.x = x_column if data is None: self.x_data = None self.datasize = -1 self.coefsize = -1 else: self.x_data = data[x_column].astype(np.float32).values # x_data is independent variables data self.datasize = self.x_data.shape[0] # datasize is the number of samples self.coefsize = len(x_column) + 1 # coefsize is the number of coefficients self.is_need_STNN = is_need_STNN self.process_fn = process_fn if len(scale_info): self.scale_info_x = scale_info[0] # scale information of x_data self.use_scale_info = True else: self.use_scale_info = False # 数据预处理 if process_fn == "minmax_scale": self.scale_fn = "minmax_scale" # stander = MinMaxScaler() # self.x_data = stander.fit_transform(self.x_data) if self.use_scale_info: self.x_data = self.minmax_scaler(self.x_data, self.scale_info_x[0], self.scale_info_x[1]) else: self.x_data = self.minmax_scaler(self.x_data) elif process_fn == "standard_scale": self.scale_fn = "standard_scale" # stander = StandardScaler() # self.x_data = stander.fit_transform(self.x_data) if self.use_scale_info: self.x_data = self.standard_scaler(self.x_data, self.scale_info_x[0], self.scale_info_x[1]) else: self.x_data = self.standard_scaler(self.x_data) else: raise ValueError("invalid process_fn") self.x_data = np.concatenate((self.x_data, np.ones( (self.datasize, 1))), axis=1) self.distances = None self.temporal = None def __len__(self): """ :return: the number of samples """ return len(self.x_data) def __getitem__(self, index): """ :param index: sample index :return: distance matrix and independent variable data and dependent variable data """ if self.is_need_STNN: return torch.cat((torch.tensor(self.distances[index], dtype=torch.float), torch.tensor(self.temporal[index], dtype=torch.float)), dim=-1), torch.tensor( self.x_data[index], dtype=torch.float) return torch.tensor(self.distances[index], dtype=torch.float), torch.tensor(self.x_data[index], dtype=torch.float)
[docs] def rescale(self, x): """ rescale the attribute data :param x: Input attribute data :return: rescaled attribute data """ if self.scale_fn == "minmax_scale": x = x * (self.scale_info_x[1] - self.scale_info_x[0]) + self.scale_info_x[0] elif self.scale_fn == "standard_scale": x = x * np.sqrt(self.scale_info_x[1]) + self.scale_info_x[0] else: raise ValueError("invalid process_fn") return x
[docs] def minmax_scaler(self, x, min=[], max=[]): """ function of minmax scaler :param x: Input attribute data :param min: minimum value of each attribute :param max: maximum value of each attribute :return: Output attribute data """ if len(min) == 0: x = (x - x.min(axis=0)) / (x.max(axis=0) - x.min(axis=0)) else: x = (x - min) / (max - min) return x
[docs] def standard_scaler(self, x, mean=[], std=[]): """ function of standard scaler :param x: Input attribute data :param mean: mean value of each attribute :param std: standard deviation of each attribute :return: Output attribute data """ if len(mean) == 0: x = (x - x.mean(axis=0)) / x.std(axis=0) else: x = (x - mean) / std return x
[docs] def BasicDistance(x, y): """ Calculate the distance between two points :param x: Input point coordinate data :param y: Input target point coordinate data :return: distance matrix """ x = np.float32(x) y = np.float32(y) dist = distance.cdist(x, y, 'euclidean') return dist # np.float32(np.sqrt(np.sum((x[:, np.newaxis, :] - y) ** 2, axis=2)))
[docs] def Manhattan_distance(x, y): """ Calculate the Manhattan distance between two points :param x: Input point coordinate data :param y: Input target point coordinate data :return: distance matrix """ return np.float32(np.sum(np.abs(x[:, np.newaxis, :] - y), axis=2))
[docs] def init_dataset(data, test_ratio, valid_ratio, x_column, y_column, spatial_column=None, temp_column=None, id_column=None, sample_seed=100, process_fn="minmax_scale", batch_size=32, shuffle=True, use_class=baseDataset, spatial_fun=BasicDistance, temporal_fun=Manhattan_distance, max_val_size=-1, max_test_size=-1, from_for_cv=0, is_need_STNN=False, Reference=None, simple_distance=True): """ Initialize the dataset and return the training set, validation set and test set for the model :param data: dataset :param test_ratio: test data ratio :param valid_ratio: valid data ratio :param x_column: input attribute column name :param y_column: output attribute column name :param spatial_column: spatial attribute column name :param temp_column: temporal attribute column name :param sample_seed: random seed :param process_fn: data pre-process function :param batch_size: batch size :param max_val_size: max valid data size in one injection :param max_test_size: max test data size in one injection :param shuffle: shuffle data :param use_class: dataset class :param spatial_fun: spatial distance calculate function :param temporal_fun: temporal distance calculate function :param from_for_cv: the start index of the data for cross validation :param is_need_STNN: whether to use STNN :param Reference: reference points to calculate the distance :param simple_distance: whether to use simple distance function to calculate the distance :return: train dataset, valid dataset, test dataset """ if spatial_fun is None: # if dist_fun is None, raise error raise ValueError( "dist_fun must be a function that can process the data") if spatial_column is None: # if dist_column is None, raise error raise ValueError( "dist_column must be a column name in data") if id_column is None: id_column = ['id'] if 'id' not in data.columns: data['id'] = np.arange(len(data)) else: warnings.warn("id_column is None and use default id column in data", RuntimeWarning) np.random.seed(sample_seed) data = data.sample(frac=1) # shuffle data scaler_x = None scaler_y = None # data pre-process if process_fn == "minmax_scale": scaler_x = MinMaxScaler() scaler_y = MinMaxScaler() elif process_fn == "standard_scale": scaler_x = StandardScaler() scaler_y = StandardScaler() scaler_params_x = scaler_x.fit(data[x_column]) scaler_params_y = scaler_y.fit(data[y_column]) scaler_params = [scaler_params_x, scaler_params_y] if process_fn == "minmax_scale": print("x_min:" + str(scaler_params_x.data_min_) + "; x_max:" + str(scaler_params_x.data_max_)) print("y_min:" + str(scaler_params_y.data_min_) + "; y_max:" + str(scaler_params_y.data_max_)) elif process_fn == "standard_scale": print("x_mean:" + str(scaler_params_x.mean_) + "; x_var:" + str(scaler_params_x.var_)) print("y_mean:" + str(scaler_params_y.mean_) + "; y_var:" + str(scaler_params_y.var_)) # data split test_data = data[int((1 - test_ratio) * len(data)):] train_data = data[:int((1 - test_ratio) * len(data))] val_data = train_data[ int(from_for_cv * valid_ratio * len(train_data)):int((1 + from_for_cv) * valid_ratio * len(train_data))] train_data = pandas.concat([train_data[:int(from_for_cv * valid_ratio * len(train_data))], train_data[int((1 + from_for_cv) * valid_ratio * len(train_data)):]]) # Use the parameters of the dataset to normalize the train_dataset, val_dataset, and test_dataset train_dataset = use_class(train_data, x_column, y_column, id_column, is_need_STNN) val_dataset = use_class(val_data, x_column, y_column, id_column, is_need_STNN) test_dataset = use_class(test_data, x_column, y_column, id_column, is_need_STNN) train_dataset.scale(process_fn, scaler_params) val_dataset.scale(process_fn, scaler_params) test_dataset.scale(process_fn, scaler_params) if Reference is None: reference_data = train_data elif isinstance(Reference, str): if Reference == "train": reference_data = train_data elif Reference == "train_val": reference_data = pandas.concat([train_data, val_data]) else: raise ValueError("Reference str must be 'train' or 'train_val'") else: reference_data = Reference if not isinstance(reference_data, pandas.DataFrame): raise ValueError("reference_data must be a pandas.DataFrame") train_dataset.reference, val_dataset.reference, test_dataset.reference = reference_data, reference_data, reference_data train_dataset.spatial_column = val_dataset.spatial_column = test_dataset.spatial_column = spatial_column train_dataset.x_column = val_dataset.x_column = test_dataset.x_column = x_column train_dataset.y_column = val_dataset.y_column = test_dataset.y_column = y_column if not is_need_STNN: if simple_distance: # if not use STNN, calculate spatial/temporal distance matrix and concatenate them train_dataset.distances = spatial_fun( train_data[spatial_column].values, reference_data[spatial_column].values) # 计算train距离矩阵 val_dataset.distances = spatial_fun( val_data[spatial_column].values, reference_data[spatial_column].values) # 计算val距离矩阵 test_dataset.distances = spatial_fun( test_data[spatial_column].values, reference_data[spatial_column].values) # 计算test距离矩阵 if temp_column is not None: # if temp_column is not None, calculate temporal distance matrix train_dataset.temporal = temporal_fun( train_data[temp_column].values, reference_data[temp_column].values) val_dataset.temporal = temporal_fun( val_data[temp_column].values, reference_data[temp_column].values) test_dataset.temporal = temporal_fun( test_data[temp_column].values, reference_data[temp_column].values) train_dataset.distances = np.concatenate( (train_dataset.distances[:, :, np.newaxis], train_dataset.temporal[:, :, np.newaxis]), axis=2) # concatenate spatial and temporal distance matrix val_dataset.distances = np.concatenate( (val_dataset.distances[:, :, np.newaxis], val_dataset.temporal[:, :, np.newaxis]), axis=2) test_dataset.distances = np.concatenate( (test_dataset.distances[:, :, np.newaxis], test_dataset.temporal[:, :, np.newaxis]), axis=2) else: train_dataset.distances = np.repeat(train_data[spatial_column].values[:, np.newaxis, :], len(reference_data), axis=1) train_temp_distance = np.repeat(reference_data[spatial_column].values[:, np.newaxis, :], train_dataset.datasize, axis=1) train_dataset.distances = np.concatenate( (train_dataset.distances, np.transpose(train_temp_distance, (1, 0, 2))), axis=2) val_dataset.distances = np.repeat(val_data[spatial_column].values[:, np.newaxis, :], len(reference_data), axis=1) val_temp_distance = np.repeat(reference_data[spatial_column].values[:, np.newaxis, :], val_dataset.datasize, axis=1) val_dataset.distances = np.concatenate((val_dataset.distances, np.transpose(val_temp_distance, (1, 0, 2))), axis=2) test_dataset.distances = np.repeat(test_data[spatial_column].values[:, np.newaxis, :], len(reference_data), axis=1) test_temp_distance = np.repeat(reference_data[spatial_column].values[:, np.newaxis, :], test_dataset.datasize, axis=1) test_dataset.distances = np.concatenate( (test_dataset.distances, np.transpose(test_temp_distance, (1, 0, 2))), axis=2) # if temp_column is not None, calculate temporal point matrix if temp_column is not None: train_dataset.temporal = np.repeat(train_data[temp_column].values[:, np.newaxis, :], len(reference_data), axis=1) train_temp_temporal = np.repeat(reference_data[temp_column].values[:, np.newaxis, :], train_dataset.datasize, axis=1) train_dataset.temporal = np.concatenate( (train_dataset.temporal, np.transpose(train_temp_temporal, (1, 0, 2))), axis=2) val_dataset.temporal = np.repeat(val_data[temp_column].values[:, np.newaxis, :], len(reference_data), axis=1) val_temp_temporal = np.repeat(reference_data[temp_column].values[:, np.newaxis, :], val_dataset.datasize, axis=1) val_dataset.temporal = np.concatenate( (val_dataset.temporal, np.transpose(val_temp_temporal, (1, 0, 2))), axis=2) test_dataset.temporal = np.repeat(test_data[temp_column].values[:, np.newaxis, :], len(reference_data), axis=1) test_temp_temporal = np.repeat(reference_data[temp_column].values[:, np.newaxis, :], test_dataset.datasize, axis=1) test_dataset.temporal = np.concatenate( (test_dataset.temporal, np.transpose(test_temp_temporal, (1, 0, 2))), axis=2) train_dataset.distances = np.concatenate( (train_dataset.distances, train_dataset.temporal), axis=2) val_dataset.distances = np.concatenate( (val_dataset.distances, val_dataset.temporal), axis=2) test_dataset.distances = np.concatenate( (test_dataset.distances, test_dataset.temporal), axis=2) else: # if use STNN, calculate spatial/temporal point matrix train_dataset.distances = np.repeat(train_data[spatial_column].values[:, np.newaxis, :], len(reference_data), axis=1) train_temp_distance = np.repeat(reference_data[spatial_column].values[:, np.newaxis, :], train_dataset.datasize, axis=1) train_dataset.distances = np.concatenate( (train_dataset.distances, np.transpose(train_temp_distance, (1, 0, 2))), axis=2) val_dataset.distances = np.repeat(val_data[spatial_column].values[:, np.newaxis, :], len(reference_data), axis=1) val_temp_distance = np.repeat(reference_data[spatial_column].values[:, np.newaxis, :], val_dataset.datasize, axis=1) val_dataset.distances = np.concatenate((val_dataset.distances, np.transpose(val_temp_distance, (1, 0, 2))), axis=2) test_dataset.distances = np.repeat(test_data[spatial_column].values[:, np.newaxis, :], len(reference_data), axis=1) test_temp_distance = np.repeat(reference_data[spatial_column].values[:, np.newaxis, :], test_dataset.datasize, axis=1) test_dataset.distances = np.concatenate( (test_dataset.distances, np.transpose(test_temp_distance, (1, 0, 2))), axis=2) # if temp_column is not None, calculate temporal point matrix if temp_column is not None: train_dataset.temporal = np.repeat(train_data[temp_column].values[:, np.newaxis, :], len(reference_data), axis=1) train_temp_temporal = np.repeat(reference_data[temp_column].values[:, np.newaxis, :], train_dataset.datasize, axis=1) train_dataset.temporal = np.concatenate( (train_dataset.temporal, np.transpose(train_temp_temporal, (1, 0, 2))), axis=2) val_dataset.temporal = np.repeat(val_data[temp_column].values[:, np.newaxis, :], len(reference_data), axis=1) val_temp_temporal = np.repeat(reference_data[temp_column].values[:, np.newaxis, :], val_dataset.datasize, axis=1) val_dataset.temporal = np.concatenate((val_dataset.temporal, np.transpose(val_temp_temporal, (1, 0, 2))), axis=2) test_dataset.temporal = np.repeat(test_data[temp_column].values[:, np.newaxis, :], len(reference_data), axis=1) test_temp_temporal = np.repeat(reference_data[temp_column].values[:, np.newaxis, :], test_dataset.datasize, axis=1) test_dataset.temporal = np.concatenate( (test_dataset.temporal, np.transpose(test_temp_temporal, (1, 0, 2))), axis=2) train_dataset.simple_distance = simple_distance # initialize dataloader for train/val/test dataset # set batch_size for train_dataset as batch_size # set batch_size for val_dataset as max_val_size # set batch_size for test_dataset as max_test_size if max_val_size < 0: max_val_size = len(val_dataset) if max_test_size < 0: max_test_size = len(test_dataset) if process_fn == "minmax_scale": distance_scale = MinMaxScaler() temporal_scale = MinMaxScaler() else: distance_scale = StandardScaler() temporal_scale = StandardScaler() # scale distance matrix train_distance_len = len(train_dataset.distances) val_distance_len = len(val_dataset.distances) distances = np.concatenate((train_dataset.distances, val_dataset.distances, test_dataset.distances), axis=0) distances = distance_scale.fit_transform(distances.reshape(-1, distances.shape[-1])).reshape(distances.shape) if process_fn == "minmax_scale": distance_scale_param = {"min": distance_scale.data_min_, "max": distance_scale.data_max_} else: distance_scale_param = {"mean": distance_scale.mean_, "var": distance_scale.var_} train_dataset.distances = distances[:train_distance_len] val_dataset.distances = distances[train_distance_len:train_distance_len + val_distance_len] test_dataset.distances = distances[train_distance_len + val_distance_len:] train_dataset.distances_scale_param = val_dataset.distances_scale_param = test_dataset.distances_scale_param = distance_scale_param if temp_column is not None: temporal = np.concatenate((train_dataset.temporal, val_dataset.temporal, test_dataset.temporal), axis=0) temporal = temporal_scale.fit_transform(temporal.reshape(-1, temporal.shape[-1])).reshape(temporal.shape) if process_fn == "minmax_scale": temporal_scale_param = {"min": temporal_scale.data_min_, "max": temporal_scale.data_max_} else: temporal_scale_param = {"mean": temporal_scale.mean_, "var": temporal_scale.var_} train_dataset.temporal = temporal[:train_distance_len] val_dataset.temporal = temporal[train_distance_len:train_distance_len + val_distance_len] test_dataset.temporal = temporal[train_distance_len + val_distance_len:] train_dataset.temporal_scale_param = val_dataset.temporal_scale_param = test_dataset.temporal_scale_param = temporal_scale_param train_dataset.dataloader = DataLoader( train_dataset, batch_size=batch_size, shuffle=shuffle) val_dataset.dataloader = DataLoader( val_dataset, batch_size=max_val_size, shuffle=shuffle) test_dataset.dataloader = DataLoader( test_dataset, batch_size=max_test_size, shuffle=shuffle) train_dataset.batch_size, train_dataset.shuffle = batch_size, shuffle val_dataset.batch_size, val_dataset.shuffle = max_val_size, shuffle test_dataset.batch_size, test_dataset.shuffle = max_test_size, shuffle return train_dataset, val_dataset, test_dataset
[docs] def init_dataset_cv(data, test_ratio, k_fold, x_column, y_column, spatial_column=None, temp_column=None, id_column=None, sample_seed=100, process_fn="minmax_scale", batch_size=32, shuffle=True, use_class=baseDataset, spatial_fun=BasicDistance, temporal_fun=Manhattan_distance, max_val_size=-1, max_test_size=-1, is_need_STNN=False, Reference=None, simple_distance=True): """ initialize dataset for cross validation :param data: input data :param test_ratio: test set ratio :param k_fold: k of k-fold :param x_column: attribute column name :param y_column: label column name :param spatial_column: spatial distance column name :param temp_column: temporal distance column name :param id_column: id column name :param sample_seed: random seed :param process_fn: data process function :param batch_size: batch size :param shuffle: shuffle or not :param use_class: dataset class :param spatial_fun: spatial distance calculate function :param temporal_fun: temporal distance calculate function :param max_val_size: validation set size :param max_test_size: test set size :param is_need_STNN: whether need STNN :param Reference: reference data :param simple_distance: is simple distance :return: cv_data_set, test_dataset """ cv_data_set = [] valid_ratio = (1 - test_ratio) / k_fold test_dataset = None for i in range(k_fold): train_dataset, val_dataset, test_dataset = init_dataset(data, test_ratio, valid_ratio, x_column, y_column, spatial_column, temp_column, id_column, sample_seed, process_fn, batch_size, shuffle, use_class, spatial_fun, temporal_fun, max_val_size, max_test_size, i, is_need_STNN, Reference, simple_distance) cv_data_set.append((train_dataset, val_dataset)) return cv_data_set, test_dataset
# TODO Not finished # def init_dataset_with_dist_frame(data, train_ratio, valid_ratio, x_column, y_column, id_column, dist_frame=None, # process_fn="minmax_scale", batch_size=32, shuffle=True, use_class=baseDataset): # train_data, val_data, test_data = np.split(data.sample(frac=1), # [int(train_ratio * len(data)), # int((train_ratio + valid_ratio) * len(data))]) # # # 初始化train_dataset,val_dataset,test_dataset # train_dataset = use_class(train_data, x_column, y_column, process_fn) # val_dataset = use_class(val_data, x_column, y_column, process_fn) # test_dataset = use_class(test_data, x_column, y_column, process_fn) # # dist_frame.columns = ['id1', 'id2', 'dis'] # dist_frame = dist_frame.set_index(['id1', 'id2'])[ # 'dis'].unstack().reset_index().drop('id1', axis=1) # # train_ids = train_data[id_column[0]].tolist() # val_ids = val_data[id_column[0]].tolist() # test_ids = test_data[id_column[0]].tolist() # # train_dataset.distances = np.float32( # dist_frame[dist_frame.index.isin(train_ids)][train_ids].values) # val_dataset.distances = np.float32( # dist_frame[dist_frame.index.isin(val_ids)][train_ids].values) # test_dataset.distances = np.float32( # dist_frame[dist_frame.index.isin(test_ids)][train_ids].values) # # train_dataset.dataloader = DataLoader( # train_dataset, batch_size=batch_size, shuffle=shuffle) # val_dataset.dataloader = DataLoader( # val_dataset, batch_size=batch_size, shuffle=shuffle) # test_dataset.dataloader = DataLoader( # test_dataset, batch_size=batch_size, shuffle=shuffle) # # return train_dataset, val_dataset, test_dataset
[docs] def init_predict_dataset(data, train_dataset, x_column, spatial_column=None, temp_column=None, process_fn="minmax_scale", scale_sync=True, use_class=predictDataset, spatial_fun=BasicDistance, temporal_fun=Manhattan_distance, max_size=-1, is_need_STNN=False): """ initialize predict dataset :param data: input data :param train_dataset: train data :param x_column: attribute column name :param spatial_column: spatial distance column name :param temp_column: temporal distance column name :param process_fn: data process function :param scale_sync: scale sync or not :param max_size: max size of predict dataset :param use_class: dataset class :param spatial_fun: spatial distance calculate function :param temporal_fun: temporal distance calculate function :param is_need_STNN: is need STNN or not :return: predict_dataset """ if spatial_fun is None: # if dist_fun is None, raise error raise ValueError( "dist_fun must be a function that can process the data") if spatial_column is None: # if dist_column is None, raise error raise ValueError( "dist_column must be a column name in data") # initialize the predict_dataset if train_dataset.scale_fn == "minmax_scale": process_params = [[train_dataset.x_scale_info['min'], train_dataset.x_scale_info['max']]] elif train_dataset.scale_fn == "standard_scale": process_params = [[train_dataset.x_scale_info['mean'], train_dataset.x_scale_info['std']]] else: raise ValueError("scale_fn must be minmax_scale or standard_scale") # print("ProcessParams:",process_params) if scale_sync: predict_dataset = use_class(data=data, x_column=x_column, process_fn=process_fn, scale_info=process_params, is_need_STNN=is_need_STNN) else: predict_dataset = use_class(data=data, x_column=x_column, process_fn=process_fn, is_need_STNN=is_need_STNN) # train_data = train_dataset.dataframe reference_data = train_dataset.reference if not is_need_STNN: # if not use STNN, calculate spatial/temporal distance matrix and concatenate them if train_dataset.simple_distance: predict_dataset.distances = spatial_fun( data[spatial_column].values, reference_data[spatial_column].values) if temp_column is not None: # if temp_column is not None, calculate temporal distance matrix predict_dataset.temporal = temporal_fun( data[temp_column].values, reference_data[temp_column].values) predict_dataset.distances = np.concatenate( (predict_dataset.distances[:, :, np.newaxis], predict_dataset.temporal[:, :, np.newaxis]), axis=2) # concatenate spatial and temporal distance matrix else: predict_dataset.distances = np.repeat(data[spatial_column].values[:, np.newaxis, :], len(reference_data), axis=1) predict_temp_distance = np.repeat(reference_data[spatial_column].values[:, np.newaxis, :], predict_dataset.datasize, axis=1) predict_dataset.distances = np.concatenate( (predict_dataset.distances, np.transpose(predict_temp_distance, (1, 0, 2))), axis=2) if temp_column is not None: predict_dataset.temporal = np.repeat(data[temp_column].values[:, np.newaxis, :], len(reference_data), axis=1) predict_temp_temporal = np.repeat(reference_data[temp_column].values[:, np.newaxis, :], predict_dataset.datasize, axis=1) predict_dataset.temporal = np.concatenate( (predict_dataset.temporal, np.transpose(predict_temp_temporal, (1, 0, 2))), axis=2) predict_dataset.distances = np.concatenate( (predict_dataset.distances, predict_dataset.temporal), axis=2) else: # if use STNN, calculate spatial/temporal point matrix # spatial distances matrix predict_dataset.distances = np.repeat(data[spatial_column].values[:, np.newaxis, :], len(reference_data), axis=1) predict_temp_distance = np.repeat(reference_data[spatial_column].values[:, np.newaxis, :], predict_dataset.datasize, axis=1) predict_dataset.distances = np.concatenate( (predict_dataset.distances, np.transpose(predict_temp_distance, (1, 0, 2))), axis=2) # temporal distances matrix if temp_column is not None: predict_dataset.temporal = np.repeat(data[temp_column].values[:, np.newaxis, :], len(reference_data), axis=1) predict_temp_temporal = np.repeat(reference_data[temp_column].values[:, np.newaxis, :], predict_dataset.datasize, axis=1) predict_dataset.temporal = np.concatenate( (predict_dataset.temporal, np.transpose(predict_temp_temporal, (1, 0, 2))), axis=2) if process_fn == "minmax_scale": predict_dataset.distances = predict_dataset.minmax_scaler(predict_dataset.distances, train_dataset.distances_scale_param['min'], train_dataset.distances_scale_param['max']) else: predict_dataset.distances = predict_dataset.standard_scaler(predict_dataset.distances, train_dataset.distances_scale_param['mean'], train_dataset.distances_scale_param['var']) # initialize dataloader for train/val/test dataset if max_size < 0: max_size = len(predict_dataset) predict_dataset.dataloader = DataLoader( predict_dataset, batch_size=max_size, shuffle=False) return predict_dataset
[docs] def load_dataset(directory, use_class=baseDataset): dataset = use_class() dataset.read(directory) dataset.dataloader = DataLoader(dataset, batch_size=dataset.batch_size, shuffle=dataset.shuffle) return dataset