Changeset 799

Show
Ignore:
Timestamp:
03/07/07 19:33:41 (2 years ago)
Author:
mfenniak
Message:

working portal based cursor queries

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • pg8000/trunk/pg8000.py

    r798 r799  
    7070        self.connection = connection 
    7171        if self.connection.iterate_dicts: 
    72             self.method = Connection.read_dict 
     72            self.method = Cursor.read_dict 
    7373        else: 
    74             self.method = Connection.read_tuple 
     74            self.method = Cursor.read_tuple 
    7575 
    7676    def __iter__(self): 
     
    8383        return retval 
    8484 
    85 ## 
    86 # This class represents a connection to a PostgreSQL database. 
    87 # <p> 
    88 # A single PostgreSQL connection can only perform a single query at a time, 
    89 # which is an important restriction to note.  This limitation can be overcome 
    90 # by retrieving all results immediately after a query, but this approach is not 
    91 # taken by this library. 
    92 # <p> 
    93 # Stability: Added in v1.00, stability guaranteed for v1.xx. 
    94 
    95 # @param host   The hostname of the PostgreSQL server to connect with.  Only 
    96 # TCP/IP connections are presently supported, so this parameter is mandatory. 
    97 
    98 # @param user   The username to connect to the PostgreSQL server with.  This 
    99 # parameter is mandatory. 
    100 
    101 # @param port   The TCP/IP port of the PostgreSQL server instance.  This 
    102 # parameter defaults to 5432, the registered and common port of PostgreSQL 
    103 # TCP/IP servers. 
    104 
    105 # @param database   The name of the database instance to connect with.  This 
    106 # parameter is optional, if omitted the PostgreSQL server will assume the 
    107 # database name is the same as the username. 
    108 
    109 # @param password   The user password to connect to the server with.  This 
    110 # parameter is optional.  If omitted, and the database server requests password 
    111 # based authentication, the connection will fail.  On the other hand, if this 
    112 # parameter is provided and the database does not request password 
    113 # authentication, then the password will not be used. 
    114 class Connection(object): 
    115  
     85class Cursor(object): 
    11686    ## 
    11787    # A configuration variable that determines whether iterating over the 
     
    12393    iterate_dicts = False 
    12494 
    125     def __init__(self, host, user, port=5432, database=None, password=None): 
     95    row_cache_size = 100 
     96 
     97    def __init__(self, connection, name = None): 
     98        self.c = connection.c 
     99        if name == None: 
     100            name = "pg8000_%s_%s" % (id(self.c), id(self)) 
     101        self.name = name 
    126102        self._row_desc = None 
    127         try: 
    128             self.c = Protocol.Connection(host, port) 
    129             self.c.connect() 
    130             self.c.authenticate(user, password=password, database=database) 
    131         except socket.error, e: 
    132             raise InterfaceError("communication error", e) 
    133  
    134     def execute(self, command, *args): 
    135         pass 
    136  
    137     def query(self, query, *args): 
    138         self._row_desc = self.c.extended_query('', '', query, args) 
    139         #self._row_desc = self.c.query(query) 
     103        self._cached_rows = [] 
     104        self._command_complete = True 
     105 
     106    def execute(self, query, *args): 
     107        self._cached_rows = [] 
     108        self._command_complete = False 
     109        self._row_desc = self.c.extended_query(self.name, '', query, args) 
    140110 
    141111    def _fetch(self): 
    142         row = self.c.getrow() 
    143         if row == None: 
    144             return None 
     112        if not self._cached_rows: 
     113            if self._command_complete: 
     114                return None 
     115            end_of_data, rows = self.c.fetch_rows(self.name, self.row_cache_size) 
     116            self._cached_rows = rows 
     117            if end_of_data: 
     118                self._command_complete = True 
     119        row = self._cached_rows[0] 
     120        del self._cached_rows[0] 
    145121        return tuple([Types.py_value(row.fields[i], self._row_desc.fields[i]) for i in range(len(row.fields))]) 
    146122 
     
    182158        return DataIterator(self) 
    183159 
     160## 
     161# This class represents a connection to a PostgreSQL database. 
     162# <p> 
     163# A single PostgreSQL connection can only perform a single query at a time, 
     164# which is an important restriction to note.  This limitation can be overcome 
     165# by retrieving all results immediately after a query, but this approach is not 
     166# taken by this library. 
     167# <p> 
     168# Stability: Added in v1.00, stability guaranteed for v1.xx. 
     169# 
     170# @param host   The hostname of the PostgreSQL server to connect with.  Only 
     171# TCP/IP connections are presently supported, so this parameter is mandatory. 
     172# 
     173# @param user   The username to connect to the PostgreSQL server with.  This 
     174# parameter is mandatory. 
     175# 
     176# @param port   The TCP/IP port of the PostgreSQL server instance.  This 
     177# parameter defaults to 5432, the registered and common port of PostgreSQL 
     178# TCP/IP servers. 
     179# 
     180# @param database   The name of the database instance to connect with.  This 
     181# parameter is optional, if omitted the PostgreSQL server will assume the 
     182# database name is the same as the username. 
     183# 
     184# @param password   The user password to connect to the server with.  This 
     185# parameter is optional.  If omitted, and the database server requests password 
     186# based authentication, the connection will fail.  On the other hand, if this 
     187# parameter is provided and the database does not request password 
     188# authentication, then the password will not be used. 
     189class Connection(object): 
     190    def __init__(self, host, user, port=5432, database=None, password=None): 
     191        self._row_desc = None 
     192        try: 
     193            self.c = Protocol.Connection(host, port) 
     194            self.c.connect() 
     195            self.c.authenticate(user, password=password, database=database) 
     196        except socket.error, e: 
     197            raise InterfaceError("communication error", e) 
     198 
    184199    def begin(self): 
    185         pass 
     200        raise NotSupportedError("uncoded") 
    186201 
    187202    def commit(self): 
    188         pass 
     203        raise NotSupportedError("uncoded") 
    189204 
    190205    def rollback(self): 
    191         pass 
     206        raise NotSupportedError("uncoded") 
    192207 
    193208 
     
    264279            return val 
    265280 
     281    class Close(object): 
     282        def __init__(self, typ, name): 
     283            if len(typ) != 1: 
     284                raise InternalError("Close typ must be 1 char") 
     285            self.typ = typ 
     286            self.name = name 
     287 
     288        def serialize(self): 
     289            val = self.typ + self.name + "\x00" 
     290            val = struct.pack("!i", len(val) + 4) + val 
     291            val = "C" + val 
     292            return val 
     293 
     294    class ClosePortal(Close): 
     295        def __init__(self, name): 
     296            Protocol.Close.__init__(self, "P", name) 
     297 
     298    class ClosePreparedStatement(Close): 
     299        def __init__(self, name): 
     300            Protocol.Close.__init__(self, "S", name) 
     301 
    266302    class Describe(object): 
    267303        def __init__(self, typ, name): 
     
    289325            return 'H\x00\x00\x00\x04' 
    290326 
     327    class Sync(object): 
     328        def serialize(self): 
     329            return 'S\x00\x00\x00\x04' 
     330 
    291331    class PasswordMessage(object): 
    292332        def __init__(self, pwd): 
     
    297337            val = struct.pack("!i", len(val) + 4) + val 
    298338            val = "p" + val 
     339            return val 
     340 
     341    class Execute(object): 
     342        def __init__(self, portal, row_count): 
     343            self.portal = portal 
     344            self.row_count = row_count 
     345 
     346        def serialize(self): 
     347            val = self.portal + "\x00" + struct.pack("!i", self.row_count) 
     348            val = struct.pack("!i", len(val) + 4) + val 
     349            val = "E" + val 
    299350            return val 
    300351 
     
    373424        def createFromData(data): 
    374425            return Protocol.BindComplete() 
     426        createFromData = staticmethod(createFromData) 
     427 
     428    class CloseComplete(object): 
     429        def createFromData(data): 
     430            return Protocol.CloseComplete() 
     431        createFromData = staticmethod(createFromData) 
     432 
     433    class PortalSuspended(object): 
     434        def createFromData(data): 
     435            return Protocol.PortalSuspended() 
    375436        createFromData = staticmethod(createFromData) 
    376437 
     
    528589                    raise InternalError("Unexpected response msg %r" % (msg)) 
    529590 
     591        def fetch_rows(self, portal, row_count): 
     592            self.verifyState("ready") 
     593            self._send(Protocol.Execute(portal, row_count)) 
     594            self._send(Protocol.Flush()) 
     595            rows = [] 
     596            end_of_data = False 
     597            while 1: 
     598                msg = self._read_message() 
     599                if isinstance(msg, Protocol.DataRow): 
     600                    rows.append(msg) 
     601                elif isinstance(msg, Protocol.PortalSuspended): 
     602                    # got all the rows we asked for, but not all that exist 
     603                    break 
     604                elif isinstance(msg, Protocol.CommandComplete): 
     605                    self._send(Protocol.ClosePortal(portal)) 
     606                    self._send(Protocol.Sync()) 
     607                    self._waitForReady() 
     608                    end_of_data = True 
     609                    break 
     610                elif isinstance(msg, Protocol.ErrorResponse): 
     611                    raise msg.createException() 
     612                else: 
     613                    raise InternalError("Unexpected response msg %r" % msg) 
     614            return end_of_data, rows 
     615 
    530616        def query(self, qs): 
    531617            self.verifyState("ready") 
     
    561647        "1": ParseComplete, 
    562648        "2": BindComplete, 
     649        "3": CloseComplete, 
     650        "s": PortalSuspended, 
    563651        } 
    564652 
     
    591679        format = description['format'] 
    592680        funcs = Types.pg_types.get(type_oid) 
    593         if func == None: 
     681        if funcs == None: 
    594682            raise NotSupportedError("data response type %r not supported" % (type_oid)) 
    595683        func = funcs[format] 
     
    602690        return data == 't' 
    603691 
     692    def boolrecv(data, description): 
     693        return data == "\x01" 
     694 
     695    def int2recv(data, description): 
     696        return struct.unpack("!h", data)[0] 
     697 
     698    def int4recv(data, description): 
     699        return struct.unpack("!i", data)[0] 
     700 
    604701    def int4in(data, description): 
    605702        return int(data) 
     
    607704    def int4send(v): 
    608705        return struct.pack("!i", v) 
     706 
     707    def timestamp_recv(data, description): 
     708        val = struct.unpack("!d", data)[0] 
     709        return datetime.datetime(2000, 1, 1) + datetime.timedelta(seconds = val) 
    609710 
    610711    def timestamp_in(data, description): 
     
    622723 
    623724    pg_types = { 
    624         16: (boolin, None), 
    625         23: (int4in, None), 
    626         1114: (timestamp_in, None), 
     725        16: (boolin, boolrecv), 
     726        21: (None, int2recv), 
     727        23: (int4in, int4recv), 
     728        1114: (timestamp_in, timestamp_recv), 
    627729    } 
    628730 
  • pg8000/trunk/pg8000-test.py

    r798 r799  
    44 
    55db = pg8000.Connection(host='localhost', user='mfenniak') 
    6 db.iterate_dicts = True 
     6cursor = pg8000.Cursor(db) 
     7# db.iterate_dicts = True 
    78 
    89print "begin query..." 
    9 db.query("SELECT 5000 + 1, True as pg_stuff, False, '2000-01-02 03:04:05'::timestamp, $1", 55) 
    10 for row in db: 
     10cursor.execute("SELECT township, range, meridian FROM ats LIMIT 176") 
     11i = 0 
     12for row in cursor: 
     13    i = i + 1 
     14    print i, repr(row) 
     15print "end query..." 
     16 
     17print "begin query..." 
     18cursor.execute("SELECT 5000 + 1, True as pg_stuff, False, '2000-01-02 03:04:05.67'::timestamp, $1", 55) 
     19for row in cursor: 
    1120    print repr(row) 
    1221print "end query..."