Changeset 811

Show
Ignore:
Timestamp:
03/08/07 19:40:42 (2 years ago)
Author:
mfenniak
Message:

Begin adding support for prepared statements.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • pg8000/trunk/pg8000.py

    r810 r811  
    7171        self.connection = connection 
    7272        if self.connection.iterate_dicts: 
    73             self.method = Cursor.read_dict 
     73            self.method = PreparedStatement.read_dict 
    7474        else: 
    75             self.method = Cursor.read_tuple 
     75            self.method = PreparedStatement.read_tuple 
    7676 
    7777    def __iter__(self): 
     
    8484        return retval 
    8585 
    86 class Cursor(object): 
     86class PreparedStatement(object): 
    8787    ## 
    8888    # A configuration variable that determines whether iterating over the 
     
    9696    row_cache_size = 100 
    9797 
    98     def __init__(self, connection, name = None): 
    99         self.iterate_dicts = connection.iterate_dicts 
     98    def __init__(self, connection, statement, *types): 
    10099        self.c = connection.c 
    101         if name == None: 
    102             name = "pg8000_%s_%s" % (id(self.c), id(self)) 
    103         self.name = name 
     100        self._portal_name = "pg8000_portal_%s_%s" % (id(self.c), id(self)) 
     101        self._statement_name = "pg8000_statement_%s_%s" % (id(self.c), id(self)) 
    104102        self._row_desc = None 
    105103        self._cached_rows = [] 
    106104        self._command_complete = True 
    107  
    108     def execute(self, query, *args): 
    109         self._cached_rows = [] 
     105        self._parse_row_desc = self.c.parse(self._statement_name, statement, types) 
     106 
     107    def execute(self, *args): 
     108        if not self._command_complete: 
     109            # cleanup last execute 
     110            self._cached_rows = [] 
     111            self.c.close(self._portal_name) 
    110112        self._command_complete = False 
    111         self._row_desc = self.c.extended_query(self.name, '', query, args) 
    112         if self._row_desc == None: 
    113             self._command_complete = True 
     113        self._row_desc = self.c.bind(self._portal_name, self._statement_name, args, self._parse_row_desc) 
    114114 
    115115    def _fetch(self): 
     
    117117            if self._command_complete: 
    118118                return None 
    119             end_of_data, rows = self.c.fetch_rows(self.name, self.row_cache_size, self._row_desc) 
     119            end_of_data, rows = self.c.fetch_rows(self._portal_name, self.row_cache_size, self._row_desc) 
    120120            self._cached_rows = rows 
    121121            if end_of_data: 
     
    165165    def __iter__(self): 
    166166        return DataIterator(self) 
     167 
     168 
     169class Cursor(object): 
     170    def __init__(self, connection): 
     171        self.connection = connection 
     172        self._stmt = None 
     173 
     174    def execute(self, query, *args): 
     175        self._stmt = PreparedStatement(self.connection, query, *[type(x) for x in args]) 
     176        self._stmt.execute(*args) 
     177 
     178    def read_dict(self): 
     179        if self._stmt == None: 
     180            return None 
     181        return self._stmt.read_dict() 
     182 
     183    def read_tuple(self): 
     184        if self._stmt == None: 
     185            return None 
     186        return self._stmt.read_tuple() 
     187 
    167188 
    168189## 
     
    218239        except socket.error, e: 
    219240            raise InterfaceError("communication error", e) 
    220         Cursor.__init__(self, self, name=''
     241        Cursor.__init__(self, self
    221242 
    222243    def begin(self): 
     
    626647                raise InternalError("StartupMessage was responded to with non-AuthenticationRequest msg %r" % msg) 
    627648 
    628         def extended_query(self, portal, statement, qs, params): 
     649        def parse(self, statement, qs, types): 
    629650            self.verifyState("ready") 
    630             type_info = [Types.pg_type_info(type(x)) for x in params] 
     651            type_info = [Types.pg_type_info(x) for x in types] 
    631652            param_types, param_fc = [x[0] for x in type_info], [x[1] for x in type_info] # zip(*type_info) -- fails on empty arr 
    632653            self._send(Protocol.Parse(statement, qs, param_types)) 
     
    644665                    pass 
    645666                elif isinstance(msg, Protocol.NoData): 
     667                    # We're not waiting for a row description.  Return 
     668                    # something destinctive to let bind know that there is no 
     669                    # output. 
     670                    return (None, param_fc) 
     671                elif isinstance(msg, Protocol.RowDescription): 
     672                    return (msg, param_fc) 
     673                elif isinstance(msg, Protocol.ErrorResponse): 
     674                    raise msg.createException() 
     675                else: 
     676                    raise InternalError("Unexpected response msg %r" % (msg)) 
     677 
     678        def bind(self, portal, statement, params, parse_data): 
     679            self.verifyState("ready") 
     680            row_desc, param_fc = parse_data 
     681            if row_desc == None: 
     682                # no data coming out 
     683                output_fc = () 
     684            else: 
     685                # We've got row_desc that allows us to identify what we're going to 
     686                # get back from this statement. 
     687                output_fc = [Types.py_type_info(f) for f in row_desc.fields] 
     688            self._send(Protocol.Bind(portal, statement, param_fc, params, output_fc, self._client_encoding)) 
     689            # I don't know why we need to send DescribePortal again, but without it, 
     690            # we don't receive our BindComplete.  It's like Flush fails to work. 
     691            self._send(Protocol.DescribePortal(portal)) 
     692            self._send(Protocol.Flush()) 
     693            while 1: 
     694                msg = self._read_message() 
     695                if isinstance(msg, Protocol.BindComplete): 
     696                    # good news everybody! 
     697                    pass 
     698                elif isinstance(msg, Protocol.NoData): 
    646699                    # No data means we should execute this command right away. 
    647700                    self._send(Protocol.Execute(portal, 0)) 
     
    654707                        elif isinstance(msg, Protocol.ReadyForQuery): 
    655708                            # ready to move on with life... 
    656                             return None 
     709                            break 
    657710                        elif isinstance(msg, Protocol.ErrorResponse): 
    658711                            raise msg.createException() 
    659712                        else: 
    660713                            raise InternalError("unexpected response") 
    661                 elif isinstance(msg, Protocol.RowDescription): 
    662                     row_desc = msg 
    663                     break 
    664                 elif isinstance(msg, Protocol.ErrorResponse): 
    665                     raise msg.createException() 
    666                 else: 
    667                     raise InternalError("Unexpected response msg %r" % (msg)) 
    668  
    669             # We've got row_desc that allows us to identify what we're going to 
    670             # get back from this statement.  Now we can Bind values. 
    671             output_fc = [Types.py_type_info(f) for f in row_desc.fields] 
    672             self._send(Protocol.Bind(portal, statement, param_fc, params, output_fc, self._client_encoding)) 
    673             # I don't know why we need to send DescribePortal again, but without it, 
    674             # we don't receive our BindComplete.  It's like Flush fails to work. 
    675             self._send(Protocol.DescribePortal(portal)) 
    676             self._send(Protocol.Flush()) 
    677             while 1: 
    678                 msg = self._read_message() 
    679                 if isinstance(msg, Protocol.BindComplete): 
    680                     # good news everybody! 
    681                     pass 
     714                    return None 
    682715                elif isinstance(msg, Protocol.RowDescription): 
    683716                    # Return the new row desc, since it will have the format 
     
    688721                else: 
    689722                    raise InternalError("Unexpected response msg %r" % (msg)) 
    690  
    691  
    692723 
    693724        def fetch_rows(self, portal, row_count, row_desc): 
     
    731762            return end_of_data, rows 
    732763 
     764        def close(self, portal): 
     765            self._send(Protocol.ClosePortal(portal)) 
     766            self._send(Protocol.Sync()) 
     767            while 1: 
     768                msg = self._read_message() 
     769                if isinstance(msg, Protocol.CloseComplete): 
     770                    # thanks! 
     771                    pass 
     772                elif isinstance(msg, Protocol.ReadyForQuery): 
     773                    return 
     774                elif isinstance(msg, Protocol.ErrorResponse): 
     775                    raise msg.createException() 
     776                else: 
     777                    raise InternalError("Unexpected response msg %r" % msg) 
     778 
    733779        def query(self, qs): 
    734780            self.verifyState("ready") 
  • pg8000/trunk/pg8000-test.py

    r810 r811  
    1010db.iterate_dicts = True 
    1111 
     12#s1 = pg8000.PreparedStatement(db, "INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", int, int, str) 
     13s1 = pg8000.PreparedStatement(db, "SELECT * FROM t1 WHERE f1 = $1", int) 
     14s1.execute(5) 
     15for row in s1: 
     16    print repr(row) 
     17s1.execute(2) 
     18for row in s1: 
     19    print repr(row) 
     20 
     21 
     22import sys 
     23sys.exit(0) 
     24 
    1225cur1 = pg8000.Cursor(db) 
    1326 
    14 #cur1.execute("DROP TABLE t1") 
    15 #cur1.execute("CREATE TABLE t1 (f1 int primary key, f2 int not null, f3 varchar(50) not null)") 
    16 #cur1.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 1, 1, "hello") 
    17 #cur1.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 2, 10, u"he\u0173llo") 
    18 #cur1.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 3, 100, "hello") 
    19 #cur1.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 4, 1000, "hello") 
    20 #cur1.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 5, 10000, "hello") 
     27cur1.execute("DROP TABLE t1") 
     28cur1.execute("CREATE TABLE t1 (f1 int primary key, f2 int not null, f3 varchar(50) not null)") 
     29cur1.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 1, 1, "hello") 
     30cur1.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 2, 10, u"he\u0173llo") 
     31cur1.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 3, 100, "hello") 
     32cur1.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 4, 1000, "hello") 
     33cur1.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 5, 10000, "hello") 
    2134 
    22 #print "begin query..." 
    23 #cur1.execute("SELECT * FROM t1") 
    24 #i = 0 
    25 #for row1 in cur1: 
    26 #    i = i + 1 
    27 #    print i, repr(row1) 
    28 #    db.execute("SELECT * FROM t1 WHERE f1 > $1", row1['f1']) 
    29 #    for row2 in db: 
    30 #        print "\t", repr(row2) 
    31 #print "end query..." 
     35print "begin query..." 
     36cur1.execute("SELECT * FROM t1") 
     37i = 0 
     38for row1 in cur1: 
     39    i = i + 1 
     40    print i, repr(row1) 
     41    db.execute("SELECT * FROM t1 WHERE f1 > $1", row1['f1']) 
     42    for row2 in db: 
     43        print "\t", repr(row2) 
     44print "end query..." 
    3245 
    3346print "Beginning type checks..."