# **************************************************************************
# *
# * Authors: J.M. De la Rosa Trevin (jmdelarosa@cnb.csic.es)
# *
# * Unidad de Bioinformatica of Centro Nacional de Biotecnologia , CSIC
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 3 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307 USA
# *
# * All comments concerning this program package may be sent to the
# * e-mail address 'scipion@cnb.csic.es'
# *
# **************************************************************************
"""
This module contains some sqlite basic tools to handle Databases.
"""
import logging
logger = logging.getLogger(__name__)
from sqlite3 import dbapi2 as sqlite
from sqlite3 import OperationalError as OperationalError
from pyworkflow.utils import STATUS, getExtraLogInfo, Config
[docs]class SqliteDb:
"""Class to handle a Sqlite database.
It will create connection, execute queries and commands.
"""
OPEN_CONNECTIONS = {} # Store all connections made
def __init__(self):
self._reuseConnections = False
def _createConnection(self, dbName, timeout):
"""Establish db connection"""
self._dbName = dbName
if self._reuseConnections and dbName in self.OPEN_CONNECTIONS:
self.connection = self.OPEN_CONNECTIONS[dbName]
else:
# self.closeConnection(dbName) # Close the connect if exists for this db
self.connection = sqlite.Connection(dbName, timeout, check_same_thread=False)
self.connection.row_factory = sqlite.Row
self.OPEN_CONNECTIONS[dbName] = self.connection
logger.debug("Connection open for %s" % dbName, extra=getExtraLogInfo(
"CONNECTIONS",
STATUS.START,
dbfilename=dbName))
self.cursor = self.connection.cursor()
# Define some shortcuts functions
if Config.debugSQLOn():
self.executeCommand = self._debugExecute
else:
self.executeCommand = self.cursor.execute
self.commit = self.connection.commit
[docs] @classmethod
def closeConnection(cls, dbName):
if dbName in cls.OPEN_CONNECTIONS:
connection = cls.OPEN_CONNECTIONS[dbName]
del cls.OPEN_CONNECTIONS[dbName]
connection.close()
logger.debug("Connection closed for %s" % dbName,
extra=getExtraLogInfo('CONNECTIONS', STATUS.STOP, dbfilename=dbName))
[docs] def getDbName(self):
return self._dbName
[docs] def close(self):
self.connection.close()
logger.debug("Connection closed for %s" % self._dbName,
extra=getExtraLogInfo(
"CONNECTIONS",
STATUS.STOP,
dbfilename=self._dbName))
if self._dbName in self.OPEN_CONNECTIONS:
del self.OPEN_CONNECTIONS[self._dbName]
def _debugExecute(self, *args):
try:
logger.debug("COMMAND: %s; %s" %(args[0] , self._dbName),
extra=getExtraLogInfo("QUERY", STATUS.EVENT, dbfilename=self._dbName)
)
logger.debug("ARGUMENTS: " + str(args[1:]))
return self.cursor.execute(*args)
except Exception as ex:
print(">>>> FAILED cursor.execute on db: '%s'" % self._dbName)
raise ex
def _iterResults(self):
row = self.cursor.fetchone()
while row is not None:
yield row
row = self.cursor.fetchone()
def _results(self, iterate=False):
""" Return the results to which cursor, point to.
If iterates=True, iterate yielding each result independently"""
if not iterate:
return self.cursor.fetchall()
else:
return self._iterResults()
[docs] def getTables(self, tablePattern=None):
""" Return the table names existing in the Database.
If tablePattern is not None, only tables matching
the pattern will be returned.
"""
self.executeCommand("SELECT name FROM sqlite_master "
"WHERE type='table' "
"AND name NOT LIKE 'sqlite_%';")
return [str(row['name']) for row in self._iterResults()]
[docs] def hasTable(self, tableName):
return tableName in self.getTables()
[docs] def getTableColumns(self, tableName):
self.executeCommand('PRAGMA table_info(%s)' % tableName)
return self.cursor.fetchall()
[docs] def getVersion(self):
""" Return the database 'version' that is used.
Internally it make use of the SQLite PRAGMA database.user_version;
"""
self.executeCommand('PRAGMA user_version')
return self.cursor.fetchone()[0]
[docs] def setVersion(self, version):
self.executeCommand('PRAGMA user_version=%d' % version)
self.commit()