Processors API Reference¶
Detailed documentation for text and sequence processing utilities.
Text Processor¶
TextProcessor¶
Core class for text encoding and processing.
class TextProcessor:
def __init__(self, config: TextEncodingConfig):
self.config = config
self.model = None
self.device = config.device
self.cache_manager = CacheManager(config.cache_dir)
Parameters:
- config: Text encoding configuration object
Methods:
load_model()¶
Load text encoding model.
def load_model(self) -> None:
"""
Load Sentence Transformer model
"""
if self.model is None:
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer(self.config.encoder_model)
self.model.to(self.device)
print(f"Loaded text encoder: {self.config.encoder_model}")
encode_texts(texts, cache_key, force_reload)¶
Encode text list.
def encode_texts(
self,
texts: List[str],
cache_key: Optional[str] = None,
force_reload: bool = False
) -> np.ndarray:
"""
Encode text list to embedding vectors
Args:
texts: Text list
cache_key: Cache key, will attempt to use cache if provided
force_reload: Whether to force recomputation
Returns:
Embedding matrix (num_texts, embedding_dim)
"""
# Check cache
if cache_key and not force_reload and self.cache_manager.exists(cache_key):
print(f"Loading embeddings from cache: {cache_key}")
return self.cache_manager.load(cache_key)
# Load model
self.load_model()
# Batch encoding
print(f"Encoding {len(texts)} texts with {self.config.encoder_model}")
embeddings = []
for i in range(0, len(texts), self.config.batch_size):
batch_texts = texts[i:i + self.config.batch_size]
batch_embeddings = self.model.encode(
batch_texts,
convert_to_numpy=True,
normalize_embeddings=self.config.normalize_embeddings,
show_progress_bar=True
)
embeddings.append(batch_embeddings)
# Merge results
embeddings = np.vstack(embeddings)
# Save cache
if cache_key:
self.cache_manager.save(cache_key, embeddings)
print(f"Saved embeddings to cache: {cache_key}")
return embeddings
encode_item_features(items_df, cache_key, force_reload)¶
Encode item features.
def encode_item_features(
self,
items_df: pd.DataFrame,
cache_key: Optional[str] = None,
force_reload: bool = False
) -> np.ndarray:
"""
Encode item features to embedding vectors
Args:
items_df: Items DataFrame
cache_key: Cache key
force_reload: Whether to force recomputation
Returns:
Item embedding matrix (num_items, embedding_dim)
"""
# Format text
texts = []
for _, row in items_df.iterrows():
text = self.config.format_text(row.to_dict())
texts.append(text)
return self.encode_texts(texts, cache_key, force_reload)
encode_single_text(text)¶
Encode single text.
def encode_single_text(self, text: str) -> np.ndarray:
"""
Encode single text
Args:
text: Input text
Returns:
Text embedding vector (embedding_dim,)
"""
self.load_model()
embedding = self.model.encode(
[text],
convert_to_numpy=True,
normalize_embeddings=self.config.normalize_embeddings
)[0]
return embedding
compute_similarity(text1, text2)¶
Compute text similarity.
def compute_similarity(self, text1: str, text2: str) -> float:
"""
Compute cosine similarity between two texts
Args:
text1: First text
text2: Second text
Returns:
Cosine similarity value [-1, 1]
"""
embedding1 = self.encode_single_text(text1)
embedding2 = self.encode_single_text(text2)
return np.dot(embedding1, embedding2) / (
np.linalg.norm(embedding1) * np.linalg.norm(embedding2)
)
find_similar_texts(query_text, candidate_texts, top_k)¶
Find similar texts.
def find_similar_texts(
self,
query_text: str,
candidate_texts: List[str],
top_k: int = 5
) -> List[Tuple[int, str, float]]:
"""
Find texts most similar to query text
Args:
query_text: Query text
candidate_texts: Candidate text list
top_k: Return top k most similar
Returns:
List of (index, text, similarity) tuples, sorted by similarity descending
"""
query_embedding = self.encode_single_text(query_text)
candidate_embeddings = self.encode_texts(candidate_texts)
# Compute similarities
similarities = np.dot(candidate_embeddings, query_embedding)
# Get top-k
top_indices = np.argsort(similarities)[-top_k:][::-1]
results = []
for idx in top_indices:
results.append((idx, candidate_texts[idx], similarities[idx]))
return results
Sequence Processor¶
SequenceProcessor¶
Core class for sequence data processing.
class SequenceProcessor:
def __init__(self, config: SequenceConfig):
self.config = config
def build_user_sequences(
self,
interactions_df: pd.DataFrame
) -> List[Dict[str, Any]]:
"""
Build user interaction sequences
Args:
interactions_df: Interactions DataFrame containing user_id, item_id, timestamp
Returns:
User sequence list, each sequence contains user ID and item sequence
"""
sequences = []
# Group by user and sort by time
for user_id, group in interactions_df.groupby('user_id'):
user_interactions = group.sort_values('timestamp')
item_sequence = user_interactions['item_id'].tolist()
# Filter sequences that are too short
if len(item_sequence) >= self.config.min_seq_length:
sequences.append({
'user_id': user_id,
'item_sequence': item_sequence,
'timestamps': user_interactions['timestamp'].tolist() if self.config.include_timestamps else None
})
return sequences
create_training_samples(sequences)¶
Create training samples.
def create_training_samples(
self,
sequences: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Create training samples from user sequences
Args:
sequences: User sequence list
Returns:
Training sample list, each sample contains input and target sequences
"""
training_samples = []
for seq_data in sequences:
item_sequence = seq_data['item_sequence']
# Create multiple subsequences
for i in range(0, len(item_sequence) - self.config.min_seq_length + 1, self.config.sequence_stride):
# Determine subsequence length
end_idx = min(i + self.config.max_seq_length, len(item_sequence))
if end_idx - i >= self.config.min_seq_length:
input_seq = item_sequence[i:end_idx-self.config.target_offset]
target_seq = item_sequence[i+self.config.target_offset:end_idx]
if len(input_seq) > 0 and len(target_seq) > 0:
sample = {
'user_id': seq_data['user_id'],
'input_sequence': input_seq,
'target_sequence': target_seq
}
# Add timestamp information
if self.config.include_timestamps and seq_data['timestamps']:
sample['input_timestamps'] = seq_data['timestamps'][i:end_idx-self.config.target_offset]
sample['target_timestamps'] = seq_data['timestamps'][i+self.config.target_offset:end_idx]
training_samples.append(sample)
return training_samples
pad_and_truncate_sequence(sequence)¶
Pad and truncate sequence.
def pad_and_truncate_sequence(self, sequence: List[int]) -> List[int]:
"""
Pad and truncate sequence to specified length
Args:
sequence: Input sequence
Returns:
Processed sequence
"""
# Truncate
if len(sequence) > self.config.max_seq_length:
sequence = self.config.truncate_sequence(sequence)
# Pad
if len(sequence) < self.config.max_seq_length:
sequence = self.config.pad_sequence(sequence)
return sequence
create_attention_mask(sequence)¶
Create attention mask.
def create_attention_mask(self, sequence: List[int]) -> List[int]:
"""
Create attention mask for sequence
Args:
sequence: Input sequence
Returns:
Attention mask, 1 for valid positions, 0 for padding positions
"""
mask = []
for token in sequence:
if token == self.config.padding_token:
mask.append(0)
else:
mask.append(1)
return mask
encode_time_features(timestamps)¶
Encode time features.
def encode_time_features(self, timestamps: List[float]) -> np.ndarray:
"""
Encode timestamps to feature vectors
Args:
timestamps: Timestamp list
Returns:
Time feature matrix (seq_len, time_encoding_dim)
"""
if not timestamps:
return np.zeros((0, self.config.time_encoding_dim))
# Normalize timestamps
timestamps = np.array(timestamps)
min_time, max_time = timestamps.min(), timestamps.max()
if max_time > min_time:
normalized_times = (timestamps - min_time) / (max_time - min_time)
else:
normalized_times = np.zeros_like(timestamps)
# Create sinusoidal encoding
time_features = []
for i in range(self.config.time_encoding_dim // 2):
freq = 1.0 / (10000 ** (2 * i / self.config.time_encoding_dim))
sin_features = np.sin(normalized_times * freq)
cos_features = np.cos(normalized_times * freq)
time_features.extend([sin_features, cos_features])
# Transpose and truncate to specified dimension
time_features = np.array(time_features[:self.config.time_encoding_dim]).T
return time_features
Data Augmentation Processor¶
DataAugmentor¶
Data augmentation processor.
class DataAugmentor:
def __init__(self, augmentation_config: Dict[str, Any]):
self.config = augmentation_config
def augment_sequence(self, sequence: List[int]) -> List[int]:
"""
Perform data augmentation on sequence
Args:
sequence: Original sequence
Returns:
Augmented sequence
"""
augmented = sequence.copy()
# Random drop
if self.config.get('random_drop', False):
drop_prob = self.config.get('drop_prob', 0.1)
augmented = [item for item in augmented if random.random() > drop_prob]
# Random shuffle
if self.config.get('random_shuffle', False):
shuffle_prob = self.config.get('shuffle_prob', 0.1)
if random.random() < shuffle_prob:
# Only shuffle partial subsequence
start = random.randint(0, max(0, len(augmented) - 3))
end = min(start + random.randint(2, 4), len(augmented))
subseq = augmented[start:end]
random.shuffle(subseq)
augmented[start:end] = subseq
# Random replace
if self.config.get('random_replace', False):
replace_prob = self.config.get('replace_prob', 0.05)
vocab_size = self.config.get('vocab_size', 1000)
for i in range(len(augmented)):
if random.random() < replace_prob:
augmented[i] = random.randint(1, vocab_size)
return augmented
Preprocessing Pipeline¶
PreprocessingPipeline¶
Data preprocessing pipeline.
class PreprocessingPipeline:
def __init__(
self,
text_processor: TextProcessor,
sequence_processor: SequenceProcessor,
augmentor: Optional[DataAugmentor] = None
):
self.text_processor = text_processor
self.sequence_processor = sequence_processor
self.augmentor = augmentor
def process_items(
self,
items_df: pd.DataFrame,
cache_key: str = None
) -> pd.DataFrame:
"""
Process item data
Args:
items_df: Items DataFrame
cache_key: Cache key
Returns:
Processed items DataFrame with feature vectors
"""
print("Processing item features...")
# Encode text features
embeddings = self.text_processor.encode_item_features(
items_df, cache_key=cache_key
)
# Add features to DataFrame
processed_df = items_df.copy()
processed_df['features'] = embeddings.tolist()
return processed_df
def process_interactions(
self,
interactions_df: pd.DataFrame,
items_df: pd.DataFrame
) -> List[Dict[str, Any]]:
"""
Process interaction data to generate sequences
Args:
interactions_df: Interactions DataFrame
items_df: Items DataFrame
Returns:
Processed sequence data
"""
print("Building user sequences...")
# Build user sequences
sequences = self.sequence_processor.build_user_sequences(interactions_df)
# Create training samples
training_samples = self.sequence_processor.create_training_samples(sequences)
# Data augmentation
if self.augmentor:
augmented_samples = []
for sample in training_samples:
# Original sample
augmented_samples.append(sample)
# Augmented sample
aug_input = self.augmentor.augment_sequence(sample['input_sequence'])
aug_target = self.augmentor.augment_sequence(sample['target_sequence'])
augmented_sample = sample.copy()
augmented_sample['input_sequence'] = aug_input
augmented_sample['target_sequence'] = aug_target
augmented_samples.append(augmented_sample)
training_samples = augmented_samples
return training_samples
Utility Functions¶
compute_sequence_statistics(sequences)¶
Compute sequence statistics.
def compute_sequence_statistics(sequences: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Compute sequence data statistics
Args:
sequences: Sequence list
Returns:
Statistics dictionary
"""
if not sequences:
return {}
lengths = [len(seq['item_sequence']) for seq in sequences]
unique_users = len(set(seq['user_id'] for seq in sequences))
# Compute item frequencies
item_counts = {}
for seq in sequences:
for item_id in seq['item_sequence']:
item_counts[item_id] = item_counts.get(item_id, 0) + 1
stats = {
'num_sequences': len(sequences),
'num_unique_users': unique_users,
'num_unique_items': len(item_counts),
'avg_sequence_length': np.mean(lengths),
'min_sequence_length': np.min(lengths),
'max_sequence_length': np.max(lengths),
'median_sequence_length': np.median(lengths),
'total_interactions': sum(lengths),
'most_popular_items': sorted(item_counts.items(), key=lambda x: x[1], reverse=True)[:10]
}
return stats
visualize_embeddings(embeddings, labels, method)¶
Visualize embedding vectors.
def visualize_embeddings(
embeddings: np.ndarray,
labels: List[str] = None,
method: str = 'tsne',
save_path: str = None
) -> None:
"""
Visualize high-dimensional embedding vectors
Args:
embeddings: Embedding matrix (n_samples, embedding_dim)
labels: Sample labels
method: Dimensionality reduction method ('tsne', 'pca', 'umap')
save_path: Save path
"""
import matplotlib.pyplot as plt
# Dimensionality reduction
if method == 'tsne':
from sklearn.manifold import TSNE
reducer = TSNE(n_components=2, random_state=42)
elif method == 'pca':
from sklearn.decomposition import PCA
reducer = PCA(n_components=2)
elif method == 'umap':
import umap
reducer = umap.UMAP(n_components=2, random_state=42)
else:
raise ValueError(f"Unknown method: {method}")
reduced_embeddings = reducer.fit_transform(embeddings)
# Plot
plt.figure(figsize=(10, 8))
if labels:
unique_labels = list(set(labels))
colors = plt.cm.tab10(np.linspace(0, 1, len(unique_labels)))
for i, label in enumerate(unique_labels):
mask = np.array(labels) == label
plt.scatter(
reduced_embeddings[mask, 0],
reduced_embeddings[mask, 1],
c=[colors[i]],
label=label,
alpha=0.7
)
plt.legend()
else:
plt.scatter(reduced_embeddings[:, 0], reduced_embeddings[:, 1], alpha=0.7)
plt.title(f'Embedding Visualization ({method.upper()})')
plt.xlabel('Component 1')
plt.ylabel('Component 2')
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.show()
Usage Examples¶
Text Processing¶
from genrec.data.processors import TextProcessor
from genrec.data.configs import TextEncodingConfig
# Create configuration
config = TextEncodingConfig(
encoder_model="sentence-transformers/all-MiniLM-L6-v2",
template="Title: {title}; Category: {category}",
batch_size=32
)
# Create processor
processor = TextProcessor(config)
# Encode texts
texts = ["Apple iPhone 13", "Samsung Galaxy S21", "Sony WH-1000XM4"]
embeddings = processor.encode_texts(texts, cache_key="sample_texts")
print(f"Embeddings shape: {embeddings.shape}")
Sequence Processing¶
from genrec.data.processors import SequenceProcessor
from genrec.data.configs import SequenceConfig
# Create configuration
config = SequenceConfig(
max_seq_length=50,
min_seq_length=3,
target_offset=1
)
# Create processor
processor = SequenceProcessor(config)
# Process interaction data
sequences = processor.build_user_sequences(interactions_df)
training_samples = processor.create_training_samples(sequences)
print(f"Generated {len(training_samples)} training samples")
Complete Preprocessing Pipeline¶
from genrec.data.processors import PreprocessingPipeline
# Create pipeline
pipeline = PreprocessingPipeline(
text_processor=text_processor,
sequence_processor=sequence_processor
)
# Process data
processed_items = pipeline.process_items(items_df, cache_key="items_beauty")
processed_sequences = pipeline.process_interactions(interactions_df, processed_items)
# View statistics
stats = compute_sequence_statistics(processed_sequences)
print(f"Dataset statistics: {stats}")