Source code for macsypy.search_genes

#########################################################################
# MacSyFinder - Detection of macromolecular systems in protein dataset  #
#               using systems modelling and similarity search.          #
# Authors: Sophie Abby, Bertrand Neron                                  #
# Copyright (c) 2014-2024  Institut Pasteur (Paris) and CNRS.           #
# See the COPYRIGHT file for details                                    #
#                                                                       #
# This file is part of MacSyFinder package.                             #
#                                                                       #
# MacSyFinder is free software: you can redistribute it and/or modify   #
# it under the terms of the GNU General Public License as published by  #
# the Free Software Foundation, either version 3 of the License, or     #
# (at your option) any later version.                                   #
#                                                                       #
# MacSyFinder is distributed in the hope that it will be useful,        #
# but WITHOUT ANY WARRANTY; without even the implied warranty of        #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the          #
# GNU General Public License for more details .                         #
#                                                                       #
# You should have received a copy of the GNU General Public License     #
# along with MacSyFinder (COPYING).                                     #
# If not, see <https://www.gnu.org/licenses/>.                          #
#########################################################################

"""
Manage the hmm step (hmmsearch or recover results from previous run) in parallele
"""

import concurrent.futures
import logging
import signal
import sys
import shutil
import os.path
import math

from .report import GembaseHMMReport, GeneralHMMReport, OrderedHMMReport
from .utils import threads_available
from .database import Indexes
from .config import Config
from .gene import ModelGene, CoreGene
from .report import HMMReport

_log = logging.getLogger(__name__)


[docs] def worker_cpu(genes_nb: int, cfg: Config) -> tuple[int, int]: """ Compute the optimum number of worker and cpu per worker The number of worker is set by the user (1 by default 0 means all worker available) we use one worker per gene if number of workers is greater than number of genes then several cpu can be use by hmsearch to speed up the search step :param genes_nb: the number of genes to search :param cfg: The macsyfinder configuration :return: the number of worker and cpu_per_worker to use :rtype: tuple (int worker_nb, int cpu_per_worker) """ worker_nb = cfg.worker() if not worker_nb: worker_nb = threads_available() cpu_per_worker = max(1, math.floor(worker_nb / genes_nb)) return worker_nb, cpu_per_worker
[docs] def search_genes(genes: list[ModelGene], cfg: Config) -> list[HMMReport]: """ For each gene of the list, use the corresponding profile to perform an Hmmer search, and parse the output to generate a HMMReport that is saved in a file after CoreHit filtering. These tasks are performed in parallel using threads. The number of workers can be limited by worker_nb directive in the config object or in the command-line with the "-w" option. :param genes: the genes to search in the input sequence dataset :param cfg: the configuration object """ worker_nb, cpu_per_worker = worker_cpu(len(genes), cfg) _log.debug(f"worker_nb = {worker_nb:d}\tcpu per worker = {cpu_per_worker}") all_reports = [] def stop(signum: int, frame) -> None: """stop the main process, its threads and subprocesses""" _log.critical('KILL all Processes') proc_grp_id = os.getpgid(0) os.killpg(proc_grp_id, signum) sys.exit(signum) # store the original SIGTERM signal handler to restore it # once search systems is over default_signal_handler = signal.signal(signal.SIGTERM, stop) def search(gene: CoreGene, cpu: int) -> HMMReport: """ Search gene in the database built from the input sequence file (execute \"hmmsearch\"), and produce a HMMReport :param gene: the gene to search :param cpu: the number of cpu to use (per worker) :return: an HMMReport """ _log.info(f"search gene {gene.name}") profile = gene.profile try: report = profile.execute(cpu=cpu) except Exception as err: _log.critical(err) stop(signal.SIGKILL, None) else: if report: report.extract() report.save_extract() return report def recover(gene: CoreGene, cfg: Config) -> HMMReport: """ Recover Hmmer output from a previous run, and produce a report :param gene: the gene to search :param cfg: the configuration :return: the list of all HMMReports (derived class depending on the input dataset type) """ hmm_old_path = os.path.join(cfg.previous_run(), cfg.hmmer_dir(), gene.name + cfg.res_search_suffix()) _log.info(f"recover hmm {hmm_old_path}") hmm_new_path = os.path.join(cfg.working_dir(), cfg.hmmer_dir(), gene.name + cfg.res_search_suffix()) shutil.copy(hmm_old_path, hmm_new_path) gene.profile.hmm_raw_output = hmm_new_path db_type = cfg.db_type() if db_type == 'gembase': report = GembaseHMMReport(gene, hmm_new_path, cfg) elif db_type == 'ordered_replicon': report = OrderedHMMReport(gene, hmm_new_path, cfg) else: report = GeneralHMMReport(gene, hmm_new_path, cfg) if report: report.extract() report.save_extract() return report # there is only one instance of gene per name but the same instance can be # in all genes several times # hmmsearch and extract should be executed ONLY ONCE per run # so I uniquify the list of gene (use CoreGene and set) genes = {mg.core_gene for mg in genes} _log.debug("start searching genes") hmmer_dir = os.path.join(cfg.working_dir(), cfg.hmmer_dir()) if not os.path.exists(hmmer_dir): # it works because mkdir is an atomic operation os.mkdir(hmmer_dir) # ensure that the indexes is build only once # So threads just have to read idx = Indexes(cfg) idx.build() previous_run = cfg.previous_run() with concurrent.futures.ThreadPoolExecutor(max_workers=worker_nb) as executor: future_search = [] for gene in genes: if previous_run and os.path.exists(os.path.join(previous_run, cfg.hmmer_dir(), gene.name + cfg.res_search_suffix())): future_search.append(executor.submit(recover, gene, cfg)) else: future_search.append(executor.submit(search, gene, cpu_per_worker)) for future in concurrent.futures.as_completed(future_search): report = future.result() if report: all_reports.append(report) _log.debug("end searching genes") signal.signal(signal.SIGTERM, default_signal_handler) return all_reports