Source code for pkpd.inhalation

# **************************************************************************
# *
# * Authors:     Carlos Oscar Sorzano (info@kinestat.com)
# *
# * Kinestat Pharma
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'info@kinestat.com'
# *
# **************************************************************************
"""
Classes for inhalation PK simulations.

See Hartung and Borghardt. A mechanistic framework for a priori pharmacokinetic
predictions of orally inhaled drugs. PLOS Computational Biology, 16: e1008466 (2020)
"""
import math
import numpy as np
import scipy.linalg
from scipy.interpolate import interp1d, interp2d
from pwem.objects import EMObject
from pyworkflow.object import String, Integer
from .utils import int_dx, int_dx1dx2

[docs]def diam2vol(diameters): # Hartung2020_MATLAB/functions/diam2vol.m # Convert diameter to volume, assuming spherical shape # Units: [um] for diameters, [cm^3] for V return math.pi / 6 * np.power(1e-4 * diameters, 3)
[docs]class PKPhysiologyLungParameters(EMObject): def __init__(self, **args): EMObject.__init__(self, **args) self.fnPhys = String() self.multiplier = [1] * 9
[docs] def write(self, fnOut): fh=open(fnOut,"w") fh.write("%f # cardiac output [mL/min]\n"%self.Qco) fh.write("%f # lung tissue weight, without blood [g]\n"%self.OWlun) fh.write("%f # alveolar fraction of cardiac output\n"%self.falvCO) fh.write("%f # alveolar ELF volume [mL]\n"%self.Velf_alv) # ELF = Epithelial Lining Fluid fh.write("%f # alveolar surface area [cm2]\n"%self.Surf_alv) fh.write("%f # bronchial fraction of cardiac output\n"%self.fbrCO) fh.write("%f # bronchial fraction of lung tissue volume\n"%self.fbrVlun) fh.write("%f # bronchial ELF heights for interpolation, trachea [cm]\n" % self.helf_trach) fh.write("%f # bronchial ELF heights for interpolation, terminal bronchioles [cm]\n" % self.helf_termbr) fh.write("%s # trachea length [cm]\n" % self.tracheaLength) fh.write("%s # trachea diameter [cm]\n" % self.tracheaDiameter) fh.write("%s # main bronchi length [cm]\n" % self.bronchi1Length) fh.write("%s # main bronchi diameter [cm]\n" % self.bronchi1Diameter) fh.write("%s # bronchi length [cm]\n" % self.bronchi2Length) fh.write("%s # bronchi diameter [cm]\n" % self.bronchi2Diameter) fh.write("%s # bronchiole length [cm]\n" % self.bronchi3Length) fh.write("%s # bronchiole diameter [cm]\n" % self.bronchi3Diameter) fh.write("%s # terminal bronchiole length [cm]\n" % self.bronchi4Length) fh.write("%s # terminal bronchiole diameter [cm]\n" % self.bronchi4Diameter) fh.close() self.fnPhys.set(fnOut)
[docs] def read(self, fnIn): fh=open(fnIn) self.Qco=float(fh.readline().split()[0]) self.OWlun=float(fh.readline().split()[0]) self.falvCO=float(fh.readline().split()[0]) self.Velf_alv=float(fh.readline().split()[0]) self.Surf_alv=float(fh.readline().split()[0]) self.fbrCO=float(fh.readline().split()[0]) self.fbrVlun=float(fh.readline().split()[0]) self.helf_trach=float(fh.readline().split()[0]) self.helf_termbr=float(fh.readline().split()[0]) self.tracheaLength=fh.readline().split()[0] self.tracheaDiameter=fh.readline().split()[0] self.bronchi1Length=fh.readline().split()[0] self.bronchi1Diameter=fh.readline().split()[0] self.bronchi2Length=fh.readline().split()[0] self.bronchi2Diameter=fh.readline().split()[0] self.bronchi3Length=fh.readline().split()[0] self.bronchi3Diameter=fh.readline().split()[0] self.bronchi4Length=fh.readline().split()[0] self.bronchi4Diameter=fh.readline().split()[0] fh.close() self.fnPhys.set(fnIn)
[docs] def getSystemic(self): # Hartung2020_MATLAB/physiol_subst/physiology_systemic.m data = {} data['Qco']=self.Qco data['OWlung']=self.OWlun return data
[docs] def getBronchial(self): # Hartung2020_MATLAB/physiol_subst/physiology_bronchial.m def listSplit(valuesStr): return [float(x) for x in valuesStr.split(',')] tracheaLength = float(self.tracheaLength) tracheaDiameter = float(self.tracheaDiameter) bronchi1Length = float(self.bronchi1Length) bronchi1Diameter = float(self.bronchi1Diameter) bronchi2Length = listSplit(self.bronchi2Length) bronchi2Diameter = listSplit(self.bronchi2Diameter) bronchi3Length = listSplit(self.bronchi3Length) bronchi3Diameter = listSplit(self.bronchi3Diameter) bronchi4Length = float(self.bronchi4Length) bronchi4Diameter = float(self.bronchi4Diameter) segmentLengths = [tracheaLength, bronchi1Length] +bronchi2Length +bronchi3Length +[bronchi4Length] segmentDiameters = [tracheaDiameter, bronchi1Diameter]+bronchi2Diameter+bronchi3Diameter+[bronchi4Diameter] segmentType = ['T','B1'] + ['B2']*len(bronchi2Length) + ['B3']*len(bronchi3Length) + ['B4'] length_cm = np.asarray(segmentLengths) diam_cm = np.asarray(segmentDiameters) generation = np.arange(0,len(segmentLengths))+1 number = np.power(2.0,generation-1) xArea_cm2 = math.pi*np.multiply(np.power(diam_cm/2,2.0),number) # cm2 vol_cm3 = np.multiply(xArea_cm2,length_cm) # cm3 start_cm = np.insert(np.cumsum(segmentLengths[:-1]),0,0,axis=0) end_cm = np.cumsum(segmentLengths) pos = 0.5*(start_cm+end_cm) surf_cm2 = np.multiply(np.multiply(length_cm,(math.pi * diam_cm)),number) h_elf_cm = np.interp(pos, [0, np.sum(length_cm)], [self.helf_trach, self.helf_termbr]) elf_cm3 = math.pi/4 * (np.power(diam_cm,2)-np.power((diam_cm-2*h_elf_cm),2)) elf_cm3 = np.multiply(np.multiply(elf_cm3,length_cm),number) Vtis_ctr = self.OWlun*self.fbrVlun; # central lung tissue volume voltis_cm3 = elf_cm3 * Vtis_ctr / sum(elf_cm3); data = {} data['generation']=generation data['type']=segmentType data['number']=number data['length_cm']=length_cm data['diam_cm']=diam_cm data['xArea_cm2']=xArea_cm2 data['vol_cm3']=vol_cm3 data['start_cm']=start_cm data['end_cm']=end_cm data['pos']=pos data['surf_cm2']=surf_cm2 * self.multiplier[5] data['h_elf_trach'] = self.helf_trach data['h_elf_termbr'] = self.helf_termbr data['h_elf_cm'] = h_elf_cm data['elf_cm3'] = elf_cm3 * self.multiplier[1] data['voltis_cm3'] = voltis_cm3 * self.multiplier[3] data['fVol'] = self.fbrVlun data['fQco'] = self.fbrCO * self.multiplier[7] return data
[docs] def getAlveolar(self): # Hartung2020_MATLAB/physiol_subst/physiology_alveolar.m data = {} data['fQco']=self.falvCO * self.multiplier[8] data['fVol']=1-self.fbrVlun data['ELF_cm3'] = self.Velf_alv * self.multiplier[2] data['Vol_cm3'] = self.OWlun * data['fVol'] * self.multiplier[4] # density = 1 [g/cm^3] data['Surf_cm2'] = self.Surf_alv * self.multiplier[6] return data
[docs]class PKSubstanceLungParameters(EMObject): def __init__(self, **args): EMObject.__init__(self, **args) self.fnSubst = String() self.multiplier = [1] * 8
[docs] def write(self, fnOut): fh=open(fnOut,"w") fh.write("%s # name\n"%self.name) fh.write("%.60g # maximum dissolution rate in alveolar space in units [nmol/(cm*min)]\n"%self.kdiss_alv) fh.write("%.60g # maximum dissolution rate in conducting airways in units [nmol/(cm*min)]\n"%self.kdiss_br) fh.write("%.60g # steady-state permeability in alveolar space in [cm/min]\n"%self.kp_alv) fh.write("%.60g # steady-state permeability in conducting airways in [cm/min]\n"%self.kp_br) fh.write("%.60g # solubility in alveolar space in [nmol/cm3]=[uM]\n" % self.Cs_alv) fh.write("%.60g # solubility in conducting airways in [nmol/cm3]=[uM]\n" % self.Cs_br) fh.write("%.60g # density in [nmol/cm3] = [uM]\n" % self.rho) fh.write("%.60g # molecular weight [g/mol]\n"%self.MW) fh.write("%.60g # plasma to lung partition coefficient in alveolar space [unitless]\n" % self.Kpl_alv) fh.write("%.60g # plasma to lung partition coefficient in conducting airways [unitless]\n" % self.Kpl_br) fh.write("%.60g # fraction unbound in plasma [unitless]\n" % self.fu) fh.write("%.60g # blood to plasma ratio [unitless]\n" % self.R) fh.close() self.fnSubst.set(fnOut)
[docs] def read(self, fnIn): fh=open(fnIn) self.name=fh.readline().split()[0] self.kdiss_alv=float(fh.readline().split()[0]) self.kdiss_br=float(fh.readline().split()[0]) self.kp_alv=float(fh.readline().split()[0]) self.kp_br=float(fh.readline().split()[0]) self.Cs_alv=float(fh.readline().split()[0]) self.Cs_br=float(fh.readline().split()[0]) self.rho=float(fh.readline().split()[0]) self.MW=float(fh.readline().split()[0]) self.Kpl_alv=float(fh.readline().split()[0]) self.Kpl_br=float(fh.readline().split()[0]) self.fu=float(fh.readline().split()[0]) self.R=float(fh.readline().split()[0]) fh.close() self.fnSubst.set(fnIn)
[docs] def getData(self): data = {} data['name'] = self.name data['kdiss_alv'] = self.kdiss_alv * self.multiplier[3] data['kdiss_br'] = self.kdiss_br * self.multiplier[2] data['kp_alv'] = self.kp_alv * self.multiplier[5] data['kp_br'] = self.kp_br * self.multiplier[4] data['Cs_alv'] = self.Cs_alv * self.multiplier[1] data['Cs_br'] = self.Cs_br * self.multiplier[0] data['rho'] = self.rho data['MW'] = self.MW data['Kpl_alv'] = self.Kpl_alv * self.multiplier[7] data['Kpl_br'] = self.Kpl_br * self.multiplier[6] data['fu'] = self.fu data['R'] = self.R return data
[docs]class PKDepositionParameters(EMObject): def __init__(self, **args): EMObject.__init__(self, **args) self.fnSubstance = String() self.fnLung = String() self.fnDeposition = String() self.doseMultiplier = 1
[docs] def setFiles(self, fnSubstance, fnLung, fnDeposition): self.fnSubstance.set(fnSubstance) self.fnLung.set(fnLung) self.fnDeposition.set(fnDeposition)
[docs] def readDepositionFile(self, alvlim): fh=open(self.fnDeposition.get()) state = 0 for line in fh.readlines(): if line.strip()=="": continue if state == 0: # Header tokens = line.split(':') if tokens[0]=='Total dose [ug]': self.dose = float(tokens[1].strip())*self.doseMultiplier elif tokens[0]=='Diameter [um]': self.diameterMode = tokens[1].strip() elif 'FractionDeposited' in tokens[0]: diameters = [] oldGeneration = 0 fractionMatrix = [] fractionRow = [] state = 1 elif state == 1: # table of numbers tokens = [float(x.strip()) for x in line.split()] diam = tokens[0] generation = tokens[1] fractionDeposited = tokens[2] if not diam in diameters: diameters.append(diam) if oldGeneration!=generation: if oldGeneration!=0: fractionMatrix.append(fractionRow) fractionRow=[] oldGeneration=generation fractionRow.append(fractionDeposited) fractionMatrix.append(fractionRow) fh.close() diameters = np.asarray(diameters) # [um] for D if self.diameterMode=="aerodynamic": # Hartung2020_MATLAB/functions/aero2geom.m lambdaVar = 1; # spherical shape rho_water = 1; # [g / mL] rho_subst = self.substance.rho * self.substance.MW * 1e-9; # [nmol / mL] *[g / mol] ->[g / mL] diameters = diameters * np.sqrt( lambdaVar * rho_water / rho_subst); self.dose_nmol = self.dose * 1e3 / self.substance.MW # % [ug] *1e3 / ([g/mol]) = [nmol] self.bronchiDose_nmol = np.asarray(fractionMatrix[0:(alvlim-1)])*self.dose_nmol # This is a matrix with as many rows as generations and as many columns as diameters # The content is what is the dose deposited at that generation in nmol alveolarMatrix = np.reshape(fractionMatrix[alvlim-1:],(len(fractionMatrix)-alvlim+1,diameters.size)) self.alveolarDose_nmol = np.sum(alveolarMatrix*self.dose_nmol, axis=0) # This is a matrix with as many rows as generations in the alveoli and as many columns as diameters # The content is what is the dose deposited at that generation in nmol self.throatDose = self.dose_nmol - np.sum(self.bronchiDose_nmol) - np.sum(self.alveolarDose_nmol) self.particleSize = diam2vol(diameters) # cm3
[docs] def read(self): self.substance = PKSubstanceLungParameters() self.substance.read(self.fnSubstance.get()) self.lung = PKPhysiologyLungParameters() self.lung.read(self.fnLung.get()) alveolarGeneration = len(self.lung.getBronchial()['type'])+1 self.readDepositionFile(alveolarGeneration)
[docs] def getData(self): data = {} data['bronchial'] = self.bronchiDose_nmol data['alveolar'] = self.alveolarDose_nmol data['throat'] = self.throatDose data['size'] = self.particleSize data['dose_nmol'] = self.dose_nmol return data
[docs]class PKCiliarySpeed(EMObject): # Hartung2020_MATLAB/functions/get_cilspeed.m # Only 'interp' model is implemented here EXPON = 0 FIT = 1 INTERP = 2 def __init__(self, **args): EMObject.__init__(self, **args) self.type = Integer() self.type.set(self.INTERP) self.lungParams = None
[docs] def prepare(self, lungParams, ciliarySpeedType): self.type.set(ciliarySpeedType) self.lungParams = lungParams lungData = lungParams.getBronchial() self.Lx = np.sum(lungData['length_cm']) self.pos = lungData['pos'] self.diam_cm = lungData['diam_cm'] print("Ciliary speed ================") print("Total length of the lung [cm]",self.Lx) print("Location of branches [cm]",self.pos) print("Diameter of the branches [cm]",self.diam_cm) if self.type.get()==self.EXPON: # Exponentially increasing ciliary speed self.cilspeed = lambda x: np.where(x>self.Lx, 0.0, np.exp(np.log(4e-3) + x * (np.log(50) - np.log(4e-3)))) elif self.type.get()==self.FIT: # Simple fitted empirical ciliary transport model self.cilspeed = lambda x: np.where(x<0.0, 0.0, np.exp(5*np.divide(self.Lx-x-3,np.power(3+self.Lx-x,0.92)))/200) elif self.type.get()==self.INTERP: # Hofmann - Sturm model for mucociliary transport velocity # Reference: Hofmann / Sturm (2004) J Aerosol Med, Vol 17(1) self.v = 0.1 * 1.2553 * np.power(lungData['diam_cm'],2.808) # in [cm / min] self.cilspeed = interp1d(self.pos, self.v, bounds_error=False, fill_value=(self.v[0], self.v[-1]))
[docs]class PKInhalationDissolution(EMObject): # Hartung2020_MATLAB/functions/get_disol.m # All implemented dissolution models are based on an adapted version of # the Noyes - Whitney equation, but differ in whether they # 1) describe saturable or unsaturable dissolution # 2) truncate or not the dissolution speed for small particles (for numeric stability) # 3) truncate or not the dissolution speed for large particles # (based on the assumption that only a part of the particle surface # is in contact with the dissolution medium) UNSAT = 0 TRUNC_UNSAT = 1 SAT = 2 # Only this one is implemented TRUNC_SAT = 3 TRUNC2_SAT = 4 CAP = 5 XCAP = 6 UNSOL = 7 def __init__(self, **args): EMObject.__init__(self, **args) self.type = Integer() self.type.set(self.SAT) self.substanceParams = None
[docs] def prepare(self, substanceParams, part): self.substanceParams = substanceParams substanceData = substanceParams.getData() if part=="bronchi": self.kdiss = substanceData['kdiss_br'] self.Cs = substanceData['Cs_br'] else: self.kdiss = substanceData['kdiss_alv'] self.Cs = substanceData['Cs_alv'] self.rho = substanceData['rho'] print("Substance in %s ==================="%part) print("Maximum dissolution rate [nmol/(cm*min)]",self.kdiss) print("Solubility in [nmol/cm3]=[uM]",self.Cs) print("Density in [nmol/cm3]",self.rho) D = self.kdiss / self.Cs K = 4 * math.pi * D / (self.rho * np.power(4.0/3.0 * math.pi, 1.0/3.0)) if self.type.get()==self.SAT: self.dissol = lambda s, Cf: np.multiply(np.reshape(K*(self.Cs-Cf),(Cf.size,1)), np.reshape(np.where(s>0, np.power(s,1.0/3.0), 0.0),(1,s.size)))
# [cm^3/min]
[docs] def getDissolution(self, s, Cf, h): return self.dissol(s,Cf)
[docs]class PKLung(EMObject): # Hartung2020_MATLAB/functions/get_bronchial_kinetics.m # Hartung2020_MATLAB/functions/get_alveolar_kinetics.m def __init__(self, **args): EMObject.__init__(self, **args) self.substanceParams = None self.lungParams = None self.ciliarySpeed = None self.inhalationDissolutionBronchi = None self.inhalationDissolutionAlveoli = None
[docs] def prepare(self, substanceParams, lungParams, pkParams, pkMultiplier, ciliarySpeedType): self.substanceParams = substanceParams self.lungParams = lungParams self.ciliarySpeed = PKCiliarySpeed() self.ciliarySpeed.prepare(lungParams, ciliarySpeedType) self.inhalationDissolutionBronchi = PKInhalationDissolution() self.inhalationDissolutionBronchi.prepare(substanceParams,"bronchi") # Hartung2020_MATLAB/functions/get_bronchial_kinetics.m bronchialData = lungParams.getBronchial() # Code just to show the equivalence between the .m and .py # bronchialData['elf_cm3'] = bronchialData['elf_cm3'] # bronchialData['voltis_cm3'] = bronchialData['voltis_cm3'] # bronchialData['surf_cm2'] = bronchialData['surf_cm2'] # bronchialData['fQco'] = bronchialData['fQco'] pos = bronchialData['pos'] diam_cm = bronchialData['diam_cm'] self.substanceData = substanceParams.getData() # Code just to show the equivalence between the .m and .py # self.substanceData['Cs_br'] = self.substanceData['Cs_br'] # self.substanceData['Cs_alv'] = self.substanceData['Cs_alv'] # self.substanceData['kdiss_br'] = self.substanceData['kdiss_br'] # self.substanceData['kdiss_alv'] = self.substanceData['kdiss_alv'] # self.substanceData['kp_br'] = self.substanceData['kp_br'] # self.substanceData['kp_alv'] = self.substanceData['kp_alv'] # self.substanceData['Kpl_br'] = self.substanceData['Kpl_br'] # self.substanceData['Kpl_alv'] = self.substanceData['Kpl_alv'] kp = self.substanceData['kp_br'] ka = math.pi * kp * diam_cm print("Absorption rate per bronchial segment [1/min]",ka) self.bronchialAbsorb = interp1d(pos, ka, bounds_error=False, fill_value=(ka[0], ka[-1])) # Hartung2020_MATLAB/functions/get_alveolar_kinetics.m self.inhalationDissolutionAlveoli = PKInhalationDissolution() self.inhalationDissolutionAlveoli.prepare(substanceParams,"alveoli") self.pkData={} sample = pkParams.getFirstSample() self.pkData['Cl'] = float(sample.getDescriptorValue('Cl')) * pkMultiplier[0] self.pkData['V'] = float(sample.getDescriptorValue('V')) * pkMultiplier[1] self.pkData['Q'] = float(sample.getDescriptorValue('Q')) * pkMultiplier[2] self.pkData['Vp'] = float(sample.getDescriptorValue('Vp')) * pkMultiplier[3] self.pkData['k01'] = float(sample.getDescriptorValue('k01')) * pkMultiplier[4] self.pkData['F'] = float(sample.getDescriptorValue('F')) * pkMultiplier[5]
[docs] def cilspeed(self, x): return self.lungParams.multiplier[0] * self.ciliarySpeed.cilspeed(x)
[docs]def conserving_projection(X1, V1, X2): # Hartung2020_MATLAB/functions/conserving_projection.m # V2 = CONSERVING_PROJECTION(X1,V1,X2) projects quantity V1 from location # grid X1 to grid X2, ensuring int_x^y(v2(z)dz) = int_x^y(v1(z)dz) holds # for any x,y, where vI(z) := vI(j) for Xi(j) < z < Xi(j+1) (i=1,2) # (constant between grid cells) cumV1 = np.concatenate(([0],np.cumsum(V1))); interpolator = interp1d(X1, cumV1, bounds_error=False, fill_value=(cumV1[0], cumV1[-1])) return np.diff(interpolator(X2))
[docs]def P_aELF(lungData, Xbnd): # Hartung2020_MATLAB/functions/P_aELF.m # Aim: locally and globally conserve ELF volumes # Input: boundary gridpoints of computational location grid # Output: cross-sectional areas at (ctr) grid points (a_Xctr), satisfying # int_dx(Xbnd, a_Xctr) == sum(aw.elf_cm3) (and local volume conservation) V_Xctr = conserving_projection(np.concatenate(([0],lungData['end_cm'])), lungData['elf_cm3'], Xbnd) return np.divide(V_Xctr, np.diff(Xbnd))
[docs]def P_aTis(lungData, Xbnd): # Hartung2020_MATLAB/functions/P_aTis.m # Aim: locally and globally conserve tissue volumes # Input: boundary gridpoints of computational location grid # Output: cross-sectional areas at (ctr) grid points (a_Xctr), satisfying # int_dx(Xbnd, a_Xctr) == sum(aw.voltis_cm3) (and local volume conservation) V_Xctr = conserving_projection(np.concatenate(([0],lungData['end_cm'])), lungData['voltis_cm3'], Xbnd) return np.divide(V_Xctr, np.diff(Xbnd))
[docs]def P_hELF(lungData, Xctr): # Hartung2020_MATLAB/functions/P_hELF.m # Input: center gridpoints of computational location grid # Output: ELF heights at (ctr) grid points (h_Xctr), in cm. interpolator = interp1d(lungData['pos'], lungData['h_elf_cm'], fill_value='extrapolate') return interpolator(Xctr)
[docs]def P_Qbr(lungData, systemicData, Xbnd): # Hartung2020_MATLAB/functions/P_Qbr.m # Project bronchial (central lung) blood flow onto computational grid # Total blood flow is conserved Qcl = lungData['fQco'] * systemicData['Qco'] # Modelling assumption: blood flow proportional to tissue volume Qgen = Qcl * lungData['voltis_cm3'] / np.sum(lungData['voltis_cm3']) Q_Xctr = conserving_projection(np.concatenate(([0],lungData['end_cm'])), Qgen, Xbnd) return np.divide(Q_Xctr,np.diff(Xbnd));
[docs]def project_deposition_2D(depositionData,X,S,lungData): # Project deposition data on 2D computational grid # Strategy for projection on 2D grid: # - Uniform distribution of dose in data location-size gridcells # (per generation and per size category) --> f(x,s) # - rho0 at solver location-size gridcell C = average of f(x,s) in C xbnddat = np.concatenate(([0],lungData['end_cm'])) smax = diam2vol(50) sdil_default = diam2vol(0.1) # Projection onto bronchi # step 1: distribute delta peaks in size over short size intervals sdattmp = depositionData['size'] sdil = np.min([sdil_default, 0.5*np.min(sdattmp)]) if sdattmp.size>1: sdil=np.min([sdil,np.min(np.diff(sdattmp))]) sbnddat = np.concatenate(([0],np.kron(sdattmp,[1,1]) + sdil*np.kron(np.ones(sdattmp.shape),[-1,1]), [smax])) amtxs_br = depositionData['bronchial'] amtxs_br_ext = np.zeros((amtxs_br.shape[0],2*amtxs_br.shape[1]+1)) amtxs_br_ext[:,1::2] = amtxs_br # step 2: compute cumulative amount matrix per data location-size grid camtdat_br_x = np.cumsum(np.pad(amtxs_br_ext,((1,0),(0,0)),'constant',constant_values=0),axis=0) camtdat_br_xs = np.cumsum(np.pad(camtdat_br_x,((0,0),(1,0)),'constant',constant_values=0),axis=1) # step 3: linear interpolation projects onto solver location-size grid interpolator = interp2d(sbnddat, xbnddat, camtdat_br_xs, bounds_error=False, fill_value=0) camtbnd_br_xs = interpolator(S, X) amtgrd_br_x = np.diff(camtbnd_br_xs, axis=0) amtgrd_br_xs = np.diff(amtgrd_br_x, axis=1) amtgrd_br_xs = np.where(amtgrd_br_xs<0,0,amtgrd_br_xs) # Make sure that everything is positive # step 4: for each grid cell: amount -> location-resolved amount per size dx = np.diff(X) ds = np.diff(S) s = 0.5*(S[0:-1] + S[1:]) rho0br = np.divide(amtgrd_br_xs,np.reshape(dx,(dx.size,1))*np.reshape(np.multiply(s,ds),(1,s.size))) # Alveolar amt_alv = depositionData['alveolar'] amt_alv_ext = np.zeros((1,2*amt_alv.size+1)) amt_alv_ext[:,1::2] = amt_alv camtdat_alv = np.concatenate(([0],np.cumsum(amt_alv_ext))) interpolator = interp1d(sbnddat, camtdat_alv, bounds_error=False, fill_value=0) camtbnd_alv = interpolator(S) amtgrd_alv = np.diff(camtbnd_alv); rho0alv = np.divide(amtgrd_alv, np.multiply(s,ds)) return (rho0br, rho0alv)
[docs]def saturable_2D_upwind_IE(lungParams, pkLung, depositionParams, tt, Sbnd): # Hartung2020_MATLAB/models/saturable_2D_upwind_IE.m # Algorithm features: # - Conducting airways, peripheral airways and systemic circulation fully coupled # - Upwind discretisation of PDE and compatible integration rules for mass conservation # - implicit Euler discretisation of linear processes # - saturable dissolution model # - initial particle size treated as a delta distribution; particle size is treat followed over time. # - along the location axis, an arbitrary grid can be used # # Since dissolution is saturable, particle sizes at different airway # positions may differ from each other for t > 0. # # The idea of this approach is that the size resolution in the data may # be much coarser than that the location grid # (extreme case: monodisperse particle distribution) # print(Sbnd.shape) # print(np.mean(Sbnd)); aaaa dt = np.diff(tt) T = np.max(tt) lungData = lungParams.getBronchial() alveolarData = lungParams.getAlveolar() systemicData = lungParams.getSystemic() depositionData = depositionParams.getData() Xbnd = np.sort([0] + lungData['end_cm'].tolist() + lungData['pos'].tolist()) Nx = Xbnd.size - 1 Ns = Sbnd.size - 1 # midpoints in discretisation cell (where rho, Cflu, Ctis are modelled) Xctr = Xbnd[:-1] + np.diff(Xbnd)/2 Sctr = Sbnd[:-1] + np.diff(Sbnd)/2 # transport velocity lambdaX = pkLung.cilspeed(Xbnd) # print("Xbnd",np.mean(Xbnd)) # print("lambdaX",np.mean(lambdaX)); aaaaa # location/size discretisation steps dx = np.diff(Xbnd) ds = np.diff(Sbnd) # absorption into tissue kaX = pkLung.bronchialAbsorb(Xctr) # print("kaX",np.mean(kaX)); aaaaa # cross-sectional areas aFluX = P_aELF(lungData, Xbnd) aTisX = P_aTis(lungData, Xbnd) # print("aFluX",np.mean(aFluX)); # print("aTisX",np.mean(aTisX)); aaaaa # ELF heights in bronchi / alveolar space hFluX = P_hELF(lungData, Xctr) hFlualv = alveolarData['ELF_cm3']/alveolarData['Surf_cm2'] # print("hFluX",np.mean(hFluX)); # print("hFlualv",np.mean(hFlualv)); aaaaa # blood flow Qalv = alveolarData['fQco'] * systemicData['Qco'] # Alveolar qX = P_Qbr(lungData, systemicData, Xbnd) QbrX = np.multiply(qX, dx) # bronchial (location-resolved) # print("Qalv",np.mean(Qalv)); # print("QbrX",np.mean(QbrX)); aaaaa # plasma to lung partition coefficients Kpl_br = pkLung.substanceData['Kpl_br'] Kpl_alv = pkLung.substanceData['Kpl_alv'] Kpl_u_br = Kpl_br / pkLung.substanceData['fu'] Kpl_u_alv = Kpl_alv / pkLung.substanceData['fu'] # print("Kpl_br",np.mean(Kpl_br)); # print("Kpl_alv",np.mean(Kpl_alv)); # print("Kpl_u_br",np.mean(Kpl_u_br)); # print("Kpl_u_alv",np.mean(Kpl_u_alv)); aaaaa # permeability-surface area products [cm3/min] PS_alv = pkLung.substanceData['kp_alv'] * alveolarData['Surf_cm2']; # alveolar PS_br = np.multiply(kaX , dx) # bronchial # print("PS_alv",np.mean(PS_alv)); # print("PS_br",np.mean(PS_br)); aaaaa # assign volumes to variables alvELF = alveolarData['ELF_cm3'] # alveolar fluid alvTis = alveolarData['Vol_cm3'] # alveolar tissue brELF = np.multiply(aFluX,dx) # bronchial fluid brTis = np.multiply(aTisX,dx) # bronchial tissue # print("alvELF",np.mean(alvELF)); # print("alvTis",np.mean(alvTis)); # print("brELF",np.mean(brELF)); # print("brTis",np.mean(brTis)); aaaaa # Allocate lung amounts: A_flu(alv/br), A_tis(alv/br) Nt=tt.size-1 Aalvflu = np.zeros((Nt+1,1)); Aalvtis = np.zeros((Nt+1,1)); Abrflu = np.zeros((Nt+1,Nx)); Abrtis = np.zeros((Nt+1,Nx)); # Allocate systemic amounts: Asysgut, Asysctr, Asysper Asysgut = np.zeros((Nt+1,1)); Asysctr = np.zeros((Nt+1,1)); Asysper = np.zeros((Nt+1,1)); # Allocate amount cleared: Aclear = np.zeros((Nt+1,1)); # for mass balance Amcc = np.zeros((Nt+1,1)); # to track cumulative amount cleared by MCC (not in mass balance) # Initialize PSPM densities (br/alv) and gut compartment with dosing rhobr = np.zeros((Nt+1,Nx,Ns)); rhoalv = np.zeros((Nt+1, 1,Ns)); rho0br, rho0alv = project_deposition_2D(depositionData, Xbnd, Sbnd, lungData) # print("rho0br",np.mean(rho0br)); # print("rho0alv",np.mean(rho0alv)); aaaaa rhobr[0,:,:]=rho0br rhoalv[0,:,:]=rho0alv Asysgut[0] = depositionData['throat'] CFL_factor = np.zeros((Nt,1)) # print("depositionData['throat']",depositionData['throat']); aaaa # Time iteration # pre-compute expressions constant in time # row vectors (expandable to a 1-by-Nx-by-Ns array) l_dx_post = np.divide(lambdaX[1:],dx) l_dx_pre = np.divide(lambdaX[0:-1],dx) dSctr = np.diff(Sctr) # from discrete integration by parts ds3D = np.reshape(ds, (1, 1, ds.size)) # print("l_dx_post",np.mean(l_dx_post)); # print("l_dx_pre",np.mean(l_dx_pre)); # print("dSctr",np.mean(dSctr)); # print("ds3D",np.mean(ds3D)); aaaaa # PK parameters Vc = pkLung.pkData['V']; k10 = pkLung.pkData['Cl'] / Vc k12 = pkLung.pkData['Q'] / Vc k21 = pkLung.pkData['Q'] / pkLung.pkData['Vp'] k01 = pkLung.pkData['k01'] F = pkLung.pkData['F'] # print("Vc",Vc); # print("k10",k10); # print("k12",k12); # print("k21",k21); # print("k01",k01); # print("F",F); aaaaa Qbrtot = np.sum(QbrX) # total bronchial blood flow R = pkLung.substanceData['R'] # print("Qbrtot",Qbrtot); # print("R",R); aaaaa # Time loop for n in range(Nt): # (semi - explicit Euler; implicit absorption into tissue) dtn = dt[n] Calvflun = Aalvflu[n] / alvELF; Cbrflun = np.divide(Abrflu[n,:], brELF) d_Sbnd_Cflualv = np.reshape(pkLung.inhalationDissolutionAlveoli.getDissolution(Sbnd, Calvflun, hFlualv), (Sbnd.size)) # print("d_Sbnd_Cflualv",np.mean(d_Sbnd_Cflualv)); aaaaa rhoalv[n + 1,:,:] = \ np.multiply(1-dtn*np.divide(d_Sbnd_Cflualv[0:-1],ds3D), rhoalv[n,:,:])+\ dtn * np.multiply(np.divide(d_Sbnd_Cflualv[1:], ds3D),\ np.pad(rhoalv[n,:,1:],((0,0),(0,1)),'constant',constant_values=0)) # aaa=rhoalv[n + 1,:,:]; print("rhoalv[n + 1,:,:]",np.mean(aaa)); aaaaa dissolved_alv = np.dot(dSctr,np.reshape(np.multiply(d_Sbnd_Cflualv[1:-1], rhoalv[n,:,1:]),(dSctr.size))) # print("dissolved_alv",np.mean(dissolved_alv)); aaaaa d_Sbnd_Cflubr = pkLung.inhalationDissolutionBronchi.getDissolution(Sbnd, Cbrflun, hFluX) # print("d_Sbnd_Cflubr",np.mean(d_Sbnd_Cflubr)); aaaaa aux = rhobr[n, :, :] rhobr[n + 1,:,:] = \ np.multiply(1 - dtn * (np.reshape(l_dx_pre,(l_dx_pre.size,1))+ np.reshape(np.divide(d_Sbnd_Cflubr[:,0:-1], ds3D),aux.shape)), rhobr[n, :, :]) + \ dtn * np.multiply(np.reshape(l_dx_post,(l_dx_post.size,1)), np.pad(rhobr[n,1:,:],((0,1),(0,0)),'constant',constant_values=0)) +\ dtn * np.multiply(np.divide(d_Sbnd_Cflubr[:,1:], ds3D), \ np.pad(rhobr[n, :, 1:], ((0, 0), (0, 1)), 'constant', constant_values=0)) # aaa=rhobr[n + 1,:,:]; print("rhobr[n + 1,:,:]",np.mean(aaa)); aaaaa dissolved_br = np.multiply(dx, np.sum(np.multiply(dSctr, np.multiply(d_Sbnd_Cflubr[:, 1:-1],rhobr[n,:, 1:])), axis=1)) # print("dissolved_br",np.mean(dissolved_br)); aaaaa A1 = np.diag(1+dtn*np.divide(PS_br,brELF)) A2 = np.diag(-(dtn/Kpl_u_br)*np.divide(PS_br,brTis)) A3 = np.diag(-dtn*np.divide(PS_br,brELF)) A4 = np.diag(1+np.multiply(np.divide(dtn,brTis),PS_br/Kpl_u_br + QbrX*(R/Kpl_br))) Mbr = np.kron(A1,[[1,0],[0,0]])+np.kron(A2,[[0,1],[0,0]])+np.kron(A3,[[0,0],[1,0]])+np.kron(A4,[[0,0],[0,1]]) # print("Mbr",np.mean(Mbr)) Malv = np.asarray([[1 + dtn/alvELF*PS_alv, -dtn/alvTis * PS_alv/Kpl_u_alv], [-dtn/alvELF*PS_alv , 1 + dtn/alvTis*(PS_alv/Kpl_u_alv + Qalv*R/Kpl_alv)]]) # print("Malv",np.mean(Malv)) Msys = np.asarray([ # gut per clear ctr <- X_i' %f(X_j) [1+dtn*k01 , 0 , 0 , 0], # gut [0 ,1+dtn*k21, 0 , -dtn*k12], # per [-dtn*(1-F)*k01, 0 , 1 , -dtn*k10], # clear [-dtn*F*k01, -dtn*k21, 0 , 1+dtn*(k10+k12+(Qalv+Qbrtot)/Vc)]]) # ctr # print("Msys",np.mean(Msys)); aaaa Mbrctr = -np.kron((dtn / Vc) * QbrX, [0, 1]) Malvctr = -np.asarray([0, (dtn / Vc) * Qalv]) Mbralvctr = np.concatenate((Mbrctr, Malvctr)) Mctrbr = -np.kron(dtn* np.divide(QbrX, brTis) * (R / Kpl_br), [0, 1]) Mctralv = -np.asarray([0, dtn * (Qalv / alvTis) * (R / Kpl_alv)]) Mctrbralv = np.concatenate((Mctrbr,Mctralv)) M = np.block([[scipy.linalg.block_diag(Mbr,Malv), np.zeros((2*Nx+2,3)), np.reshape(Mbralvctr,(2*Nx+2,1))], [np.block([[np.zeros((3,2*Nx+2))],[np.reshape(Mctrbralv,(1,2*Nx+2))]]), Msys]]) # print("M",np.mean(M)); aaaa rhsbr = np.block([[Abrflu[n,:] + dtn * dissolved_br],[Abrtis[n,:]]]) # print("rhsbr",np.mean(rhsbr)); aaaa mcc = dtn * lambdaX[0] * int_dx(Sbnd, np.multiply(Sctr,rhobr[n,0,:])) rhs = np.concatenate((np.reshape(rhsbr,(rhsbr.size,1),order='F'), [Aalvflu[n] + dtn * dissolved_alv], [Aalvtis[n]], [Asysgut[n] + mcc], [Asysper[n]], [Aclear[n]], [Asysctr[n]])) # print("rhs",np.mean(rhs)); aaaa Ynext = np.linalg.solve(M,rhs) # print("Ynext",np.mean(Ynext)); aaaa Abrflu[n + 1,:] = np.reshape(Ynext[0:(2*Nx):2],(Nx,)) Abrtis[n + 1,:] = np.reshape(Ynext[1:(2*Nx):2],(Nx,)) Aalvflu[n + 1] = Ynext[2 * Nx] Aalvtis[n + 1] = Ynext[2 * Nx + 1] Asysgut[n + 1] = Ynext[2 * Nx + 2] Asysper[n + 1] = Ynext[2 * Nx + 3] Aclear[n + 1] = Ynext[2 * Nx + 4] Asysctr[n + 1] = Ynext[2 * Nx + 5] Amcc[n + 1] = Amcc[n] + mcc # Quality control: detect a violation of CFL condition aux = np.reshape(np.divide(d_Sbnd_Cflubr[:,0:-1],ds3D),(d_Sbnd_Cflubr.shape[0],d_Sbnd_Cflubr.shape[1]-1)) +\ np.dot(np.reshape(l_dx_pre,(l_dx_pre.shape[0],1)),np.ones((1,d_Sbnd_Cflubr.shape[1]-1))) CFL_factor[n] = dtn * aux.max() if CFL_factor[n] > 1: print('CFL condition not satisfied at t=%f'%tt[n + 1]) # Quality control: detect a negative quantity if Asysgut[n+1] < 0: print('Asysgut not positive at t=%f'%tt[n + 1]) if Asysctr[n+1] < 0: print('Asysctr not positive at t=%f'%tt[n + 1]) if Asysper[n+1] < 0: print('Asysper not positive at t=%f'%tt[n + 1]) if (rhobr[n+1,:,:]).min() < 0: print('rho (br) not positive at t=%f'%tt[n + 1]) if (Abrflu[n+1,:]).min() < 0: print('A_flu (br) not positive at t=%f'%tt[n + 1]) if (Abrtis[n+1,:]).min() < 0: print('A_tis (br) not positive at t=%f'%tt[n + 1]) if (rhoalv[n+1,:,:]).min() < 0: print('rho (alv) not positive at t=%f'%tt[n + 1]) if (Aalvflu[n+1,:]).min() < 0: print('A_flu (alv) not positive at t=%f'%tt[n + 1]) if (Aalvtis[n+1,:]).min() < 0: print('A_tis (alv) not positive at t=%f'%tt[n + 1]) # Amount of undissolved drug in alveolar space Aalvsol = np.zeros(rhoalv.shape[0]) for n in range(rhoalv.shape[0]): Aalvsol[n]=int_dx(Sbnd,np.multiply(Sctr,np.reshape(rhoalv[n,0,:],(rhoalv.shape[2])))) # print("Aalvsol",np.mean(Aalvsol)); aaaa # Concentration of drug dissolved in alveolar epithelial lining fluid Calvflu = Aalvflu / alvELF # Concentration of drug in alveolar lung tissue Calvtis = Aalvtis/alvTis; # print("Calvflu",np.mean(Calvflu)); # print("Calvtis",np.mean(Calvtis)); aaaa # Amount of undissolved drug in conducting airways (bronchial) Abrsol = np.zeros(rhobr.shape[0]) for n in range(rhobr.shape[0]): Abrsol[n]=int_dx1dx2(Xbnd,Sbnd,np.multiply(Sctr,np.reshape(rhobr[n,:,:],(rhobr.shape[1],rhobr.shape[2])))) # print("Abrsol",np.mean(Abrsol)); # Concentration of drug dissolved in bronchial epithelial lining fluid Cbrflu = np.divide(Abrflu, brELF) # print("Cbrflu",np.mean(Cbrflu)); # Concentration of drug in bronchial lung tissue Cbrtis = np.divide(Abrtis, brTis) # print("Cbrtis",np.mean(Cbrtis)); aaaa # Amounts Aalv = {'solid': Aalvsol, 'fluid': Aalvflu, 'tissue': Aalvtis} Abr = {'solid': Abrsol, 'fluid': np.sum(Abrflu,axis=1), 'tissue': np.sum(Abrtis,axis=1), 'clear': Amcc} Asys = {'gut': Asysgut, 'ctr': Asysctr, 'per': Asysper, 'clear': Aclear} A = {'alv':Aalv, 'br': Abr, 'sys':Asys} # Concentrations Calv = {'fluid': Calvflu, 'tissue': Calvtis} Cbr = {'fluid': Cbrflu, 'tissue': Cbrtis} Csys = {'ctr': Asysctr / Vc} C = {'alv': Calv, 'br': Cbr, 'sys': Csys} # Geometry alvgeom = {'V': {'flu': alvELF, 'tis': alvTis}} # pooled brgeom = {'a': {'flu': aFluX, 'tis': aTisX}} # location-resolved geom = {'alv': alvgeom, 'br':brgeom} # Discretisation grd = {'t':tt,'X':Xctr,'S':Sctr,'dX':dx,'dS':ds} param = {'Nt':Nt,'Nx':Nx,'Ns':Ns,'T':T} discr = {'grid':grd,'param':param} # Input inpt = {'lungParams':lungParams, 'pkLung':pkLung, 'depositionParams':depositionParams} # Units (for use e.g. in plotting) units = { 'amount': '[nmol]', 'time': '[min]', # These units are used con- 'length':'[cm]', # sistently across substance / 'area': '[cm2]', # physiological databases, 'volume':'[cm3]', # read_deposition() and systemic 'weight':'[g]'} # PK models # complete model output sol = { 'A': A, 'C': C, 'geom': geom, 'discr':discr, 'input':inpt, 'units':units } return sol