Source code for cellmaps_generate_hierarchy.hierarchy

import os
import sys
import csv
import logging
import subprocess
from datetime import date
import random

import ndex2
import cdapsutil
import cellmaps_generate_hierarchy
from cellmaps_utils import constants
from cellmaps_utils.provenance import ProvenanceUtil
from cellmaps_generate_hierarchy.exceptions import CellmapsGenerateHierarchyError

logger = logging.getLogger(__name__)


[docs] class HierarchyGenerator(object): """ Base class for generating hierarchy that is output in CX format following CDAPS style """ def __init__(self, provenance_utils=ProvenanceUtil(), author='cellmaps_generate_hierarchy', version=cellmaps_generate_hierarchy.__version__): """ Constructor """ self._provenance_utils = provenance_utils self._author = author self._version = version self._generated_dataset_ids = []
[docs] def get_generated_dataset_ids(self): """ Gets IDs of datasets created by this object that have been registered with FAIRSCAPE :return: """ return self._generated_dataset_ids
[docs] def get_hierarchy(self, networks, algorithm='leiden', maxres=80, k=10): """ Gets hierarchy :return: (hierarchy as :py:class:`list`, parent ppi as :py:class:`list`) :rtype: tuple """ raise NotImplementedError('Subclasses need to implement')
[docs] class CDAPSHiDeFHierarchyGenerator(HierarchyGenerator): """ Generates hierarchy using HiDeF """ CDAPS_JSON_FILE = 'cdaps.json' EDGELIST_TSV = '.id.edgelist.tsv' HIDEF_OUT_PREFIX = 'hidef_output' TRANSLATED_HIDEF_OUT_PREFIX = 'hidefnames_output' CDRES_KEY_NAME = 'communityDetectionResult' NODE_CX_KEY_NAME = 'nodeAttributesAsCX2' ATTR_DEC_NAME = 'attributeDeclarations' PERSISTENCE_COL_NAME = 'HiDeF_persistence' HIERARCHY_PARENT_CUTOFF = 0.1 BOOTSTRAP_EDGES = 0 def __init__(self, hidef_cmd='hidef_finder.py', provenance_utils=ProvenanceUtil(), refiner=None, hcxconverter=None, hierarchy_parent_cutoff=HIERARCHY_PARENT_CUTOFF, author='cellmaps_generate_hierarchy', version=cellmaps_generate_hierarchy.__version__, bootstrap_edges=BOOTSTRAP_EDGES): """ :param hidef_cmd: HiDeF command line binary :type hidef_cmd: str :param provenance_utils: :param author: :type author: str :param version: """ super().__init__(provenance_utils=provenance_utils, author=author, version=version) self._refiner = refiner self._hcxconverter = hcxconverter self._hierarchy_parent_cutoff = hierarchy_parent_cutoff self._python = sys.executable if os.sep not in hidef_cmd: self._hidef_cmd = os.path.join(os.path.dirname(self._python), hidef_cmd) else: self._hidef_cmd = hidef_cmd self._bootstrap_edges = bootstrap_edges def _get_max_node_id(self, nodes_file): """ Examines the 'nodes_file' passed in and finds the value of highest node id. It is assumed the 'nodes_file' a tab delimited file of format: <CLUSTER NAME> <# NODES> <SPACE DELIMITED NODE IDS> <SCORE> :param nodes_file: :type nodes_file: Path to to nodes file from hidef output :return: highest node id found :rtype: int """ maxval = None with open(nodes_file, 'r') as csvfile: linereader = csv.reader(csvfile, delimiter='\t') for row in linereader: for node in row[2].split(' '): if maxval is None: maxval = int(node) continue curval = int(node) if curval > maxval: maxval = curval return maxval
[docs] def write_members_for_row(self, out_stream, row, cur_node_id): """ Given a row from nodes file from hidef output the members of the clusters by parsing the <SPACE DELIMITED NODE IDS> as mentioned in :py:func:`#get_max_node_id` description. The output is written to `out_stream` for each node id in format: <cur_node_id>,<node id>,c-m; :param out_stream: :type out_stream: file like object :param row: Should be a line from hidef nodes file parsed by :py:func:`csv.reader` :type row: iterator :param cur_node_id: id of cluster that contains the nodes :type cur_node_id: int :return: None """ for node in row[2].split(' '): out_stream.write(str(cur_node_id) + ',' + node + ',c-m;')
[docs] def update_cluster_node_map(self, cluster_node_map, cluster, max_node_id): """ Updates 'cluster_node_map' which is in format of <cluster name> => <node id> by adding 'cluster' to 'cluster_node_map' if it does not exist :param cluster_node_map: map of cluster names to node ids :type cluster_node_map: dict :param cluster: name of cluster :type cluster: str :param max_node_id: current max node id :type max_node_id: int :return: (new 'max_node_id' if 'cluster' was added otherwise 'max_node_id', id corresponding to 'cluster' found in 'cluster_node_map') :rtype: tuple """ if cluster not in cluster_node_map: max_node_id += 1 cluster_node_map[cluster] = max_node_id cur_node_id = max_node_id else: cur_node_id = cluster_node_map[cluster] return max_node_id, cur_node_id
[docs] def update_persistence_map(self, persistence_node_map, node_id, persistence_val): """ :param persistence_node_map: :param node_id: :param persistence_val: :return: """ if node_id not in persistence_node_map: persistence_node_map[node_id] = persistence_val
[docs] def write_communities(self, out_stream, edge_file, cluster_node_map): """ Writes out links between clusters in COMMUNITYDETECTRESULT format as noted in :py:func:`#convert_hidef_output_to_cdaps` using hidef edge file set in 'edge_file' that is expected to be in this tab delimited format: <SOURCE CLUSTER> <TARGET CLUSTER> <default> This function converts the <SOURCE CLUSTER> <TARGET CLUSTER> to new node ids (leveraging 'cluster_node_map') and writes the following output: <SOURCE CLUSTER NODE ID>,<TARGET CLUSTER NODE ID>,c-c; to the 'out_stream' :param out_stream: output stream :type out_stream: file like object :param edge_file: path to hidef edges file :type edge_file: str :return: None """ with open(edge_file, 'r') as csvfile: linereader = csv.reader(csvfile, delimiter='\t') for row in linereader: out_stream.write(str(cluster_node_map[row[0]]) + ',' + str(cluster_node_map[row[1]]) + ',c-c;') out_stream.write('",')
[docs] def write_persistence_node_attribute(self, out_stream, persistence_map): """ :param out_stream: :param persistence_map: :return: """ out_stream.write('"' + CDAPSHiDeFHierarchyGenerator.NODE_CX_KEY_NAME + '": {') out_stream.write('"' + CDAPSHiDeFHierarchyGenerator.ATTR_DEC_NAME + '": [{') out_stream.write('"nodes": { "' + CDAPSHiDeFHierarchyGenerator.PERSISTENCE_COL_NAME + '": { "d": "integer", "a": "p1", "v": 0}}}],') out_stream.write('"nodes": [') is_first = True for key in persistence_map: if is_first is False: out_stream.write(',') else: is_first = False out_stream.write('{"id": ' + str(key) + ',') out_stream.write('"v": { "p1": ' + str(persistence_map[key]) + '}}') out_stream.write(']}}')
[docs] def convert_hidef_output_to_cdaps(self, out_stream, outdir): """ Looks for x.nodes and x.edges in `outdir` directory to generate output in COMMUNITYDETECTRESULT format: https://github.com/idekerlab/communitydetection-rest-server/wiki/COMMUNITYDETECTRESULT-format This method leverages :py:func:`#write_members_for_row` and :py:func:`#write_communities` to write output :param out_stream: output stream to write results :type out_stream: file like object :param outdir: :type outdir: str :return: None """ nodefile = os.path.join(outdir, CDAPSHiDeFHierarchyGenerator.HIDEF_OUT_PREFIX + '.pruned.nodes') max_node_id = self._get_max_node_id(nodefile) cluster_node_map = {} persistence_map = {} out_stream.write('{"communityDetectionResult": "') with open(nodefile, 'r') as csvfile: linereader = csv.reader(csvfile, delimiter='\t') for row in linereader: max_node_id, cur_node_id = self.update_cluster_node_map(cluster_node_map, row[0], max_node_id) self.update_persistence_map(persistence_map, cur_node_id, row[-1]) self.write_members_for_row(out_stream, row, cur_node_id) edge_file = os.path.join(outdir, CDAPSHiDeFHierarchyGenerator.HIDEF_OUT_PREFIX + '.pruned.edges') self.write_communities(out_stream, edge_file, cluster_node_map) self.write_persistence_node_attribute(out_stream, persistence_map) out_stream.write('\n') return None
def _run_cmd(self, cmd, cwd=None, timeout=86400): """ Runs command as a command line process :param cmd_to_run: command to run as list :type cmd_to_run: list :return: (return code, standard out, standard error) :rtype: tuple """ logger.debug('Running command under ' + str(cwd) + ' path: ' + str(cmd)) p = subprocess.Popen(cmd, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: out, err = p.communicate(timeout=timeout) except subprocess.TimeoutExpired: logger.warning('Timeout reached. Killing process') p.kill() out, err = p.communicate() raise CellmapsGenerateHierarchyError('Process timed out. exit code: ' + str(p.returncode) + ' stdout: ' + str(out) + ' stderr: ' + str(err)) return p.returncode, out, err def _get_largest_network(self, networks): """ Finds largest network by file size :param networks: list of :py:class:`~ndex2.nice_cx_network.NiceCXNetwork` objects :type networks: list :return: Largest network :rtype: :py:class:`~ndex2.nice_cx_network.NiceCXNetwork` """ largest_network = None max_file_size = 0 for n in networks: file_size = os.path.getsize(n + constants.CX_SUFFIX) if file_size >= max_file_size: largest_network = n max_file_size = file_size return largest_network def _get_parent_net_with_specified_cutoff(self, network, path, closest_net, closest_path, min_difference=float('inf')): """ Determines the network with cutoff closest to the hierarchy_parent_cutoff value. :param network: The network to be evaluated. :type network: Network :param path: The file path or identifier for the network. :type path: str :param closest_net: Currently identified the closest network. :type closest_net: Network :param closest_path: File path or identifier for the current closest network. :type closest_path: str :param min_difference: Minimum difference between the cutoff values, defaults to float('inf'). :type min_difference: float :return: A tuple of the closest network, its path, and the minimum difference in cutoff values. :rtype: tuple """ cutoff_attr = network.get_network_attribute('cutoff') cutoff_value = 1 if cutoff_attr is None else float(cutoff_attr['v']) if cutoff_value == self._hierarchy_parent_cutoff: return network, path, 0 else: difference = abs(self._hierarchy_parent_cutoff - cutoff_value) if difference < min_difference: min_difference = difference closest_net = network closest_path = path return closest_net, closest_path, min_difference def _get_name_to_id_dict(self, network): """ :param network: :return: """ name_to_id = {} for node_id, node_obj in network.get_nodes(): name_to_id[node_obj['n']] = node_id return name_to_id def _get_id_to_name_dict(self, network): """ :param network: :return: """ id_to_name = {} for node_id, node_obj in network.get_nodes(): id_to_name[node_id] = node_obj['n'] return id_to_name def _create_edgelist_files_for_networks(self, networks): """ Iterates through **networks** prefix paths and loads the CX files. Method then creates a PREFIX_PATH :py:const:`CDAPSHiDeFHierarchyGenerator.EDGELIST_TSV` file for each network and returns those paths as a list :param networks: Prefix paths of input PPI networks :type networks: list :return: (parent network path, :py:class:`~ndex2.nice_cx_network.NiceCXNetwork`, largest network path, :py:class:`list`) :rtype: tuple """ net_paths = [] largest_network_path = self._get_largest_network(networks) largest_network = ndex2.create_nice_cx_from_file(largest_network_path + constants.CX_SUFFIX) logger.debug('Largest network name: ' + largest_network.get_name()) largest_name_to_id = self._get_name_to_id_dict(largest_network) # Bootstrap edges all_edges = [(edge_obj['s'], edge_obj['t']) for _, edge_obj in largest_network.get_edges()] num_edges_to_remove = int(len(all_edges) * (self._bootstrap_edges / 100)) all_removed_edges = random.sample(all_edges, num_edges_to_remove) parent_net = None parent_path = None min_difference = float('inf') for n in networks: if largest_network_path == n: net = largest_network else: logger.debug('Creating NiceCXNetwork object from: ' + n + constants.CX_SUFFIX) net = ndex2.create_nice_cx_from_file(n + constants.CX_SUFFIX) dest_path = n + CDAPSHiDeFHierarchyGenerator.EDGELIST_TSV net_paths.append(dest_path) logger.debug('Writing out id edgelist: ' + str(dest_path)) id_to_name = self._get_id_to_name_dict(net) remaining_edges = list() removed_edges = list() for _, edge_obj in net.get_edges(): edge = (edge_obj['s'], edge_obj['t']) if len(removed_edges) >= int(len(net.get_edges()) * (self._bootstrap_edges / 100)): remaining_edges.append(edge) elif edge not in all_removed_edges: remaining_edges.append(edge) else: removed_edges.append(edge) with open(dest_path, 'w') as f: for s, t in remaining_edges: f.write(str(largest_name_to_id[id_to_name[s]]) + '\t' + str(largest_name_to_id[id_to_name[t]]) + '\n') if len(removed_edges) > 0: removed_edges_path = n + '_removed_edges.tsv' with open(removed_edges_path, 'w') as f: for s, t in removed_edges: f.write(str(largest_name_to_id[id_to_name[s]]) + '\t' + str(largest_name_to_id[id_to_name[t]]) + '\n') if len(remaining_edges) == 0: raise CellmapsGenerateHierarchyError(f"PPI network {n} has no edges. Cannot create hierarchy.") # register edgelist file with fairscape data_dict = {'name': os.path.basename(dest_path) + ' PPI id edgelist file', 'description': 'PPI id edgelist file', 'data-format': 'tsv', 'author': str(self._author), 'version': str(self._version), 'date-published': date.today().strftime( self._provenance_utils.get_default_date_format_str())} dataset_id = self._provenance_utils.register_dataset(os.path.dirname(dest_path), source_file=dest_path, data_dict=data_dict) self._generated_dataset_ids.append(dataset_id) if min_difference != 0: parent_net, parent_path, min_difference = self._get_parent_net_with_specified_cutoff( net, n, parent_net, parent_path, min_difference) logger.debug('Parent network name: ' + parent_net.get_name()) return parent_path + constants.CX_SUFFIX, parent_net, largest_network, net_paths def _register_hidef_output_files(self, outdir): """ Register <HIDEF_PREFIX>.nodes and <HIDEF_PREFIX>.edges and <HIDEF_PREFIX>.weaver files with FAIRSCAPE """ for hidef_file in [('nodes', 'tsv'), ('edges', 'tsv'), ('weaver', 'npy')]: outfile = os.path.join(outdir, CDAPSHiDeFHierarchyGenerator.HIDEF_OUT_PREFIX + '.' + hidef_file[0]) data_dict = {'name': os.path.basename(outfile) + ' HiDeF output ' + hidef_file[0] + ' file', 'description': ' HiDeF output ' + hidef_file[0] + ' file', 'data-format': hidef_file[1], 'author': str(self._author), 'version': str(self._version), 'date-published': date.today().strftime(self._provenance_utils.get_default_date_format_str())} dataset_id = self._provenance_utils.register_dataset(os.path.dirname(outfile), source_file=outfile, data_dict=data_dict) self._generated_dataset_ids.append(dataset_id) def _annotate_hierarchy(self, network=None, path=None): """ Adds HCX attributes to network as well as sets ``prov:wasGeneratedBy`` to the name and version of this tool ``prov:wasDerivedFrom`` to FAIRSCAPE dataset id of this rocrate :param network: Hierarchy :type network: :py:class:`~ndex2.nice_cx_network.NiceCXNetwork` :param path: Path to parent PPI network in CX or CX2 format :type path: str """ network.set_network_attribute(name='prov:wasGeneratedBy', values=self._author + ' ' + self._version) rocrate_id = self._provenance_utils.get_id_of_rocrate(os.path.dirname(path)) description = 'Cell Map Hierarchy' if rocrate_id is not None: network.set_network_attribute(name='prov:wasDerivedFrom', values='RO-crate: ' + str(rocrate_id)) prov_utils = self._provenance_utils.get_rocrate_provenance_attributes(os.path.dirname(path)) if self._bootstrap_edges > 0: description = (description + ' derived from edgeslists with ' + str(self._bootstrap_edges) + '% of edges randomly removed') network.set_network_attribute(name='description', values=description + '|' + str(prov_utils.get_description())) if prov_utils.get_keywords() is None: keyword_subset = [] else: keyword_subset = prov_utils.get_keywords()[:6] network_name = (prov_utils.get_name() + ' - ' + ' '.join(keyword_subset) + ' hierarchy').lstrip() network.set_name(network_name) else: network.set_network_attribute(name='description', values=description) network.set_name(description.lstrip()) def _annotate_hierarchy_nodes(self, network): """ Annotates each node in the hierarchy with its community name and a label flag. :param network: The hierarchy containing nodes to be annotated. :type network: :py:class:`~ndex2.nice_cx_network.NiceCXNetwork` """ for node_id, node_obj in network.get_nodes(): name = node_obj['n'] network.set_node_attribute(node_id, 'CD_CommunityName', values=name, type='string', overwrite=True) network.set_node_attribute(node_id, 'CD_Labeled', values='true', type='boolean', overwrite=True) def _run_hidef(self, edgelist_files, outputprefix, algorithm, maxres, k): cmd = [self._python, self._hidef_cmd, '--g'] cmd.extend(edgelist_files) cmd.extend(['--o', outputprefix, '--alg', algorithm, '--maxres', str(maxres), '--k', str(k), '--skipgml']) exit_code, out, err = self._run_cmd(cmd) if exit_code != 0: logger.error('Cmd failed with exit code: ' + str(exit_code) + ' : ' + str(out) + ' : ' + str(err)) raise CellmapsGenerateHierarchyError('Cmd failed with exit code: ' + str(exit_code) + ' : ' + str(out) + ' : ' + str(err))
[docs] def get_hierarchy_from_edgelists(self, outdir, edgelist_files, parent_net, algorithm='leiden', maxres=80, k=10): """ Generates a hierarchy from edgelist files using HiDeF. This method runs the HiDeF algorithm on the provided edgelist files to generate a hierarchical community structure. It optionally refines the hierarchy, converts the HiDeF output to CDAPS format, and then uses `cdapsutil` to run community detection on the parent network. :param outdir: The output directory where HiDeF results and intermediate files will be stored. :type outdir: str :param edgelist_files: A list of paths to edgelist files to be used as input to HiDeF. :type edgelist_files: list :param parent_net: The parent network on which community detection is performed. :type parent_net: :py:class:`~ndex2.nice_cx_network.NiceCXNetwork` or :py:class:`~ndex2.cx2.CX2Network` :param algorithm: The algorithm to use for community detection (default is 'leiden'). :type algorithm: str :param maxres: The maximum resolution parameter for HiDeF (default is 80). :type maxres: int :param k: The k parameter for HiDeF (default is 10). :type k: int :return: A tuple containing the resulting hierarchy and the path to the CDAPS output JSON file, or (None, None) if an error occurs. :rtype: tuple (hierarchy, str) or (None, None) :raises FileNotFoundError: If no output is generated from HiDeF. """ outputprefix = os.path.join(outdir, CDAPSHiDeFHierarchyGenerator.HIDEF_OUT_PREFIX) self._run_hidef(edgelist_files, outputprefix, algorithm, maxres, k) try: if self._refiner is not None: self._refiner.refine_hierarchy(outprefix=outputprefix) cdaps_out_file = os.path.join(outdir, CDAPSHiDeFHierarchyGenerator.CDAPS_JSON_FILE) with open(cdaps_out_file, 'w') as out_stream: self.convert_hidef_output_to_cdaps(out_stream, outdir) cd = cdapsutil.CommunityDetection(runner=cdapsutil.ExternalResultsRunner()) hier = cd.run_community_detection(parent_net, algorithm=cdaps_out_file) return hier, cdaps_out_file except FileNotFoundError as fe: logger.error('No output from hidef: ' + str(fe) + '\n') return None, None
[docs] def get_hierarchy(self, networks, algorithm='leiden', maxres=80, k=10): """ Runs HiDeF to generate hierarchy and registers resulting output files with FAIRSCAPE. To do this the method generates edgelist files from the CX files corresponding to the **networks** using the internal node ids for edge source and target names. These files are written to the same directory as the **networks** with HiDeF is then given all these networks via ``--g`` flag. .. warning:: Due to FAIRSCAPE registration this method is NOT threadsafe and cannot be called in parallel or with any other call that is updating FAIRSCAPE registration on the current RO-CRATE :param networks: Paths (without suffix ie .cx) to PPI networks to be used as input to HiDeF :type networks: list :param algorithm: The algorithm to use for community detection (default is 'leiden'). :type algorithm: str :param maxres: The maximum resolution parameter for HiDeF (default is 80). :type maxres: int :param k: The k parameter for HiDeF (default is 10). :type k: int :raises CellmapsGenerateHierarchyError: If there was an error :return: Resulting hierarchy or ``None`` if no hierarchy from HiDeF :return: (hierarchy as list, parent ppi as list, hierarchyurl, parenturl) or None, None if not created :rtype: tuple """ if self._hcxconverter is None: raise CellmapsGenerateHierarchyError('HCX converter must be set') outdir = os.path.dirname(networks[0]) (parent_net_path, parent_net, largest_net, edgelist_files) = self._create_edgelist_files_for_networks(networks) hier, cdaps_out_file = self.get_hierarchy_from_edgelists(outdir, edgelist_files, largest_net, algorithm, maxres, k) self._clean_tmp_edgelist_files(edgelist_files) self._annotate_hierarchy(network=hier, path=parent_net_path) self._annotate_hierarchy_nodes(network=hier) hierarchy_in_hcx = self._hcxconverter.get_converted_hierarchy(hierarchy=hier, parent_network=parent_net) # Register outputs from hierarchy generation self._register_hidef_output_files(outdir) # register cdaps json file with fairscape data_dict = {'name': os.path.basename(cdaps_out_file) + ' CDAPS output JSON file', 'description': 'CDAPS output JSON file', 'data-format': 'json', 'author': str(self._author), 'version': str(self._version), 'date-published': date.today().strftime(self._provenance_utils.get_default_date_format_str())} dataset_id = self._provenance_utils.register_dataset(os.path.dirname(cdaps_out_file), source_file=cdaps_out_file, data_dict=data_dict) self._generated_dataset_ids.append(dataset_id) return hierarchy_in_hcx
def _clean_tmp_edgelist_files(self, edgelist_files): for file in edgelist_files: try: file_path = os.path.join(os.getcwd(), '_tmp.' + os.path.basename(file)) if os.path.exists(file_path): os.remove(file_path) except Exception as e: logger.warning(f"Tried to remove tmp file, but failed due to: {e}")