| 1 |
#!/usr/bin/env python |
|---|
| 2 |
|
|---|
| 3 |
import datetime |
|---|
| 4 |
import decimal |
|---|
| 5 |
import threading |
|---|
| 6 |
|
|---|
| 7 |
import pg8000 |
|---|
| 8 |
|
|---|
| 9 |
#db = pg8000.Connection(host='joy', user='pg8000-test', database='pg8000-test', password='pg8000-test', socket_timeout=5) |
|---|
| 10 |
#db = pg8000.Connection(host='localhost', user='mfenniak') |
|---|
| 11 |
db = pg8000.Connection(user="mfenniak", unix_sock="/tmp/.s.PGSQL.5432") |
|---|
| 12 |
|
|---|
| 13 |
print "testing db.execute and error recovery in NoData query" |
|---|
| 14 |
for i in range(1, 3): |
|---|
| 15 |
db.begin() |
|---|
| 16 |
try: |
|---|
| 17 |
db.execute("DROP TABLE t1") |
|---|
| 18 |
db.commit() |
|---|
| 19 |
except pg8000.DatabaseError, e: |
|---|
| 20 |
assert e.args[1] == '42P01' # table does not exist |
|---|
| 21 |
db.rollback() |
|---|
| 22 |
db.begin() |
|---|
| 23 |
|
|---|
| 24 |
print "creating test table" |
|---|
| 25 |
db.begin() |
|---|
| 26 |
db.execute("CREATE TABLE t1 (f1 int primary key, f2 int not null, f3 varchar(50) null)") |
|---|
| 27 |
db.commit() |
|---|
| 28 |
|
|---|
| 29 |
print "testing db.execute on query, error recovery in on parsing" |
|---|
| 30 |
for i in range(1, 3): |
|---|
| 31 |
db.begin() |
|---|
| 32 |
try: |
|---|
| 33 |
db.execute("SELECT * FROM table_that_does_not_exist") |
|---|
| 34 |
print "error - shouldn't get here" |
|---|
| 35 |
for row in db.iterate_dict(): |
|---|
| 36 |
print "definately shouldn't be here... %r" % row |
|---|
| 37 |
except pg8000.DatabaseError, e: |
|---|
| 38 |
assert e.args[1] == '42P01' # table does not exist |
|---|
| 39 |
db.rollback() |
|---|
| 40 |
db.begin() |
|---|
| 41 |
|
|---|
| 42 |
print "testing multithreaded prepared statement with arguments" |
|---|
| 43 |
# Not the most efficient way to do this. Multiple DB connections would |
|---|
| 44 |
# multiplex this insert and make it faster -- we're just testing for thread |
|---|
| 45 |
# safety here. Testing with much higher values of left/right allows |
|---|
| 46 |
# multithread locking to be obvious. |
|---|
| 47 |
s1 = pg8000.PreparedStatement(db, "INSERT INTO t1 (f1, f2, f3) VALUES ($1, $2, $3)", int, int, str) |
|---|
| 48 |
def test(left, right): |
|---|
| 49 |
for i in range(left, right): |
|---|
| 50 |
s1.execute(i, id(threading.currentThread()), "test - unicode \u0173 ") |
|---|
| 51 |
t1 = threading.Thread(target=test, args=(1, 10)) |
|---|
| 52 |
t2 = threading.Thread(target=test, args=(10, 20)) |
|---|
| 53 |
t3 = threading.Thread(target=test, args=(20, 30)) |
|---|
| 54 |
t4 = threading.Thread(target=test, args=(30, 40)) |
|---|
| 55 |
t1.start() ; t2.start() ; t3.start() ; t4.start() |
|---|
| 56 |
t1.join(); t2.join(); t3.join(); t4.join() |
|---|
| 57 |
|
|---|
| 58 |
db.commit() |
|---|
| 59 |
|
|---|
| 60 |
|
|---|
| 61 |
print "testing basic query..." |
|---|
| 62 |
db.execute("SELECT * FROM t1") |
|---|
| 63 |
for row in db.iterate_dict(): |
|---|
| 64 |
assert row.has_key("f1") and row.has_key("f2") and row.has_key("f3") |
|---|
| 65 |
|
|---|
| 66 |
print "testing two queries at once..." |
|---|
| 67 |
cur1 = pg8000.Cursor(db) |
|---|
| 68 |
cur1.execute("SELECT * FROM t1") |
|---|
| 69 |
s1 = pg8000.PreparedStatement(db, "SELECT f1, f2 FROM t1 WHERE f1 > $1", int) |
|---|
| 70 |
i = 0 |
|---|
| 71 |
for row1 in cur1.iterate_dict(): |
|---|
| 72 |
assert row1.has_key("f1") and row1.has_key("f2") and row1.has_key("f3") |
|---|
| 73 |
i = i + 1 |
|---|
| 74 |
s1.execute(row1['f1']) |
|---|
| 75 |
for row2 in s1.iterate_dict(): |
|---|
| 76 |
assert row2.has_key("f1") and row2.has_key("f2") and not row2.has_key("f3") |
|---|
| 77 |
|
|---|
| 78 |
print "beginning type checks..." |
|---|
| 79 |
|
|---|
| 80 |
cur1.execute("SELECT $1", 5) |
|---|
| 81 |
assert tuple(cur1.iterate_dict()) == ({"?column?": 5},) |
|---|
| 82 |
|
|---|
| 83 |
cur1.execute("SELECT $1", 22.333) |
|---|
| 84 |
retval = tuple(cur1.iterate_dict()) |
|---|
| 85 |
assert retval == ({"?column?": 22.332999999999998},) |
|---|
| 86 |
|
|---|
| 87 |
cur1.execute("SELECT $1", decimal.Decimal("22.333")) |
|---|
| 88 |
retval = tuple(cur1.iterate_dict()) |
|---|
| 89 |
assert retval == ({"?column?": decimal.Decimal("22.333")},) |
|---|
| 90 |
|
|---|
| 91 |
cur1.execute("SELECT 5000::smallint") |
|---|
| 92 |
assert tuple(cur1.iterate_dict()) == ({"int2": 5000},) |
|---|
| 93 |
|
|---|
| 94 |
cur1.execute("SELECT 5000::integer") |
|---|
| 95 |
assert tuple(cur1.iterate_dict()) == ({"int4": 5000},) |
|---|
| 96 |
|
|---|
| 97 |
cur1.execute("SELECT 50000000000000::bigint") |
|---|
| 98 |
assert tuple(cur1.iterate_dict()) == ({"int8": 50000000000000},) |
|---|
| 99 |
|
|---|
| 100 |
cur1.execute("SELECT 5000.023232::decimal") |
|---|
| 101 |
assert tuple(cur1.iterate_dict()) == ({"numeric": decimal.Decimal("5000.023232")},) |
|---|
| 102 |
|
|---|
| 103 |
cur1.execute("SELECT 1.1::real") |
|---|
| 104 |
assert tuple(cur1.iterate_dict()) == ({"float4": 1.1000000238418579},) |
|---|
| 105 |
|
|---|
| 106 |
cur1.execute("SELECT 1.1::double precision") |
|---|
| 107 |
assert tuple(cur1.iterate_dict()) == ({"float8": 1.1000000000000001},) |
|---|
| 108 |
|
|---|
| 109 |
cur1.execute("SELECT 'hello'::varchar(50)") |
|---|
| 110 |
assert tuple(cur1.iterate_dict()) == ({"varchar": u"hello"},) |
|---|
| 111 |
|
|---|
| 112 |
cur1.execute("SELECT 'hello'::char(20)") |
|---|
| 113 |
assert tuple(cur1.iterate_dict()) == ({"bpchar": u"hello "},) |
|---|
| 114 |
|
|---|
| 115 |
cur1.execute("SELECT 'hello'::text") |
|---|
| 116 |
assert tuple(cur1.iterate_dict()) == ({"text": u"hello"},) |
|---|
| 117 |
|
|---|
| 118 |
#cur1.execute("SELECT 'hell\007o'::bytea") |
|---|
| 119 |
#assert tuple(cur1.iterate_dict()) == ({"bytea": "hello"},) |
|---|
| 120 |
|
|---|
| 121 |
cur1.execute("SELECT '2001-02-03 04:05:06.17'::timestamp") |
|---|
| 122 |
retval = tuple(cur1.iterate_dict()) |
|---|
| 123 |
assert retval == ({'timestamp': datetime.datetime(2001, 2, 3, 4, 5, 6, 170000)},) |
|---|
| 124 |
|
|---|
| 125 |
#cur1.execute("SELECT '2001-02-03 04:05:06.17'::timestamp with time zone") |
|---|
| 126 |
#assert tuple(cur1.iterate_dict()) == ({'timestamp': datetime.datetime(2001, 2, 3, 4, 5, 6, 170000, pg8000.Types.FixedOffsetTz("-07"))},) |
|---|
| 127 |
|
|---|
| 128 |
cur1.execute("SELECT '1 month'::interval") |
|---|
| 129 |
assert tuple(cur1.iterate_dict()) == ({'interval': '1 mon'},) |
|---|
| 130 |
#print repr(tuple(cur1.iterate_dict())) |
|---|
| 131 |
|
|---|
| 132 |
|
|---|
| 133 |
print "tests complete" |
|---|