Source code for alleleTools.format.vcf2allele

"""
VCF to Allele Table Conversion Module.

This module converts VCF (Variant Call Format) files
containing HLA or KIR allele data into allele tables. It supports various
output formats including pyHLA and PyPop compatible formats.

Usage:
    To generate the input file from imputation, run:
    # Extract only relevant alleles
    bcftools view --include 'ID~"HLA"' IMPUTED.vcf > HLA.vcf
    # Convert the extracted alleles to a table
    altools convert vcf2allele HLA.vcf --output out.alt

Author: Nicolás Mendoza Mejía (2023)
"""

from collections import defaultdict
from enum import Enum
from typing import List

import pandas as pd

from alleleTools.format.alleleTable import AlleleTable

from ..argtypes import file_path, output_path
from ..format.vcf import VCF


[docs] def setup_parser(subparsers): """ Set up the argument parser for the vcf2allele command. Args: subparsers: The subparsers object to add this command to. Returns: argparse.ArgumentParser: The configured parser for vcf2allele. """ parser = subparsers.add_parser( name="from_vcf", help="Convert vcf file to allele table", description="Convert vcf file to allele table", epilog="Author: Nicolás Mendoza Mejía (2023)", ) # Input/output arguments parser.add_argument( "input", type=file_path, help="Input vcf file name", ) parser.add_argument( "--phe", type=file_path, help="input phe file name (to add phenotype column)", default="", ) parser.add_argument( "--output", type=output_path, help="name of the output file", default="output.alt", ) # Allele format arguments parser.add_argument( "--rm-prefix", type=str, help=""" removes prefix from allele names, usually its the gene name (KIR or HLA_) """, default="HLA_", ) parser.add_argument( "--separator", type=str, help="separator to split gene name from allele name", default="*", ) parser.add_argument( "--extensive_search", type=bool, help=""" when no allele is imputed, look for the next most likely alleles """, default=False, ) # Additional arguments parser.add_argument( "--output_header", action="store_true", help="output header with the gene names", ) parser.add_argument( "--population", type=str, help=""" If this is set, a colum with the population will be added at the beginning. This makes the output compatible with pyPop. """, default="", ) parser.set_defaults(func=call_function) return parser
[docs] def call_function(args): """ Main function to execute VCF to allele table conversion. This function orchestrates the conversion process by: 1. Loading and preprocessing the VCF file 2. Extracting genotype information 3. Converting to allele format 4. Adding phenotype data if provided 5. Writing the output file Args: args: Parsed command line arguments containing: - input: Path to input VCF file - output: Path to output file - rm_prefix: Prefix to remove from allele names - separator: Separator between gene and allele names - extensive_search: Whether to perform extensive allele search - phe: Optional phenotype file path - output_header: Whether to include header in output - population: Population identifier for PyPop compatibility """ vcf = VCF(args.input) vcf.remove_id_prefix(args.rm_prefix) format = vcf.get_format() # Get table with only samples as columns genotypes = vcf.samples_dataframe() true_alleles = _get_true_alleles( genotypes, format, args.extensive_search, args.separator ) # sort the columns true_alleles = true_alleles.reindex(sorted(true_alleles.columns), axis=1) # Create allele table alt = AlleleTable() alt.alleles = true_alleles # add phenotype column if provided alt.load_phenotype(args.phe) alt.to_csv(args.output, header=args.output_header)
[docs] class VCFalleles: """ Class for processing and analyzing VCF allele data. This class handles the parsing and analysis of allele information from VCF format data, including genotype determination, resolution analysis, and ploidy-based filtering. Attributes: df (pd.DataFrame): Processed allele data with genotype information indexed by gene and allele names. Args: alleles (pd.DataFrame): Raw allele data from VCF formats (pd.DataFrame): Format information from VCF header """ def __init__(self, alleles: pd.DataFrame, formats: pd.DataFrame) -> None: self.df = self._parse_vcf_info(alleles, formats) self.df["is_homozygous"] = self.df.GT.str.contains(r"1\|1") genes = self._get_genes(self.df.index) self.df = self.df.set_index([genes, self.df.index]).rename_axis( ["gene", "allele"] ) def _parse_vcf_info(self, alleles, formats): """ Parse VCF info fields into structured data. Extracts genotype, dosage, and ploidy likelihoods from VCF format fields following the pattern: {GT}:{DS}:{AA},{AB},{BB} Args: alleles (pd.Series): Series with info fields from VCF file formats (list): List of format field names Returns: pd.DataFrame: Parsed data with GT as string and DS, AA, AB, BB as floats """ # Extract the genotype, dosage and ploidy likelihoods info_df = alleles.str.split(":", expand=True) info_df.columns = formats float_cols = ["DS", "AA", "AB", "BB"] # Convert columns to the correct data type for col in set(formats).intersection(float_cols): info_df[col] = info_df[col].astype(float) return info_df def _find_high_res(self, allele_list: List[str]): """ Identify high-resolution alleles from a list. Determines which alleles represent the highest resolution by checking if any allele is a substring of another (indicating lower resolution). For example, in ['A*02', 'A*02:01:01:01'], only 'A*02:01:01:01' is considered high resolution. Args: allele_list (List[str]): List of allele names to analyze Returns: List[bool]: Boolean list indicating which alleles are high resolution """ high_res = list() for i, x in enumerate(allele_list): # cross comparison with every other allele comp = [x in y for y in allele_list] comp[i] = False # remove self comparison high_res.append(not any(comp)) return high_res def _get_genes(self, alleles: pd.Series): """ Extract gene names from allele identifiers. Args: alleles (pd.Series): Series of allele identifiers Returns: pd.Series: Extracted gene names """ return alleles.str.extract(r"([A-Z0-9]+)")[0] class _ploidy_filter(Enum): """ Enumeration for filtering alleles by ploidy status. Each enum value contains a tuple of (mask_boolean, regex_pattern): - HOMOZYGOUS: Filters for homozygous genotypes (1|1) - HETEROZYGOUS: Filters for heterozygous genotypes (not 0|0) - UNCERTAIN: Filters for uncertain genotypes (0|0) """ HOMOZYGOUS = (True, r"1\|1") HETEROZYGOUS = (False, r"0\|0") UNCERTAIN = (True, r"0\|0") def _get_ploidy_alleles( self, alleles: pd.DataFrame, filter: _ploidy_filter, n: int = 1 ): """ Get the highest resolution alleles that match a ploidy filter. Args: alleles (pd.DataFrame): DataFrame with allele codes as index and columns GT, DS, AB filter (_ploidy_filter): Filter criteria for ploidy selection n (int): Number of alleles to return (default: 1) Returns: pd.DataFrame: Filtered alleles sorted by score (DS + AB) """ # 1. Apply filter mask, pattern = filter.value has_passed_filter = ( alleles.GT.str.contains(pattern) if mask else ~alleles.GT.str.contains(pattern) ) filtered = alleles.loc[has_passed_filter] # 2. Get high resolution alleles is_high_res = self._find_high_res(filtered.index.values.tolist()) high_res = filtered.loc[is_high_res].copy() # 3. Get the n alleles with the highest dosages + AB probability if "AB" in high_res and "DS" in high_res: high_res["score"] = high_res["DS"] + high_res["AB"] else: high_res["score"] = 1 return high_res.nlargest(1, columns="score") def _fill_ploidy_second_most_probable(self, heterozygous, alleles): """ Fill ploidy with second most probable alleles when needed. When there aren't enough high-confidence alleles to fill the expected ploidy, this method finds the next most probable alleles. Args: heterozygous (pd.DataFrame): Current heterozygous alleles alleles (pd.DataFrame): All available alleles Returns: List[str]: Combined list of allele names including scores """ # If there is not enough alleles to fill the ploidy # with the most probable alleles hetero_low = self._get_ploidy_alleles( alleles, self._ploidy_filter.UNCERTAIN, n=2 - len(heterozygous) ) if not ("DS" in hetero_low and "AB" in hetero_low): return [] hetero_low_str = hetero_low.apply( lambda row: f"{row.name}({row['DS']}/{row['AB']})", axis=1 ) return heterozygous.index.tolist() + [hetero_low_str.iloc[0]]
[docs] def sort_and_fill(self, extensive=False): """ Sort alleles by gene and determine final genotype calls. For each gene, determines whether the genotype is homozygous or heterozygous and selects the appropriate alleles based on confidence scores and resolution. Args: extensive (bool): Whether to perform extensive search for low-confidence alleles (default: False) Returns: List[str]: List of selected allele names for all genes """ results = list() genes = self.df.index.get_level_values("gene").unique() for gene in sorted(genes): to_append = list() alleles = self.df.loc[[gene]].reset_index(level=0) if any(alleles.is_homozygous): # If the gene is homozygous, get the allele homozygous = self._get_ploidy_alleles( alleles, self._ploidy_filter.HOMOZYGOUS ) to_append = homozygous.index.tolist() * 2 else: # is heterozygous # Get the two highest resolution alleles heterozygous = self._get_ploidy_alleles( alleles, self._ploidy_filter.HETEROZYGOUS, n=2 ) if len(heterozygous == 2): to_append = heterozygous.index.tolist() elif len(heterozygous) <= 1 and extensive: to_append = self._fill_ploidy_second_most_probable( heterozygous, alleles ) else: to_append = [] results.extend(to_append) return results
def _get_true_alleles(genotypes, format, extensive=False, allele_separator="*" ): """ Extract true alleles from VCF genotype data for all samples. This function processes genotype data for all samples in the VCF file, determining the most likely allele calls for each gene and sample. Args: genotypes (pd.DataFrame): DataFrame with samples as columns and allele IDs as index, containing genotype info format (list): List of format field names from VCF header extensive (bool): Whether to perform extensive search for uncertain alleles (default: False) allele_separator (str): Character separating gene name from allele name (default: "*") Returns: pd.DataFrame: DataFrame with samples as index and genes as columns, containing the called alleles Raises: Warning: Prints warning if alleles are ignored due to separator mismatch """ df = pd.DataFrame() ignored_samples = defaultdict(lambda: 0) last_ignored_allele = "" for sample in genotypes.columns: # Get the list of alleles for that column(sample) alleles = genotypes.loc[ ~genotypes[sample].str.contains(r"0\|0:0:1,0,0", na=False), sample ] allele_list = VCFalleles(alleles, format).sort_and_fill(extensive) # Separate allele from gene name genes, alleles = (list(), list()) for allele in allele_list: if allele_separator not in allele: ignored_samples[sample] += 1 last_ignored_allele = allele else: gene, allele = allele.split(allele_separator) genes.append(gene) alleles.append(allele) if len(alleles) == 0: continue # Add _1 to duplicate genes genes_columns = list() for gene in genes: if gene in genes_columns: gene += "_1" genes_columns.append(gene) # Add the alleles to the data frame row = pd.DataFrame( allele_list, index=genes_columns, columns=[sample], ).transpose() df = pd.concat([df, row]) if len(ignored_samples) > 0: print( f""" Warning: A total of {len(ignored_samples)} samples had ignored alleles. This may be due to a separator not matching the allele names, try changing the --separator argument. For example, the last ignored allele was '{last_ignored_allele}', but the expected --separator was '{allele_separator}' """ ) return df