| 8 | | |
|---|
| 9 | | db = pg8000.Connection(host='joy', user='pg8000-test', database='pg8000-test', password='pg8000-test', socket_timeout=5) |
|---|
| 10 | | #db = pg8000.Connection(host='localhost', user='mfenniak') |
|---|
| 11 | | #db = pg8000.Connection(user="mfenniak", unix_sock="/tmp/.s.PGSQL.5432") |
|---|
| 12 | | |
|---|
| 13 | | print "testing db.execute and error recovery in NoData query" |
|---|
| 14 | | for i in range(1, 3): |
|---|
| 15 | | db.begin() |
|---|
| 16 | | try: |
|---|
| 17 | | db.execute("DROP TABLE t1") |
|---|
| 18 | | db.commit() |
|---|
| 19 | | except pg8000.DatabaseError, e: |
|---|
| 20 | | assert e.args[1] == '42P01' # table does not exist |
|---|
| 21 | | db.rollback() |
|---|
| 22 | | db.begin() |
|---|
| 23 | | |
|---|
| 24 | | print "creating test table" |
|---|
| 25 | | db.begin() |
|---|
| 26 | | db.execute("CREATE TABLE t1 (f1 int primary key, f2 int not null, f3 varchar(50) null)") |
|---|
| 27 | | db.commit() |
|---|
| 28 | | |
|---|
| 29 | | print "testing db.execute on query, error recovery in on parsing" |
|---|
| 30 | | for i in range(1, 3): |
|---|
| 31 | | db.begin() |
|---|
| 32 | | try: |
|---|
| 33 | | db.execute("SELECT * FROM table_that_does_not_exist") |
|---|
| 34 | | print "error - shouldn't get here" |
|---|
| 35 | | for row in db.iterate_dict(): |
|---|
| 36 | | print "definately shouldn't be here... %r" % row |
|---|
| 37 | | except pg8000.DatabaseError, e: |
|---|
| 38 | | assert e.args[1] == '42P01' # table does not exist |
|---|
| 39 | | db.rollback() |
|---|
| 40 | | db.begin() |
|---|
| 41 | | |
|---|
| 42 | | print "testing multithreaded prepared statement with arguments" |
|---|
| 43 | | # Not the most efficient way to do this. Multiple DB connections would |
|---|
| 44 | | # multiplex this insert and make it faster -- we're just testing for thread |
|---|
| 45 | | # safety here. Testing with much higher values of left/right allows |
|---|
| 46 | | # multithread locking to be obvious. |
|---|
| 47 | | s1 = pg8000.PreparedStatement(db, "INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", int, int, str) |
|---|
| 48 | | def test(left, right): |
|---|
| 49 | | for i in range(left, right): |
|---|
| 50 | | s1.execute(i, id(threading.currentThread()), "test - unicode \u0173 ") |
|---|
| 51 | | t1 = threading.Thread(target=test, args=(1, 10)) |
|---|
| 52 | | t2 = threading.Thread(target=test, args=(10, 20)) |
|---|
| 53 | | t3 = threading.Thread(target=test, args=(20, 30)) |
|---|
| 54 | | t4 = threading.Thread(target=test, args=(30, 40)) |
|---|
| 55 | | t1.start() ; t2.start() ; t3.start() ; t4.start() |
|---|
| 56 | | t1.join(); t2.join(); t3.join(); t4.join() |
|---|
| 57 | | |
|---|
| 58 | | db.commit() |
|---|
| 59 | | |
|---|
| 60 | | |
|---|
| 61 | | print "testing basic query..." |
|---|
| 62 | | db.execute("SELECT * FROM t1") |
|---|
| 63 | | for row in db.iterate_dict(): |
|---|
| 64 | | assert row.has_key("f1") and row.has_key("f2") and row.has_key("f3") |
|---|
| 65 | | |
|---|
| 66 | | print "testing two queries at once..." |
|---|
| 67 | | cur1 = pg8000.Cursor(db) |
|---|
| 68 | | cur1.execute("SELECT * FROM t1") |
|---|
| 69 | | print repr(cur1.row_description) |
|---|
| 70 | | s1 = pg8000.PreparedStatement(db, "SELECT f1, f2 FROM t1 WHERE f1 > $1", int) |
|---|
| 71 | | i = 0 |
|---|
| 72 | | for row1 in cur1.iterate_dict(): |
|---|
| 73 | | assert row1.has_key("f1") and row1.has_key("f2") and row1.has_key("f3") |
|---|
| 74 | | i = i + 1 |
|---|
| 75 | | s1.execute(row1['f1']) |
|---|
| 76 | | for row2 in s1.iterate_dict(): |
|---|
| 77 | | assert row2.has_key("f1") and row2.has_key("f2") and not row2.has_key("f3") |
|---|
| 78 | | |
|---|
| 79 | | print "beginning type checks..." |
|---|
| 80 | | |
|---|
| 81 | | cur1.execute("SELECT $1", 5) |
|---|
| 82 | | assert tuple(cur1.iterate_dict()) == ({"?column?": 5},) |
|---|
| 83 | | |
|---|
| 84 | | cur1.execute("SELECT $1", 22.333) |
|---|
| 85 | | retval = tuple(cur1.iterate_dict()) |
|---|
| 86 | | assert retval == ({"?column?": 22.332999999999998},) |
|---|
| 87 | | |
|---|
| 88 | | cur1.execute("SELECT $1", decimal.Decimal("22.333")) |
|---|
| 89 | | retval = tuple(cur1.iterate_dict()) |
|---|
| 90 | | assert retval == ({"?column?": decimal.Decimal("22.333")},) |
|---|
| 91 | | |
|---|
| 92 | | cur1.execute("SELECT 5000::smallint") |
|---|
| 93 | | assert tuple(cur1.iterate_dict()) == ({"int2": 5000},) |
|---|
| 94 | | |
|---|
| 95 | | cur1.execute("SELECT 5000::integer") |
|---|
| 96 | | assert tuple(cur1.iterate_dict()) == ({"int4": 5000},) |
|---|
| 97 | | |
|---|
| 98 | | cur1.execute("SELECT 50000000000000::bigint") |
|---|
| 99 | | assert tuple(cur1.iterate_dict()) == ({"int8": 50000000000000},) |
|---|
| 100 | | |
|---|
| 101 | | cur1.execute("SELECT 5000.023232::decimal") |
|---|
| 102 | | assert tuple(cur1.iterate_dict()) == ({"numeric": decimal.Decimal("5000.023232")},) |
|---|
| 103 | | |
|---|
| 104 | | cur1.execute("SELECT 1.1::real") |
|---|
| 105 | | assert tuple(cur1.iterate_dict()) == ({"float4": 1.1000000238418579},) |
|---|
| 106 | | |
|---|
| 107 | | cur1.execute("SELECT 1.1::double precision") |
|---|
| 108 | | assert tuple(cur1.iterate_dict()) == ({"float8": 1.1000000000000001},) |
|---|
| 109 | | |
|---|
| 110 | | cur1.execute("SELECT 'hello'::varchar(50)") |
|---|
| 111 | | assert tuple(cur1.iterate_dict()) == ({"varchar": u"hello"},) |
|---|
| 112 | | |
|---|
| 113 | | cur1.execute("SELECT 'hello'::char(20)") |
|---|
| 114 | | assert tuple(cur1.iterate_dict()) == ({"bpchar": u"hello "},) |
|---|
| 115 | | |
|---|
| 116 | | cur1.execute("SELECT 'hello'::text") |
|---|
| 117 | | assert tuple(cur1.iterate_dict()) == ({"text": u"hello"},) |
|---|
| 118 | | |
|---|
| 119 | | #cur1.execute("SELECT 'hell\007o'::bytea") |
|---|
| 120 | | #assert tuple(cur1.iterate_dict()) == ({"bytea": "hello"},) |
|---|
| 121 | | |
|---|
| 122 | | cur1.execute("SELECT '2001-02-03 04:05:06.17'::timestamp") |
|---|
| 123 | | retval = tuple(cur1.iterate_dict()) |
|---|
| 124 | | assert retval == ({'timestamp': datetime.datetime(2001, 2, 3, 4, 5, 6, 170000)},) |
|---|
| 125 | | |
|---|
| 126 | | #cur1.execute("SELECT '2001-02-03 04:05:06.17'::timestamp with time zone") |
|---|
| 127 | | #assert tuple(cur1.iterate_dict()) == ({'timestamp': datetime.datetime(2001, 2, 3, 4, 5, 6, 170000, pg8000.Types.FixedOffsetTz("-07"))},) |
|---|
| 128 | | |
|---|
| 129 | | cur1.execute("SELECT '1 month'::interval") |
|---|
| 130 | | assert tuple(cur1.iterate_dict()) == ({'interval': '1 mon'},) |
|---|
| 131 | | #print repr(tuple(cur1.iterate_dict())) |
|---|
| 132 | | |
|---|
| 133 | | |
|---|
| 134 | | print "tests complete" |
|---|
| | 8 | import struct |
|---|
| | 9 | |
|---|
| | 10 | db_connect = { |
|---|
| | 11 | "host": "joy", |
|---|
| | 12 | "user": "pg8000-test", |
|---|
| | 13 | "database": "pg8000-test", |
|---|
| | 14 | "password": "pg8000-test", |
|---|
| | 15 | "socket_timeout": 5 |
|---|
| | 16 | } |
|---|
| | 17 | db = pg8000.Connection(**db_connect) |
|---|
| | 18 | dbapi = pg8000.DBAPI |
|---|
| | 19 | db2 = dbapi.connect(**db_connect) |
|---|
| | 20 | |
|---|
| | 21 | # Tests relating to the basic operation of the database driver, driven by the |
|---|
| | 22 | # pg8000 custom interface. |
|---|
| | 23 | class QueryTests(unittest.TestCase): |
|---|
| | 24 | def setUp(self): |
|---|
| | 25 | try: |
|---|
| | 26 | db.execute("DROP TABLE t1") |
|---|
| | 27 | except pg8000.DatabaseError, e: |
|---|
| | 28 | # the only acceptable error is: |
|---|
| | 29 | self.assert_(e.args[1] == '42P01', # table does not exist |
|---|
| | 30 | "incorrect error for drop table") |
|---|
| | 31 | db.execute("CREATE TEMPORARY TABLE t1 (f1 int primary key, f2 int not null, f3 varchar(50) null)") |
|---|
| | 32 | |
|---|
| | 33 | def TestParallelQueries(self): |
|---|
| | 34 | db.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 1, 1, None) |
|---|
| | 35 | db.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 2, 10, None) |
|---|
| | 36 | db.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 3, 100, None) |
|---|
| | 37 | db.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 4, 1000, None) |
|---|
| | 38 | db.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", 5, 10000, None) |
|---|
| | 39 | c1 = pg8000.Cursor(db) |
|---|
| | 40 | c2 = pg8000.Cursor(db) |
|---|
| | 41 | c1.execute("SELECT f1, f2, f3 FROM t1") |
|---|
| | 42 | for row in c1.iterate_tuple(): |
|---|
| | 43 | f1, f2, f3 = row |
|---|
| | 44 | c2.execute("SELECT f1, f2, f3 FROM t1 WHERE f1 > $1", f1) |
|---|
| | 45 | for row in c2.iterate_tuple(): |
|---|
| | 46 | f1, f2, f3 = row |
|---|
| | 47 | |
|---|
| | 48 | def TestNoDataErrorRecovery(self): |
|---|
| | 49 | for i in range(1, 4): |
|---|
| | 50 | try: |
|---|
| | 51 | db.execute("DROP TABLE t1") |
|---|
| | 52 | except pg8000.DatabaseError, e: |
|---|
| | 53 | # the only acceptable error is: |
|---|
| | 54 | self.assert_(e.args[1] == '42P01', # table does not exist |
|---|
| | 55 | "incorrect error for drop table") |
|---|
| | 56 | |
|---|
| | 57 | def TestMultithreadedStatement(self): |
|---|
| | 58 | # Note: Multithreading with a prepared statement is not highly |
|---|
| | 59 | # recommended due to low performance. |
|---|
| | 60 | s1 = pg8000.PreparedStatement(db, "INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", int, int, str) |
|---|
| | 61 | def test(left, right): |
|---|
| | 62 | for i in range(left, right): |
|---|
| | 63 | s1.execute(i, id(threading.currentThread()), None) |
|---|
| | 64 | t1 = threading.Thread(target=test, args=(1, 25)) |
|---|
| | 65 | t2 = threading.Thread(target=test, args=(25, 50)) |
|---|
| | 66 | t3 = threading.Thread(target=test, args=(50, 75)) |
|---|
| | 67 | t1.start(); t2.start(); t3.start() |
|---|
| | 68 | t1.join(); t2.join(); t3.join() |
|---|
| | 69 | |
|---|
| | 70 | def TestMultithreadedCursor(self): |
|---|
| | 71 | # Note: Multithreading with a cursor is not highly recommended due to |
|---|
| | 72 | # low performance. |
|---|
| | 73 | cur = pg8000.Cursor(db) |
|---|
| | 74 | def test(left, right): |
|---|
| | 75 | for i in range(left, right): |
|---|
| | 76 | cur.execute("INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", i, id(threading.currentThread()), None) |
|---|
| | 77 | t1 = threading.Thread(target=test, args=(1, 25)) |
|---|
| | 78 | t2 = threading.Thread(target=test, args=(25, 50)) |
|---|
| | 79 | t3 = threading.Thread(target=test, args=(50, 75)) |
|---|
| | 80 | t1.start(); t2.start(); t3.start() |
|---|
| | 81 | t1.join(); t2.join(); t3.join() |
|---|
| | 82 | |
|---|
| | 83 | class ParamstyleTests(unittest.TestCase): |
|---|
| | 84 | def TestQmark(self): |
|---|
| | 85 | new_query, new_args = pg8000.DBAPI.convert_paramstyle("qmark", "SELECT ?, ?, \"field_?\" FROM t WHERE a='say ''what?''' AND b=? AND c=E'?\\'test\\'?'", (1, 2, 3)) |
|---|
| | 86 | assert new_query == "SELECT $1, $2, \"field_?\" FROM t WHERE a='say ''what?''' AND b=$3 AND c=E'?\\'test\\'?'" |
|---|
| | 87 | assert new_args == (1, 2, 3) |
|---|
| | 88 | |
|---|
| | 89 | def TestQmark2(self): |
|---|
| | 90 | new_query, new_args = pg8000.DBAPI.convert_paramstyle("qmark", "SELECT ?, ?, * FROM t WHERE a=? AND b='are you ''sure?'", (1, 2, 3)) |
|---|
| | 91 | assert new_query == "SELECT $1, $2, * FROM t WHERE a=$3 AND b='are you ''sure?'" |
|---|
| | 92 | assert new_args == (1, 2, 3) |
|---|
| | 93 | |
|---|
| | 94 | def TestNumeric(self): |
|---|
| | 95 | new_query, new_args = pg8000.DBAPI.convert_paramstyle("numeric", "SELECT :2, :1, * FROM t WHERE a=:3", (1, 2, 3)) |
|---|
| | 96 | assert new_query == "SELECT $2, $1, * FROM t WHERE a=$3" |
|---|
| | 97 | assert new_args == (1, 2, 3) |
|---|
| | 98 | |
|---|
| | 99 | def TestNamed(self): |
|---|
| | 100 | new_query, new_args = pg8000.DBAPI.convert_paramstyle("named", "SELECT :f2, :f1 FROM t WHERE a=:f2", {"f2": 1, "f1": 2}) |
|---|
| | 101 | assert new_query == "SELECT $1, $2 FROM t WHERE a=$1" |
|---|
| | 102 | assert new_args == (1, 2) |
|---|
| | 103 | |
|---|
| | 104 | def TestFormat(self): |
|---|
| | 105 | new_query, new_args = pg8000.DBAPI.convert_paramstyle("format", "SELECT %s, %s, \"f1_%%\", E'txt_%%' FROM t WHERE a=%s AND b='75%%'", (1, 2, 3)) |
|---|
| | 106 | assert new_query == "SELECT $1, $2, \"f1_%\", E'txt_%' FROM t WHERE a=$3 AND b='75%'" |
|---|
| | 107 | assert new_args == (1, 2, 3) |
|---|
| | 108 | |
|---|
| | 109 | def TestPyformat(self): |
|---|
| | 110 | new_query, new_args = pg8000.DBAPI.convert_paramstyle("pyformat", "SELECT %(f2)s, %(f1)s, \"f1_%%\", E'txt_%%' FROM t WHERE a=%(f2)s AND b='75%%'", {"f2": 1, "f1": 2, "f3": 3}) |
|---|
| | 111 | assert new_query == "SELECT $1, $2, \"f1_%\", E'txt_%' FROM t WHERE a=$1 AND b='75%'" |
|---|
| | 112 | assert new_args == (1, 2) |
|---|
| | 113 | |
|---|
| | 114 | |
|---|
| | 115 | class DBAPITests(unittest.TestCase): |
|---|
| | 116 | def setUp(self): |
|---|
| | 117 | c = db2.cursor() |
|---|
| | 118 | try: |
|---|
| | 119 | c.execute("DROP TABLE t1") |
|---|
| | 120 | except pg8000.DatabaseError, e: |
|---|
| | 121 | # the only acceptable error is: |
|---|
| | 122 | self.assert_(e.args[1] == '42P01', # table does not exist |
|---|
| | 123 | "incorrect error for drop table") |
|---|
| | 124 | c.execute("CREATE TEMPORARY TABLE t1 (f1 int primary key, f2 int not null, f3 varchar(50) null)") |
|---|
| | 125 | c.execute("INSERT INTO t1 (f1, f2, f3) VALUES (%s, %s, %s)", (1, 1, None)) |
|---|
| | 126 | c.execute("INSERT INTO t1 (f1, f2, f3) VALUES (%s, %s, %s)", (2, 10, None)) |
|---|
| | 127 | c.execute("INSERT INTO t1 (f1, f2, f3) VALUES (%s, %s, %s)", (3, 100, None)) |
|---|
| | 128 | c.execute("INSERT INTO t1 (f1, f2, f3) VALUES (%s, %s, %s)", (4, 1000, None)) |
|---|
| | 129 | c.execute("INSERT INTO t1 (f1, f2, f3) VALUES (%s, %s, %s)", (5, 10000, None)) |
|---|
| | 130 | |
|---|
| | 131 | def TestParallelQueries(self): |
|---|
| | 132 | c1 = db2.cursor() |
|---|
| | 133 | c2 = db2.cursor() |
|---|
| | 134 | c1.execute("SELECT f1, f2, f3 FROM t1") |
|---|
| | 135 | while 1: |
|---|
| | 136 | row = c1.fetchone() |
|---|
| | 137 | if row == None: |
|---|
| | 138 | break |
|---|
| | 139 | f1, f2, f3 = row |
|---|
| | 140 | c2.execute("SELECT f1, f2, f3 FROM t1 WHERE f1 > %s", (f1,)) |
|---|
| | 141 | while 1: |
|---|
| | 142 | row = c2.fetchone() |
|---|
| | 143 | if row == None: |
|---|
| | 144 | break |
|---|
| | 145 | f1, f2, f3 = row |
|---|
| | 146 | |
|---|
| | 147 | def TestQmark(self): |
|---|
| | 148 | orig_paramstyle = dbapi.paramstyle |
|---|
| | 149 | try: |
|---|
| | 150 | dbapi.paramstyle = "qmark" |
|---|
| | 151 | c1 = db2.cursor() |
|---|
| | 152 | c1.execute("SELECT f1, f2, f3 FROM t1 WHERE f1 > ?", (3,)) |
|---|
| | 153 | while 1: |
|---|
| | 154 | row = c1.fetchone() |
|---|
| | 155 | if row == None: |
|---|
| | 156 | break |
|---|
| | 157 | f1, f2, f3 = row |
|---|
| | 158 | finally: |
|---|
| | 159 | dbapi.paramstyle = orig_paramstyle |
|---|
| | 160 | |
|---|
| | 161 | def TestNumeric(self): |
|---|
| | 162 | orig_paramstyle = dbapi.paramstyle |
|---|
| | 163 | try: |
|---|
| | 164 | dbapi.paramstyle = "numeric" |
|---|
| | 165 | c1 = db2.cursor() |
|---|
| | 166 | c1.execute("SELECT f1, f2, f3 FROM t1 WHERE f1 > :1", (3,)) |
|---|
| | 167 | while 1: |
|---|
| | 168 | row = c1.fetchone() |
|---|
| | 169 | if row == None: |
|---|
| | 170 | break |
|---|
| | 171 | f1, f2, f3 = row |
|---|
| | 172 | finally: |
|---|
| | 173 | dbapi.paramstyle = orig_paramstyle |
|---|
| | 174 | |
|---|
| | 175 | def TestNamed(self): |
|---|
| | 176 | orig_paramstyle = dbapi.paramstyle |
|---|
| | 177 | try: |
|---|
| | 178 | dbapi.paramstyle = "named" |
|---|
| | 179 | c1 = db2.cursor() |
|---|
| | 180 | c1.execute("SELECT f1, f2, f3 FROM t1 WHERE f1 > :f1", {"f1": 3}) |
|---|
| | 181 | while 1: |
|---|
| | 182 | row = c1.fetchone() |
|---|
| | 183 | if row == None: |
|---|
| | 184 | break |
|---|
| | 185 | f1, f2, f3 = row |
|---|
| | 186 | finally: |
|---|
| | 187 | dbapi.paramstyle = orig_paramstyle |
|---|
| | 188 | |
|---|
| | 189 | def TestFormat(self): |
|---|
| | 190 | orig_paramstyle = dbapi.paramstyle |
|---|
| | 191 | try: |
|---|
| | 192 | dbapi.paramstyle = "format" |
|---|
| | 193 | c1 = db2.cursor() |
|---|
| | 194 | c1.execute("SELECT f1, f2, f3 FROM t1 WHERE f1 > %s", (3,)) |
|---|
| | 195 | while 1: |
|---|
| | 196 | row = c1.fetchone() |
|---|
| | 197 | if row == None: |
|---|
| | 198 | break |
|---|
| | 199 | f1, f2, f3 = row |
|---|
| | 200 | finally: |
|---|
| | 201 | dbapi.paramstyle = orig_paramstyle |
|---|
| | 202 | |
|---|
| | 203 | def TestPyformat(self): |
|---|
| | 204 | orig_paramstyle = dbapi.paramstyle |
|---|
| | 205 | try: |
|---|
| | 206 | dbapi.paramstyle = "pyformat" |
|---|
| | 207 | c1 = db2.cursor() |
|---|
| | 208 | c1.execute("SELECT f1, f2, f3 FROM t1 WHERE f1 > %(f1)s", {"f1": 3}) |
|---|
| | 209 | while 1: |
|---|
| | 210 | row = c1.fetchone() |
|---|
| | 211 | if row == None: |
|---|
| | 212 | break |
|---|
| | 213 | f1, f2, f3 = row |
|---|
| | 214 | finally: |
|---|
| | 215 | dbapi.paramstyle = orig_paramstyle |
|---|
| | 216 | |
|---|
| | 217 | # Tests relating to type conversion. |
|---|
| | 218 | class TypeTests(unittest.TestCase): |
|---|
| | 219 | def TestNullRoundtrip(self): |
|---|
| | 220 | # We can't just "SELECT $1" and set None as the parameter, since it has |
|---|
| | 221 | # no type. That would result in a PG error, "could not determine data |
|---|
| | 222 | # type of parameter $1". So we create a temporary table, insert null |
|---|
| | 223 | # values, and read them back. |
|---|
| | 224 | db.execute("CREATE TEMPORARY TABLE TestNullWrite (f1 int4, f2 timestamp, f3 varchar)") |
|---|
| | 225 | db.execute("INSERT INTO TestNullWrite VALUES ($1, $2, $3)", |
|---|
| | 226 | None, None, None) |
|---|
| | 227 | db.execute("SELECT * FROM TestNullWrite") |
|---|
| | 228 | retval = tuple(db.iterate_dict()) |
|---|
| | 229 | self.assert_(retval == ({"f1": None, "f2": None, "f3": None},), |
|---|
| | 230 | "retrieved value match failed") |
|---|
| | 231 | |
|---|
| | 232 | def TestNullSelectFailure(self): |
|---|
| | 233 | # See comment in TestNullRoundtrip. This test is here to ensure that |
|---|
| | 234 | # this behaviour is documented and doesn't mysteriously change. |
|---|
| | 235 | self.assertRaises(pg8000.ProgrammingError, db.execute, |
|---|
| | 236 | "SELECT $1 as f1", None) |
|---|
| | 237 | |
|---|
| | 238 | def TestDecimalRoundtrip(self): |
|---|
| | 239 | db.execute("SELECT $1 as f1", decimal.Decimal('1.1')) |
|---|
| | 240 | retval = tuple(db.iterate_dict()) |
|---|
| | 241 | self.assert_(retval == ({"f1": decimal.Decimal('1.1')},), |
|---|
| | 242 | "retrieved value match failed") |
|---|
| | 243 | |
|---|
| | 244 | def TestFloatRoundtrip(self): |
|---|
| | 245 | # This test ensures that the binary float value doesn't change in a |
|---|
| | 246 | # roundtrip to the server. That could happen if the value was |
|---|
| | 247 | # converted to text and got rounded by a decimal place somewhere. |
|---|
| | 248 | val = 1.756e-12 |
|---|
| | 249 | bin_orig = struct.pack("!d", val) |
|---|
| | 250 | db.execute("SELECT $1 as f1", val) |
|---|
| | 251 | retval = tuple(db.iterate_dict()) |
|---|
| | 252 | bin_new = struct.pack("!d", retval[0]['f1']) |
|---|
| | 253 | self.assert_(bin_new == bin_orig, |
|---|
| | 254 | "retrieved value match failed") |
|---|
| | 255 | |
|---|
| | 256 | def TestStrRoundtrip(self): |
|---|
| | 257 | db.execute("SELECT $1 as f1", "hello world") |
|---|
| | 258 | retval = tuple(db.iterate_dict()) |
|---|
| | 259 | self.assert_(retval == ({"f1": u"hello world"},), |
|---|
| | 260 | "retrieved value match failed") |
|---|
| | 261 | |
|---|
| | 262 | def TestUnicodeRoundtrip(self): |
|---|
| | 263 | db.execute("SELECT $1 as f1", u"hello \u0173 world") |
|---|
| | 264 | retval = tuple(db.iterate_dict()) |
|---|
| | 265 | self.assert_(retval == ({"f1": u"hello \u0173 world"},), |
|---|
| | 266 | "retrieved value match failed") |
|---|
| | 267 | |
|---|
| | 268 | def TestLongRoundtrip(self): |
|---|
| | 269 | db.execute("SELECT $1 as f1", 50000000000000L) |
|---|
| | 270 | retval = tuple(db.iterate_dict()) |
|---|
| | 271 | self.assert_(retval == ({"f1": 50000000000000L},), |
|---|
| | 272 | "retrieved value match failed") |
|---|
| | 273 | |
|---|
| | 274 | def TestIntRoundtrip(self): |
|---|
| | 275 | db.execute("SELECT $1 as f1", 100) |
|---|
| | 276 | retval = tuple(db.iterate_dict()) |
|---|
| | 277 | self.assert_(retval == ({"f1": 100},), |
|---|
| | 278 | "retrieved value match failed") |
|---|
| | 279 | |
|---|
| | 280 | def TestByteaRoundtrip(self): |
|---|
| | 281 | db.execute("SELECT $1 as f1", pg8000.Bytea("\x00\x01\x02\x03\x02\x01\x00")) |
|---|
| | 282 | retval = tuple(db.iterate_dict()) |
|---|
| | 283 | self.assert_(retval == ({"f1": "\x00\x01\x02\x03\x02\x01\x00"},), |
|---|
| | 284 | "retrieved value match failed") |
|---|
| | 285 | |
|---|
| | 286 | def TestOidIn(self): |
|---|
| | 287 | db.execute("SELECT oid FROM pg_type") |
|---|
| | 288 | retval = tuple(db.iterate_dict()) |
|---|
| | 289 | # It is sufficient that no errors were encountered. |
|---|
| | 290 | |
|---|
| | 291 | def TestBooleanOut(self): |
|---|
| | 292 | db.execute("SELECT 't'::bool") |
|---|
| | 293 | retval = tuple(db.iterate_dict()) |
|---|
| | 294 | self.assert_(retval == ({"bool": True},), |
|---|
| | 295 | "retrieved value match failed") |
|---|
| | 296 | |
|---|
| | 297 | def TestNumericOut(self): |
|---|
| | 298 | db.execute("SELECT 5000::numeric") |
|---|
| | 299 | retval = tuple(db.iterate_dict()) |
|---|
| | 300 | self.assert_(retval == ({"numeric": decimal.Decimal("5000")},), |
|---|
| | 301 | "retrieved value match failed") |
|---|
| | 302 | |
|---|
| | 303 | def TestInt2Out(self): |
|---|
| | 304 | db.execute("SELECT 5000::smallint") |
|---|
| | 305 | retval = tuple(db.iterate_dict()) |
|---|
| | 306 | self.assert_(retval == ({"int2": 5000},), |
|---|
| | 307 | "retrieved value match failed") |
|---|
| | 308 | |
|---|
| | 309 | def TestInt4Out(self): |
|---|
| | 310 | db.execute("SELECT 5000::integer") |
|---|
| | 311 | retval = tuple(db.iterate_dict()) |
|---|
| | 312 | self.assert_(retval == ({"int4": 5000},), |
|---|
| | 313 | "retrieved value match failed") |
|---|
| | 314 | |
|---|
| | 315 | def TestInt8Out(self): |
|---|
| | 316 | db.execute("SELECT 50000000000000::bigint") |
|---|
| | 317 | retval = tuple(db.iterate_dict()) |
|---|
| | 318 | self.assert_(retval == ({"int8": 50000000000000},), |
|---|
| | 319 | "retrieved value match failed") |
|---|
| | 320 | |
|---|
| | 321 | def TestFloat4Out(self): |
|---|
| | 322 | db.execute("SELECT 1.1::real") |
|---|
| | 323 | retval = tuple(db.iterate_dict()) |
|---|
| | 324 | self.assert_(retval == ({"float4": 1.1000000238418579},), |
|---|
| | 325 | "retrieved value match failed") |
|---|
| | 326 | |
|---|
| | 327 | def TestFloat8Out(self): |
|---|
| | 328 | db.execute("SELECT 1.1::double precision") |
|---|
| | 329 | retval = tuple(db.iterate_dict()) |
|---|
| | 330 | self.assert_(retval == ({"float8": 1.1000000000000001},), |
|---|
| | 331 | "retrieved value match failed") |
|---|
| | 332 | |
|---|
| | 333 | def TestVarcharOut(self): |
|---|
| | 334 | db.execute("SELECT 'hello'::varchar(20)") |
|---|
| | 335 | retval = tuple(db.iterate_dict()) |
|---|
| | 336 | self.assert_(retval == ({"varchar": u"hello"},), |
|---|
| | 337 | "retrieved value match failed") |
|---|
| | 338 | |
|---|
| | 339 | def TestCharOut(self): |
|---|
| | 340 | db.execute("SELECT 'hello'::char(20)") |
|---|
| | 341 | retval = tuple(db.iterate_dict()) |
|---|
| | 342 | self.assert_(retval == ({"bpchar": u"hello "},), |
|---|
| | 343 | "retrieved value match failed") |
|---|
| | 344 | |
|---|
| | 345 | def TestTextOut(self): |
|---|
| | 346 | db.execute("SELECT 'hello'::text") |
|---|
| | 347 | retval = tuple(db.iterate_dict()) |
|---|
| | 348 | self.assert_(retval == ({"text": u"hello"},), |
|---|
| | 349 | "retrieved value match failed") |
|---|
| | 350 | |
|---|
| | 351 | def TestIntervalOut(self): |
|---|
| | 352 | db.execute("SELECT '1 month'::interval") |
|---|
| | 353 | retval = tuple(db.iterate_dict()) |
|---|
| | 354 | self.assert_(retval == ({"interval": "1 mon"},), |
|---|
| | 355 | "retrieved value match failed") |
|---|
| | 356 | |
|---|
| | 357 | def TestTimestampOut(self): |
|---|
| | 358 | db.execute("SELECT '2001-02-03 04:05:06.17'::timestamp") |
|---|
| | 359 | retval = tuple(db.iterate_dict()) |
|---|
| | 360 | self.assert_(retval == ({"timestamp": datetime.datetime(2001, 2, 3, 4, 5, 6, 170000)},), |
|---|
| | 361 | "retrieved value match failed") |
|---|
| | 362 | |
|---|
| | 363 | |
|---|
| | 364 | def suite(): |
|---|
| | 365 | paramstyle_tests = unittest.makeSuite(ParamstyleTests, "Test") |
|---|
| | 366 | dbapi_tests = unittest.makeSuite(DBAPITests, "Test") |
|---|
| | 367 | query_tests = unittest.makeSuite(QueryTests, "Test") |
|---|
| | 368 | type_tests = unittest.makeSuite(TypeTests, "Test") |
|---|
| | 369 | return unittest.TestSuite((paramstyle_tests, dbapi_tests, query_tests, |
|---|
| | 370 | type_tests)) |
|---|
| | 371 | |
|---|
| | 372 | if __name__ == "__main__": |
|---|
| | 373 | runner = unittest.TextTestRunner() |
|---|
| | 374 | runner.run(suite()) |
|---|
| | 375 | |
|---|