"""
.. See the NOTICE file distributed with this work for additional information
regarding copyright ownership.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import json
import h5py
import numpy as np
from dmp import dmp
from dm_generator.GenerateSampleCoords import GenerateSampleCoords
[docs]class coord(object): # pylint: disable=invalid-name,too-many-instance-attributes
"""
Class related to handling the functions for interacting directly with the
HDF5 files. All required information should be passed to this class.
"""
def __init__(self, user_id, file_id, resolution=None, cnf_loc=''):
"""
Initialise the module and set the required base parameters
Parameters
----------
user_id : str
Identifier to uniquely locate the users files. Can be set to
"common" if the files can be shared between users
file_id : str
Location of the file in the file system
resolution : int (Optional)
Level of resolution. This is optional, but only the functions
get_resolutions() and set_resolutions() can be called. Once the
resolution has been set then all functions are callable.
"""
self.file_handle = None
# Open the hdf5 file
if user_id == 'test':
resource_path = os.path.join(
os.path.dirname(__file__),
"../tests/data/sample_coords.hdf5"
)
if os.path.isfile(resource_path) is False:
gsa = GenerateSampleCoords()
gsa.main()
self.file_handle = h5py.File(resource_path, 'r')
else:
dm_handle = dmp(cnf_loc)
file_obj = dm_handle.get_file_by_id(user_id, file_id)
self.file_handle = h5py.File(file_obj['file_path'], 'r')
self.resolution = resolution
if self.resolution is not None:
self.grp = self.file_handle[str(self.resolution)]
self.meta = self.grp['meta']
self.mpgrp = self.meta['model_params']
self.clusters = self.meta['clusters']
self.centroids = self.meta['centroids']
dset = self.grp['data']
if 'dependencies' in dset.attrs:
self.dependencies = json.loads(dset.attrs['dependencies'])
else:
self.dependencies = []
if 'TADbit_meta' in dset.attrs:
self.meta_data = json.loads(dset.attrs['TADbit_meta'])
else:
self.meta_data = {}
if 'hic_data' in dset.attrs:
self.hic_data = json.loads(dset.attrs['hic_data'])
else:
self.hic_data = {}
if 'restraints' in dset.attrs:
self.restraints = json.loads(dset.attrs['restraints'])
else:
self.restraints = {}
[docs] def close(self):
"""
Tidy function to close file handles
"""
self.file_handle.close()
[docs] def get_resolutions(self):
"""
List resolutions that models have been generated for
Returns
-------
list : str
Available levels of resolution that can be set
"""
return [res for res in self.file_handle]
[docs] def set_resolution(self, resolution):
"""
Set, or change, the resolution level
Parameters
----------
resolution : int
Level of resolution
"""
self.resolution = int(resolution)
self.grp = self.file_handle[str(resolution)]
self.meta = self.grp['meta']
self.mpgrp = self.meta['model_params']
self.clusters = self.meta['clusters']
self.centroids = self.meta['centroids']
dset = self.grp['data']
if 'dependencies' in dset.attrs:
self.dependencies = json.loads(dset.attrs['dependencies'])
else:
self.dependencies = []
if 'TADbit_meta' in dset.attrs:
self.meta_data = json.loads(dset.attrs['TADbit_meta'])
else:
self.meta_data = {}
if 'hic_data' in dset.attrs:
self.hic_data = json.loads(dset.attrs['hic_data'])
else:
self.hic_data = {}
if 'restraints' in dset.attrs:
self.restraints = json.loads(dset.attrs['restraints'])
else:
self.restraints = {}
[docs] def get_resolution(self):
"""
List the current level of rseolution
Returns
-------
resolution : int
Current level of resolution
"""
return self.resolution
[docs] def get_region_order(self, chr_id=None, region=None):
"""
List the regions on a given chromosome ID or region ID in the order that
they are located on the chromosome
Parameters
----------
chr_id : str
Chromosome ID
region : str
Region ID
Returns
-------
list
region_id : str
List of the region IDs
"""
if region is not None:
chr_id = self.mpgrp[str(region)].attrs['chromosome']
regions = {}
for region_id in self.mpgrp:
if self.mpgrp[str(region_id)].attrs['chromosome'] == chr_id:
regions[region_id] = self.mpgrp[str(region_id)].attrs['start']
return sorted(regions, key=lambda k: regions[k])
[docs] def get_object_data(self, region_id):
"""
Prepare the object header data structure ready for printing
Parameters
----------
region_id : int
Region that is getting downloaded
Returns
-------
objectdata : dict
All headers and values required for the JSON output
"""
if self.resolution is None:
return {}
mpds = self.mpgrp[str(region_id)]
dset = self.grp['data']
return {
'title': dset.attrs['title'],
'experimentType': dset.attrs['experimentType'],
'species': dset.attrs['species'],
'project': dset.attrs['project'],
'identifier': dset.attrs['identifier'],
'assembly': dset.attrs['assembly'],
'cellType': dset.attrs['cellType'],
'resolution': dset.attrs['resolution'],
'datatype': dset.attrs['datatype'],
'components': dset.attrs['components'],
'source': dset.attrs['source'],
'chromEnd': [np.asscalar(mpds.attrs['end'])],
'end': np.asscalar(mpds.attrs['end']),
'chromStart': [np.asscalar(mpds.attrs['start'])],
'start': np.asscalar(mpds.attrs['start']),
'chrom': mpds.attrs['chromosome'],
'dependencies': self.dependencies,
'uuid': region_id,
}
[docs] def get_clusters(self, region_id):
"""
List all clusters of models
Returns
-------
clusters : list
List of models in each cluster
"""
if self.resolution is None:
return {}
# Need to loop through structure
clustersgrp = self.clusters[str(region_id)]
clusters = []
for i in range(len(clustersgrp)):
clusters.append([np.asscalar(x) for x in clustersgrp[str(i)][:]])
return clusters
[docs] def get_centroids(self, region_id):
"""
List the centroid models for each cluster
Returns
-------
centroids : list
List of the centroid models for each cluster
"""
if self.resolution is None:
return {}
centroids = [np.asscalar(x) for x in self.centroids[region_id]]
return centroids
[docs] def get_chromosomes(self):
"""
List of chromosomes that have models at a given resolution
Returns
-------
chromosomes : list
List of chromosomes at the set resolution
"""
if self.resolution is None:
return {}
return list(set([
self.mpgrp[region_id].attrs['chromosome'] for region_id in self.mpgrp.keys() # pylint: disable=line-too-long
]))
[docs] def get_regions(self, chr_id, start, end):
"""
List regions that are within a given range on a chromosome
Parameters
----------
chr_id : str
Chromosome ID
start : int
Start position
end : int
Stop position
Returns
-------
regions : list
List of region IDs whose parameters match those provided
"""
if self.resolution is None:
return {}
return [
region_id for region_id in self.mpgrp.keys() if self.mpgrp[region_id].attrs['start'] < end and self.mpgrp[region_id].attrs['end'] > start and self.mpgrp[region_id].attrs['chromosome'] == chr_id # pylint: disable=line-too-long
]
[docs] def get_models(self, region_id):
"""
List all models for a given region
Returns
-------
List
model_id : int
cluster_id : int
"""
if self.resolution is None:
return {}
model_param_ds = self.mpgrp[str(region_id)]
return model_param_ds[:, :]
[docs] def get_model(self, region_id, model_ids=None, page=0, mpp=10):
"""
Get the coordinates within a defined region on a specific chromosome.
If the model_id is not returned the the consensus models for that region
are returned
Parameters
----------
region_id : str
Region ID
model_ids : list
List of model IDs for the models that are required
page : int
Page number
mpp : int
Number of models per page (default: 10; max: 100)
Returns
-------
array : list
model : dict
metadata : dict
Relevant extra meta data added by TADbit
object : dict
Key value pair of information about the region
models : list
List of dictionaries for each model
clusters : list
List of models for each cluster
centroids : list
List of all centroid models
restraints : list
List of retraints for each position
hic_data : dict
Hi-C model data
metadata : dict
model_count : int
Count of the number of models for the defined region ID
page_count : int
Number of pages
"""
if self.resolution is None:
return {}
mpds = self.mpgrp[str(region_id)]
dset = self.grp['data']
if model_ids[0] == 'centroids':
model_ids = self.get_centroids(region_id)
if model_ids[0] == 'all':
model_ids = list(self.mpgrp[str(region_id)][:, 0])
model_count = len(model_ids)
if mpp > 100:
mpp = 100
model_count = len(model_ids)
page_count = np.ceil(float(model_count)/mpp)
model_ids.sort()
model_pages = [model_ids[i:i+mpp] for i in range(0, len(model_ids), mpp)]
models = []
model_ds = dset[mpds.attrs['i']:mpds.attrs['j'], :, :]
for mid in model_pages[page]:
model_loc = list(mpds[:, 0]).index(int(mid))
# length x model_loc x coords
# Using model_ds by pre-cutting then taking slices from that array
# is much quicker as the majority of the effort is in the initial
# slice. It is also slightly quicker for getting a single model
model = model_ds[:, model_loc, :]
models.append(
{
"ref": str(mid),
"data": list([str(x) for coords in model for x in coords])
}
)
object_data = self.get_object_data(region_id)
clusters = self.get_clusters(region_id)
centroids = self.get_centroids(str(region_id))
model_json = {
"metadata": self.meta_data,
"object": object_data,
"models": models,
"clusters": clusters,
"centroids": centroids,
"restraints": self.restraints,
"hic_data": self.hic_data,
}
model_meta = {
"model_count": model_count,
"page_count": int(page_count)
}
return (model_json, model_meta)