Source code for wordviz.loading

import os
import shutil
from gensim.models import KeyedVectors
from gensim.scripts.glove2word2vec import glove2word2vec
from  gensim.models.fasttext import load_facebook_model
import json
import numpy as np
import zipfile
from pathlib import Path
import urllib.request


[docs] class EmbeddingLoader: """ Loads word or sentence embedding. Attributes ---------- embeddings_raw : Any KeyedVectors format for static embeddings embeddings : np.ndarray Array of embeddings tokens : list of str Representative elements for the embeddings in natural language (words, sentences, or other elements to visualize) dimension : int Dimensionality of the embeddings. type: str Type of embedding - 'word': word embeddings - 'sentence': Sentence/document/passage embeddings - 'word_context': Word embeddings in different contexts - 'custom': User-defined """ def __init__(self): self.embeddings_raw = None self.embeddings = None self.tokens = None self.dimension = None self.type = None self.embeddings_subset = None self.tokens_subset = None with open(os.path.join(os.path.dirname(__file__), 'pretrained_embeddings.json')) as f: self.available_pretrained = json.load(f)
[docs] def get_cache_dir(self): cache_dir = Path.home() / ".wordviz_cache" cache_dir.mkdir(parents=True, exist_ok=True) return cache_dir
def _validate_file(self, path): '''checks if path argument leads to a valid file name and returns if it is binary''' valid_ext = ['.bin', '.txt', '.vec'] if not isinstance(path, str): path = str(path) _, ext = os.path.splitext(path.lower()) if path is None: raise ValueError('File path is required') if not isinstance(path, str): raise TypeError('The file path must be a string') if not os.path.exists(path): raise FileNotFoundError(f"Invalid file path {path}: the file does not exist") if ext not in valid_ext: raise ValueError(f'Invalid file extension {ext}. Valid extensions are: {','.join(valid_ext)}') binary = True if ext == '.bin' else False return binary
[docs] def load_from_file(self, path: str, format: str) -> np.ndarray: ''' Loads word embeddings from a file in .txt, .vec, or .bin format. Parameters ----------- path : str Path to the embedding file. format : str Format of the embedding model: 'word2vec', 'fasttext', or 'glove'. Returns -------- np.ndarray Loaded embedding matrix. Notes: ------ - For GloVe files, they are first converted to word2vec format. - FastText binary files are supported via Facebook's native loader. - Loaded tokens are stored in self.tokens. - Embedding matrix is stored in self.embeddings. ''' binary = self._validate_file(path) match format: case 'word2vec': self.embeddings_raw = KeyedVectors.load_word2vec_format(path, binary=binary) case 'fasttext': if binary: self.embeddings_raw = load_facebook_model(path) else: self.embeddings_raw = KeyedVectors.load_word2vec_format(path, binary=False) case 'glove': glove2word2vec(path, "glove_w2v.txt") if not os.path.exists("glove_w2v.txt"): raise RuntimeError("GloVe to Word2Vec conversion failed.") self.embeddings_raw = KeyedVectors.load_word2vec_format("glove_w2v.txt") self.tokens = list(self.embeddings_raw.index_to_key) self.dimension = self.embeddings_raw.vector_size self.type = 'word' words = self.embeddings_raw.index_to_key self.embeddings = np.array([self.embeddings_raw.get_vector(word) for word in words]) print("Embedding loaded from file") return self.embeddings
[docs] def download_zip(self, url, filename): '''downloads zip file from url''' zip_path = self.get_cache_dir() / filename if not zip_path.exists(): print(f"Downloading {filename}...") urllib.request.urlretrieve(url, zip_path) else: print(f"{filename} already exists in cache.") return zip_path
[docs] def export_embedding(self, source_path, dest_folder): '''saves locally pretrained embeddings file''' os.makedirs(dest_folder, exist_ok=True) filename = os.path.basename(source_path) dest_path = os.path.join(dest_folder, filename) shutil.copy(source_path, dest_path) print(f"File saved in {dest_path}.")
[docs] def load_pretrained(self, model: str, lang: str, source: str, dimension: str, save_file: bool = False, export_dir: str = None) -> np.ndarray: ''' Downloads and loads a pretrained embedding model from an online source. Parameters ----------- model : str Name of the embedding model ('word2vec', 'fasttext', etc.). lang : str Language code of the embedding ('en', 'it'). source : str Data source ('wiki', 'cc'). dimension : str or int Embedding dimensionality (e.g., '300'). save_file : bool, default=False If True, saves the embedding to the specified export directory. export_dir : str, optional Path to the directory where the file will be exported (used if save_file=True). Returns -------- np.ndarray Loaded embedding matrix (n_words x dimension). ''' columns = self.available_pretrained["columns"] option = next( (dict(zip(columns, row)) for row in self.available_pretrained["data"] if row[0] == model and row[1] == lang and row[2] == source and row[3] == dimension), None ) if option is not None: url = option['url'] filename = option['filename'] else: raise ValueError(f"Can't find pretrained file with parameters: {model}, {lang}, {source}, {dimension}") zip_filename = url.split("/")[-1] zip_path = self.download_zip(url, zip_filename) dest_dir = self.get_cache_dir() / model / lang / source / dimension dest_dir.mkdir(parents=True, exist_ok=True) file_path = dest_dir / filename if not file_path.exists(): with zipfile.ZipFile(zip_path, 'r') as z: print(f"Extracting {filename}...") z.extract(filename, path=dest_dir) self.embeddings = self.load_from_file(file_path, model) if save_file: if export_dir is None: raise ValueError("Must specify export_dir to save file.") self.export_embedding(file_path, export_dir) return self.embeddings
[docs] def load_contextual(self, embeddings, labels, embedding_type='sentence') -> np.ndarray: """ Loads embeddings from contextual models. Parameters ----------- embeddings: various formats - numpy.ndarray - torch.Tensor - List[List[float]] labels: list of str labels corresponding to embedding embedding_type: str - 'sentence': Sentence/document/passage embeddings - 'word_context': Word embeddings in different contexts - 'word': word embeddings - 'custom': User-defined Returns -------- np.ndarray Loaded embedding matrix (n_labels x dimension). """ embeddings_array = self._normalize_embeddings(embeddings) self.embeddings = embeddings_array self.tokens = labels self.dimension = embeddings_array.shape[1] self.type = embedding_type print("Contextual embedding loaded") return self.embeddings
def _normalize_embeddings(self, embeddings): """Converts embeddings to numpy array.""" if isinstance(embeddings, np.ndarray): return embeddings.astype(np.float32) elif hasattr(embeddings, 'detach'): # torch.Tensor return embeddings.detach().cpu().numpy().astype(np.float32) elif isinstance(embeddings, list): return np.array(embeddings, dtype=np.float32) else: try: return np.array(embeddings, dtype=np.float32) except: raise ValueError(f"Cannot convert embeddings of type {type(embeddings)} to numpy array")
[docs] def list_available_pretrained(self): '''prints a list of pretrained embeddings provided by the package''' print('model | lang | source | dim') for file in self.available_pretrained['data']: print(" | ".join(x for x in file[:-2]))
[docs] def get_embedding(self, token): '''returns corresponding embeddings using KeyedVectors object for a string given by the user''' if self.embeddings is None: raise RuntimeError("No embeddings loaded.") if self.type in ("sentence", "word_context"): try: index = self.tokens.index(token) except ValueError: raise KeyError(f"Token '{token}' not found") return self.embeddings[index] elif self.type == "word": # prefer keyed vectors if available if getattr(self, "embeddings_raw", None) is not None: try: return self.embeddings_raw.get_vector(token) except Exception: pass try: idx = self.tokens.index(token) except ValueError: raise KeyError(f"Token '{token}' not found") return self.embeddings[idx] else: raise RuntimeError("Unknown embedding type")
[docs] def subset(self, n: int = 1000, strategy: str = 'first', random_seed: int = None): ''' Create a subset of the current embeddings and tokens. Useful for speeding up visualizations or managing memory with large embedding spaces. Parameters ----------- n : int, default=1000 Number of embeddings to retain. If n exceeds the total number of available embeddings, all are retained. strategy : str, default='first' Selection strategy: - 'first': select the first n embeddings in original order. - 'random': select n random embeddings. random_seed : int, optional Seed for reproducible random sampling (only used if strategy is 'random'). Updates -------- self.tokens_subset : list of str List of selected token strings. self.embeddings_subset : np.ndarray Corresponding selected embedding vectors. ''' if self.embeddings is None or self.tokens is None: raise RuntimeError("No embeddings loaded. Call load_from_file / load_contextual first.") if n > len(self.tokens): print('n is larger than the embedding size, the subset size will be equal to the full size') if strategy == 'first': indices = list(range(min(n, len(self.tokens)))) elif strategy == 'random': rng = np.random.default_rng(random_seed) indices = rng.choice(len(self.tokens), size=min(n, len(self.tokens)), replace=False).tolist() else: raise ValueError("strategy has to be 'first' o 'random'") self.tokens_subset = [self.tokens[i] for i in indices] self.embeddings_subset = self.embeddings[indices]
[docs] def use_subset(self, n: int = 1000): '''returns embedding subset. If None, creates 1000 words subset and returns it.''' if self.embeddings_subset is None: self.subset(n) return self.embeddings_subset, self.tokens_subset