Source code for cellmaps_generate_hierarchy.runner

#! /usr/bin/env python

import os
import logging
import re
import time
import json
import warnings
from datetime import date

import ndex2
import pandas as pd
from tqdm import tqdm
from cellmaps_utils import constants
from cellmaps_utils import logutils
from cellmaps_utils.provenance import ProvenanceUtil
import cellmaps_generate_hierarchy
from cellmaps_generate_hierarchy.exceptions import CellmapsGenerateHierarchyError
from cellmaps_utils.hidefconverter import HierarchyToHiDeFConverter
from cellmaps_utils.ndexupload import NDExHierarchyUploader

from cellmaps_generate_hierarchy.hcx import HCXFromCDAPSCXHierarchy

logger = logging.getLogger(__name__)


[docs] class CellmapsGenerateHierarchy(object): """ Runs steps necessary to create PPI from embedding and to generate a hierarchy """ K_DEFAULT = 10 ALGORITHM = 'leiden' MAXRES = 80 def __init__(self, outdir=None, inputdirs=[], ppigen=None, algorithm=ALGORITHM, maxres=MAXRES, k=K_DEFAULT, gene_node_attributes=None, hiergen=None, name=None, organization_name=None, project_name=None, layoutalgo=None, skip_logging=True, provenance_utils=ProvenanceUtil(), input_data_dict=None, ndexserver=None, ndexuser=None, ndexpassword=None, visibility=None, keep_intermediate_files=False, provenance=None ): """ Constructor :param outdir: Directory to create and put results in :type outdir: str :param ppigen: PPI Network Generator object, should be a subclass :type ppigen: :py:class:`~cellmaps_generate_hierarchy.ppi.PPINetworkGenerator` :param hiergen: Hierarchy Generator object, should be a subclass :type hiergen: :py:class:`~cellmaps_generate_hierarchy.HierarchyGenerator` :param algorithm: Clustering algorithm for hierarchy detection (default: 'leiden') :type algorithm: str :param maxres: Maximum resolution to explore when clustering (default: 80) :type maxres: int :param k: Number of neighbors for graph construction (default: 10) :type k: int :param gene_node_attributes: TSV file(s) or directory containing additional gene attributes to annotate network nodes :type gene_node_attributes: list[str] :param hiergen: Hierarchy generator object that clusters and converts networks to hierarchical structure :type hiergen: :py:class:`~cellmaps_generate_hierarchy.HierarchyGenerator` :param name: Optional dataset name used in metadata and registration :type name: str :param organization_name: Name of the organization creating this dataset :type organization_name: str :param project_name: Name of the project associated with this analysis :type project_name: str :param layoutalgo: Optional layout algorithm to apply to hierarchy (currently unused due to CX2 format limitations) :type layoutalgo: :py:class:`~cellmaps_utils.layout.BaseLayout` or None :param skip_logging: If ``True`` skip logging, if ``None`` or ``False`` do NOT skip logging :type skip_logging: bool :param provenance_utils: Utility class for registering datasets, RO-Crates, and software in FAIRSCAPE :type provenance_utils: :py:class:`~cellmaps_utils.provenance.ProvenanceUtil` :param input_data_dict: Dictionary capturing run parameters for reproducibility and logging :type input_data_dict: dict or None :param ndexserver: NDEx server address for uploading hierarchy and networks :type ndexserver: str or None :param ndexuser: NDEx username for authentication :type ndexuser: str or None :param ndexpassword: NDEx password for authentication :type ndexpassword: str or None :param visibility: If set to ``public``, ``PUBLIC`` or ``True`` sets hierarchy and interactome to publicly visibility on NDEx, otherwise they are left as private :type visibility: str or bool :param keep_intermediate_files: If True, keeps PPI network files for review and registers them; otherwise deletes them :type keep_intermediate_files: bool :param provenance: Optional provenance metadata to use when no RO-Crate is available Example: .. code-block:: python { 'name': 'Example input dataset', 'organization-name': 'CM4AI', 'project-name': 'Example' } :type provenance: dict or None """ logger.debug('In constructor') if outdir is None: raise CellmapsGenerateHierarchyError('outdir is None') self._outdir = os.path.abspath(outdir) self._inputdirs = inputdirs self._start_time = int(time.time()) self._ppigen = ppigen self._algorithm = algorithm self._maxres = maxres self._k = k self._gene_node_attributes = gene_node_attributes self._hiergen = hiergen self._name = name self._project_name = project_name self._organization_name = organization_name self._keywords = None self._description = None if skip_logging is None: self._skip_logging = False else: self._skip_logging = skip_logging self._input_data_dict = input_data_dict self._provenance_utils = provenance_utils self._layoutalgo = layoutalgo self._server = ndexserver self._user = ndexuser self._password = ndexpassword self._visibility = visibility self.keep_intermediate_files = keep_intermediate_files self._provenance = provenance if self._input_data_dict is None: self._input_data_dict = {'outdir': self._outdir, 'inputdirs': self._inputdirs, 'embedding_generator': str(self._ppigen), 'algorithm': self._algorithm, 'maxres': self._maxres, 'k': self._k, 'gene_node_attributes': str(self._gene_node_attributes), 'hiergen': str(self._hiergen), 'ndexserver': self._server, 'ndexuser': self._user, 'ndexpassword': self._password, 'name': self._name, 'project_name': self._project_name, 'organization_name': self._organization_name, 'skip_logging': self._skip_logging, 'provenance': str(self._provenance) } def _update_provenance_fields(self): """ :return: """ rocrate_dirs = [] if self._inputdirs is not None: if isinstance(self._inputdirs, str): if os.path.exists(os.path.join(os.path.abspath(self._inputdirs), constants.RO_CRATE_METADATA_FILE)): rocrate_dirs.append(self._inputdirs) else: for embeddind_dir in self._inputdirs: if os.path.exists(os.path.join(os.path.abspath(embeddind_dir), constants.RO_CRATE_METADATA_FILE)): rocrate_dirs.append(embeddind_dir) if len(rocrate_dirs) > 0: prov_attrs = self._provenance_utils.get_merged_rocrate_provenance_attrs(self._inputdirs, override_name=self._name, override_project_name= self._project_name, override_organization_name= self._organization_name, extra_keywords=['hierarchy', 'model']) self._name = prov_attrs.get_name() self._organization_name = prov_attrs.get_organization_name() self._project_name = prov_attrs.get_project_name() self._keywords = prov_attrs.get_keywords() self._description = prov_attrs.get_description() elif self._provenance is not None: self._name = self._provenance['name'] if 'name' in self._provenance else 'Hierarchy' self._organization_name = self._provenance['organization-name'] \ if 'organization-name' in self._provenance else 'NA' self._project_name = self._provenance['project-name'] \ if 'project-name' in self._provenance else 'NA' self._keywords = self._provenance['keywords'] if 'keywords' in self._provenance else ['hierarchy', 'model'] self._description = self._provenance['description'] if 'description' in self._provenance else \ 'Hierarchy generation' else: raise CellmapsGenerateHierarchyError('One of inputs directories should be an RO-Crate or provenance file ' 'should be specified.') def _create_rocrate(self): """ Creates rocrate for output directory :raises CellMapsProvenanceError: If there is an error """ logger.debug('Registering rocrate with FAIRSCAPE') try: self._provenance_utils.register_rocrate(self._outdir, name=self._name, organization_name=self._organization_name, project_name=self._project_name, description=self._description, keywords=self._keywords) except TypeError as te: raise CellmapsGenerateHierarchyError('Invalid provenance: ' + str(te)) except KeyError as ke: raise CellmapsGenerateHierarchyError('Key missing in provenance: ' + str(ke)) def _get_keywords_extended_with_new_values(self, new_values=None): """ Takes keywords passed into constructor and append **new_values** and return a unique list of merged values :param new_values: new values :type new_values: list :return: merged list of keywords :rtype: list """ if self._keywords is None or len(self._keywords) == 0: keywords = [] else: keywords = self._keywords.copy() if isinstance(new_values, list): keywords.extend(new_values) else: keywords.append(new_values) return list(set(keywords)) def _register_software(self): """ Registers this tool :raises CellMapsImageEmbeddingError: If fairscape call fails """ software_keywords = self._get_keywords_extended_with_new_values(new_values=['tools', cellmaps_generate_hierarchy.__name__]) software_description = self._description + ' ' + \ cellmaps_generate_hierarchy.__description__ self._softwareid = self._provenance_utils.register_software(self._outdir, name=cellmaps_generate_hierarchy.__name__, description=software_description, author=cellmaps_generate_hierarchy.__author__, version=cellmaps_generate_hierarchy.__version__, file_format='py', keywords=software_keywords, url=cellmaps_generate_hierarchy.__repo_url__) def _register_computation(self, generated_dataset_ids=[]): """ # Todo: added in used dataset, software and what is being generated :return: """ logger.debug('Getting id of input rocrate') input_dataset_ids = [] if isinstance(self._inputdirs, list): for i_dir in self._inputdirs: input_dataset_ids.append(self._provenance_utils.get_id_of_rocrate(i_dir)) else: input_dataset_ids.append(self._provenance_utils.get_id_of_rocrate(self._inputdirs)) keywords = self._get_keywords_extended_with_new_values(new_values=['computation']) description = self._description + ' run of ' + cellmaps_generate_hierarchy.__name__ self._provenance_utils.register_computation(self._outdir, name=cellmaps_generate_hierarchy.__computation_name__, run_by=str(self._provenance_utils.get_login()), command=str(self._input_data_dict), description=description, keywords=keywords, used_software=[self._softwareid], used_dataset=input_dataset_ids, generated=generated_dataset_ids)
[docs] def get_ppi_network_dest_file(self, ppi_network): """ Gets the path where the PPI network should be written to :param ppi_network: PPI Network :type ppi_network: :py:class:`ndex2.nice_cx_network.NiceCXNetwork` :return: Path on filesystem to write the PPI network :rtype: str """ cutoff = ppi_network.get_network_attribute('cutoff')['v'] return os.path.join(self._outdir, constants.PPI_NETWORK_PREFIX + '_cutoff_' + str(cutoff))
[docs] def get_hierarchy_dest_file(self): """ Creates file path prefix for hierarchy Example path: ``/tmp/foo/hierarchy`` :return: Prefix path on filesystem to write Hierarchy Network :rtype: str """ return os.path.join(self._outdir, constants.HIERARCHY_NETWORK_PREFIX)
[docs] def get_hierarchy_parent_network_dest_file(self): """ Creates file path prefix for hierarchy parent network Example path: ``/tmp/foo/hierarchy_parent`` :return: """ return os.path.join(self._outdir, 'hierarchy_parent')
def _remove_ppi_networks(self, networks_paths): for n in networks_paths: try: os.remove(n + constants.CX_SUFFIX) except Exception as e: logger.warning(f"Tried to remove ppi file {n}, but failed due to: {e}") def _write_ppi_network_as_cx(self, ppi_network, dest_path=None): """ :param ppi_network: :return: """ logger.debug('Writing PPI network ' + str(ppi_network.get_name())) # write PPI to filesystem with open(dest_path, 'w') as f: json.dump(ppi_network.to_cx(), f) def _register_ppi_network(self, ppi_network, dest_path=None): """ :param ppi_network: :return: """ logger.debug('Registering PPI network ' + str(ppi_network.get_name())) description = self._description description += ' PPI Network file' keywords = self._get_keywords_extended_with_new_values(new_values=['file']) # register ppi network file with fairscape data_dict = {'name': os.path.basename(dest_path) + ' PPI network file', 'description': description, 'keywords': keywords, 'data-format': 'CX', 'author': cellmaps_generate_hierarchy.__name__, 'version': cellmaps_generate_hierarchy.__version__, 'date-published': date.today().strftime(self._provenance_utils.get_default_date_format_str())} return self._provenance_utils.register_dataset(self._outdir, source_file=dest_path, data_dict=data_dict) def _write_hierarchy_network(self, hierarchy=None): """ Writes **hierarchy** to file :param hierarchy: CX2 network converted to list and dicts :type hierarchy: list :return: Path to hierarchy output file :rtype: str """ logger.debug('Writing hierarchy') suffix = '.cx2' # todo put this into cellmaps_utils.constants hierarchy_out_file = self.get_hierarchy_dest_file() + suffix with open(hierarchy_out_file, 'w') as f: json.dump(hierarchy, f) return hierarchy_out_file def _register_hierarchy_network(self, hierarchy_out_file=None, hierarchyurl=None): """ :param network: :return: """ logger.debug('Register hierarchy with fairscape') description = self._description description += ' Hierarchy network file' keywords = self._get_keywords_extended_with_new_values(new_values=['file', 'hierarchy', 'network', 'HCX']) # register hierarchy network file with fairscape # The name must be Output Dataset so that the cm4ai portal knows to # grab the URL link data_dict = {'name': 'Output Dataset', 'description': description, 'keywords': keywords, 'data-format': 'HCX', 'author': cellmaps_generate_hierarchy.__name__, 'version': cellmaps_generate_hierarchy.__version__, 'date-published': date.today().strftime(self._provenance_utils.get_default_date_format_str())} if hierarchyurl is not None: data_dict['url'] = hierarchyurl dataset_id = self._provenance_utils.register_dataset(self._outdir, source_file=hierarchy_out_file, data_dict=data_dict) return dataset_id def _write_and_register_hierarchy_parent_network(self, parent=None, parenturl=None): """ :param network: :return: """ logger.debug('Writing hierarchy parent') suffix = '.cx2' # todo put this into cellmaps_utils.constants parent_out_file = self.get_hierarchy_parent_network_dest_file() + suffix with open(parent_out_file, 'w') as f: json.dump(parent, f) description = self._description description += ' Hierarchy parent network file' keywords = self._get_keywords_extended_with_new_values(new_values=['file', 'parent', 'interactome', 'ppi', 'network', 'CX2']) data_dict = {'name': 'Hierarchy parent network', 'description': description, 'keywords': keywords, 'data-format': 'CX2', 'author': cellmaps_generate_hierarchy.__name__, 'version': cellmaps_generate_hierarchy.__version__, 'date-published': date.today().strftime(self._provenance_utils.get_default_date_format_str())} if parenturl is not None: data_dict['url'] = parenturl dataset_id = self._provenance_utils.register_dataset(self._outdir, source_file=parent_out_file, data_dict=data_dict) return dataset_id def _register_hidef_output_with_gene_names(self, hidef_output_path, hidef_output_name): """ """ logger.debug(f'Registering hidef output {hidef_output_name} with gene names') description = self._description description += f' HiDeF output {hidef_output_name} with gene names file' keywords = self._get_keywords_extended_with_new_values(new_values=['file']) # register file with fairscape data_dict = {'name': f'HiDeF output {hidef_output_name} with gene names', 'description': description, 'keywords': keywords, 'data-format': "tsv", 'author': cellmaps_generate_hierarchy.__name__, 'version': cellmaps_generate_hierarchy.__version__, 'date-published': date.today().strftime(self._provenance_utils.get_default_date_format_str())} return self._provenance_utils.register_dataset(self._outdir, source_file=hidef_output_path, data_dict=data_dict) def _add_gene_node_attributes(self, parent_ppi): """ Adds gene node attributes to the parent PPI network from provided TSV files or found in ro-crates. :param parent_ppi: The PPI network to which the attributes will be added. :type parent_ppi: :py:class:`ndex2.cx2.CX2Network` :return: The parent PPI network object with the new attributes added. :rtype: :py:class:`ndex2.cx2.CX2Network` """ node_name_dict = {} for node_id, node_obj in parent_ppi.get_nodes().items(): node_name_dict[node_obj['v']['name']] = node_id for entry_path in self._gene_node_attributes: attr_files = list() if os.path.isdir(entry_path): attr_files.extend([os.path.join(entry_path, f) for f in os.listdir(entry_path) if re.match(r'\d+_' + re.escape(constants.IMAGE_GENE_NODE_ATTR_FILE), f)]) ppi_attr_file = os.path.join(entry_path, constants.PPI_GENE_NODE_ATTR_FILE) if os.path.exists(ppi_attr_file): attr_files.append(ppi_attr_file) if len(attr_files) < 1: logger.warning(f"No attribute file found in directory {entry_path}") continue elif entry_path.endswith('.tsv'): attr_files.append(entry_path) else: logger.warning(f"Entry is neither a directory nor a TSV file: {entry_path}") continue for attribute_file in attr_files: df = pd.read_csv(attribute_file, sep='\t', header=0) for _, row in df.iterrows(): gene_name = row.iloc[0] node_id = node_name_dict.get(gene_name, None) if node_id is None: continue for column_name in df.columns[1:]: if not pd.isna(row[column_name]): parent_ppi.add_node_attribute(node_id, column_name, row[column_name]) if column_name == 'represents' and row[column_name].startswith('ensembl:ENSG'): ensembl_only = re.sub('^ensembl:', '', row[column_name]) # URL suggested by Jan to get HPA info parent_ppi.add_node_attribute(node_id, 'representsurl', 'https://www.proteinatlas.org/' + ensembl_only + '/subcellular') # URL suggested by Jan to get all antibodies for given ensembl id parent_ppi.add_node_attribute(node_id, 'antibodyurl', 'https://www.proteinatlas.org/' + ensembl_only + '/summary/antibody') return parent_ppi def _get_network_attribute(self, network=None, attribute_name=None, default='Unknown'): """ Gets network attribute from **network** value matching **attribute_name** or value of **default** if not found :param network: :type network: :py:class:`~ndex2.cx2.CX2Network` :param attribute_name: :type attribute_name: str :param default: :type default: str :return: :rtype: str """ net_attrs = network.get_network_attributes() if net_attrs is None: logger.info('Network lacks any network attributes. hmm....') return default if attribute_name in net_attrs: return net_attrs[attribute_name] logger.debug(str(attribute_name) + ' network attribute note found. using default') return default def _update_ppi_with_hierarchy_attributes(self, parent_ppi=None, hierarchy=None): """ Updates parent_ppi aka parent network with some attributes from hierarchy namely **prov:wasGeneratedBy** and **prov:wasDerivedFrom** In addition :param hierarchy: :type hierarchy: :py:class:`~ndex2.cx2.CX2Network` """ parent_ppi.add_network_attribute('prov:wasGeneratedBy', self._get_network_attribute(hierarchy, attribute_name='prov:wasGeneratedBy')) parent_ppi.add_network_attribute('prov:wasDerivedFrom', self._get_network_attribute(hierarchy, attribute_name='prov:wasDerivedFrom')) p_net_attrs = parent_ppi.get_network_attributes() parent_ppi.add_network_attribute('name', str(hierarchy.get_name() + ' ' + p_net_attrs['name'])) def generate_readme(self): description = getattr(cellmaps_generate_hierarchy, '__description__', 'No description provided.') version = getattr(cellmaps_generate_hierarchy, '__version__', '0.0.0') with open(os.path.join(os.path.dirname(__file__), 'readme_outputs.txt'), 'r') as f: readme_outputs = f.read() readme = readme_outputs.format(DESCRIPTION=description, VERSION=version) with open(os.path.join(self._outdir, 'README.txt'), 'w') as f: f.write(readme)
[docs] def run(self): """ Runs CM4AI Generate Hierarchy :return: """ exitcode = 99 try: logger.debug('In run method') if os.path.isdir(self._outdir): raise CellmapsGenerateHierarchyError(self._outdir + ' already exists') if not os.path.isdir(self._outdir): os.makedirs(self._outdir, mode=0o755) if self._skip_logging is False: logutils.setup_filelogger(outdir=self._outdir, handlerprefix='cellmaps_image_embedding') logutils.write_task_start_json(outdir=self._outdir, start_time=self._start_time, data={'commandlineargs': self._input_data_dict}, version=cellmaps_generate_hierarchy.__version__) self.generate_readme() self._update_provenance_fields() self._create_rocrate() self._register_software() generated_dataset_ids = [] ppi_network_prefix_paths = [] # generate PPI networks for ppi_network in tqdm(self._ppigen.get_next_network(), desc='Generating hierarchy'): dest_prefix = self.get_ppi_network_dest_file(ppi_network) ppi_network_prefix_paths.append(dest_prefix) cx_path = dest_prefix + constants.CX_SUFFIX self._write_ppi_network_as_cx(ppi_network, dest_path=cx_path) if self.keep_intermediate_files: generated_dataset_ids.append(self._register_ppi_network(ppi_network, dest_path=cx_path)) # generate hierarchy and get parent ppi hierarchy, parent_ppi = self._hiergen.get_hierarchy(ppi_network_prefix_paths, self._algorithm, self._maxres, self._k) if not self.keep_intermediate_files: self._remove_ppi_networks(ppi_network_prefix_paths) if self._gene_node_attributes is not None: parent_ppi = self._add_gene_node_attributes(parent_ppi) if "bait" in parent_ppi.get_attribute_declarations()['nodes']: parent_ppi = HCXFromCDAPSCXHierarchy.apply_style_to_network(parent_ppi, 'interactome_style_with_bait.cx2') parenturl = None hierarchyurl = None self._update_ppi_with_hierarchy_attributes(parent_ppi=parent_ppi, hierarchy=hierarchy) if self._server is not None and self._user is not None and self._password is not None: ndex_uploader = NDExHierarchyUploader(self._server, self._user, self._password, self._visibility) _, parenturl, _, hierarchyurl = ndex_uploader.save_hierarchy_and_parent_network(hierarchy, parent_ppi) message = (f'Hierarchy uploaded. To view hierarchy on NDEx please paste this URL in your browser ' f'{hierarchyurl}. To view Hierarchy on new experimental Cytoscape on the Web, ' f'go to {ndex_uploader.get_cytoscape_url(hierarchyurl)}') print(message) logger.info(message) hierarchy = hierarchy.to_cx2() parent_ppi = parent_ppi.to_cx2() # TODO: Need to support layout with HCX warnings.warn("Layout disabled due to incompatibilities with HCX format") # if self._layoutalgo is not None: # logger.debug('Applying layout') # self._layoutalgo.add_layout(network=hierarchy) # else: # logger.debug('No layout algorithm set, skipping') # write out hierarchy hierarchy_out_file = self._write_hierarchy_network(hierarchy) # write out parent network and register with fairscape generated_dataset_ids.append(self._write_and_register_hierarchy_parent_network(parent=parent_ppi, parenturl=parenturl)) generated_dataset_ids.append(self._register_hierarchy_network(hierarchy_out_file, hierarchyurl=hierarchyurl)) # add datasets created by hiergen object generated_dataset_ids.extend(self._hiergen.get_generated_dataset_ids()) hidef_converter = HierarchyToHiDeFConverter(self._outdir, self._outdir) hidef_nodes, hidef_edges = hidef_converter.generate_hidef_files() generated_dataset_ids.append( self._register_hidef_output_with_gene_names(hidef_nodes, 'nodes')) generated_dataset_ids.append( self._register_hidef_output_with_gene_names(hidef_edges, 'edges')) # register generated datasets self._register_computation(generated_dataset_ids=generated_dataset_ids) exitcode = 0 finally: logutils.write_task_finish_json(outdir=self._outdir, start_time=self._start_time, status=exitcode) return exitcode