Source code for pkpd.utils

# **************************************************************************
# *
# * Authors:     Carlos Oscar Sorzano (info@kinestat.com)
# *
# * Kinestat Pharma
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'info@kinestat.com'
# *
# **************************************************************************
"""
PKPD functions
"""
import copy
try:
    from itertools import izip
except ImportError:
    izip = zip
import numpy as np
import math
from scipy.interpolate import InterpolatedUnivariateSpline, pchip_interpolate
import time
import hashlib
from os.path import (exists, splitext, getmtime)
from openpyxl.styles import Font, PatternFill
from openpyxl.utils.cell import get_column_letter

[docs]def parseRange(auxString): if auxString=="": return None elif auxString.startswith('['): auxString=auxString.replace('[','') auxString=auxString.replace(']','') tokens=auxString.split(',') auxArray = np.array(tokens, dtype='float') return auxArray.astype(np.float) elif ':' in auxString: tokens=auxString.split(':') if len(tokens)!=3: raise Exception("The X evaluation string is not well formatted: %s"%auxString) fromValue = float(tokens[0]) step= float(tokens[1]) toValue = float(tokens[2]) return np.arange(fromValue,toValue,step)
[docs]def find_nearest(array,value): idx = np.searchsorted(array, value, side="left") if idx > 0 and (idx >= len(array) or math.fabs(value - array[idx-1]) < math.fabs(value - array[idx])): return idx-1 else: return idx
[docs]def getMD5String(fn): fnTime = time.strftime("%Y-%m-%d-%M:%S\n",time.gmtime(getmtime(fn))) mhash = hashlib.md5() with open(fn, 'rb') as f: for chunk in iter(lambda: f.read(128 * mhash.block_size), b""): mhash.update(chunk) return mhash.hexdigest()
#return hashlib.md5(fnTime+open(fn, 'rb').read()).hexdigest().
[docs]def writeMD5(fn): if not exists(fn): return fnMD5=splitext(fn)[0]+".md5" fh=open(fnMD5,"w") fh.write("%s\n"%getMD5String(fn)) fh.write("%s\n"%fn) fh.close()
[docs]def verifyMD5(fn): return True # This allows projects copied with FTP to be used if fn is None or not exists(fn): return True if fn.endswith(".md5"): fnMD5=fn else: fnMD5=splitext(fn)[0]+".md5" if not exists(fnMD5): return False fh=open(fnMD5,"r") md5StringFile=fh.readline().strip() fnFile=fh.readline().strip() fh.close() if fn.endswith(".md5"): fn=fnFile return getMD5String(fn)==md5StringFile
[docs]def uniqueFloatValues(x,y, TOL=-1): xp=np.asarray(x,dtype=np.float64) yp=np.asarray(y,dtype=np.float64) idx = np.logical_not(np.logical_or(np.isnan(xp),np.isnan(yp))) xp=xp[idx] yp=yp[idx] if TOL<0: TOL = np.max(xp.flat) / (xp.size*1e3) A = np.zeros((xp.size,), dtype=[('x','f8'),('y','f8')]) A['x']=xp.flat A['y']=yp.flat idx = np.argsort(A,order=('x','y')) # A is constructed for solving ties d = np.append(1, np.diff(xp.flat[idx])) #for i in range(0,d.size): # print("i=",i,"xp[i]=",xp[i],"d[i]=",d[i],"yp[i]=",yp[i],"idx[i]",idx[i]) xunique = xp.flat[idx[d > TOL]] yunique = yp.flat[idx[d > TOL]] return xunique,yunique
[docs]def uniqueFloatValues2(x1,x2,y): x1p=np.asarray(x1,dtype=np.float64) x2p=np.asarray(x2,dtype=np.float64) yp=np.asarray(y,dtype=np.float64) TOL1 = np.max(x1p.flat) / 1e5 idx1 = np.argsort(x1p.flat) d1 = np.append(True, np.diff(x1p.flat[idx1])) TOL2 = np.max(x2p.flat) / 1e5 idx2 = np.argsort(x1p.flat) d2 = np.append(True, np.diff(x2p.flat[idx2])) if np.sum(d1>TOL1)>np.sum(d2>TOL2): takeIdx=idx1[d1>TOL1] else: takeIdx=idx2[d2>TOL2] x1unique = x1p.flat[takeIdx] x2unique = x2p.flat[takeIdx] yunique = yp.flat[takeIdx] return x1unique,x2unique,yunique
[docs]def twoWayUniqueFloatValues(x,y): y1,x1=uniqueFloatValues(y,x) x2,y2=uniqueFloatValues(x1,y1) return x2,y2
[docs]def calculateAUC0t(t, C): # Make sure that the (0,0) sample is present AUC0t = np.zeros(t.shape[0]) for idx in range(1,t.shape[0]): dt = (t[idx]-t[idx-1]) if C[idx]>=C[idx-1]: # Trapezoidal in the raise AUC0t[idx] = AUC0t[idx-1]+0.5*dt*(C[idx-1]+C[idx]) else: # Log-trapezoidal in the decay if C[idx-1]>0 and C[idx]>0: decrement = C[idx-1]/C[idx] K = math.log(decrement) # B = K/dt AUC0t[idx] = AUC0t[idx-1]+dt*(C[idx-1]-C[idx])/K else: AUC0t[idx] = AUC0t[idx-1] return AUC0t
[docs]def upper_tri_masking(A): # Extract the upper triangular matrix without the diagonal r = np.arange(A.shape[0]) c = np.arange(A.shape[1]) mask = r[:,None] < r return A[mask]
[docs]def parseOperation(operation): varList = [] idx0 = operation.find("$(") while idx0 >= 0: idxF = operation.find(")", idx0) varName = operation[(idx0 + 2):idxF] if not varName in varList: varList.append(varName) idx0 = operation.find("$(", idxF + 1) coeffList = [] idx0 = operation.find("$[") while idx0 >= 0: idxF = operation.find("]", idx0) varName = operation[(idx0 + 2):idxF] if not varName in varList: coeffList.append(varName) idx0 = operation.find("$[", idxF + 1) parsedOperation = copy.copy(operation) ldict={} for varName in varList: exec ("parsedOperation=parsedOperation.replace('$(%s)','%s')" % (varName, varName), locals(), ldict) for varName in coeffList: exec ("parsedOperation=parsedOperation.replace('$[%s]','%s')" % (varName, varName), locals(), ldict) if 'parsedOperation' in ldict: parsedOperation=ldict['parsedOperation'] return parsedOperation, varList, coeffList
[docs]def excelWriteRow(msgList, workbook, row, col=1, sheetName="", bold=False): currentCol=col if sheetName!="": if not sheetName in workbook.sheetnames: workbook.create_sheet(sheetName) sheet=workbook[sheetName] else: sheet=workbook.active if type(msgList)!=list: msgList2=[msgList] else: msgList2=msgList for msg in msgList2: c = sheet.cell(row=row, column=currentCol) try: if isinstance(msg,bool): c.value = str(msg) else: c.value = msg if bold: c.font = Font(bold=True) except: print("Cannot convert the cell row=%d, col=%d"%(row,col),msg) currentCol+=1
[docs]def excelFillCells(workbook, row, col0=1, colF=10, sheetName="", fillColor="54B948"): if sheetName!="": if not sheetName in workbook.sheetnames: workbook.create_sheet(sheetName) sheet=workbook[sheetName] else: sheet=workbook.active for col in range(col0,colF+1): c = sheet.cell(row=row, column=col) c.fill = PatternFill(bgColor=fillColor, fill_type="solid")
[docs]def as_text(value): if value is None: return "" return str(value)
[docs]def excelAdjustColumnWidths(workbook, sheetName=""): if sheetName!="": if not sheetName in workbook.sheetnames: workbook.create_sheet(sheetName) sheet=workbook[sheetName] else: sheet=workbook.active for column_cells in sheet.columns: length = max(len(as_text(cell.value)) for cell in column_cells) sheet.column_dimensions[get_column_letter(column_cells[0].column)].width = length
[docs]def computeXYmean(XYlist, Nxsteps=300, common=False): xmin = None xmax = None for x,_ in XYlist: if xmin is None: xmin=np.min(x) xmax=np.max(x) else: if common: xmin = max(xmin, np.min(x)) xmax = min(xmax, np.max(x)) else: xmin = min(xmin, np.min(x)) xmax = max(xmax, np.max(x)) dataDict = {} # key will be x values xrange = np.arange(xmin, xmax, (xmax - xmin) / Nxsteps) for xValues, yValues in XYlist: xValuesUnique, yValuesUnique = twoWayUniqueFloatValues(xValues, yValues) xUniqueMin=np.min(xValuesUnique) xUniqueMax=np.max(xValuesUnique) B = InterpolatedUnivariateSpline(xValuesUnique, yValuesUnique, k=1) xrangei=xrange[np.logical_and(xrange>=xUniqueMin,xrange<=xUniqueMax)] yrange = B(xrangei) for x, y in izip(xrange, yrange): if x in dataDict: dataDict[x].append(y) else: dataDict[x] = [y] sortedX = sorted(dataDict.keys()) # We will store five values (min, 25%, 50%, 75%, max) # for each of the time entries computed # percentileList = [0, 25, 50, 75, 100] # Y = np.zeros((len(sortedX), 5)) # for i, t in enumerate(sortedX): # Y[i, :] = np.percentile(dataDict[t], percentileList) Y = np.zeros(len(sortedX)) for i, t in enumerate(sortedX): Y[i] = np.mean(dataDict[t]) return sortedX, Y
[docs]def smoothPchip(x,y): d=np.insert(np.diff(y),0,0) d[d<0]=0 ypos=np.cumsum(d) ypos=ypos*np.sum(y)/np.sum(ypos) return ypos
#xunique, yunique = uniqueFloatValues(x,ypos) #yInterpolated = pchip_interpolate(xunique,yunique,x) #return yInterpolated
[docs]def int_dx(x, y): # Integral over size space (natural grid) dx = np.diff(x) return np.sum(np.multiply(y,dx))
[docs]def int_dx1dx2(x1, x2, y): # Integral over size space (natural grid) dx1 = np.diff(x1) dx2 = np.diff(x2) return np.sum(np.multiply(np.sum(np.multiply(y,dx2),axis=1),dx1))