Source code for gnnwr.utils

import math
import statsmodels.api as sm
import pandas as pd
import torch
import warnings
import copy
import folium
from folium.plugins import HeatMap, MarkerCluster
import branca


[docs] class OLS(): """ OLS is the class to calculate the OLR weights of data.Get the weight by `object.params`. :param dataset: Input data :param xName: the independent variables' column :param yName: the dependent variable's column """ def __init__(self, dataset, xName: list, yName: list): self.__dataset = dataset self.__xName = xName self.__yName = yName self.__formula = yName[0] + '~' + '+'.join(xName) self.__fit = sm.formula.ols(self.__formula, dataset).fit() self.params = list(self.__fit.params.to_dict().values()) intercept = self.__fit.params[0] self.params = self.params[1:] self.params.append(intercept)
[docs] class DIAGNOSIS: # TODO 更多诊断方法 """ Diagnosis is the class to calculate the diagnoses of GNNWR/GTNNWR. :param weight: output of the neural network :param x_data: the independent variables :param y_data: the dependent variables :param y_pred: output of the GNNWR/GTNNWR """ def __init__(self, weight, x_data, y_data, y_pred): self.__weight = weight self.__x_data = x_data self.__y_data = y_data self.__y_pred = y_pred self.__n = len(y_data) self.__k = len(x_data[0]) self.__residual = y_data - y_pred self.__ssr = torch.sum((y_pred - torch.mean(y_data)) ** 2) self.__hat_com = torch.mm(torch.linalg.inv( torch.mm(self.__x_data.transpose(-2, -1), self.__x_data)), self.__x_data.transpose(-2, -1)) self.__ols_hat = torch.mm(self.__x_data, self.__hat_com) x_data_tile = x_data.repeat(self.__n, 1) x_data_tile = x_data_tile.view(self.__n, self.__n, -1) x_data_tile_t = x_data_tile.transpose(1, 2) gtweight_3d = torch.diag_embed(self.__weight) hatS_temp = torch.matmul(gtweight_3d, torch.matmul(torch.inverse(torch.matmul(x_data_tile_t, x_data_tile)), x_data_tile_t)) hatS = torch.matmul(x_data.view(-1, 1, x_data.size(1)), hatS_temp) hatS = hatS.view(-1, self.__n) self.__hat = hatS self.__S = torch.trace(self.__hat)
[docs] def hat(self): """ :return: hat matrix """ return self.__hat
[docs] def F1_GNN(self): """ :return: F1-test """ k1 = self.__n - 2 * torch.trace(self.__hat) + \ torch.trace(torch.mm(self.__hat.transpose(-2, -1), self.__hat)) k2 = self.__n - self.__k - 1 rss_olr = torch.sum( (torch.mean(self.__y_data) - torch.mm(self.__ols_hat, self.__y_data)) ** 2) return self.__ssr / k1 / (rss_olr / k2)
[docs] def AIC(self): """ :return: AIC """ return self.__n * (math.log(self.__ssr / self.__n * 2 * math.pi, math.e)) + self.__n + self.__k
[docs] def AICc(self): """ :return: AICc """ return self.__n * (math.log(self.__ssr / self.__n * 2 * math.pi, math.e) + (self.__n + self.__S) / ( self.__n - self.__S - 2))
[docs] def R2(self): """ :return: R2 of the result """ return 1 - torch.sum(self.__residual ** 2) / torch.sum((self.__y_data - torch.mean(self.__y_data)) ** 2)
[docs] def Adjust_R2(self): """ :return: Adjust R2 of the result """ return 1 - (1 - self.R2()) * (self.__n - 1) / (self.__n - self.__k - 1)
[docs] def RMSE(self): """ :return: RMSE of the result """ return torch.sqrt(torch.sum(self.__residual ** 2) / self.__n)
[docs] class Visualize: def __init__(self, data, lon_lat_columns=None, zoom=4): self.__raw_data = data self.__tiles = 'https://wprd01.is.autonavi.com/appmaptile?x={x}&y={y}&z={z}&lang=en&size=1&scl=1&style=7' self.__zoom = zoom if hasattr(self.__raw_data, '_use_gpu'): self._train_dataset = self.__raw_data._train_dataset.dataframe self._valid_dataset = self.__raw_data._valid_dataset.dataframe self._test_dataset = self.__raw_data._test_dataset.dataframe self._result_data = self.__raw_data.result_data self._all_data = pd.concat([self._train_dataset, self._valid_dataset, self._test_dataset]) if lon_lat_columns is None: warnings.warn("lon_lat columns are not given. Using the spatial columns in dataset") self._spatial_column = self._train_dataset.spatial_column self.__center_lon = self._all_data[self._spatial_column[0]].mean() self.__center_lat = self._all_data[self._spatial_column[1]].mean() self.__lon_column = self._spatial_column[0] self.__lat_column = self._spatial_column[1] else: self._spatial_column = lon_lat_columns self.__center_lon = self._all_data[self._spatial_column[0]].mean() self.__center_lat = self._all_data[self._spatial_column[1]].mean() self.__lon_column = self._spatial_column[0] self.__lat_column = self._spatial_column[1] self._x_column = data._train_dataset.x_column self._y_column = data._train_dataset.y_column self.__map = folium.Map(location=[self.__center_lat, self.__center_lon], zoom_start=zoom, tiles=self.__tiles, attr="高德") else: raise ValueError("given data is not instance of GNNWR")
[docs] def display_dataset(self, name="all", y_column=None, colors=None, steps=20, vmin=None, vmax=None): # colormap = branca.colormap.linear.RdYlGn_10.scale().to_step(steps) if colors is None: colors = [] if y_column == None: warnings.warn("y_column is not given. Using the first y_column in dataset") y_column = self._y_column[0] if name == 'all': dst = self._all_data elif name == 'train': dst = self._train_dataset elif name == 'valid': dst = self._valid_dataset elif name == 'test': dst = self._test_dataset else: raise ValueError("name is not included in 'all','train','valid','test'") dst_min = dst[y_column].min() if vmin == None else vmin dst_max = dst[y_column].max() if vmax == None else vmax res = folium.Map(location=[self.__center_lat, self.__center_lon], zoom_start=self.__zoom, tiles=self.__tiles, attr="高德") if len(colors) <= 0: colormap = branca.colormap.linear.YlOrRd_09.scale(dst_min, dst_max).to_step(steps) else: colormap = branca.colormap.LinearColormap(colors=colors, vmin=dst_min, vmax=dst_max).to_step(steps) for idx, row in dst.iterrows(): folium.CircleMarker(location=(row[self.__lat_column], row[self.__lon_column]), radius=7, color=colormap.rgb_hex_str(row[y_column]), fill=True, fill_opacity=1, popup=""" longitude:{} latitude:{} {}:{} """.format(row[self.__lon_column], row[self.__lat_column], y_column, row[y_column]) ).add_to(res) res.add_child(colormap) return res
[docs] def weights_heatmap(self, data_column, colors=None, steps=20, vmin=None, vmax=None): if colors is None: colors = [] res = folium.Map(location=[self.__center_lat, self.__center_lon], zoom_start=self.__zoom, tiles=self.__tiles, attr="高德") dst = self._result_data dst_min = dst[data_column].min() if vmin is None else vmin dst_max = dst[data_column].max() if vmax is None else vmax data = [[row[self.__lat_column], row[self.__lon_column], row[data_column]] for index, row in dst.iterrows()] if len(colors) <= 0: colormap = branca.colormap.linear.YlOrRd_09.scale(dst_min, dst_max).to_step(steps) else: colormap = branca.colormap.LinearColormap(colors=colors, vmin=dst_min, vmax=dst_max).to_step(steps) gradient_map = dict() for i in range(steps): gradient_map[i / steps] = colormap.rgb_hex_str(i / steps) colormap.add_to(res) HeatMap(data=data, gradient=gradient_map, radius=10).add_to(res) return res
[docs] def dot_map(self, data, lon_column, lat_column, y_column, zoom=4, colors=None, steps=20, vmin=None, vmax=None): if colors is None: colors = [] center_lon = data[lon_column].mean() center_lat = data[lat_column].mean() dst_min = data[y_column].min() if vmin is None else vmin dst_max = data[y_column].max() if vmax is None else vmax res = folium.Map(location=[center_lat, center_lon], zoom_start=zoom, tiles=self.__tiles, attr="高德") if len(colors) <= 0: colormap = branca.colormap.linear.YlOrRd_09.scale(dst_min, dst_max).to_step(steps) else: colormap = branca.colormap.LinearColormap(colors=colors, vmin=dst_min, vmax=dst_max).to_step(steps) for idx, row in data.iterrows(): folium.CircleMarker(location=[row[lat_column], row[lon_column]], radius=7, color=colormap.rgb_hex_str(row[y_column]), fill=True, fill_opacity=1, popup=""" longitude:{} latitude:{} {}:{} """.format(row[lon_column], row[lat_column], y_column, row[y_column]) ).add_to(res) colormap.add_to(res) return res