Changeset 811
- Timestamp:
- 03/08/07 19:40:42 (2 years ago)
- Files:
-
- pg8000/trunk/pg8000.py (modified) (11 diffs)
- pg8000/trunk/pg8000-test.py (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
pg8000/trunk/pg8000.py
r810 r811 71 71 self.connection = connection 72 72 if self.connection.iterate_dicts: 73 self.method = Cursor.read_dict73 self.method = PreparedStatement.read_dict 74 74 else: 75 self.method = Cursor.read_tuple75 self.method = PreparedStatement.read_tuple 76 76 77 77 def __iter__(self): … … 84 84 return retval 85 85 86 class Cursor(object):86 class PreparedStatement(object): 87 87 ## 88 88 # A configuration variable that determines whether iterating over the … … 96 96 row_cache_size = 100 97 97 98 def __init__(self, connection, name = None): 99 self.iterate_dicts = connection.iterate_dicts 98 def __init__(self, connection, statement, *types): 100 99 self.c = connection.c 101 if name == None: 102 name = "pg8000_%s_%s" % (id(self.c), id(self)) 103 self.name = name 100 self._portal_name = "pg8000_portal_%s_%s" % (id(self.c), id(self)) 101 self._statement_name = "pg8000_statement_%s_%s" % (id(self.c), id(self)) 104 102 self._row_desc = None 105 103 self._cached_rows = [] 106 104 self._command_complete = True 107 108 def execute(self, query, *args): 109 self._cached_rows = [] 105 self._parse_row_desc = self.c.parse(self._statement_name, statement, types) 106 107 def execute(self, *args): 108 if not self._command_complete: 109 # cleanup last execute 110 self._cached_rows = [] 111 self.c.close(self._portal_name) 110 112 self._command_complete = False 111 self._row_desc = self.c.extended_query(self.name, '', query, args) 112 if self._row_desc == None: 113 self._command_complete = True 113 self._row_desc = self.c.bind(self._portal_name, self._statement_name, args, self._parse_row_desc) 114 114 115 115 def _fetch(self): … … 117 117 if self._command_complete: 118 118 return None 119 end_of_data, rows = self.c.fetch_rows(self. name, self.row_cache_size, self._row_desc)119 end_of_data, rows = self.c.fetch_rows(self._portal_name, self.row_cache_size, self._row_desc) 120 120 self._cached_rows = rows 121 121 if end_of_data: … … 165 165 def __iter__(self): 166 166 return DataIterator(self) 167 168 169 class Cursor(object): 170 def __init__(self, connection): 171 self.connection = connection 172 self._stmt = None 173 174 def execute(self, query, *args): 175 self._stmt = PreparedStatement(self.connection, query, *[type(x) for x in args]) 176 self._stmt.execute(*args) 177 178 def read_dict(self): 179 if self._stmt == None: 180 return None 181 return self._stmt.read_dict() 182 183 def read_tuple(self): 184 if self._stmt == None: 185 return None 186 return self._stmt.read_tuple() 187 167 188 168 189 ## … … 218 239 except socket.error, e: 219 240 raise InterfaceError("communication error", e) 220 Cursor.__init__(self, self , name='')241 Cursor.__init__(self, self) 221 242 222 243 def begin(self): … … 626 647 raise InternalError("StartupMessage was responded to with non-AuthenticationRequest msg %r" % msg) 627 648 628 def extended_query(self, portal, statement, qs, params):649 def parse(self, statement, qs, types): 629 650 self.verifyState("ready") 630 type_info = [Types.pg_type_info( type(x)) for x in params]651 type_info = [Types.pg_type_info(x) for x in types] 631 652 param_types, param_fc = [x[0] for x in type_info], [x[1] for x in type_info] # zip(*type_info) -- fails on empty arr 632 653 self._send(Protocol.Parse(statement, qs, param_types)) … … 644 665 pass 645 666 elif isinstance(msg, Protocol.NoData): 667 # We're not waiting for a row description. Return 668 # something destinctive to let bind know that there is no 669 # output. 670 return (None, param_fc) 671 elif isinstance(msg, Protocol.RowDescription): 672 return (msg, param_fc) 673 elif isinstance(msg, Protocol.ErrorResponse): 674 raise msg.createException() 675 else: 676 raise InternalError("Unexpected response msg %r" % (msg)) 677 678 def bind(self, portal, statement, params, parse_data): 679 self.verifyState("ready") 680 row_desc, param_fc = parse_data 681 if row_desc == None: 682 # no data coming out 683 output_fc = () 684 else: 685 # We've got row_desc that allows us to identify what we're going to 686 # get back from this statement. 687 output_fc = [Types.py_type_info(f) for f in row_desc.fields] 688 self._send(Protocol.Bind(portal, statement, param_fc, params, output_fc, self._client_encoding)) 689 # I don't know why we need to send DescribePortal again, but without it, 690 # we don't receive our BindComplete. It's like Flush fails to work. 691 self._send(Protocol.DescribePortal(portal)) 692 self._send(Protocol.Flush()) 693 while 1: 694 msg = self._read_message() 695 if isinstance(msg, Protocol.BindComplete): 696 # good news everybody! 697 pass 698 elif isinstance(msg, Protocol.NoData): 646 699 # No data means we should execute this command right away. 647 700 self._send(Protocol.Execute(portal, 0)) … … 654 707 elif isinstance(msg, Protocol.ReadyForQuery): 655 708 # ready to move on with life... 656 return None709 break 657 710 elif isinstance(msg, Protocol.ErrorResponse): 658 711 raise msg.createException() 659 712 else: 660 713 raise InternalError("unexpected response") 661 elif isinstance(msg, Protocol.RowDescription): 662 row_desc = msg 663 break 664 elif isinstance(msg, Protocol.ErrorResponse): 665 raise msg.createException() 666 else: 667 raise InternalError("Unexpected response msg %r" % (msg)) 668 669 # We've got row_desc that allows us to identify what we're going to 670 # get back from this statement. Now we can Bind values. 671 output_fc = [Types.py_type_info(f) for f in row_desc.fields] 672 self._send(Protocol.Bind(portal, statement, param_fc, params, output_fc, self._client_encoding)) 673 # I don't know why we need to send DescribePortal again, but without it, 674 # we don't receive our BindComplete. It's like Flush fails to work. 675 self._send(Protocol.DescribePortal(portal)) 676 self._send(Protocol.Flush()) 677 while 1: 678 msg = self._read_message() 679 if isinstance(msg, Protocol.BindComplete): 680 # good news everybody! 681 pass 714 return None 682 715 elif isinstance(msg, Protocol.RowDescription): 683 716 # Return the new row desc, since it will have the format … … 688 721 else: 689 722 raise InternalError("Unexpected response msg %r" % (msg)) 690 691 692 723 693 724 def fetch_rows(self, portal, row_count, row_desc): … … 731 762 return end_of_data, rows 732 763 764 def close(self, portal): 765 self._send(Protocol.ClosePortal(portal)) 766 self._send(Protocol.Sync()) 767 while 1: 768 msg = self._read_message() 769 if isinstance(msg, Protocol.CloseComplete): 770 # thanks! 771 pass 772 elif isinstance(msg, Protocol.ReadyForQuery): 773 return 774 elif isinstance(msg, Protocol.ErrorResponse): 775 raise msg.createException() 776 else: 777 raise InternalError("Unexpected response msg %r" % msg) 778 733 779 def query(self, qs): 734 780 self.verifyState("ready") pg8000/trunk/pg8000-test.py
r810 r811 10 10 db.iterate_dicts = True 11 11 12 #s1 = pg8000.PreparedStatement(db, "INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", int, int, str) 13 s1 = pg8000.PreparedStatement(db, "SELECT * FROM t1 WHERE f1 = $1", int) 14 s1.execute(5) 15 for row in s1: 16 print repr(row) 17 s1.execute(2) 18 for row in s1: 19 print repr(row) 20 21 22 import sys 23 sys.exit(0) 24 12 25 cur1 = pg8000.Cursor(db) 13 26 14 #cur1.execute("DROP TABLE t1")15 #cur1.execute("CREATE TABLE t1 (f1 int primary key, f2 int not null, f3 varchar(50) not null)")16 #cur1.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 1, 1, "hello")17 #cur1.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 2, 10, u"he\u0173llo")18 #cur1.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 3, 100, "hello")19 #cur1.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 4, 1000, "hello")20 #cur1.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 5, 10000, "hello")27 cur1.execute("DROP TABLE t1") 28 cur1.execute("CREATE TABLE t1 (f1 int primary key, f2 int not null, f3 varchar(50) not null)") 29 cur1.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 1, 1, "hello") 30 cur1.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 2, 10, u"he\u0173llo") 31 cur1.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 3, 100, "hello") 32 cur1.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 4, 1000, "hello") 33 cur1.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 5, 10000, "hello") 21 34 22 #print "begin query..."23 #cur1.execute("SELECT * FROM t1")24 #i = 025 #for row1 in cur1:26 #i = i + 127 #print i, repr(row1)28 #db.execute("SELECT * FROM t1 WHERE f1 > $1", row1['f1'])29 #for row2 in db:30 #print "\t", repr(row2)31 #print "end query..."35 print "begin query..." 36 cur1.execute("SELECT * FROM t1") 37 i = 0 38 for row1 in cur1: 39 i = i + 1 40 print i, repr(row1) 41 db.execute("SELECT * FROM t1 WHERE f1 > $1", row1['f1']) 42 for row2 in db: 43 print "\t", repr(row2) 44 print "end query..." 32 45 33 46 print "Beginning type checks..."
