# -*- coding: utf-8 -*-
import os
import subprocess
import logging
import json
import time
import requests
from ndex2.cx2 import CX2Network
from ndex2.nice_cx_network import NiceCXNetwork
import cdapsutil
from cdapsutil.exceptions import CommunityDetectionError
LOGGER = logging.getLogger(__name__)
def _cur_time_in_seconds():
return int(round(time.time()))
def _run_functional_enrichment_docker(docker_dict):
"""
Function that runs docker dumping results to
file path specified by docker_dict['outfile']
{'index': counter,
'node_id': node_id,
'outfile': <output file, must be in temp_dir>,
'image': <docker_image>,
'arguments': <list of arguments>,
'temp_dir': <temp directory where input data resides and output will be written>,
'docker_runner': <instance of DockerRunner class>}
:param docker_dict: {'outfile': <PATH WHERE OUTPUT RESULT SHOULD BE WRITTEN>}
:type docker_dict: dict
:return: None
"""
start_time = _cur_time_in_seconds()
drunner = docker_dict['docker_runner']
e_code, out, err = drunner.submit(algorithm=docker_dict['image'],
temp_dir=docker_dict['temp_dir'],
arguments=docker_dict['arguments'])
res = dict()
res['e_code'] = e_code
res['out'] = out.decode('utf-8')
res['err'] = err.decode('utf-8')
res['elapsed_time'] = _cur_time_in_seconds() - start_time
with open(docker_dict['outfile'], 'w') as f:
json.dump(res, f)
[docs]
class ProcessWrapper(object):
"""
Runs command line process
"""
def __init__(self):
"""
Constructor
"""
pass
[docs]
def run(self, cmd):
"""
Runs external process
:param cmd: Command to run. Should be a list of arguments
that include invoking command. For example to
run ``ls -la`` pass in ['ls','-la']
:type cmd: list
:return: (return code, stdout from subprocess, stderr from subprocess)
:rtype: tuple
"""
p = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = p.communicate()
return p.returncode, out, err
[docs]
class Runner(object):
"""
Base class for objects that run Community Detection Algorithms packaged as
`Docker <https://www.docker.com/>`__ containers built for
`CDAPS service <https://cdaps.readthedocs.io/>`__ via various means.
Currently built Runners:
:py:class:`ExternalResultsRunner` - Parses already run output file/stream
:py:class:`DockerRunner` - Runs locally via Docker
:py:class:`ServiceRunner` - Runs remotely via CDAPS Service
"""
def __init__(self):
"""
Constructor
"""
self._docker_image_name = ''
self._algorithm_name = ''
[docs]
def get_docker_image(self):
"""
Gets the name of the Docker image
:return: Name of Docker image or empty string if
unknown
:rtype: str
"""
return self._docker_image_name
[docs]
def set_docker_image(self, docker_image):
"""
Sets Docker image
:return:
"""
self._docker_image_name = docker_image
[docs]
def get_algorithm_name(self):
"""
Gets the algorithm name
:return: Name of algorithm or empty string if unknown
:rtype: str
"""
return self._algorithm_name
[docs]
def set_algorithm_name(self, algoname):
"""
Sets algorithm name
:param algoname: Name of algorithm
:type algoname: str
:return:
"""
self._algorithm_name = algoname
[docs]
def get_algorithms(self):
"""
Will always raise since subclasses should implement this
:py:class:`~cdapsutil.exceptions.CommunityDetectionError`
:raises CommunityDetectionError: Will always raise this
"""
raise CommunityDetectionError('Not implemented for this Runner')
[docs]
def run(self, net_cx=None, algorithm=None, arguments=None, temp_dir=None):
"""
Must be implemented by subclasses. Will always raise
:py:class:`cdapsutil.exceptions.CommunityDetectionError`
:param net_cx: Network to use as input
:type net_cx: :py:class:`ndex2.nice_cx_network.NiceCXNetwork`
:param algorithm:
:param arguments: Any custom parameters for algorithm. The
parameters should all be of type :py:class:`str`
If custom parameter is just a flag set
value to ``None``
Example: ``{'--flag': None, '--cutoff': '0.2'}``
:param temp_dir:
:raises CommunityDetectionError: Will always raise this
:return: None
"""
raise CommunityDetectionError('Not implemented for this Runner')
@staticmethod
def _write_edge_list(net_cx, tempdir=None, weight_col=None):
"""
Writes edges from `net_cx` network to file named ``input.edgelist``
in `tempdir` as a tab delimited file of source target
**WARNING** `weight_col` parameter is currently ignored
:param net_cx: Network to extract edges from
:type net_cx: :py:class:`ndex2.nice_cx_network.NiceCXNetwork` or :py:class:`ndex2.cx2.CX2Network`
:param tempdir: Directory to write edge list to
:type tempdir: str
:return: Path to edgelist file
:rtype: str
"""
edgelist = os.path.join(tempdir, 'input.edgelist')
with open(edgelist, 'w') as f:
if isinstance(net_cx, NiceCXNetwork):
for edge_id, edge_obj in net_cx.get_edges():
f.write(str(edge_obj['s']) + '\t' + str(edge_obj['t']) + '\n')
else:
for edge_id, edge_obj in net_cx.get_edges().items():
f.write(str(edge_obj['s']) + '\t' + str(edge_obj['t']) + '\n')
return edgelist
@staticmethod
def _get_edge_list(net_cx, weight_col=None):
"""
Gets edges from 'net_cx' network.
**WARNING** 'weight_col' parameter is currently ignored
:param net_cx: Network to extract edges from
:type net_cx: :py:class:`ndex2.nice_cx_network.NiceCXNetwork` or :py:class:`ndex2.cx2.CX2Network`
:param weight_col: Name of column to extract weights from
:type weight_col: str
:return: Edges in tab delimited format
:rtype: str
"""
edgelist = []
if isinstance(net_cx, NiceCXNetwork):
for edge_id, edge_obj in net_cx.get_edges():
edgelist.append(str(edge_obj['s']) + '\t' + str(edge_obj['t']) + '\n')
else:
for edge_id, edge_obj in net_cx.get_edges().items():
edgelist.append(str(edge_obj['s']) + '\t' + str(edge_obj['t']) + '\n')
return ''.join(edgelist)
[docs]
class ServiceRunner(Runner):
"""
:py:class:`Runner` that runs `CDAPS Service containers`
remotely via `CDAPS Service <https://cdaps.readthedocs.io>`__
:param service_endpoint: URL for CDAPS REST Service
:type service_endpoint: str
:param requests_timeout: Timeout in seconds to pass to
:py:mod:`requests` module for all web requests
:type requests_timeout: int or float
:param max_retries: Number of times to check for task completion
:type max_retries: int
:param poll_interval: Time to wait in seconds between checks for task
completion
:type poll_interval: int
"""
USER_AGENT_KEY = 'UserAgent'
"""
User Agent Header label
"""
REST_ENDPOINT = 'http://cdservice.cytoscape.org/cd/' \
'communitydetection/v1'
"""
Default Rest endpoint
"""
def __init__(self, service_endpoint=REST_ENDPOINT, requests_timeout=30,
max_retries=600, poll_interval=1):
"""
Constructor. See class docs for usage
"""
super().__init__()
self._service_endpoint = service_endpoint
self._requests_timeout=requests_timeout
self._useragent = 'cdapsutil/' +\
str(cdapsutil.__version__)
self._max_retries = max_retries
self._poll_interval = poll_interval
def _get_user_agent_header(self):
"""
Gets a :py:class:`dict` with User Agent for this
client that can be passed to `headers=` of :py:mod:`requests`
package
:return: ``{'UserAgent': 'cdapsutil/<VERSION>'}``
:rtype: dict
"""
return {ServiceRunner.USER_AGENT_KEY: self._useragent}
def _extract_exit_out_and_error_from_json(self, resp_as_json):
"""
:param resp_as_json:
:return:
"""
e_code = 0
if resp_as_json['status'] != 'complete':
e_code = 1
return e_code, resp_as_json['result'], resp_as_json['message']
[docs]
def run(self, net_cx=None, algorithm=None, arguments=None,
temp_dir=None):
"""
Runs 'algorithm' via `CDAPS service <https://cdaps.readthedocs.io/>`__
with error code, standard out and standard error derived
from the service call
:param net_cx: Network to use as input
:type net_cx: :py:class:`ndex2.nice_cx_network.NiceCXNetwork`
:param algorithm: Algorithm to run
:type algorithm: str
:param arguments: Any custom parameters for algorithm. The
parameters should all be of type :py:class:`str`
If custom parameter is just a flag set
value to ``None``
Example: ``{'--flag': None, '--cutoff': '0.2'}``
:type arguments: dict
:param temp_dir: Ignored
:type temp_dir: str
:raises CommunityDetectionError: If there is an error in running job
outside of non-zero exit code from
command
:return: (return code, stdout from subprocess, stderr from subprocess)
:rtype: tuple
"""
edgelist = self._get_edge_list(net_cx)
task_id = self.submit(algorithm=algorithm, data=edgelist,
arguments=arguments)['id']
LOGGER.debug('Waiting for task ' + str(task_id) + ' to complete')
self.set_algorithm_name(algorithm)
self.wait_for_task_to_complete(task_id,
max_retries=self._max_retries,
poll_interval=self._poll_interval)
resp_as_json = self.get_result(task_id)
if resp_as_json['status'] != 'complete':
CommunityDetectionError('Error running algorithm. '
'Raw JSON: ' +
str(resp_as_json))
return self._extract_exit_out_and_error_from_json(resp_as_json)
[docs]
def submit(self, algorithm=None, data=None,
arguments=None):
"""
Submits algorithm to `CDAPS service <https://cdaps.readthedocs.io/>`__
with endpoint set in constructor
:param algorithm: Name of algorithm to call
:type algorithm: str
:param data: The data to pass to the algorithm
:type object: Could be str, dict, list or anything that can be
converted to JSON
:param arguments: Any custom parameters for algorithm. The
parameters should all be of type :py:class:`str`
If custom parameter is just a flag set
value to ``None``
Example: ``{'--flag': None, '--cutoff': '0.2'}``
:type arguments: dict
:return: Task id in this format ``{'id': '<TASK ID'}``
:rtype: dict
"""
if algorithm is None or len(str(algorithm).strip()) == 0:
raise CommunityDetectionError('Algorithm is empty string or None')
thedata = {'algorithm': algorithm,
'data': data}
if arguments is not None:
thedata['customParameters'] = arguments
req = None
try:
LOGGER.debug('Submitting algorithm ' + str(algorithm) + ' to ' +
str(self._service_endpoint))
req = requests.post(self._service_endpoint, json=thedata,
headers=self._get_user_agent_header(),
timeout=self._requests_timeout)
if req.status_code != 202:
raise CommunityDetectionError('Received unexpected HTTP response '
'status code: ' +
str(req.status_code) +
' from request: ' +
str(req.text))
return req.json()
except requests.exceptions.HTTPError as he:
raise CommunityDetectionError('Received HTTPError submitting ' +
str(algorithm) + ' with parameters ' +
str(arguments) + ' : ' +
str(he))
except json.decoder.JSONDecodeError as je:
raise CommunityDetectionError('Unable to parse result from submit: ' +
str(je))
finally:
if req is not None:
try:
req.close()
except requests.exceptions.HTTPError as he:
LOGGER.debug('Caught HTTPError closing response : ' +
str(he))
pass
[docs]
def wait_for_task_to_complete(self, task_id, poll_interval=1,
consecutive_fail_retry=5,
max_retries=None):
"""
Waits for task with `task_id` id to complete.
:param task_id: Id of task
:type task_id: str
:param poll_interval: How long to wait in seconds before checking
again if task is complete
:type poll_interval: int
:param consecutive_fail_retry: If the number of consecutive failure calls to get
status exceeds this value an exception is raised
:type consecutive_fail_retry: int
:param max_retries: Total number of checks to perform before raising an
exception. For example, if `max_retries` is set to
``600`` and `poll_interval` is ``1`` then this
method will wait 10 minutes for task to complete
checking 600 times, or once per second.
**NOTE:** If set to``None`` this method will
poll indefinitely
:type max_retries: int
:raises CommunityDetectionError: If `task_id` is ``None``, if
`max_fail_retry` is exceeded,
if `max_retries` is exceeded
:return: status response of completed task
:rtype: dict
"""
if task_id is None or len(str(task_id).strip()) == 0:
raise CommunityDetectionError('Task id is empty string or None')
# polling loop to wait for task to complete
progress = 0
consecutive_err_cnt = 0
retry_count = 0
LOGGER.debug('Task id: ' + str(task_id) +
' Poll interval: ' + str(poll_interval) +
' consecutive fail retry: ' +
str(consecutive_fail_retry) +
' max retries: ' + str(max_retries))
while progress != 100 and consecutive_err_cnt <= consecutive_fail_retry:
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug('Try # ' + str(retry_count) + ' progress: ' +
str(progress) + ' consecutive error count: ' +
str(consecutive_err_cnt))
retry_count += 1
if max_retries is not None:
if retry_count > max_retries:
raise CommunityDetectionError('Max retry count ' +
str(max_retries) +
' exceeded')
time.sleep(poll_interval)
resp = None
try:
resp = requests.get(self._service_endpoint + '/' +
str(task_id) + '/status',
headers=self._get_user_agent_header(),
timeout=self._requests_timeout)
if resp.status_code != 200:
consecutive_err_cnt += 1
LOGGER.debug('Ran into some error: ' + str(resp.text))
continue
resp_json = resp.json()
if resp_json is None or 'progress' not in resp_json:
LOGGER.debug('progress not in JSON: ' + str(resp_json))
consecutive_err_cnt += 1
continue
consecutive_err_cnt = 0
progress = resp_json['progress']
LOGGER.debug('Progress is ' + str(progress))
except requests.exceptions.HTTPError as he:
LOGGER.debug('Received error from requests: ' + str(he))
consecutive_err_cnt += 1
continue
finally:
if resp is not None:
try:
resp.close()
except requests.exceptions.HTTPError as he:
LOGGER.debug('Caught HTTPError closing response : ' +
str(he))
pass
if consecutive_err_cnt > 0:
raise CommunityDetectionError('Received ' +
str(consecutive_err_cnt) +
' consecutive errors')
return resp_json
[docs]
def get_result(self, task_id):
"""
Gets result from `CDAPS service <https://cdaps.readthedocs.io/>`__
:param task_id: Id of task
:type task_id: str
:return: Result from service
:rtype: dict
"""
if task_id is None or len(str(task_id).strip()) == 0:
raise CommunityDetectionError('Task id is empty string or None')
resp = None
try:
resp = requests.get(self._service_endpoint + '/' +
str(task_id),
headers=self._get_user_agent_header(),
timeout=self._requests_timeout)
if resp.status_code != 200:
raise CommunityDetectionError('Received ' + str(resp.status_code) +
' HTTP response status code : ' +
str(resp.text))
return resp.json()
except requests.exceptions.HTTPError as he:
raise CommunityDetectionError('Received HTTPError getting result'
' for task: ' + task_id + ' : ' +
str(he))
finally:
if resp is not None:
try:
resp.close()
except requests.exceptions.HTTPError as he:
LOGGER.debug('Caught HTTPError closing response : ' +
str(he))
pass
[docs]
def get_algorithms(self):
"""
Queries `CDAPS service <https://cdaps.readthedocs.io/>`__ for list
of available algorithms
Example result (only showing one algorithm):
.. code-block:: python
{ "algorithms":
{
"name": "hidef",
"displayName": "HiDeF",
"description": "...",
"version": "0.2.2",
"dockerImage": "coleslawndex/cdhidef:0.2.2",
"inputDataFormat": "EDGELISTV2",
"outputDataFormat": "COMMUNITYDETECTRESULTV2",
"customParameters": [
{
"name": "--maxres",
"displayName": "Maximum resolution parameter",
"description": "Maximum resolution parameter. Increase to get more smaller communities",
"type": "value",
"defaultValue": "50.0",
"validationType": "number",
"validationHelp": "Should be a number",
"validationRegex": null,
"minValue": null,
"maxValue": null
},
{
"name": "--alg",
"displayName": "Algorithm",
"description": "Choose to use Louvain or newer Leiden algorithm",
"type": "value",
"defaultValue": "louvain",
"validationType": "string",
"validationHelp": "Must be set to louvain or leiden",
"validationRegex": "louvain|leiden",
"minValue": null,
"maxValue": null
}
]
}
}
:raises CommunityDetectionError: If there is an error
:return: Algorithms available from service in example format shown above
:rtype: dict
"""
resp = None
try:
resp = requests.get(self._service_endpoint + '/algorithms',
headers=self._get_user_agent_header(),
timeout=self._requests_timeout)
if resp.status_code != 200:
raise CommunityDetectionError('Received ' +
str(resp.status_code) +
' HTTP response status code : ' +
str(resp.text))
return resp.json()
except json.JSONDecodeError as je:
raise CommunityDetectionError('Error result not in JSON '
'format : ' + str(je))
except requests.exceptions.HTTPError as he:
raise CommunityDetectionError('Received HTTPError getting '
'algorithms : ' +
str(he))
finally:
if resp is not None:
try:
resp.close()
except requests.exceptions.HTTPError as he:
LOGGER.debug('Caught HTTPError closing response : ' +
str(he))
pass
[docs]
class DockerRunner(Runner):
"""
:py:class:`Runner` that runs CDAPS Service Docker containers
locally via `Docker <https://docker.com>`__
:param binary_path: Full path to Docker command
:type binary_path: str
:param processwrapper: Object to run external process
:type processwrapper: :py:class:`ProcessWrapper`
"""
def __init__(self, binary_path='docker',
processwrapper=ProcessWrapper()):
"""
Constructor
"""
super().__init__()
self._dockerpath = binary_path
self._procwrapper = processwrapper
[docs]
def run(self, net_cx=None, algorithm=None, arguments=None,
temp_dir=None):
"""
Runs `Docker <https://docker.com>`__ command returning a tuple
with error code, standard out and standard error
:param net_cx: Network to use as input
:type net_cx: :py:class:`ndex2.nice_cx_network.NiceCXNetwork`
:param algorithm: docker image to run
:type algorithm: str
:param arguments: Any custom parameters for algorithm. The
parameters should all be of type :py:class:`str`
If custom parameter is just a flag set
value to ``None``
Example: ``{'--flag': None, '--cutoff': '0.2'}``
:type arguments: dict
:param temp_dir: temporary directory where docker can be run
this should be a directory that docker can
access when `-v X:X` flag is added to docker
command
:type temp_dir: str
:raises CommunityDetectionError: If there is an error in running job
outside of non-zero exit code from
command
:return: (return code, stdout from subprocess, stderr from subprocess)
:rtype: tuple
"""
if algorithm is None:
raise CommunityDetectionError('Algorithm is None')
edgelist = self._write_edge_list(net_cx, tempdir=temp_dir)
full_args = [self._dockerpath, 'run',
'--rm', '-v',
temp_dir + ':' +
temp_dir,
algorithm,
edgelist]
self.set_docker_image(algorithm)
self.set_algorithm_name(algorithm)
if arguments is not None:
for key in arguments:
full_args.append(key)
if arguments[key] is not None:
full_args.append(str(arguments[key]))
start_time = _cur_time_in_seconds()
try:
return self._procwrapper.run(full_args)
finally:
LOGGER.debug('Running ' +
' '.join(full_args) +
' took ' +
str(_cur_time_in_seconds() - start_time) +
' seconds')
[docs]
class ExternalResultsRunner(Runner):
"""
:py:class:`Runner` returns an already generated result set
via `algorithm` parameter in :py:func:`~ExternalResultsRunner.run` method.
This allows results generated externally to be processed.
"""
def __init__(self):
"""
Constructor
"""
super().__init__()
[docs]
def run(self, net_cx=None, algorithm=None, arguments=None,
temp_dir=None):
"""
Assumes `algorithm` contains path to file with result of invocation
of algorithm. This allows for externally run algorithms to be
loaded into this library.
A tuple is returned with return code,result and standard error
of result.
In this implementation the return code is always ``0`` unless the
contents of the file is in format matching result from
CDAPS REST Service in which case a ``1`` returncode may be sent if
the status was **NOT** ``complete``. Any data in ``message`` field
will be set in the standard error of result.
:param net_cx: Ignored
:type net_cx: :py:class:`ndex2.nice_cx_network.NiceCXNetwork`
:param algorithm: Path to file with results
:type algorithm: str
:param arguments: Ignored
:type arguments: dict
:param temp_dir: Ignored
:type temp_dir: str
:raises CommunityDetectionError: If there is an error in running job
outside of non-zero exit code from
command
:return: (return code, result, error message)
:rtype: tuple
"""
if algorithm is None:
raise CommunityDetectionError('Algorithm is None')
if not os.path.isfile(algorithm):
raise CommunityDetectionError(str(algorithm) + ' is not a file')
e_code = 0
err = None
try:
with open(algorithm, 'r') as f:
result = f.read()
try:
if result.lstrip().startswith('{'):
json_res = json.loads(result)
if 'status' in result:
if str(json_res['status']) != 'complete':
e_code = 1
if 'message' in result:
err = json_res['message']
except json.JSONDecodeError:
pass
self.set_docker_image('Loaded from file: ' + algorithm)
self.set_algorithm_name(os.path.basename(algorithm))
return e_code, result, err
except OSError as oe:
raise CommunityDetectionError('Caught OSError : ' + str(oe))