Source code for pyworkflow.webservices.workflowhub

import json
from urllib.request import Request, urlopen
from pyworkflow.template import Template


[docs]class WHTemplate(Template): def __init__(self, source, name, description, url): super().__init__(source, name, description) self.url = url
[docs] def loadContent(self): """ Download the file pointer by url and read the content""" return make_request(self.url, asJson=False)
[docs]def make_request(url, asJson=True): """ Makes a request to the url and returns the json as a dictionary""" req = Request(url) req.add_header("accept", "application/json") with urlopen(req, timeout=1) as response: if asJson: data = json.load(response) else: html_response = response.read() encoding = response.headers.get_content_charset('utf-8') data = html_response.decode(encoding) return data
[docs]def get_workflow_file_url(workflow_id, version): root_url = "https://workflowhub.eu/workflows/%s/git/%s/" % (workflow_id, version) url = root_url + 'tree' result = make_request(url) for file in result["tree"]: path = (file["path"]) if path.endswith(".json.template"): return root_url + 'raw/' + path
[docs]def get_wh_templates(template_id=None, organization="Scipion%20CNB"): """ Returns a list of scipion templates available in workflow hub""" url = "https://workflowhub.eu/ga4gh/trs/v2/tools?organization=%s" % organization response = make_request(url) template_list = [] for workflow in response: workflow_id = workflow["id"] name = workflow["name"] description = workflow['description'] version = workflow["versions"][-1] version_id = version["id"] template_url = get_workflow_file_url(workflow_id, version_id) new_template = WHTemplate("Workflow hub", name, description, template_url) if template_id is None or new_template.getObjId() == template_id: template_list.append(new_template) return template_list
if __name__ == "__main__": templates = get_wh_templates() for template in templates: print(template)