[TurboGears][SQLObject][SQLite] TurboGearsでSQLiteのトランザクションの隔離レベルを設定する方法

環境

この記事の内容は、TurboGears 1.0.1, SQLObject 0.7.1, SQLite 3.3.5で確認しました。

隔離レベルの設定方法

排他ロックを行う場合、以下のようなコードを記述します。ここで、projはアプリケーションのパッケージ名とします。

transaction = proj.model.hub.threadingLocal.connection
transaction._obsolete = False
transaction._connection = transaction._dbConnection.getConnection()
transaction._connection.isolation_level = "EXCLUSIVE"
try:
    # データベースを操作するコードをここに記述する。

    proj.model.hub.commit()
except Exception, e:
    proj.model.hub.rollback()

4行目の"EXCLUSIVE"のところを以下のように変更すれば、他のロックも可能です(ただし、確認していません)。

SHARED 共有ロック
RESERVED 予約ロック
PENDING 待機ロック
EXCLUSIVE 排他ロック

詳細

普通トランザクションを開始するときは、

proj.model.hub.begin()

とします。なので、まずbeginメソッドの中で何が行われているのか、調べます。

proj.model.hubが何者か分からなかったので、適当なところで、

print "hub=%r" % (proj.model.hub)

と書いてみたら、hubはturbogears.batabase.PackageHubオブジェクトだということが分かりました。このクラスは(どういう仕組みを使っているのか、またなぜそうする必要があるのかは未調査ですが)、turbogears.database.AutoConnectHubオブジェクトへのプロキシで、hub.begin()というのはturbogears.database.AutoConnectHubオブジェクトのbeginメソッドを呼び出しています。では、turbogears.database.AutoConnectHubクラスのbeginメソッドをみてみます。

        def begin(self, conn=None):
            "Starts a transaction."
            if not self.supports_transactions:
                return conn
            if not conn:
                conn = self.getConnection()
            if isinstance(conn, Transaction):
                if conn._obsolete:
                    conn.begin()
                return conn
            self.threadingLocal.old_conn = conn
            trans = conn.transaction()
            self.threadingLocal.connection = trans
            return trans

proj.model.hub.begin()とした場合、conn引数はNoneになるので、6行目の、

                conn = self.getConnection()

でデータベースのコネクションを得ています。では、このgetConnection()ではなにをしているのでしょうか? それは以下の通りでした。

        def getConnection(self):
            try:
                conn = self.threadingLocal.connection
                return self.begin(conn)
            except AttributeError:
                if self.uri:
                    conn = sqlobject.connectionForURI(self.uri)
                    # the following line effectively turns off the DBAPI connection
                    # cache. We're already holding on to a connection per thread,
                    # and the cache causes problems with sqlite.
                    if self.uri.startswith("sqlite"):
                        TheURIOpener.cachedURIs = {}
                    elif self.uri.startswith("mysql") and \
                         config.get("turbogears.enable_mysql41_timestamp_workaround", False):
                        self._enable_timestamp_workaround(conn)
                    self.threadingLocal.connection = conn
                    return self.begin(conn)
                raise AttributeError(
                    "No connection has been defined for this thread "
                    "or process")

"except AttributeError"から下は、コードの内容から察するに、まだコネクションを持っていないとき、すなわち最初に呼び出されたときにのみ実行されるのではないでしょうか。だとすると、ほとんどの処理に関わっている部分だけ取り出すと、以下のようになります。

        def getConnection(self):
            conn = self.threadingLocal.connection
            return self.begin(conn)

"threadingLocal"という属性は、名前からするとスレッドローカルな値を保持するためのもののようです。そして最後に、beginメソッドを呼び出しています。

処理の流れをもういちど確認すると、beginメソッドがgetConnectionメソッドを呼び出し、その中でbeginメソッドを呼び出しているわけです。わけがわかりません。デバッグ文をいろいろといれて動作をみたところ、2回目のbeginメソッドは次のように実行されていました。

        def begin(self, conn=None):
            if isinstance(conn, Transaction):
                conn.begin()
                return conn

つまり、conn引数はsqlobject.dbconnection.Transactionオブジェクトだということです。最終的に、最初のbeginメソッドの戻り値もこのTransactionオブジェクトになります。また、sqlobject.dbconnection.Transactionオブジェクトのbeginメソッドは、以下のようになっています。

    def begin(self):
        # @@: Should we do this, or should begin() be a no-op when we're
        # not already obsolete?
        assert self._obsolete, "You cannot begin a new transaction session without rolling back this one"
        self._obsolete = False
        self._connection = self._dbConnection.getConnection()
        self._dbConnection._setAutoCommit(self._connection, 0)

さて、これまでの中に、トランザクションの隔離レベルを設定できそうな場所があったでしょうか? どうもこれだけではまだ見えてきません。ここでsqlobject.dbconnection.Transactionクラスのbeginメソッドに出てくるself._connectionとself._dbConnectionを調べてみます。これらの型についてデバッグ文を用いて調べてみたところ、

メンバ
_connection pysqlite2.dbapi2.Connection
_dbConnection sqlobject.sqlite.sqliteconnection.SQLiteConnection

ということが分かりました。

そこでsqlobject.sqlite.sqliteconnection.SQLiteConnectionクラスの_setAutoCommitメソッドを調べてみると、以下のようになっていました。

    def _setAutoCommit(self, conn, auto):
        if using_sqlite2:
            if auto:
                conn.isolation_level = None
            else:
                conn.isolation_level = ""
        else:
            conn.autocommit = auto

とうとう、隔離レベルを設定している箇所を発見しました。しかし、どうも隔離レベルを設定できるようにはなっていないようです。sqlobject.sqlite.sqliteconnection.SQLiteConnectionクラスには、

    def _setIsolationLevel(self, conn, level):
        if not using_sqlite2:
            return
        conn.isolation_level = level

というメソッドもありましたが、このメソッドはどこからも使われていませんでした。また、connという名前の引数があることから、これを使うには別途コネクションを表すようなオブジェクトが必要なようで、今回探しているものではなさそうです。

そこで、_setAutoCommitメソッドを使わずにisolation_levelを設定し、それ以外はbeginメソッドと同じ動作をするコードを記述することにしました。それが、前述した、

transaction = proj.model.hub.threadingLocal.connection
transaction._obsolete = False
transaction._connection = transaction._dbConnection.getConnection()
transaction._connection.isolation_level = "EXCLUSIVE"

です。

なお、実際にトランザクションが開始されるのは、上のコードの後、最初にデータベースに対してなんらかの操作をしたときからです。というのも、pysqlite - pysqlite - Tracからpysqlite 2.3.3のソースコードをダウンロードして中身を見てみると、isolation_levelを使っているのはsrc/connection.cというファイルの中の、

static int pysqlite_connection_set_isolation_level(pysqlite_Connection* self, PyObject* isolation_level)
(略)
        Py_INCREF(isolation_level);
        self->isolation_level = isolation_level;

        begin_statement = PyString_FromString("BEGIN ");
        if (!begin_statement) {
            return -1;
        }
        PyString_Concat(&begin_statement, isolation_level);
        if (!begin_statement) {
            return -1;
        }

        self->begin_statement = PyMem_Malloc(PyString_Size(begin_statement) + 2);
        if (!self->begin_statement) {
            return -1;
        }

        strcpy(self->begin_statement, PyString_AsString(begin_statement));
        Py_DECREF(begin_statement);
(略)

という箇所で、ではisolation_levelから作られたbegin_statementがどこで使われているのか探してみると、次の場所にあります。

ファイル 関数名
src/connection.c _pysqlite_connection_begin
src/cursor.c _pysqlite_query_execute

_pysqlite_connection_begin関数では、

    rc = sqlite3_prepare(self->db, self->begin_statement, -1, &statement, &tail);

のように使われており、実際に発行している様子をうかがわせます。

一方、_pysqlite_query_execute関数内では、次のようになっています。この中で、上の_pysqlite_connection_begin関数を呼び出しています。

    if (self->connection->begin_statement) {
        switch (statement_type) {
            case STATEMENT_UPDATE:
            case STATEMENT_DELETE:
            case STATEMENT_INSERT:
            case STATEMENT_REPLACE:
                if (!self->connection->inTransaction) {
                    result = _pysqlite_connection_begin(self->connection);
                    if (!result) {
                        goto error;
                    }
                    Py_DECREF(result);
                }
                break;

以上をまとめると、UPDATE文やDELETE文を発行するとき、まだトランザクションに入っていないなら、isolation_levelを付加したBEGIN文を発行する、ということになります。

なお、isolation_levelを"EXCLUSIVE"にしたとき、本当にそうなっているのか調べるのに私は、コミットの直前に、

import time
time.sleep(60)

という文を追加し、排他ロックのまま処理が止まっている間にコンソールから、

$ sqlite3 devdata.sqlite
SQLite version 3.3.5
Enter ".help" for instructions
sqlite> select * from tg_user;
SQL error: database is locked

と実行して確認しました。

所感

なんだか裏技のようなことをやっていますが、_setIsolationLevelメソッドが用意されているところからすると、将来のSQLObjectでは隔離レベルの設定ができるようになるかもしれません。