Source code for plink_bed_reader.plink_bed_reader

# Copyright 2024 - Barcelona Supercomputing Center
# Author: Rodrigo Martin
# MIT License

from typing import Tuple, Optional, Any, Union
from enum import Enum, auto
import os
import io
import numpy as np


[docs] class BEDMode(Enum): """Enum with the possible modes of a BED file""" SNP_MAJOR = auto() INDIVIDUAL_MAJOR = auto()
def _get_major_mode(bed_file: io.BufferedReader) -> BEDMode: # Save byte position pos = bed_file.tell() # Read the mode byte to check if the file is in SNP major or individual major mode bed_file.seek(2, os.SEEK_SET) mode = bed_file.read(1) # Restore byte position bed_file.seek(pos, os.SEEK_SET) if mode == b'\x01': return BEDMode.SNP_MAJOR if mode == b'\x00': return BEDMode.INDIVIDUAL_MAJOR raise ValueError("Invalid mode byte") def _read_sample_snp_counts(fam_file_path: str, bim_file_path: str) -> Tuple[int, int]: sample_count = 0 snp_count = 0 # Count the number of samples and SNPs in the file with open(fam_file_path, encoding='UTF-8') as fam_file: for _ in fam_file: sample_count += 1 with open(bim_file_path, encoding='UTF-8') as bim_file: for _ in bim_file: snp_count += 1 return sample_count, snp_count
[docs] class PLINKBEDReader(): """ Reads PLINK BED files (individual major or SNP major) and returns the genotypes as a NumPy array (uint8). The file is read in chunks to reduce memory usage and allows for random access. Matching the PLINK format specification (https://www.cog-genomics.org/plink/1.9/formats#bed), the genotypes are encoded as follows: 0 = homozygous 1/1 (usually minor) 1 = heterozygous 2 = missing 3 = homozygous 2/2 (usually major) """
[docs] def __init__(self, bed_file_path: str, offset: int = 0, count: Optional[int] = None, mode: Optional[BEDMode] = None, fam_file_path: Optional[str] = None, bim_file_path: Optional[str] = None): """ Parameters ---------- bed_file_path : str Path to the BED file (can be with or without the extension). Admits both SNP major and individual major modes. offset : int, optional Number of samples or SNPs to skip at the beginning of the file, depending on the major mode. count : int, optional Number of samples or SNPs to read from the file, depending on the major mode. mode : BEDMode, optional Major mode of the file. The mode will be inferred from the file. If the mode is provided, it will be used as a sanity check. fam_file_path : str, optional Path to the FAM file. If not provided, it will be inferred from the BED file. bim_file_path : str, optional Path to the BIM file. If not provided, it will be inferred from the BED file. """ bed_prefix = bed_file_path[:-4] if bed_file_path.endswith('.bed') else bed_file_path fam_file_path = bed_prefix + '.fam' if fam_file_path is None else fam_file_path bim_file_path = bed_prefix + '.bim' if bim_file_path is None else bim_file_path # Count the number of samples and SNPs in the file raw_sample_count, raw_snp_count = _read_sample_snp_counts(fam_file_path, bim_file_path) # Open the BED file self._bed_file = open(bed_prefix + '.bed', 'rb') # Check if the file is in SNP major or individual major mode self._major_mode = _get_major_mode(self._bed_file) # Check if the mode is correct if mode is not None and mode != self._major_mode: raise ValueError(f'Mismatch mode {mode} for file {self._major_mode}') self._offset = offset if self._major_mode == BEDMode.INDIVIDUAL_MAJOR: self._sample_count = raw_sample_count - offset if count is None else count self._snp_count = raw_snp_count # We are in individual major mode, so each byte contains 4 SNPs # The chunk size is rounded up to the nearest byte self.chunk_size_bytes = int(np.ceil(self._snp_count / 4)) elif self._major_mode == BEDMode.SNP_MAJOR: self._sample_count = raw_sample_count self._snp_count = raw_snp_count - offset if count is None else count # We are in SNP major mode, so each byte contains 4 samples # The chunk size is rounded up to the nearest byte self.chunk_size_bytes = int(np.ceil(self._sample_count / 4))
@property def sample_count(self): """Number of samples""" return self._sample_count @property def snp_count(self): """Number of SNPs""" return self._snp_count @property def major_mode(self): """Major mode of the file""" return self._major_mode
[docs] def close(self): """Close the BED file""" self._bed_file.close()
def _read_idx(self, idx: int) -> np.ndarray[Any, np.dtype[np.uint8]]: # Check if the file is closed if self._bed_file.closed: raise ValueError("I/O operation on closed BED file") # Check if the index is out of bounds if idx >= len(self): raise IndexError(f"Index out of bounds {idx} {len(self)}") # Skip the header (first 3 bytes are magic numbers and mode) self._bed_file.seek(3 + self.chunk_size_bytes * (idx + self._offset), os.SEEK_SET) # Read the chunk chunk = self._bed_file.read(self.chunk_size_bytes) if not chunk: raise ValueError("Unexpected end of BED file") # Convert the chunk to a NumPy array bit_array = np.frombuffer(chunk, dtype=np.uint8) del chunk # The array has one byte per SNP/Sample bit_array = np.unpackbits(bit_array) # The array is stored as two bits per SNP/Sample, but in reverse in each byte, sum them to get the array array = (bit_array[::2] + 2 * bit_array[1::2]).astype(np.uint8) del bit_array # Each block of 4 bit SNP/Sample is stored in reverse order # 0 = homozygous 1/1 (usually minor) # 1 = heterozygous # 2 = missing # 3 = homozygous 2/2 (usually major) # Reverse the order of the SNP/Sample in the block array = array.reshape(-1, 4)[:, ::-1] # Flatten the array array = array.flatten() # Remove the extra bits return array[:self._snp_count] if self._major_mode == BEDMode.INDIVIDUAL_MAJOR else array[:self._sample_count] def __len__(self): return self._sample_count if self._major_mode == BEDMode.INDIVIDUAL_MAJOR else self._snp_count def __getitem__(self, key: Union[int, slice]): if isinstance(key, slice): return np.array([self._read_idx(i) for i in range(*key.indices(len(self)))], dtype=np.uint8) return self._read_idx(key)