"""
.. See the NOTICE file distributed with this work for additional information
regarding copyright ownership.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import h5py
import numpy as np
from dmp import dmp
from dm_generator.GenerateSampleAdjacency import GenerateSampleAdjacency
[docs]class adjacency(object): # pylint: disable=invalid-name
"""
Class related to handling the functions for interacting directly with the
HDF5 files. All required information should be passed to this class.
"""
def __init__(self, user_id, file_id, resolution=None, cnf_loc=''):
"""
Initialise the module and generate sample data if required
Parameters
----------
user_id : str
Identifier to uniquely locate the users files. Can be set to
"common" if the files can be shared between users
file_id : str
Location of the file in the file system
resolution : int (Optional)
Level of resolution. This is optional, but only the functions
get_resolutions() and set_resolutions() can be called. Once the
resolution has been set then all functions are callable.
"""
self.file_id = file_id # file_id required later on for URL generation
if user_id == 'test':
resource_path = os.path.join(
os.path.dirname(__file__),
"../tests/data/sample_adjacency.hdf5"
)
if os.path.isfile(resource_path) is False:
gsa = GenerateSampleAdjacency()
gsa.main()
self.hdf5_handle = h5py.File(resource_path, "r")
else:
dm_handle = dmp(cnf_loc)
file_obj = dm_handle.get_file_by_id(user_id, file_id)
self.hdf5_handle = h5py.File(file_obj["file_path"], "r")
self.resolutions = [int(i) for i in self.hdf5_handle.keys()]
if resolution is None:
self.dset = self.hdf5_handle[str(self.resolutions[0])]
self.resolution = self.resolutions[0]
else:
self.dset = self.hdf5_handle[str(resolution)]
self.resolution = resolution
self.chr_param = self._calculate_chr_param(self.resolutions, self.dset.attrs["chromosomes"])
[docs] def close(self):
"""
Close the HDF5 data file handle
"""
self.hdf5_handle.close()
[docs] def get_resolutions(self):
"""
List resolutions that models have been generated for
Returns
-------
list : str
Available levels of resolution that can be set
"""
return [res for res in self.hdf5_handle]
[docs] def set_resolution(self, resolution):
"""
Set, or change, the resolution level
Parameters
----------
resolution : int
Level of resolution
"""
self.resolution = resolution
self.dset = self.hdf5_handle[str(self.resolution)]
chromosomes = self.dset.attrs['chromosomes']
self.chr_param = self._calculate_chr_param(self.resolution, chromosomes)
[docs] def get_details(self):
"""
Return a list of the available resolutions in a given HDF5 file
"""
return {
"chromosomes": [list(c) for c in self.dset.attrs["chromosomes"]],
"chr_param": self.chr_param,
"resolutions": self.resolutions}
[docs] def get_resolution(self):
"""
List the current level of rseolution
Returns
-------
resolution : int
Current level of resolution
"""
return self.resolution
[docs] def get_chromosomes(self):
"""
List of chromosomes that have models at a given resolution
Returns
-------
chromosomes : list
List of chromosomes at the set resolution
"""
if self.resolution is None:
return {}
return [list(c) for c in self.dset.attrs['chromosomes']]
[docs] def get_chromosome_parameters(self):
"""
Return a list of the available resolutions in a given HDF5 file
Returns
-------
dict
chromosomes : list
chr_param : dict
resolitions
Example
-------
.. code-block:: python
:linenos:
from reader import adjacency
r = adjacency('test', '', 10000)
value = r.get_chromosome_parameters()
"""
return self.chr_param
[docs] def get_range(
self, chr_id, start, end,
limit_chr=None, limit_start=None, limit_end=None,
value_url='/api/getValue', no_links=None):
"""
Get the interactions that happen within a defined region on a specific
chromosome. Returns inter and intra interactions with the defined
region.
Parameters
----------
chr_id : str
Chromosomal name
start : int
Start position within the chromosome
end : int
End position within the chromosome
limit_chr : str (Optional)
Limit the results to a particular chromosome
limit_start : int (Optional)
Limit the range start position on the limit_chr paramter
limit_end : int (Optional)
Limit the range end position on the limit_chr parameter
value_url : str (Optional)
Define a custom URL snippet for the location of the file if different
from the defaul
no_links : bool (Optional)
Will return the URL links to the individual points within the
adjacency matrix. In cases where this generates a large number of
points it is possible to turn off generating these links. Set this
value to 1.
Returns
-------
dict
log : list
List of messages about the state for debugging
results : list
List of values for given positions within the adjacency matrix
Example
-------
.. code-block:: python
:linenos:
from reader import adjacency
r = adjacency('test', '', 10000)
value = r.get_range(2000000, 1000000)
"""
# Defines columns to get extracted from the array
x_pos = int(np.floor(float(start) / float(self.resolution)))
y_pos = int(np.ceil(float(end) / float(self.resolution)))
# xy_offset for the chromosome in the super array
print(self.chr_param.keys())
xy_offset = self.chr_param[chr_id]["bins"][self.resolution][1]
dset = self.hdf5_handle[str(self.resolution)]
start2 = 0
end2 = 0
if limit_chr is not None:
if limit_start is not None and limit_end is not None:
start2 = int(np.floor(float(limit_start) / float(self.resolution)))
end2 = int(np.ceil(float(limit_end) / float(self.resolution)))
xy2_offset = self.chr_param[limit_chr]["bins"][self.resolution][1]
result = dset[
(x_pos + xy_offset):(y_pos + xy_offset),
(start2 + xy2_offset):(end2 + xy2_offset)
]
else:
start2 = self.chr_param[limit_chr]["bins"][self.resolution][1]
end2 = start2 + self.chr_param[limit_chr]["bins"][self.resolution][0]
result = dset[
(x_pos + xy_offset):(y_pos + xy_offset),
start2:end2
]
else:
result = dset[(x_pos + xy_offset):(y_pos + xy_offset), :]
# Iterate through slice and extract results greater than zero
results = []
log_text = []
r_index = np.transpose(np.nonzero(result))
log_text.append(
{
"coord": {
"x0": (x_pos + xy_offset),
"x1": (y_pos + xy_offset)
},
"r_index": len(r_index),
"param": {
"start": start,
"x": x_pos,
"end": end,
"y": y_pos,
"xy_offset": xy_offset,
"resolution": self.resolution,
"chr_id": chr_id,
"startB": start2,
"endB": end2,
"limit_chr": limit_chr
},
'chr_param': self.chr_param
}
)
for i in r_index:
x_start = ((i[0] + x_pos) * int(self.resolution))
y_chr = self.get_chromosome_from_array_index(i[1] + start2)
if limit_chr is not None:
y_start = (i[1] + start2) * int(self.resolution)
else:
y_start = (
i[1] - self.chr_param[y_chr]["bins"][self.resolution][1]
) * int(self.resolution)
entry = {
"chrA": chr_id,
"startA": x_start,
"chrB": y_chr,
"startB": y_start,
"value": int(result[i[0], i[1]]),
"pos_x": i[0] + x_pos + xy_offset,
"pos_y": i[1]
}
if no_links is None:
url = "{url_root}?file_id={fid}&res={res}&pos_x={x}&pos_y={y}"
entry['_links'] = {
'self': url.format(
url_root=value_url, fid=str(self.file_id), res=str(self.resolution),
x=str(i[0] + x_pos + xy_offset), y=str(i[1])
)
}
results.append(entry)
return {"log": log_text, "results": results}
[docs] def get_value(self, bin_i, bin_j):
"""
Get a specific value for a given dataset, resolution
Parameters
----------
bin_i : int
Array position in the first dimension
bin_j : int
Array position in the second dimension
Returns
-------
value : int
Value for a given cell in the adjacency array
Example
-------
.. code-block:: python
:linenos:
from reader import adjacency
r = adjacency('test', '', 10000)
value = r.get_value(2000000, 1000000)
"""
value = self.dset[int(bin_i), int(bin_j)]
return value
def _calculate_chr_param(self, bin_sizes, chromosomes):
"""
Load the self.chr_param object with the required information about the
start and stop positions of the chromosomes and bins.
"""
chr_param = {}
genome_len = 0
bin_count = [0] * len(bin_sizes)
chromosome_count = len(chromosomes)
for i in range(chromosome_count):
chromosome = chromosomes[i]
genome_len += int(chromosome[1])
# Calculate the number of bins for a chromosome and then join with
# the offset values for the start in the array
bin_s = [int(np.ceil(int(chromosome[1]) / float(y))) for y in bin_sizes]
size_list = []
for j, bcv in enumerate(bin_count):
size_list.append([bin_s[j], bcv, bin_s[j] + bcv])
bin_c = dict(
zip(
bin_sizes,
size_list
)
)
chr_param[chromosome[0].decode('utf-8')] = {
'size': [int(chromosome[1]), genome_len],
'bins': bin_c
}
# Calculate the new offset values.
bin_count = [bin_count[i] + bin_s[i] for i in range(len(bin_count))]
total_bin_size_list = []
for i, tbcv in enumerate(bin_count):
total_bin_size_list.append([bin_s[i], tbcv])
total_bin_count = dict(
zip(
bin_sizes,
total_bin_size_list
)
)
chr_param["meta"] = {"genomeSize": genome_len, "totalBinCount": total_bin_count}
return chr_param
[docs] def get_chromosome_from_array_index(self, index):
"""
Identify the chromosome based on either the x or y coordinate in the
array.
Parameters
----------
index : int
Location within the array
Returns
-------
chr_id : str
Identity of the chromosome
Example
-------
.. code-block:: python
:linenos:
from reader import adjacency
r = adjacency('test', '', 10000)
cid = r.get_chromosome_from_array_index(1234567890)
"""
for chr_id in self.chr_param.keys():
if chr_id == "meta":
continue
# print(self.chr_param[chr_id]["bins"], type(self.chr_param[chr_id]["bins"]))
chr_end = self.chr_param[chr_id]["bins"][int(self.resolution)][2]
chr_start = self.chr_param[chr_id]["bins"][int(self.resolution)][1]
if index >= chr_start and index <= chr_end:
return chr_id