import json
import math
import re
from abc import ABC, abstractmethod
from enum import Enum
from typing import List, Tuple, Union
from .utils.assets import get_asset_path
[docs]
class AlleleMatchStatus(Enum):
"""
Enumeration for allele comparison results.
Defines the possible outcomes when comparing two alleles:
- NOT_EQUAL: Alleles are different (different genes or field values)
- EQUAL: Alleles are identical in gene and all fields
- LESS_RESOLUTION: Current allele has fewer fields than compared allele
- MORE_RESOLUTION: Current allele has more fields than compared allele
"""
NOT_EQUAL = 1
EQUAL = 2
LESS_RESOLUTION = 3
MORE_RESOLUTION = 4
[docs]
class Allele:
"""
Class representing an HLA allele with parsing and comparison capabilities.
This class handles the parsing of HLA allele nomenclature and provides
methods for comparison, resolution management, and string representation.
It supports confidence scores from algorithms like HiSat.
Attributes:
gene (str): Gene name (e.g., 'A', 'B', 'DRB1')
fields (List[str]): List of allele fields (e.g., ['01', '01', '01'])
confidence (float): Optional confidence score from genotyping algorithm
Args:
code (str | list): Allele string to parse (e.g., 'A*01:01:01' or
'A*01:01(0.95)'). Alternatively, you can pass a list of fields
along with the argument `gene`
gene (str): If the gene name is not present in `code` you should pass
the here here.
Raises:
Exception: If the allele string cannot be parsed
Example:
>>> allele = Allele('A*01:01:01')
>>> print(allele.gene) # 'A'
>>> print(allele.fields) # ['01', '01', '01']
>>> print(len(allele)) # 3
"""
def __init__(
self,
gene: str,
fields: List[str],
confidence: str = "",
gene_delimiter: str = "*",
field_delimiter: str = ":",
suffix: str = "",
):
self.fields: List[str] = fields
self.gene = gene
self.gene_delimiter = gene_delimiter
self.field_delimiter = field_delimiter
self.suffix = suffix
# Extract confidence score (Hisat)
if confidence:
self.confidence = float(confidence)
def __repr__(self) -> str:
if not hasattr(self, "gene"):
return "<Empty Allele>"
return f"""({len(self.fields)}-fields){str(self)}"""
def __str__(self) -> str:
"""
String representation of the allele in standard HLA nomenclature.
Returns:
str: Allele in format 'GENE*field1:field2:field3'
"""
if not hasattr(self, "gene"):
return ""
if not self.gene and not self.fields:
return ""
return f"{self.gene}{self.gene_delimiter}{self.field_delimiter.join(self.fields)}{self.suffix}"
def __len__(self) -> int:
"""
Get the resolution level of the allele.
Returns:
int: Number of fields in the allele (resolution level)
"""
return len(self.fields)
def __eq__(self, allele_b: object) -> bool:
"""Check equality with another allele."""
if not isinstance(allele_b, Allele):
return NotImplemented
return self.compare(allele_b) == AlleleMatchStatus.EQUAL
def __hash__(self):
"""Hash function for use in sets and dictionaries."""
return hash(str(self))
[docs]
def get_fields(self) -> List[str]:
return self.fields
[docs]
def truncate(self, new_resolution: int) -> "Allele":
"""
Reduce the resolution of the allele to the specified level.
Args:
new_resolution (int): Target resolution level (number of fields)
Note:
If new_resolution is greater than current resolution, no change is made.
"""
if new_resolution > len(self):
return Allele("", [])
self.fields = self.fields[:new_resolution]
return self
[docs]
def compare(self, allele: "Allele") -> AlleleMatchStatus:
"""
Compare this allele with another allele.
Performs detailed comparison considering gene name, field values,
and resolution levels.
Args:
allele (Allele): The allele to compare with
Returns:
CmpResult: The result of the comparison
Example:
>>> a1 = Allele('A*01:01')
>>> a2 = Allele('A*01:01:01')
>>> a1.compare(a2) # CmpResult.MORE_RESOLUTION
"""
# Check that it belongs to the same gene
if self.gene != allele.gene:
return AlleleMatchStatus.NOT_EQUAL
# Compare each field
for s_field, a_field in zip(self.fields, allele.fields):
if s_field != a_field:
return AlleleMatchStatus.NOT_EQUAL
# Check differences in resolution
if len(self.fields) > len(allele.fields):
return AlleleMatchStatus.LESS_RESOLUTION
elif len(self.fields) < len(allele.fields):
return AlleleMatchStatus.MORE_RESOLUTION
return AlleleMatchStatus.EQUAL
[docs]
class ParsingStrategy(ABC):
"""
Abstract base class for allele parsing strategies.
"""
[docs]
@abstractmethod
def parse(self, text: str) -> Allele:
pass
[docs]
class DelimitedParser(ParsingStrategy):
"""
Parser for alleles using simple delimiters. Two parameters are needed:
- gene_delimiter: character that separates the gene name from the fields
- field_delimiter: character that separates the different fields
Example:
For allele "A*01:02:03", gene_delimiter="*", field_delimiter=":"
Results:
1. gene = "A"
2. fields = ["01", "02", "03"]
"""
def __init__(self, gene_delimiter: str, field_delimiter: str):
self.gene_delimiter = gene_delimiter
self.field_delimiter = field_delimiter
[docs]
def parse(self, text: str) -> Allele:
parts = text.split(self.gene_delimiter)
if len(parts) != 2:
return Allele("", [])
gene = parts[0]
field_part = parts[1]
fields = field_part.split(self.field_delimiter)
return Allele(
gene=gene,
fields=fields,
gene_delimiter=self.gene_delimiter,
field_delimiter=self.field_delimiter,
)
[docs]
class RegexParser(ParsingStrategy):
"""
Parser for alleles using regular expressions. It only requires the regex pattern.
It should contain named groups for 'gene', 'field1', 'field2', etc. An optional
group 'confidence' can also be included to capture confidence scores.
field_delimiter and gene_delimiter can also be specified. These parameters will only be used
to format the the allele string representation.
This parser is more flexible and robust than the DelimitedParser, as it can handle
more complex allele formats. However, it requires knowledge of regular expressions.
Example:
For allele "A*01",
pattern = r"(?:(?P<gene>\\w+)\\*)?(?P<field1>\\d{2})"
Results:
1. gene = "A"
2. fields = ["01"]
"""
def __init__(self, pattern: str, field_delimiter: str, gene_delimiter: str):
self.pattern = re.compile(pattern)
self.field_delimiter = field_delimiter
self.gene_delimiter = gene_delimiter
[docs]
def parse(self, text: str) -> Allele:
match = self.pattern.search(text)
if not match:
return Allele("", [])
fields = []
result = match.groupdict(default='')
for key, value in result.items():
if key.startswith("field") and value:
fields.append(value)
return Allele(
gene=result.get("gene", ""),
fields=fields,
confidence=result.get("confidence", None),
field_delimiter=self.field_delimiter,
gene_delimiter=self.gene_delimiter,
suffix=result.get('suffix', "")
)
[docs]
class AlleleParser:
"""
Configurable allele parser that supports multiple parsing strategies.
This class loads parsing configurations from a JSON file.
The configuration can be overwritten by providing a custom config file during
initialization (config_file parameter).
Use:
parser = AlleleParser(gene_family="hla", config_file="custom_config.json")
allele = parser.parse("A*01:02:03")
"""
def __init__(self, gene_family: str, config_file: str = ""):
# Load default config
config_def = get_asset_path("parser_config.json")
with open(config_def, "r") as f:
self.config = json.load(f)
# Load custom config
if config_file != "":
with open(config_file, "r") as f:
self.config.update(json.load(f))
self.strategies = self._build_strategies()
self.set_gene_family(gene_family)
def _build_strategies(self):
strategies = dict()
for name, config in self.config.items():
if config["type"] == "delimited":
strategies[name] = DelimitedParser(
config["gene_delimiter"], config["field_delimiter"]
)
elif config["type"] == "regex":
strategies[name] = RegexParser(
config["pattern"],
field_delimiter=config.get("field_delimiter", ":"),
gene_delimiter=config.get("gene_delimiter", "*"),
)
return strategies
[docs]
def set_gene_family(self, gene_family: str):
if gene_family not in self.strategies:
raise Exception(
f"Gene '{gene_family}' not found in allele parser configuration"
)
self.gene_family = gene_family
[docs]
def get_delimiters(self) -> Tuple[str, str]:
strategy = self.strategies[self.gene_family]
return strategy.gene_delimiter, strategy.field_delimiter
[docs]
def parse(self, code: str) -> Allele:
return self.strategies[self.gene_family].parse(code)
[docs]
class Solution:
def __init__(self, allele: str, support: float) -> None:
self.allele = allele
self.support = support
[docs]
class FieldTree:
"""
Tree structure to represent the hierarchical organization of allele fields.
Each node in the tree corresponds to a field value at a particular position
in the allele nomenclature. The tree is used to count and organize the
occurrence of each field value across a set of alleles.
Attributes:
field (str): The field value at this node.
support (int): Number of times this field value has been added at this
position, which indicates how many times it has been genotyped by
different tools.
children (List[FieldTree]): List of child nodes
representing subsequent fields.
Example:
For alleles A*01:01 and A*01:02, the tree will have a root 'A',
a child '01', and two children '01' and '02' under it.
"""
def __init__(self, name: str):
"""
Initialize a FieldTree node.
Args:
name (str): The field value for this node.
"""
self.field = name
self.support = 0
self.children = list()
def __str__(self):
"""
String representation of the FieldTree node and its children.
Returns:
str: Human-readable representation of the node and its subtree.
"""
if len(self.children) == 0:
return f"Field({self.field}:{self.support})"
return f"Field({self.field}:{self.support}){self.children}"
def __repr__(self):
"""
Official string representation of the FieldTree node.
Returns:
str: String representation (same as __str__).
"""
return self.__str__()
[docs]
def set_support(self, new_weight: float, recursive: bool = True) -> None:
"""
Change the support value. When recursive is True, it will change the
support value for all children.
"""
self.support = new_weight
if not recursive:
return
for child in self.children:
child.set_support(new_weight, recursive=recursive)
[docs]
def add_batch(self, batch: List[list], weight: float = 1.0):
"""
Helper method to append a list of field lists to the current tree.
Args:
batch (List[list]): A list containing lists of field values (str).
"""
if len(batch) == 0:
return
for fields in batch:
self.add(fields, weight=weight)
[docs]
def merge_tree(self, tree: "FieldTree"):
"""
Merges a foreign tree into the current tree. The top level should
match with this tree.
Args:
tree (FieldTree): The foreign tree.
"""
assert tree.field == self.field
self.support += tree.support
children = {child.field: child for child in self.children}
for t_child in tree.children:
if t_child.field not in children.keys():
self.children.append(t_child)
continue
children[t_child.field].merge_tree(t_child)
[docs]
def add(self, fields: list, weight: float = 1.0):
"""
Add a sequence of fields to the tree, incrementing counts and creating
nodes as needed.
Args:
fields (list): List of field values (str) to add as a path in the
tree.
Example:
tree.add(['01', '01']) will add/increment nodes for '01' at two
levels.
"""
self.support += weight
if len(fields) == 0:
return
name = fields[0]
overhead = fields[1:]
for child in self.children:
if child.field == name:
child.add(overhead, weight)
return
# if nothing was found
new_tree = FieldTree(name)
new_tree.add(overhead, weight)
self.children.append(new_tree)
[docs]
def get_consensus(
self,
min_support: float,
gene_delimiter: str = "*",
field_delimiter: str = ":",
max_support: float = 0,
) -> Tuple[List[str], List[float]]:
"""
Gets a list of up to two possible consensus solutions that meet the
criteria of the minimum support. This is basically a tree search
algorithm.
Two additional parameters can be provided to format the allele strings.
Args:
min_support (float): minimum proportion of support required. Each
program contributes votes equally, the amount of times the
allele has been genotyped is stored in each node of the tree.
This value is used to filter consensus alleles based on their
support values.
gene_delimiter (str): Delimiter between gene name and fields.
field_delimiter (str): Delimiter between fields.
Returns:
Tuple[List[str], List[float]]: Consensus alleles and their support
values.
"""
# min_support is a proportion between 0 and 1
assert min_support <= 1 and min_support >= 0
# Look for consensus solutions
if not max_support:
max_support = self.support
min_support_n = max_support * min_support
solutions = self.__get_consensus__(math.ceil(min_support_n))
# Get the two most supported alleles
solutions.sort(key=lambda sol: sol.support, reverse=True)
solutions = solutions[:2]
# Now sort by allele name
solutions.sort(key=lambda sol: sol.allele)
# For homozygous calls adjust the minimum support threshold
if len(solutions) == 1 and solutions[0].support >= max_support / 2:
solutions *= 2
return self.__format_solutions_as_alleles__(
solutions, gene_delimiter, field_delimiter
)
def __format_solutions_as_alleles__(
self, solutions: List[Solution], gene_delimiter: str, field_delimiter: str
) -> Tuple[List[str], List[float]]:
"""
Formats the solutions as two separate lists:
Args:
solutions (list): is a list of the found consensus alleles with
their scores
gene_delimiter (str): Delimiter between gene name and fields.
field_delimiter (str): Delimiter between fields.
Returns:
- list of consensus alleles (str)
- list of respective number of algorithms supporting the consensus
(float)
"""
alleles, supports = list(), list()
for solution in solutions:
supports.append(solution.support)
nodes = solution.allele.split(":")
# If it's only the gene name, there was no consensus
if len(nodes) <= 1:
alleles.append("")
continue
# Use the allele class as an interface to format the string
alleles.append(
str(
Allele(
gene=nodes[0],
fields=nodes[1:],
gene_delimiter=gene_delimiter,
field_delimiter=field_delimiter,
)
)
)
return alleles, supports
def __get_consensus__(self, min_support_number: float) -> List[Solution]:
"""
Get consensus solutions from the field tree.
Args:
min_support_number (float): Minimum support threshold.
Returns:
List[Tuple[str, float]]: List of consensus solutions with their
support.
"""
if self.support < min_support_number:
return list()
if len(self.children) <= 0:
return [Solution(self.field, self.support)]
solutions = list()
for child in self.children:
child_sol = child.__get_consensus__(min_support_number)
if not child_sol:
continue
solutions.extend(self.__merge_with_current_node__(child_sol))
if solutions:
return solutions
return [Solution(self.field, self.support)]
def __merge_with_current_node__(
self, res: List[Solution]
) -> List[Solution]:
"""
Takes as input the list of possible consensus from child nodes
and merges it with the current node's field. If no response was
obtained from the child nodes, it will return the current node's
field and its support count.
example:
if the current node is "A" and the child node is "B:2", the result
will be ["A:B:2", 2]
"""
if len(res) == 0:
return [Solution(self.field, self.support)]
sol = list()
for solution in res:
fields = solution.allele
support = solution.support
sol.append(Solution(f"{self.field}:{fields}", support))
return sol