Source code for fvgp.kernels

import importlib
import warnings

import numpy as np
import scipy.sparse as sparse
from scipy.spatial import cKDTree
from scipy.spatial.distance import cdist


[docs] def squared_exponential_kernel(distance, length): """ Function for the squared exponential kernel. kernel = np.exp(-(distance ** 2) / (2.0 * (length ** 2))) Parameters ---------- distance : scalar or np.ndarray Distance between a set of points. length : scalar The length scale hyperparameters. Return ------ Kernel output : same as distance parameter. """ kernel = np.exp(-(distance ** 2) / (2.0 * (length ** 2))) return kernel
[docs] def squared_exponential_kernel_robust(distance, phi): """ Function for the squared exponential kernel (robust version) kernel = np.exp(-(distance ** 2) * (phi ** 2)) Parameters ---------- distance : scalar or np.ndarray Distance between a set of points. phi : scalar The length scale hyperparameters. Return ------ Kernel output : same as distance parameter. """ kernel = np.exp(-(distance ** 2) * (phi ** 2)) return kernel
[docs] def exponential_kernel(distance, length): """ Function for the exponential kernel. kernel = np.exp(-(distance) / (length)) Parameters ---------- distance : scalar or np.ndarray Distance between a set of points. length : scalar The length scale hyperparameters. Return ------ Kernel output : same as distance parameter. """ kernel = np.exp(-distance / length) return kernel
[docs] def exponential_kernel_robust(distance, phi): """ Function for the exponential kernel (robust version) kernel = np.exp(-(distance) * (phi**2)) Parameters ---------- distance : scalar or np.ndarray Distance between a set of points. phi : scalar The length scale hyperparameters. Return ------ Kernel output : same as distance parameter. """ kernel = np.exp(-(distance) * (phi ** 2)) return kernel
[docs] def matern_kernel_diff1(distance, length): """ Function for the Matern kernel, order of differentiability = 1. kernel = (1 + sqrt(3)*d/l) * exp(-sqrt(3)*d/l) Parameters ---------- distance : scalar or np.ndarray Distance between a set of points. length : scalar The length scale hyperparameters. Return ------ Kernel output : same as distance parameter. """ kernel = (1.0 + ((np.sqrt(3.0) * distance) / length)) * np.exp( -(np.sqrt(3.0) * distance) / length ) return kernel
[docs] def matern_kernel_diff1_grad(distance, dist_der): """ Derivative of the Matern-1 kernel with respect to the hyperparameters. kernel_der = -sqrt(3)*d * dist_der * exp(-sqrt(3)*d) Parameters ---------- distance : scalar or np.ndarray Distance between a set of points. dist_der : scalar or np.ndarray The derivative of the distance matrix. We assume here that the distance is a function of the hyperparameters. Return ------ Kernel output : same as distance parameter. """ a = (np.sqrt(3.0) * distance) dadl = np.sqrt(3.0) * dist_der ea = np.exp(-a) kernel_der = dadl * ea - (1.+a) * dadl * ea return kernel_der
[docs] def matern_kernel_diff1_robust(distance, phi): """ Function for the Matern kernel, order of differentiability = 1, robust version. kernel = (1 + sqrt(3)*d*phi**2) * exp(-sqrt(3)*d*phi**2) Parameters ---------- distance : scalar or np.ndarray Distance between a set of points. phi : scalar The length scale hyperparameters. Return ------ Kernel output : same as distance parameter. """ ##1/l --> phi**2 kernel = (1.0 + ((np.sqrt(3.0) * distance) * (phi ** 2))) * np.exp( -(np.sqrt(3.0) * distance) * (phi ** 2)) return kernel
[docs] def matern_kernel_diff2(distance, length): """ Function for the Matern kernel, order of differentiability = 2. kernel = (1 + sqrt(5)*d/l + 5*d**2/(3*l**2)) * exp(-sqrt(5)*d/l) Parameters ---------- distance : scalar or np.ndarray Distance between a set of points. length : scalar The length scale hyperparameters. Return ------ Kernel output : same as distance parameter. """ kernel = ( 1.0 + ((np.sqrt(5.0) * distance) / (length)) + ((5.0 * distance ** 2) / (3.0 * length ** 2)) ) * np.exp(-(np.sqrt(5.0) * distance) / length) return kernel
[docs] def matern_kernel_diff2_robust(distance, phi): """ Function for the Matern kernel, order of differentiability = 2, robust version. kernel = (1 + sqrt(5)*d*phi**2 + 15*d**2*phi**4) * exp(-sqrt(5)*d*phi**2) Parameters ---------- distance : scalar or np.ndarray Distance between a set of points. phi : scalar The length scale hyperparameters. Return ------ Kernel output : same as distance parameter. """ kernel = ( 1.0 + ((np.sqrt(5.0) * distance) * (phi ** 2)) + ((5.0 * distance ** 2) * (3.0 * phi ** 4)) ) * np.exp(-(np.sqrt(5.0) * distance) * (phi ** 2)) return kernel
[docs] def sparse_kernel(distance, radius): """ Function for a compactly supported kernel. Parameters ---------- distance : scalar or np.ndarray Distance between a set of points. radius : scalar Radius of support. Return ------ Kernel output : same as distance parameter. """ d = np.array(distance) d[d == 0.0] = 10e-6 d[d > radius] = radius kernel = (np.sqrt(2.0) / (3.0 * np.sqrt(np.pi))) * \ ((3.0 * (d / radius) ** 2 * np.log((d / radius) / (1 + np.sqrt(1.0 - (d / radius) ** 2)))) + ((2.0 * (d / radius) ** 2 + 1.0) * np.sqrt(1.0 - (d / radius) ** 2))) return kernel
[docs] def periodic_kernel(distance, length, p): """ Function for a periodic kernel. kernel = np.exp(-(2.0/length**2)*(np.sin(np.pi*distance/p)**2)) Parameters ---------- distance : scalar or np.ndarray Distance between a set of points. length : scalar Length scale. p : scalar Period. Return ------ Kernel output : same as distance parameter. """ kernel = np.exp(-(2.0 / length ** 2) * (np.sin(np.pi * distance / p) ** 2)) return kernel
[docs] def linear_kernel(x1, x2, hp1, hp2, hp3): """ Function for a linear kernel. kernel = hp1 + (hp2*(x1-hp3)*(x2-hp3)) Parameters ---------- x1 : float Point 1. x2 : float Point 2. hp1 : float Hyperparameter. hp2 : float Hyperparameter. hp3 : float Hyperparameter. Return ------ Kernel output : same as distance parameter. """ kernel = hp1 + (hp2 * (x1 - hp3) * (x2 - hp3)) return kernel
[docs] def dot_product_kernel(x1, x2, hp, matrix): """ Function for a dot-product kernel. kernel = hp + x1.T @ matrix @ x2 Parameters ---------- x1 : np.ndarray Point 1. x2 : np.ndarray Point 2. hp : float Offset hyperparameter. matrix : np.ndarray PSD matrix defining the inner product. Return ------ Kernel output : same as distance parameter. """ kernel = hp + x1.T @ matrix @ x2 return kernel
[docs] def polynomial_kernel(x1, x2, p): """ Polynomial kernel: ``(1 + x1ᵀ x2) ** p``. Parameters ---------- x1 : np.ndarray Point 1, shape (D,). x2 : np.ndarray Point 2, shape (D,). p : float Degree hyperparameter. Returns ------- kernel : float Kernel value. """ kernel = (1.0 + x1.T @ x2) ** p return kernel
[docs] def wendland_kernel(d): """ Function for the Wendland kernel with a given distance matrix. The Wendland kernel is compactly supported, leading to sparse covariance matrices. Parameters ---------- d : np.ndarray Distance matrix. Return ------ Covariance matrix : np.ndarray """ d[d > 1.] = 1. kernel = (1. - d) ** 8 * (32. * d ** 3 + 25. * d ** 2 + 8. * d + 1.) return kernel
[docs] def wendland_anisotropic(x1, x2, hyperparameters): """ Function for the Wendland kernel. The Wendland kernel is compactly supported, leading to sparse covariance matrices. Parameters ---------- x1 : np.ndarray Numpy array of shape (U x D). x2 : np.ndarray Numpy array of shape (V x D). hyperparameters : np.ndarray Array of hyperparameters. For this kernel we need D + 1 hyperparameters. Return ------ Covariance matrix : np.ndarray """ hps = hyperparameters distance_matrix = np.zeros((len(x1), len(x2))) for i in range(len(x1[0])): distance_matrix += abs(np.subtract.outer(x1[:, i], x2[:, i]) / hps[1 + i]) ** 2 d = np.sqrt(distance_matrix) d[d > 1.] = 1. kernel = (1. - d) ** 8 * (32. * d ** 3 + 25. * d ** 2 + 8. * d + 1.) return hps[0] * kernel
[docs] def non_stat_kernel(x1, x2, x0, w, l): """ Non-stationary kernel. kernel = g(x1) g(x2) Parameters ---------- x1 : np.ndarray Numpy array of shape (U x D). x2 : np.ndarray Numpy array of shape (V x D). x0 : np.ndarray Numpy array of the basis function locations. w : np.ndarray 1d np.ndarray of weights. len(w) = len(x0). l : float Width measure of the basis functions. Return ------ Covariance matrix : np.ndarray """ non_stat = np.outer(_g(x1, x0, w, l), _g(x2, x0, w, l)) return non_stat
[docs] def non_stat_kernel_gradient(x1, x2, x0, w, l): """ Non-stationary kernel gradient. kernel = g(x1) g(x2) Parameters ---------- x1 : np.ndarray Numpy array of shape (U x D). x2 : np.ndarray Numpy array of shape (V x D). x0 : np.ndarray Numpy array of the basis function locations. w : np.ndarray 1d np.ndarray of weights. len(w) = len(x0). l : float Width measure of the basis functions. Return ------ Covariance matrix : np.ndarray """ dkdw = (np.einsum('ij,k->ijk', _dgdw(x1, x0, w, l), _g(x2, x0, w, l)) + np.einsum('ij,k->ikj', _dgdw(x2, x0, w, l), _g(x1, x0, w, l))) dkdl = (np.outer(_dgdl(x1, x0, w, l), _g(x2, x0, w, l)) + np.outer(_dgdl(x2, x0, w, l), _g(x1, x0, w, l)).T) res = np.empty((len(w) + 1, len(x1), len(x2))) res[0:len(w)] = dkdw res[-1] = dkdl return res
[docs] def get_distance_matrix(x1, x2): """ Function to calculate the pairwise distance matrix of points in x1 and x2. Parameters ---------- x1 : np.ndarray Numpy array of shape (U x D). x2 : np.ndarray Numpy array of shape (V x D). Return ------ distance matrix : np.ndarray """ d = np.zeros((len(x1), len(x2))) for i in range(x1.shape[1]): d += (x1[:, i].reshape(-1, 1) - x2[:, i]) ** 2 return np.sqrt(d)
[docs] def get_anisotropic_distance_matrix(x1, x2, hps): """ Function to calculate the pairwise axial-anisotropic distance matrix of points in x1 and x2. Parameters ---------- x1 : np.ndarray Numpy array of shape (U x D). x2 : np.ndarray Numpy array of shape (V x D). hps : np.ndarray 1d array of values. The diagonal of the metric tensor describing the axial anisotropy. Return ------ distance matrix : np.ndarray """ d = np.zeros((len(x1), len(x2))) for i in range(len(x1[0])): d += abs(np.subtract.outer(x1[:, i], x2[:, i]) / hps[i]) ** 2 return np.sqrt(d)
def _g(x, x0, w, l): d = get_distance_matrix(x, x0) e = np.exp(-(d ** 2) / l) return np.sum(w * e, axis=1) def _dgdw(x, x0, w, l): d = get_distance_matrix(x, x0) e = np.exp(-(d ** 2) / l).T return e def _dgdl(x, x0, w, l): d = get_distance_matrix(x, x0) e = np.exp(-(d ** 2) / l) return np.sum(w * e * (d ** 2 / l ** 2), axis=1)
[docs] def wendland_anisotropic_gp2Scale_cpu(x1, x2, hps): """ Function for the anisotropic Wendland kernel computed on the CPU. The Wendland kernel is compactly supported, leading to sparse covariance matrices. Parameters ---------- x1 : np.ndarray Numpy array of shape (U x D). x2 : np.ndarray Numpy array of shape (V x D). hps : np.ndarray Array of hyperparameters. For this kernel we need D + 1 hyperparameters. Return ------ Covariance matrix : np.ndarray """ distance_matrix = np.zeros((len(x1), len(x2))) for i in range(len(x1[0])): distance_matrix += (np.subtract.outer(x1[:, i], x2[:, i]) / hps[1 + i]) ** 2 d = np.sqrt(distance_matrix) d[d > 1.] = 1. return _wendland_anisotropic_polynomial(d, hps[0])
def _wendland_anisotropic_polynomial(d, amplitude): return amplitude * (1. - d) ** 8 * (32. * d ** 3 + 25. * d ** 2 + 8. * d + 1.) def _get_distance_matrix_gpu(x1, x2, device, hps): # pragma: no cover import torch d = torch.zeros((len(x1), len(x2))).to(device, dtype=torch.float32) for i in range(x1.shape[1]): d += ((x1[:, i].reshape(-1, 1) - x2[:, i]) / hps[1 + i]) ** 2 return torch.sqrt(d)
[docs] def wendland_anisotropic_gp2Scale_gpu(x1, x2, hps): # pragma: no cover """ Function for the anisotropic Wendland kernel computed on the GPU. Picks the first usable GPU backend (torch CUDA or MPS, else cupy); falls back to the CPU implementation with a UserWarning if no GPU backend is available. The Wendland kernel is compactly supported, leading to sparse covariance matrices. Parameters ---------- x1 : np.ndarray Numpy array of shape (U x D). x2 : np.ndarray Numpy array of shape (V x D). hps : np.ndarray Array of hyperparameters. For this kernel we need D + 1 hyperparameters. Return ------ Covariance matrix : np.ndarray """ engine = _get_default_gpu_engine() if engine == "torch": import torch device = _get_torch_gpu_device() x1_dev = torch.as_tensor(x1, device=device, dtype=torch.float32) x2_dev = torch.as_tensor(x2, device=device, dtype=torch.float32) hps_dev = torch.as_tensor(hps, device=device, dtype=torch.float32) d = _get_distance_matrix_gpu(x1_dev, x2_dev, device, hps_dev) d = torch.clamp(d, max=1.0) kernel = hps_dev[0] * (1. - d) ** 8 * (32. * d ** 3 + 25. * d ** 2 + 8. * d + 1.) return kernel.detach().cpu().numpy() if engine == "cupy": import cupy as cp x1_dev = cp.asarray(x1, dtype=cp.float32) x2_dev = cp.asarray(x2, dtype=cp.float32) hps_dev = cp.asarray(hps, dtype=cp.float32) d = cp.zeros((len(x1), len(x2)), dtype=cp.float32) for i in range(x1.shape[1]): d += ((x1_dev[:, i].reshape(-1, 1) - x2_dev[:, i]) / hps_dev[1 + i]) ** 2 d = cp.sqrt(d) d = cp.minimum(d, cp.float32(1.0)) kernel = hps_dev[0] * (1. - d) ** 8 * (32. * d ** 3 + 25. * d ** 2 + 8. * d + 1.) return cp.asnumpy(kernel) warnings.warn( "No usable GPU backend was found for wendland_anisotropic_gp2Scale_gpu; " "falling back to the CPU Wendland implementation.", stacklevel=2, ) return wendland_anisotropic_gp2Scale_cpu(x1, x2, hps)
# -------------------------------------------------------------------------- # GPU backend selection # -------------------------------------------------------------------------- def _get_torch_gpu_device(): # pragma: no cover if importlib.util.find_spec("torch") is None: return None import torch if torch.cuda.is_available(): device_index = torch.cuda.current_device() if torch.cuda.device_count() > 0 else 0 return torch.device(f"cuda:{device_index}") mps_backend = getattr(torch.backends, "mps", None) if mps_backend is not None and torch.backends.mps.is_available(): return torch.device("mps") return None def _cupy_gpu_available(): # pragma: no cover if importlib.util.find_spec("cupy") is None: return False try: import cupy as cp return cp.cuda.runtime.getDeviceCount() > 0 except Exception: return False def _get_default_gpu_engine(): # pragma: no cover if _get_torch_gpu_device() is not None: return "torch" if _cupy_gpu_available(): return "cupy" return None # -------------------------------------------------------------------------- # Support-aware (sparse-COO) anisotropic Wendland kernels for gp2Scale. # # These mirror wendland_anisotropic_gp2Scale_{cpu,gpu} but return a scipy.sparse # COO block directly, built from an output-sensitive cKDTree neighbor search in # the whitened (anisotropy-corrected) coordinates instead of a dense all-pairs # evaluation. Intended for gp2Scale workflows where the data has spatial # locality and batches are sorted by a progression column. # -------------------------------------------------------------------------- _GP2SCALE_SPARSE_SORT_WARNING_EMITTED = False def _warn_gp2scale_sparse_kernel_sorting(): global _GP2SCALE_SPARSE_SORT_WARNING_EMITTED if _GP2SCALE_SPARSE_SORT_WARNING_EMITTED: return warnings.warn( "The support-aware gp2Scale Wendland kernels work best when gp2Scale batches preserve locality. " "If your data have a progression column, such as day in a climate dataset, sort by that column before batching.", stacklevel=2, ) _GP2SCALE_SPARSE_SORT_WARNING_EMITTED = True def _wendland_triplets_from_neighbor_lists(neighbor_lists): row_parts = [] col_parts = [] for row_idx, neighbors in enumerate(neighbor_lists): if not neighbors: continue cols = np.asarray(neighbors, dtype=np.int64) row_parts.append(np.full(cols.size, row_idx, dtype=np.int64)) col_parts.append(cols) if not row_parts: return ( np.empty(0, dtype=np.int64), np.empty(0, dtype=np.int64), ) return np.concatenate(row_parts), np.concatenate(col_parts) def _empty_gp2scale_sparse_block(x1, x2): return sparse.coo_matrix((len(x1), len(x2))) def _whiten_gp2scale_points(x1, x2, hps): scales = np.asarray(hps[1:], dtype=float) z1 = np.asarray(x1, dtype=float) / scales z2 = np.asarray(x2, dtype=float) / scales return z1, z2 def _gp2scale_whitened_block_distance(z1, z2): mins_1 = np.min(z1, axis=0) maxs_1 = np.max(z1, axis=0) mins_2 = np.min(z2, axis=0) maxs_2 = np.max(z2, axis=0) gap = np.maximum(0.0, np.maximum(mins_1 - maxs_2, mins_2 - maxs_1)) return np.linalg.norm(gap) def _wendland_support_aware_cpu_triplets(x1, x2, hps): """ Output-sensitive COO triplets for the anisotropic Wendland gp2Scale kernel. The support condition is an ellipsoid in the original coordinates and a unit ball in whitened coordinates, so block assembly can be written as a radius search rather than dense all-pairs evaluation. """ if len(x1) == 0 or len(x2) == 0: return ( np.empty(0, dtype=float), np.empty(0, dtype=np.int64), np.empty(0, dtype=np.int64), ) z1, z2 = _whiten_gp2scale_points(x1, x2, hps) if _gp2scale_whitened_block_distance(z1, z2) > 1.0: return ( np.empty(0, dtype=float), np.empty(0, dtype=np.int64), np.empty(0, dtype=np.int64), ) tree1 = cKDTree(z1) tree2 = cKDTree(z2) neighbor_lists = tree1.query_ball_tree(tree2, r=1.0) rows, cols = _wendland_triplets_from_neighbor_lists(neighbor_lists) if rows.size == 0: return ( np.empty(0, dtype=float), np.empty(0, dtype=np.int64), np.empty(0, dtype=np.int64), ) diff = z2[cols] - z1[rows] dist_sq = np.sum(diff * diff, axis=1) inside_mask = dist_sq <= 1.0 if not np.all(inside_mask): rows = rows[inside_mask] cols = cols[inside_mask] dist_sq = dist_sq[inside_mask] if rows.size == 0: return ( np.empty(0, dtype=float), np.empty(0, dtype=np.int64), np.empty(0, dtype=np.int64), ) distances = np.sqrt(np.minimum(dist_sq, 1.0)) values = _wendland_anisotropic_polynomial(distances, hps[0]) nonzero_mask = values != 0.0 if not np.all(nonzero_mask): rows = rows[nonzero_mask] cols = cols[nonzero_mask] values = values[nonzero_mask] if rows.size == 0: return ( np.empty(0, dtype=float), np.empty(0, dtype=np.int64), np.empty(0, dtype=np.int64), ) return values, rows, cols
[docs] def wendland_anisotropic_gp2Scale_cpu_sparse(x1, x2, hps): """ Support-aware anisotropic Wendland kernel for gp2Scale. Preserves the usual kernel interface but performs block-local support checks and radius-search assembly internally, returning a sparse COO matrix directly. Intended for gp2Scale workflows where the batchwise kernel is assembled on sparse blocks. Sort batches by a locality-preserving column (e.g. time) for best performance. """ _warn_gp2scale_sparse_kernel_sorting() values, rows, cols = _wendland_support_aware_cpu_triplets(x1, x2, hps) if values.size == 0: return _empty_gp2scale_sparse_block(x1, x2) return sparse.coo_matrix((values, (rows, cols)), shape=(len(x1), len(x2)))
def _wendland_support_aware_gpu_triplets(x1, x2, hps): # pragma: no cover """ Output-sensitive COO triplets for the anisotropic Wendland gp2Scale kernel with GPU-backed distance/polynomial evaluation when a usable GPU backend is available. Neighbor search remains host-side. """ if len(x1) == 0 or len(x2) == 0: return ( np.empty(0, dtype=float), np.empty(0, dtype=np.int64), np.empty(0, dtype=np.int64), ) z1, z2 = _whiten_gp2scale_points(x1, x2, hps) if _gp2scale_whitened_block_distance(z1, z2) > 1.0: return ( np.empty(0, dtype=float), np.empty(0, dtype=np.int64), np.empty(0, dtype=np.int64), ) tree1 = cKDTree(z1) tree2 = cKDTree(z2) neighbor_lists = tree1.query_ball_tree(tree2, r=1.0) rows, cols = _wendland_triplets_from_neighbor_lists(neighbor_lists) if rows.size == 0: return ( np.empty(0, dtype=float), np.empty(0, dtype=np.int64), np.empty(0, dtype=np.int64), ) diff = z2[cols] - z1[rows] engine = _get_default_gpu_engine() if engine == "torch": import torch device = _get_torch_gpu_device() diff_dev = torch.as_tensor(diff, device=device, dtype=torch.float32) dist_sq_dev = torch.sum(diff_dev * diff_dev, dim=1) inside_mask = (dist_sq_dev <= 1.0).detach().cpu().numpy() if not np.all(inside_mask): rows = rows[inside_mask] cols = cols[inside_mask] dist_sq_dev = dist_sq_dev[inside_mask] if rows.size == 0: return ( np.empty(0, dtype=float), np.empty(0, dtype=np.int64), np.empty(0, dtype=np.int64), ) distances = torch.sqrt(torch.clamp(dist_sq_dev, max=1.0)) values = (_wendland_anisotropic_polynomial(distances, float(hps[0]))).detach().cpu().numpy() elif engine == "cupy": import cupy as cp diff_dev = cp.asarray(diff, dtype=cp.float32) dist_sq_dev = cp.sum(diff_dev * diff_dev, axis=1) inside_mask = cp.asnumpy(dist_sq_dev <= cp.float32(1.0)) if not np.all(inside_mask): rows = rows[inside_mask] cols = cols[inside_mask] dist_sq_dev = dist_sq_dev[inside_mask] if rows.size == 0: return ( np.empty(0, dtype=float), np.empty(0, dtype=np.int64), np.empty(0, dtype=np.int64), ) distances = cp.sqrt(cp.minimum(dist_sq_dev, cp.float32(1.0))) values = cp.asnumpy(_wendland_anisotropic_polynomial(distances, float(hps[0]))) else: warnings.warn( "No usable GPU backend was found for the support-aware GPU Wendland kernel; " "falling back to the CPU sparse Wendland kernel.", stacklevel=2, ) return _wendland_support_aware_cpu_triplets(x1, x2, hps) nonzero_mask = values != 0.0 if not np.all(nonzero_mask): rows = rows[nonzero_mask] cols = cols[nonzero_mask] values = values[nonzero_mask] return values, rows, cols
[docs] def wendland_anisotropic_gp2Scale_gpu_sparse(x1, x2, hps): # pragma: no cover """ GPU-backed support-aware anisotropic Wendland kernel for gp2Scale. Neighbor discovery remains host-side; only the distance/polynomial evaluation on the discovered support graph is GPU-backed when a usable backend (torch CUDA/MPS or cupy) is available. Falls back to the CPU sparse variant with a UserWarning otherwise. """ _warn_gp2scale_sparse_kernel_sorting() values, rows, cols = _wendland_support_aware_gpu_triplets(x1, x2, hps) if values.size == 0: return _empty_gp2scale_sparse_block(x1, x2) return sparse.coo_matrix((values, (rows, cols)), shape=(len(x1), len(x2)))
[docs] def wasserstein_1d(a, b): """ The 1d Wasserstein distance. Parameters ---------- a : np.ndarray 1d Numpy array. Input distribution. b : np.ndarray 1d Numpy array. Input distribution. Return ------ Wasserstein distance : float """ a = a / np.sum(a) b = b / np.sum(b) a_sorted = np.sort(a) b_sorted = np.sort(b) return np.mean(np.abs(a_sorted - b_sorted))
[docs] def wasserstein_1d_outer_vec(a, b): """ Vectorized pairwise 1-D Wasserstein distance between all rows of ``a`` and ``b``. Parameters ---------- a : np.ndarray Array of shape (M, K); each row is an unnormalized 1-D measure. b : np.ndarray Array of shape (N, K); each row is an unnormalized 1-D measure. Returns ------- W : np.ndarray Distance matrix of shape (M, N). """ a = a / a.sum(axis=1, keepdims=True) b = b / b.sum(axis=1, keepdims=True) a_sorted = np.sort(a, axis=1) b_sorted = np.sort(b, axis=1) s = a_sorted[:, None, :] - b_sorted[None, :, :] return np.mean(np.abs(s), axis=2)
[docs] def bump(d, r, beta=1., ampl=1.): """ Smooth compactly-supported bump function evaluated over a distance array. Parameters ---------- d : np.ndarray Distance values (non-negative). r : float Support radius; the function is zero for ``d >= r``. beta : float, optional Sharpness parameter (default 1). ampl : float, optional Amplitude scale (default 1). Returns ------- bump : np.ndarray Bump values, same shape as ``d``. """ #x_new = x - x0 #d = np.linalg.norm(x_new, axis = 1) a = np.zeros(d.shape) a = 1.0 - (d**2/r**2) i = np.where(a > 0.0) bump = np.zeros(a.shape) e = np.exp((-beta/a[i])+beta) bump[i] = ampl * e return bump
[docs] def sle_kernel(x1, x2, hps, args): """ Sparse-Landmark-Embedding (SLE) kernel. Embeds points via a bump-function basis centered on the training set, then applies a squared-exponential kernel on the embedded space. Requires ``args["x_data"]`` (the training locations) to construct the basis. Parameters ---------- x1 : np.ndarray Query points, shape (N1, D). x2 : np.ndarray Query points, shape (N2, D). hps : np.ndarray Hyperparameters ``[amplitude, radius, beta, length_scale]``. args : dict Must contain key ``"x_data"`` with the training point locations. Returns ------- K : np.ndarray Covariance matrix of shape (N1, N2). """ # Distance to each training point (using all dimensions) x_data = args["x_data"] d1 = get_distance_matrix(x1, x_data) # (n1, m) - can be geodesic/non-cnd! d2 = get_distance_matrix(x2, x_data) # (n2, m) # Apply bump to create embedding phi_x1 = bump(d1, hps[1], beta=hps[2], ampl=1.0) # (n1, m) phi_x2 = bump(d2, hps[1], beta=hps[2], ampl=1.0) # (n2, m) # RBF on embeddings D_mat = cdist(phi_x1, phi_x2, metric='euclidean') return hps[0] * np.exp(-D_mat ** 2 / hps[3])