Source code for vivre.align

"""
Text alignment functionality for matching source and target texts.
"""

import math
from typing import List, Optional, Tuple

import scipy.stats as stats


[docs] class Aligner: """ A class for aligning source and target texts using the Gale-Church algorithm. This class provides functionality to align segments of text between source and target languages, creating parallel corpora for translation and analysis purposes. """ def __init__( self, language_pair: str = "en-es", c: Optional[float] = None, s2: Optional[float] = None, gap_penalty: Optional[float] = None, ): """ Initialize the Aligner with language-specific parameters. Args: language_pair: Language pair code (e.g., "en-es", "en-fr", "en-de"). Defaults to "en-es" (English-Spanish). c: Optional mean length ratio override. s2: Optional variance of the difference override. gap_penalty: Optional gap penalty override. """ # Language-specific parameters for the Gale-Church algorithm self.language_params = { "en-es": {"c": 1.0, "s2": 6.8, "gap_penalty": 3.0}, "en-fr": {"c": 1.1, "s2": 7.2, "gap_penalty": 3.0}, "en-de": {"c": 1.2, "s2": 8.1, "gap_penalty": 3.0}, "en-it": {"c": 1.05, "s2": 7.0, "gap_penalty": 3.0}, } self.params = self.language_params.get( language_pair, self.language_params["en-es"] ) self.c = c if c is not None else self.params["c"] self.s2 = s2 if s2 is not None else self.params["s2"] self.gap_penalty = ( gap_penalty if gap_penalty is not None else self.params["gap_penalty"] ) # Pre-calculate gap penalty costs self._gap_cost = self._gap_penalty_cost() self._double_gap_cost = 2 * self._gap_cost
[docs] def align( self, source_sentences: List[str], target_sentences: List[str] ) -> List[Tuple[str, str]]: """ Align source and target sentences into parallel segments. Args: source_sentences: List of source language sentences (pre-tokenized). target_sentences: List of target language sentences (pre-tokenized). Returns: A list of tuples containing aligned (source_segment, target_segment) pairs. """ # Handle empty sentence lists if not source_sentences or not target_sentences: return [] # Filter out empty sentences source_sentences = [s.strip() for s in source_sentences if s.strip()] target_sentences = [s.strip() for s in target_sentences if s.strip()] if not source_sentences or not target_sentences: return [] # For perfectly matched sentences, align 1-1 if source_sentences == target_sentences: return self._align_perfect_match(source_sentences) # Use Gale-Church algorithm for different sentences return self._align_gale_church(source_sentences, target_sentences)
def _align_perfect_match(self, sentences: List[str]) -> List[Tuple[str, str]]: """ Align perfectly matched sentences 1-1. Args: sentences: List of sentences to align. Returns: List of (sentence, sentence) tuples for perfect alignment. """ # Create 1-1 alignments for each sentence alignments = [] for sentence in sentences: if sentence.strip(): alignments.append((sentence, sentence)) return alignments def _align_gale_church( self, source_sentences: List[str], target_sentences: List[str] ) -> List[Tuple[str, str]]: """ Align sentences using the Gale-Church algorithm. Args: source_sentences: List of source language sentences. target_sentences: List of target language sentences. Returns: List of aligned (source_segment, target_segment) tuples. """ if not source_sentences or not target_sentences: return [] # Calculate sentence lengths (in characters) source_lengths = [len(s) for s in source_sentences] target_lengths = [len(s) for s in target_sentences] # Find optimal alignment using dynamic programming alignment = self._gale_church_dp(source_lengths, target_lengths) # Convert alignment indices to actual sentence pairs result = [] for src_start, src_end, tgt_start, tgt_end in alignment: src_segment = " ".join(source_sentences[src_start:src_end]) tgt_segment = " ".join(target_sentences[tgt_start:tgt_end]) result.append((src_segment, tgt_segment)) return result def _gale_church_dp( self, source_lengths: List[int], target_lengths: List[int] ) -> List[Tuple[int, int, int, int]]: """ Dynamic programming implementation of Gale-Church algorithm. """ m, n = len(source_lengths), len(target_lengths) dp = [[float("inf")] * (n + 1) for _ in range(m + 1)] dp[0][0] = 0 backtrack: List[List] = [[None] * (n + 1) for _ in range(m + 1)] for i in range(m + 1): for j in range(n + 1): if i == 0 and j == 0: continue candidates = [] # 1-1 alignment if i > 0 and j > 0: cost = self._alignment_cost( source_lengths[i - 1], target_lengths[j - 1] ) candidates.append((dp[i - 1][j - 1] + cost, (i - 1, j - 1, 1, 1))) # 1-0 alignment (source sentence with no target) if i > 0: cost = self._gap_cost candidates.append((dp[i - 1][j] + cost, (i - 1, j, 1, 0))) # 0-1 alignment (target sentence with no source) if j > 0: cost = self._gap_cost candidates.append((dp[i][j - 1] + cost, (i, j - 1, 0, 1))) # 2-1 alignment (two source sentences with one target) if i > 1 and j > 0: src_len = source_lengths[i - 2] + source_lengths[i - 1] cost = self._alignment_cost(src_len, target_lengths[j - 1]) candidates.append((dp[i - 2][j - 1] + cost, (i - 2, j - 1, 2, 1))) # 1-2 alignment (one source sentence with two target) if i > 0 and j > 1: tgt_len = target_lengths[j - 2] + target_lengths[j - 1] cost = self._alignment_cost(source_lengths[i - 1], tgt_len) candidates.append((dp[i - 1][j - 2] + cost, (i - 1, j - 2, 1, 2))) # 2-2 alignment (two source sentences with two target) if i > 1 and j > 1: src_len = source_lengths[i - 2] + source_lengths[i - 1] tgt_len = target_lengths[j - 2] + target_lengths[j - 1] cost = self._alignment_cost(src_len, tgt_len) candidates.append((dp[i - 2][j - 2] + cost, (i - 2, j - 2, 2, 2))) # 2-0 alignment (two source sentences deleted) if i > 1: cost = self._double_gap_cost candidates.append((dp[i - 2][j] + cost, (i - 2, j, 2, 0))) # 0-2 alignment (two target sentences inserted) if j > 1: cost = self._double_gap_cost candidates.append((dp[i][j - 2] + cost, (i, j - 2, 0, 2))) if candidates: best_cost, best_move = min(candidates, key=lambda x: x[0]) dp[i][j] = best_cost backtrack[i][j] = best_move return self._reconstruct_alignment(backtrack, m, n) def _alignment_cost(self, src_len: int, tgt_len: int) -> float: """ Calculate alignment cost using the correct Gale-Church statistical model. Args: src_len: Source sentence length. tgt_len: Target sentence length. Returns: Alignment cost (negative log probability). """ if src_len == 0 and tgt_len == 0: return 0.0 # Calculate delta using the correct Gale-Church formula: # delta = (tgt_len - src_len * c) / sqrt(src_len * s²) # where c is the mean length ratio and s² is the variance of the difference if src_len == 0: delta = float("inf") else: delta = abs(tgt_len - src_len * self.c) / math.sqrt(src_len * self.s2) # Calculate two-tailed probability using normal distribution # P(|X| >= delta) = 2 * (1 - CDF(delta)) if delta > 10: # Very unlikely match - use a small but finite probability probability = 1e-12 else: # Use scipy's normal CDF for accurate calculation # Add epsilon to prevent numerical instability probability = 2 * (1 - stats.norm.cdf(delta)) + 1e-12 # Convert probability to cost (negative log probability) # Ensure probability is never exactly zero to prevent infinite cost # Also ensure probability doesn't exceed 1 to prevent negative cost probability = min(probability, 1.0 - 1e-12) cost = -math.log(probability) return cost def _gap_penalty_cost(self) -> float: """ Calculate the cost for gap alignments (1-0 or 0-1). Returns: Gap penalty cost. """ # Convert gap penalty (in standard deviations) to two-tailed probability # A gap penalty of 3.0 means the cost is equivalent to a 3-sigma deviation # Add epsilon to prevent numerical instability probability = 2 * (1 - stats.norm.cdf(self.gap_penalty)) + 1e-12 # Convert to cost # Ensure probability is never exactly zero to prevent infinite cost # Also ensure probability doesn't exceed 1 to prevent negative cost probability = min(probability, 1.0 - 1e-12) cost = -math.log(probability) return cost def _reconstruct_alignment( self, backtrack: List[List], m: int, n: int ) -> List[Tuple[int, int, int, int]]: """ Reconstruct the alignment path from the backtracking table. Args: backtrack: Backtracking table. m: Number of source sentences. n: Number of target sentences. Returns: List of (src_start, src_end, tgt_start, tgt_end) tuples. """ alignment = [] i, j = m, n while i > 0 or j > 0: if backtrack[i][j] is None: break prev_i, prev_j, src_count, tgt_count = backtrack[i][j] # Add alignment segment alignment.append((prev_i, i, prev_j, j)) # Move to previous position i, j = prev_i, prev_j # Reverse to get correct order alignment.reverse() return alignment