"""
Allele Table to VCF Conversion Module.
This module provides functionality to convert allele table data into VCF
(Variant Call Format) files. It handles the transformation of genotype data
from tabular format to standard VCF format, including proper diploid notation
and genomic coordinate mapping.
Author: Nicolás Mendoza Mejía (2023)
"""
from alleleTools.format.alleleTable import AlleleTable
import pandas as pd
from ..argtypes import file_path
from ..utils.assets import get_asset_path
[docs]
def setup_parser(subparsers):
"""
Set up the argument parser for the allele2vcf command.
Args:
subparsers: The subparsers object to add this command to.
Returns:
argparse.ArgumentParser: The configured parser for allele2vcf.
"""
parser = subparsers.add_parser(
name="to_vcf",
help="Convert allele table to vcf",
description="Convert allele table to vcf",
epilog="Author: Nicolás Mendoza Mejía (2023)",
)
parser.add_argument(
"input",
type=str,
help="path to the input file with allele data in allele table format",
)
parser.add_argument(
"--loci_file",
type=file_path,
help="""
path to the file with gene loci information, alternatively you could
specify --gene_cluster
""",
)
parser.add_argument(
"--gene_cluster",
type=str,
choices=["HLA", "KIR"],
help="""
name of the gene cluster (hla or kir), alternatively you could provide
a --loci_file
""",
)
parser.add_argument(
"--vcf",
type=file_path,
help="VCF file to append the alleles to",
default="file.vcf",
required=True,
)
parser.add_argument(
"--field_separator",
type=str,
help="character separating the fields inside input default is tab",
default="\t",
)
parser.set_defaults(func=call_function)
return parser
[docs]
def call_function(args):
"""
Main function to execute allele table to VCF conversion.
This function orchestrates the conversion process by:
1. Validating input parameters
2. Loading and processing allele table data
3. Converting to VCF format with proper diploid notation
4. Mapping alleles to genomic coordinates
5. Appending results to the target VCF file
Args:
args: Parsed command line arguments containing:
- input: Path to input allele table file
- loci_file: Path to gene loci information file
- gene_cluster: Alternative gene cluster specification
- vcf: Path to target VCF file for appending
- field_separator: Field separator for input file (default: tab)
Raises:
SystemExit: If neither gene_cluster nor loci_file is provided
"""
if not (args.gene_cluster or args.loci_file):
print(
"Error: either --gene_cluster or --loci_file must be provided."
"Use -h or --help to see more details."
)
exit(1)
if not args.loci_file:
if args.gene_cluster == "HLA":
args.loci_file = get_asset_path("gene_table.tsv")
elif args.gene_cluster == "KIR":
args.loci_file = get_asset_path("gene_table_kir.tsv")
else:
print(
"Error: --gene_cluster must be either 'HLA' or 'KIR'."
"Use -h or --help to see more details."
)
exit(1)
alt = AlleleTable.open(args.input, sep=args.field_separator)
genotypes = alt.alleles.copy()
gene_loci = pd.read_csv(args.loci_file, sep="\t")
pairs = _gene_pairs(genotypes.columns)
# By this point we have the vcf file without the leading columns.
# First we pivot the table to have the samples as columns and the alleles
# as rows. Then we merge both gene alleles into a single row by applying
# the diploid_notation function. e.g.:
# ID SAMPLE_ID ...
pre_vcf_alleles = pd.DataFrame()
for pair in pairs:
# Get the presence/absence of each allele in the samples.
pairA, pairB = pair
pivotA = genotypes.pivot_table(
index=pairA, columns="id", values=pairB, aggfunc="sum"
).notna()
pivotB = genotypes.pivot_table(
index=pairB, columns="id", values=pairA, aggfunc="sum"
).notna()
allele_codes = _diploid_notation(pivotA, pivotB)
# add column with gene name
allele_codes["gene"] = pairA
pre_vcf_alleles = pd.concat([pre_vcf_alleles, allele_codes])
# Now we add the leading columns and sort the samples (also in columns)
# to match the base vcf file order.
# e.g.:
# CHROM POS ID REF ALT QUAL FILTER INFO FORMAT SAMPLE_ID. ...
# column start has chr:pos, divide that into two columns.
gene_loci[["CHROM", "POS"]] = gene_loci["start"].str.split(
":", expand=True)
# cross gene from pre_vcf_alleles with gene_loci to get the position of
# each allele.
vcf_alleles = pre_vcf_alleles.merge(gene_loci, how="left", on="gene")
vcf_alleles = vcf_alleles[vcf_alleles["POS"].notna()]
vcf_alleles = vcf_alleles.assign(
REF="A", ALT="T", QUAL=".", FILTER="PASS", INFO=".", FORMAT="GT"
)
# Remove gene columns and rename
vcf_alleles = vcf_alleles.drop(columns=["gene", "start"])
# Sort columns to match the base vcf file order.
vcf_col = _get_vcf_columns(args.vcf)
vcf_col = [x for x in vcf_col if x in vcf_alleles.columns]
vcf_alleles = vcf_alleles[vcf_col]
vcf_alleles.fillna("0|0", inplace=True)
if len(vcf_alleles) == 0:
print("WARNING: No alleles are being added to the VCF file.")
print("Check that the gene names in the input file (column names) match those in the loci file.")
else:
print(f"Appending {len(vcf_alleles)} alleles to {args.vcf}")
with open(args.vcf, "a") as f:
f.write(vcf_alleles.to_csv(index=False, sep="\t", header=False))
def _diploid_notation(pairA, pairB):
"""
Convert allele presence/absence data to VCF diploid notation.
Takes two boolean DataFrames representing allele presence for each gene
copy and converts them to standard VCF genotype notation (0|0, 0|1, 1|0,
1|1).
Args:
pairA (pd.DataFrame): Boolean DataFrame for first allele copy
pairB (pd.DataFrame): Boolean DataFrame for second allele copy
Returns:
pd.DataFrame: DataFrame with allele IDs and corresponding VCF
genotype codes
Example:
pairA=True, pairB=False -> "1|0"
pairA=True, pairB=True -> "1|1"
"""
# assign a code to each allele combination.
pairA = pairA.astype(int).rename_axis("id")
pairB = pairB.astype(int).mul(2).rename_axis("id")
group = pd.concat([pairA, pairB]).groupby("id").sum()
group = group.replace({0: "0|0", 1: "1|0", 2: "0|1", 3: "1|1"})
# Add gene name to the index
group.index = group.index.str.replace("*", "_")
group.index.name = "ID"
return group.reset_index()
def _gene_pairs(lst):
"""
Extract gene pairs from column names following the gene/gene.1 convention.
Identifies paired gene columns where one column represents the first allele
(e.g., "HLA-A") and another represents the second allele (e.g., "HLA-A.1").
Args:
lst (list): List of column names from the allele table
Returns:
list: List of tuples containing paired gene column names
[(gene, gene.1), ...]
Example:
>>> _gene_pairs(["HLA-A", "HLA-A.1", "HLA-B", "HLA-B.1"])
[["HLA-A", "HLA-A.1"], ["HLA-B", "HLA-B.1"]]
"""
# Get elements in the list with *.1
gene_1 = [x for x in lst if x.endswith(".1")]
# Get possible elements without *.1
posible = [x.replace(".1", "") for x in gene_1]
# Get elements in the list without *.1 that have a pair
gene = [x for x in lst if x not in gene_1 and x in posible]
# Remove elements without a pair in gene_1
gene_1 = [x for x in gene_1 if x.replace(".1", "") in gene]
return [[g, g1] for g, g1 in zip(gene, gene_1)]
def _get_vcf_columns(vcf_file):
"""
Extract column names from a VCF file header.
Reads the VCF file to find the #CHROM header line and extracts
the column names for proper column ordering in the output.
Args:
vcf_file (str): Path to the VCF file
Returns:
list: List of column names from the VCF header
Example:
>>> _get_vcf_columns("sample.vcf")
["CHROM", "POS", "ID", "REF", "ALT", "QUAL", "FILTER", "INFO",
"FORMAT", "SAMPLE1", "SAMPLE2"]
"""
# Read only the line that starts with #CHROM
with open(vcf_file, "r") as f:
line = f.readline()
while not line.startswith("#CHROM"):
line = f.readline()
# Remove leading # and \n, then split by tab.
return line[1:].strip().split("\t") # [9:]