Changeset 821

Show
Ignore:
Timestamp:
03/09/07 19:53:40 (1 year ago)
Author:
mfenniak
Message:

Add thread safety, unix sockets.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • pg8000/trunk/pg8000.py

    r816 r821  
    3535import md5 
    3636import decimal 
     37import threading 
    3738 
    3839class Warning(StandardError): 
     
    8081            raise StopIteration() 
    8182        return retval 
     83 
     84class DBAPI(object): 
     85    Warning = Warning 
     86    Error = Error 
     87    InterfaceError = InterfaceError 
     88    DatabaseError = DatabaseError 
     89    DataError = DataError 
     90    OperationalError = OperationalError 
     91    IntegrityError = IntegrityError 
     92    ProgrammingError = ProgrammingError 
     93    NotSupportedError = NotSupportedError 
     94 
     95    class ConnectionWrapper(object): 
     96        pass 
     97 
     98    def connect(): 
     99        return ConnectionWrapper() 
     100 
    82101 
    83102## 
     
    87106# etc.  When parameters are used, the types of the parameters need to be 
    88107# specified when creating the prepared statement. 
     108# <p> 
     109# As of v1.01, instances of this class are thread-safe.  This means that a 
     110# single PreparedStatement can be accessed by multiple threads without the 
     111# internal consistency of the statement being altered.  However, the 
     112# responsibility is on the client application to ensure that one thread reading 
     113# from a statement isn't affected by another thread starting a new query with 
     114# the same statement. 
    89115# <p> 
    90116# Stability: Added in v1.00, stability guaranteed for v1.xx. 
     
    119145        self._command_complete = True 
    120146        self._parse_row_desc = self.c.parse(self._statement_name, statement, types) 
     147        self._lock = threading.RLock() 
    121148 
    122149    def __del__(self): 
     
    132159    # Stability: Added in v1.00, stability guaranteed for v1.xx. 
    133160    def execute(self, *args): 
    134         if not self._command_complete: 
    135             # cleanup last execute 
    136             self._cached_rows = [] 
    137             self.c.close_portal(self._portal_name) 
    138         self._command_complete = False 
    139         self._row_desc = self.c.bind(self._portal_name, self._statement_name, args, self._parse_row_desc) 
    140         if self._row_desc: 
    141             # We execute our cursor right away to fill up our cache.  This 
    142             # prevents the cursor from being destroyed, apparently, by a rogue 
    143             # Sync between Bind and Execute.  Since it is quite likely that 
    144             # data will be read from us right away anyways, this seems a safe 
    145             # move for now. 
    146             self._fill_cache() 
     161        self._lock.acquire() 
     162        try: 
     163            if not self._command_complete: 
     164                # cleanup last execute 
     165                self._cached_rows = [] 
     166                self.c.close_portal(self._portal_name) 
     167            self._command_complete = False 
     168            self._row_desc = self.c.bind(self._portal_name, self._statement_name, args, self._parse_row_desc) 
     169            if self._row_desc: 
     170                # We execute our cursor right away to fill up our cache.  This 
     171                # prevents the cursor from being destroyed, apparently, by a rogue 
     172                # Sync between Bind and Execute.  Since it is quite likely that 
     173                # data will be read from us right away anyways, this seems a safe 
     174                # move for now. 
     175                self._fill_cache() 
     176        finally: 
     177            self._lock.release() 
    147178 
    148179    def _fill_cache(self): 
    149         if self._cached_rows: 
    150             raise InternalError("attempt to fill cache that isn't empty") 
    151         end_of_data, rows = self.c.fetch_rows(self._portal_name, self.row_cache_size, self._row_desc) 
    152         self._cached_rows = rows 
    153         if end_of_data: 
    154             self._command_complete = True 
     180        self._lock.acquire() 
     181        try: 
     182            if self._cached_rows: 
     183                raise InternalError("attempt to fill cache that isn't empty") 
     184            end_of_data, rows = self.c.fetch_rows(self._portal_name, self.row_cache_size, self._row_desc) 
     185            self._cached_rows = rows 
     186            if end_of_data: 
     187                self._command_complete = True 
     188        finally: 
     189            self._lock.release() 
    155190 
    156191    def _fetch(self): 
    157         if not self._cached_rows: 
    158             if self._command_complete: 
    159                 return None 
    160             self._fill_cache() 
    161             if self._command_complete and not self._cached_rows: 
    162                 # fill cache tells us the command is complete, but yet we have 
    163                 # no rows after filling our cache.  This is a special case when 
    164                 # a query returns no rows. 
    165                 return None 
    166         row = self._cached_rows[0] 
    167         del self._cached_rows[0] 
    168         return tuple(row) 
     192        self._lock.acquire() 
     193        try: 
     194            if not self._cached_rows: 
     195                if self._command_complete: 
     196                    return None 
     197                self._fill_cache() 
     198                if self._command_complete and not self._cached_rows: 
     199                    # fill cache tells us the command is complete, but yet we have 
     200                    # no rows after filling our cache.  This is a special case when 
     201                    # a query returns no rows. 
     202                    return None 
     203            row = self._cached_rows[0] 
     204            del self._cached_rows[0] 
     205            return tuple(row) 
     206        finally: 
     207            self._lock.release() 
    169208 
    170209    ## 
     
    222261# and save a small amount of reparsing time. 
    223262# <p> 
     263# As of v1.01, instances of this class are thread-safe.  See {@link 
     264# PreparedStatement PreparedStatement} for more information. 
     265# <p> 
    224266# Stability: Added in v1.00, stability guaranteed for v1.xx. 
    225267# 
     
    293335# the database. 
    294336# <p> 
     337# As of v1.01, instances of this class are thread-safe.  See {@link 
     338# PreparedStatement PreparedStatement} for more information. 
     339# <p> 
    295340# Stability: Added in v1.00, stability guaranteed for v1.xx. 
    296341# 
     
    318363# Defaults to 60 seconds. 
    319364class Connection(Cursor): 
    320     def __init__(self, host, user, port=5432, database=None, password=None, socket_timeout=60): 
     365    def __init__(self, user, host=None, unix_sock=None, port=5432, database=None, password=None, socket_timeout=60): 
    321366        self._row_desc = None 
    322367        try: 
    323             self.c = Protocol.Connection(host, port, socket_timeout=socket_timeout) 
    324             self.c.connect() 
     368            self.c = Protocol.Connection(unix_sock=unix_sock, host=host, port=port, socket_timeout=socket_timeout) 
     369            #self.c.connect() 
    325370            self.c.authenticate(user, password=password, database=database) 
    326371        except socket.error, e: 
     
    692737 
    693738    class Connection(object): 
    694         def __init__(self, host=None, port=5432, socket_timeout=60): 
    695             self._state = "unconnected" 
     739        def __init__(self, unix_sock=None, host=None, port=5432, socket_timeout=60): 
    696740            self._client_encoding = "ascii" 
    697             self._host = host 
    698             self._port = port 
    699             self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     741            if unix_sock == None and host != None: 
     742                self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     743            elif unix_sock != None: 
     744                self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 
     745            else: 
     746                raise ProgrammingError("one of host or unix_sock must be provided") 
    700747            self._sock.settimeout(socket_timeout) 
     748            if unix_sock == None and host != None: 
     749                self._sock.connect((host, port)) 
     750            elif unix_sock != None: 
     751                self._sock.connect(unix_sock) 
     752            self._state = "noauth" 
    701753            self._backend_key_data = None 
     754            self._sock_lock = threading.Lock() 
    702755 
    703756        def verifyState(self, state): 
     
    711764 
    712765        def _read_message(self): 
    713             bytes = self._sock.recv(5) 
    714             assert len(bytes) == 5 
     766            bytes = "" 
     767            while len(bytes) < 5: 
     768                tmp = self._sock.recv(5 - len(bytes)) 
     769                bytes += tmp 
     770            if len(bytes) != 5: 
     771                raise InternalError("unable to read 5 bytes from socket %r" % bytes) 
    715772            message_code = bytes[0] 
    716773            data_len = struct.unpack("!i", bytes[1:])[0] - 4 
     
    718775                bytes = "" 
    719776            else: 
    720                 bytes = self._sock.recv(data_len) 
     777                bytes = "" 
     778                while len(bytes) < data_len: 
     779                    tmp = self._sock.recv(data_len - len(bytes)) 
     780                    bytes += tmp 
     781            assert len(bytes) == data_len 
    721782            msg = Protocol.message_types[message_code].createFromData(bytes) 
    722783            if isinstance(msg, Protocol.NoticeResponse): 
     
    725786            else: 
    726787                return msg 
    727  
    728         def connect(self): 
    729             self.verifyState("unconnected") 
    730             self._sock.connect((self._host, self._port)) 
    731             self._state = "noauth" 
    732788 
    733789        def authenticate(self, user, **kwargs): 
     
    760816        def parse(self, statement, qs, types): 
    761817            self.verifyState("ready") 
    762             type_info = [Types.pg_type_info(x) for x in types] 
    763             param_types, param_fc = [x[0] for x in type_info], [x[1] for x in type_info] # zip(*type_info) -- fails on empty arr 
    764             self._send(Protocol.Parse(statement, qs, param_types)) 
    765             self._send(Protocol.DescribePreparedStatement(statement)) 
    766             self._send(Protocol.Flush()) 
    767             while 1: 
    768                 msg = self._read_message() 
    769                 if isinstance(msg, Protocol.ParseComplete): 
    770                     # ok, good. 
    771                     pass 
    772                 elif isinstance(msg, Protocol.ParameterDescription): 
    773                     # well, we don't really care -- we're going to send whatever 
    774                     # we want and let the database deal with it.  But thanks 
    775                     # anyways! 
    776                     pass 
    777                 elif isinstance(msg, Protocol.NoData): 
    778                     # We're not waiting for a row description.  Return 
    779                     # something destinctive to let bind know that there is no 
    780                     # output. 
    781                     return (None, param_fc) 
    782                 elif isinstance(msg, Protocol.RowDescription): 
    783                     return (msg, param_fc) 
    784                 elif isinstance(msg, Protocol.ErrorResponse): 
    785                     raise msg.createException() 
    786                 else: 
    787                     raise InternalError("Unexpected response msg %r" % (msg)) 
     818            self._sock_lock.acquire() 
     819            try: 
     820                type_info = [Types.pg_type_info(x) for x in types] 
     821                param_types, param_fc = [x[0] for x in type_info], [x[1] for x in type_info] # zip(*type_info) -- fails on empty arr 
     822                self._send(Protocol.Parse(statement, qs, param_types)) 
     823                self._send(Protocol.DescribePreparedStatement(statement)) 
     824                self._send(Protocol.Flush()) 
     825                while 1: 
     826                    msg = self._read_message() 
     827                    if isinstance(msg, Protocol.ParseComplete): 
     828                        # ok, good. 
     829                        pass 
     830                    elif isinstance(msg, Protocol.ParameterDescription): 
     831                        # well, we don't really care -- we're going to send whatever 
     832                        # we want and let the database deal with it.  But thanks 
     833                        # anyways! 
     834                        pass 
     835                    elif isinstance(msg, Protocol.NoData): 
     836                        # We're not waiting for a row description.  Return 
     837                        # something destinctive to let bind know that there is no 
     838                        # output. 
     839                        return (None, param_fc) 
     840                    elif isinstance(msg, Protocol.RowDescription): 
     841                        return (msg, param_fc) 
     842                    elif isinstance(msg, Protocol.ErrorResponse): 
     843                        raise msg.createException() 
     844                    else: 
     845                        raise InternalError("Unexpected response msg %r" % (msg)) 
     846            finally: 
     847                self._sock_lock.release() 
    788848 
    789849        def bind(self, portal, statement, params, parse_data): 
    790850            self.verifyState("ready") 
    791             row_desc, param_fc = parse_data 
    792             if row_desc == None: 
    793                 # no data coming out 
    794                 output_fc = () 
    795             else: 
    796                 # We've got row_desc that allows us to identify what we're going to 
    797                 # get back from this statement. 
    798                 output_fc = [Types.py_type_info(f) for f in row_desc.fields] 
    799             self._send(Protocol.Bind(portal, statement, param_fc, params, output_fc, self._client_encoding)) 
    800             # We need to describe the portal after bind, since the return 
    801             # format codes will be different (hopefully, always what we 
    802             # requested). 
    803             self._send(Protocol.DescribePortal(portal)) 
    804             self._send(Protocol.Flush()) 
    805             while 1: 
    806                 msg = self._read_message() 
    807                 if isinstance(msg, Protocol.BindComplete): 
    808                     # good news everybody! 
    809                     pass 
    810                 elif isinstance(msg, Protocol.NoData): 
    811                     # No data means we should execute this command right away. 
    812                     self._send(Protocol.Execute(portal, 0)) 
    813                     self._send(Protocol.Sync()) 
    814                     while 1: 
    815                         msg = self._read_message() 
    816                         if isinstance(msg, Protocol.CommandComplete): 
    817                             # more good news! 
    818                             pass 
    819                         elif isinstance(msg, Protocol.ReadyForQuery): 
    820                             # ready to move on with life... 
    821                             break 
    822                         elif isinstance(msg, Protocol.ErrorResponse): 
    823                             raise msg.createException() 
    824                         else: 
    825                             raise InternalError("unexpected response") 
    826                     return None 
    827                 elif isinstance(msg, Protocol.RowDescription): 
    828                     # Return the new row desc, since it will have the format 
    829                     # types we asked for 
    830                     return msg 
    831                 elif isinstance(msg, Protocol.ErrorResponse): 
    832                     raise msg.createException() 
     851            self._sock_lock.acquire() 
     852            try: 
     853                row_desc, param_fc = parse_data 
     854                if row_desc == None: 
     855                    # no data coming out 
     856                    output_fc = () 
    833857                else: 
    834                     raise InternalError("Unexpected response msg %r" % (msg)) 
     858                    # We've got row_desc that allows us to identify what we're going to 
     859                    # get back from this statement. 
     860                    output_fc = [Types.py_type_info(f) for f in row_desc.fields] 
     861                self._send(Protocol.Bind(portal, statement, param_fc, params, output_fc, self._client_encoding)) 
     862                # We need to describe the portal after bind, since the return 
     863                # format codes will be different (hopefully, always what we 
     864                # requested). 
     865                self._send(Protocol.DescribePortal(portal)) 
     866                self._send(Protocol.Flush()) 
     867                while 1: 
     868                    msg = self._read_message() 
     869                    if isinstance(msg, Protocol.BindComplete): 
     870                        # good news everybody! 
     871                        pass 
     872                    elif isinstance(msg, Protocol.NoData): 
     873                        # No data means we should execute this command right away. 
     874                        self._send(Protocol.Execute(portal, 0)) 
     875                        self._send(Protocol.Sync()) 
     876                        while 1: 
     877                            msg = self._read_message() 
     878                            if isinstance(msg, Protocol.CommandComplete): 
     879                                # more good news! 
     880                                pass 
     881                            elif isinstance(msg, Protocol.ReadyForQuery): 
     882                                # ready to move on with life... 
     883                                break 
     884                            elif isinstance(msg, Protocol.ErrorResponse): 
     885                                raise msg.createException() 
     886                            else: 
     887                                raise InternalError("unexpected response") 
     888                        return None 
     889                    elif isinstance(msg, Protocol.RowDescription): 
     890                        # Return the new row desc, since it will have the format 
     891                        # types we asked for 
     892                        return msg 
     893                    elif isinstance(msg, Protocol.ErrorResponse): 
     894                        raise msg.createException() 
     895                    else: 
     896                        raise InternalError("Unexpected response msg %r" % (msg)) 
     897            finally: 
     898                self._sock_lock.release() 
    835899 
    836900        def fetch_rows(self, portal, row_count, row_desc): 
    837901            self.verifyState("ready") 
    838             self._send(Protocol.Execute(portal, row_count)) 
    839             self._send(Protocol.Flush()) 
    840             rows = [] 
    841             end_of_data = False 
    842             while 1: 
    843                 msg = self._read_message() 
    844                 if isinstance(msg, Protocol.DataRow): 
    845                     rows.append( 
    846                             [Types.py_value(msg.fields[i], row_desc.fields[i], client_encoding=self._client_encoding) 
    847                                 for i in range(len(msg.fields))] 
    848                             ) 
    849                 elif isinstance(msg, Protocol.PortalSuspended): 
    850                     # got all the rows we asked for, but not all that exist 
    851                     break 
    852                 elif isinstance(msg, Protocol.CommandComplete): 
    853                     self._send(Protocol.ClosePortal(portal)) 
    854                     self._send(Protocol.Sync()) 
    855                     while 1: 
    856                         msg = self._read_message() 
    857                         if isinstance(msg, Protocol.ReadyForQuery): 
    858                             # ready to move on with life... 
    859                             self._state = "ready" 
    860                             break 
    861                         elif isinstance(msg, Protocol.CloseComplete): 
    862                             # ok, great! 
    863                             pass 
    864                         elif isinstance(msg, Protocol.ErrorResponse): 
    865                             raise msg.createException() 
    866                         else: 
    867                             raise InternalError("unexpected response msg %r" % msg) 
    868                     end_of_data = True 
    869                     break 
    870                 elif isinstance(msg, Protocol.ErrorResponse): 
    871                     raise msg.createException() 
    872                 else: 
    873                     raise InternalError("Unexpected response msg %r" % msg) 
    874             return end_of_data, rows 
     902            self._sock_lock.acquire() 
     903            try: 
     904                self._send(Protocol.Execute(portal, row_count)) 
     905                self._send(Protocol.Flush()) 
     906                rows = [] 
     907                end_of_data = False 
     908                while 1: 
     909                    msg = self._read_message() 
     910                    if isinstance(msg, Protocol.DataRow): 
     911                        rows.append( 
     912                                [Types.py_value(msg.fields[i], row_desc.fields[i], client_encoding=self._client_encoding) 
     913                                    for i in range(len(msg.fields))] 
     914                                ) 
     915                    elif isinstance(msg, Protocol.PortalSuspended): 
     916                        # got all the rows we asked for, but not all that exist 
     917                        break 
     918                    elif isinstance(msg, Protocol.CommandComplete): 
     919                        self._send(Protocol.ClosePortal(portal)) 
     920                        self._send(Protocol.Sync()) 
     921                        while 1: 
     922                            msg = self._read_message() 
     923                            if isinstance(msg, Protocol.ReadyForQuery): 
     924                                # ready to move on with life... 
     925                                self._state = "ready" 
     926                                break 
     927                            elif isinstance(msg, Protocol.CloseComplete): 
     928                                # ok, great! 
     929                                pass 
     930                            elif isinstance(msg, Protocol.ErrorResponse): 
     931                                raise msg.createException() 
     932                            else: 
     933                                raise InternalError("unexpected response msg %r" % msg) 
     934                        end_of_data = True 
     935                        break 
     936                    elif isinstance(msg, Protocol.ErrorResponse): 
     937                        raise msg.createException() 
     938                    else: 
     939                        raise InternalError("Unexpected response msg %r" % msg) 
     940                return end_of_data, rows 
     941            finally: 
     942                self._sock_lock.release() 
    875943 
    876944        def close_statement(self, statement): 
    877             self._send(Protocol.ClosePreparedStatement(statement)) 
    878             self._send(Protocol.Sync()) 
    879             while 1: 
    880                 msg = self._read_message() 
    881                 if isinstance(msg, Protocol.CloseComplete): 
    882                     # thanks! 
    883                     pass 
    884                 elif isinstance(msg, Protocol.ReadyForQuery): 
    885                     return 
    886                 elif isinstance(msg, Protocol.ErrorResponse): 
    887                     raise msg.createException() 
    888                 else: 
    889                     raise InternalError("Unexpected response msg %r" % msg) 
     945            self.verifyState("ready") 
     946            self._sock_lock.acquire() 
     947            try: 
     948                self._send(Protocol.ClosePreparedStatement(statement)) 
     949                self._send(Protocol.Sync()) 
     950                while 1: 
     951                    msg = self._read_message() 
     952                    if isinstance(msg, Protocol.CloseComplete): 
     953                        # thanks! 
     954                        pass 
     955                    elif isinstance(msg, Protocol.ReadyForQuery): 
     956                        return 
     957                    elif isinstance(msg, Protocol.ErrorResponse): 
     958                        raise msg.createException() 
     959                    else: 
     960                        raise InternalError("Unexpected response msg %r" % msg) 
     961            finally: 
     962                self._sock_lock.release() 
    890963 
    891964        def close_portal(self, portal): 
    892             self._send(Protocol.ClosePortal(portal)) 
    893             self._send(Protocol.Sync()) 
    894             while 1: 
    895                 msg = self._read_message() 
    896                 if isinstance(msg, Protocol.CloseComplete): 
    897                     # thanks! 
    898                     pass 
    899                 elif isinstance(msg, Protocol.ReadyForQuery): 
    900                     return 
    901                 elif isinstance(msg, Protocol.ErrorResponse): 
    902                     raise msg.createException() 
    903                 else: 
    904                     raise InternalError("Unexpected response msg %r" % msg) 
    905  
    906         def query(self, qs): 
    907965            self.verifyState("ready") 
    908             self._send(Protocol.Query(qs)) 
    909             msg = self._read_message() 
    910             if isinstance(msg, Protocol.RowDescription): 
    911                 self._state = "in_query" 
    912                 return msg 
    913             elif isinstance(msg, Protocol.ErrorResponse): 
    914                 raise msg.createException() 
    915             else: 
    916                 raise InternalError("RowDescription expected, other message recv'd") 
    917  
    918         def getrow(self): 
    919             self.verifyState("in_query") 
    920             msg = self._read_message() 
    921             if isinstance(msg, Protocol.DataRow): 
    922                 return msg 
    923             elif isinstance(msg, Protocol.CommandComplete): 
    924                 self.status = "query_complete" 
    925                 self._waitForReady() 
    926                 return None 
     966            self._sock_lock.acquire() 
     967            try: 
     968                self._send(Protocol.ClosePortal(portal)) 
     969                self._send(Protocol.Sync()) 
     970                while 1: 
     971                    msg = self._read_message() 
     972                    if isinstance(msg, Protocol.CloseComplete): 
     973                        # thanks! 
     974                        pass 
     975                    elif isinstance(msg, Protocol.ReadyForQuery): 
     976                        return 
     977                    elif isinstance(msg, Protocol.ErrorResponse): 
     978                        raise msg.createException() 
     979                    else: 
     980                        raise InternalError("Unexpected response msg %r" % msg) 
     981            finally: 
     982                self._sock_lock.release() 
    927983 
    928984    message_types = { 
  • pg8000/trunk/pg8000-test.py

    r816 r821  
    33import datetime 
    44import decimal 
     5import threading 
    56 
    67import pg8000 
    78 
    89#db = pg8000.Connection(host='joy', user='Mathieu Fenniak', database="software", password="hello", socket_timeout=5) 
    9 db = pg8000.Connection(host='localhost', user='mfenniak') 
     10#db = pg8000.Connection(host='localhost', user='mfenniak') 
     11db = pg8000.Connection(user="mfenniak", unix_sock="/tmp/.s.PGSQL.5432") 
    1012 
    1113db.begin() 
     
    1416db.execute("CREATE TABLE t1 (f1 int primary key, f2 int not null, f3 varchar(50) null)") 
    1517 
     18# Not the most efficient way to do this.  Multiple DB connections would 
     19# multiplex this insert and make it faster -- we're just testing for thread 
     20# safety here.  Testing with much higher values of left/right allows 
     21# multithread locking to be obvious. 
    1622s1 = pg8000.PreparedStatement(db, "INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", int, int, str) 
    17 s1.execute(1, 1, "hello") 
    18 s1.execute(2, 10, "he\u0173llo") 
    19 s1.execute(3, 100, "hello") 
    20 s1.execute(4, 1000, None) 
    21 s1.execute(5, 10000, "hello") 
    22 s1.execute(6, 100000, "hello") 
     23def test(left, right): 
     24    for i in range(left, right): 
     25        s1.execute(i, id(threading.currentThread()), "test - unicode \u0173 ") 
     26t1 = threading.Thread(target=test, args=(1, 10)) 
     27t2 = threading.Thread(target=test, args=(10, 20)) 
     28t3 = threading.Thread(target=test, args=(20, 30)) 
     29t4 = threading.Thread(target=test, args=(30, 40)) 
     30t1.start() ; t2.start() ; t3.start() ; t4.start() 
     31t1.join(); t2.join(); t3.join(); t4.join() 
     32 
     33db.commit() 
     34 
    2335 
    2436print "begin query..." 
     
    2840print "end query..." 
    2941 
    30 print "begin query..." 
     42#print "begin query..." 
    3143cur1 = pg8000.Cursor(db) 
    32 cur1.execute("SELECT * FROM t1") 
    33 s1 = pg8000.PreparedStatement(db, "SELECT * FROM t1 WHERE f1 > $1", int) 
    34 i = 0 
    35 for row1 in cur1.iterate_dict(): 
    36     i = i + 1 
    37     print i, repr(row1) 
    38     s1.execute(row1['f1']) 
    39     for row2 in s1.iterate_dict(): 
    40         print "\t", repr(row2) 
    41 print "end query..." 
     44#cur1.execute("SELECT * FROM t1") 
     45#s1 = pg8000.PreparedStatement(db, "SELECT * FROM t1 WHERE f1 > $1", int) 
     46#i = 0 
     47#for row1 in cur1.iterate_dict(): 
     48#    i = i + 1 
     49#    print i, repr(row1) 
     50#    s1.execute(row1['f1']) 
     51#    for row2 in s1.iterate_dict(): 
     52#        print "\t", repr(row2) 
     53#print "end query..." 
    4254 
    4355print "Beginning type checks..." 
     
    88100print "Type checks complete." 
    89101 
    90 db.commit() 
    91