forked from psycopg/psycopg2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcopy_from.py
177 lines (137 loc) · 4.69 KB
/
copy_from.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# copy_from.py -- example about copy_from
#
# Copyright (C) 2002 Tom Jenkins <[email protected]>
# Copyright (C) 2005 Federico Di Gregorio <[email protected]>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
## put in DSN your DSN string
DSN = 'dbname=test'
## don't modify anything below this line (except for experimenting)
import sys
import os
import StringIO
import psycopg2
if len(sys.argv) > 1:
DSN = sys.argv[1]
print "Opening connection using dsn:", DSN
conn = psycopg2.connect(DSN)
print "Encoding for this connection is", conn.encoding
curs = conn.cursor()
try:
curs.execute("CREATE TABLE test_copy (fld1 text, fld2 text, fld3 int4)")
except:
conn.rollback()
curs.execute("DROP TABLE test_copy")
curs.execute("CREATE TABLE test_copy (fld1 text, fld2 text, fld3 int4)")
conn.commit()
# copy_from with default arguments, from open file
io = open('copy_from.txt', 'wr')
data = ['Tom\tJenkins\t37\n',
'Madonna\t\\N\t45\n',
'Federico\tDi Gregorio\t\\N\n']
io.writelines(data)
io.close()
io = open('copy_from.txt', 'r')
curs.copy_from(io, 'test_copy')
print "1) Copy %d records from file object " % len(data) + \
"using defaults (sep: \\t and null = \\N)"
io.close()
curs.execute("SELECT * FROM test_copy")
rows = curs.fetchall()
print " Select returned %d rows" % len(rows)
for r in rows:
print " %s %s\t%s" % (r[0], r[1], r[2])
curs.execute("delete from test_copy")
conn.commit()
# copy_from using custom separator, from open file
io = open('copy_from.txt', 'wr')
data = ['Tom:Jenkins:37\n',
'Madonna:\N:45\n',
'Federico:Di Gregorio:\N\n']
io.writelines(data)
io.close()
io = open('copy_from.txt', 'r')
curs.copy_from(io, 'test_copy', ':')
print "2) Copy %d records from file object using sep = :" % len(data)
io.close()
curs.execute("SELECT * FROM test_copy")
rows = curs.fetchall()
print " Select returned %d rows" % len(rows)
for r in rows:
print " %s %s\t%s" % (r[0], r[1], r[2])
curs.execute("delete from test_copy")
conn.commit()
# copy_from using custom null identifier, from open file
io = open('copy_from.txt', 'wr')
data = ['Tom\tJenkins\t37\n',
'Madonna\tNULL\t45\n',
'Federico\tDi Gregorio\tNULL\n']
io.writelines(data)
io.close()
io = open('copy_from.txt', 'r')
curs.copy_from(io, 'test_copy', null='NULL')
print "3) Copy %d records from file object using null = NULL" % len(data)
io.close()
curs.execute("SELECT * FROM test_copy")
rows = curs.fetchall()
print " Select using cursor returned %d rows" % len(rows)
for r in rows:
print " %s %s\t%s" % (r[0], r[1], r[2])
curs.execute("delete from test_copy")
conn.commit()
# copy_from using custom separator and null identifier
io = open('copy_from.txt', 'wr')
data = ['Tom:Jenkins:37\n', 'Madonna:NULL:45\n', 'Federico:Di Gregorio:NULL\n']
io.writelines(data)
io.close()
io = open('copy_from.txt', 'r')
curs.copy_from(io, 'test_copy', ':', 'NULL')
print "4) Copy %d records from file object " % len(data) + \
"using sep = : and null = NULL"
io.close()
curs.execute("SELECT * FROM test_copy")
rows = curs.fetchall()
print " Select using cursor returned %d rows" % len(rows)
for r in rows:
print " %s %s\t%s" % (r[0], r[1], r[2])
curs.execute("delete from test_copy")
conn.commit()
# anything can be used as a file if it has .read() and .readline() methods
data = StringIO.StringIO()
data.write('\n'.join(['Tom\tJenkins\t37',
'Madonna\t\N\t45',
'Federico\tDi Gregorio\t\N']))
data.seek(0)
curs.copy_from(data, 'test_copy')
print "5) Copy 3 records from StringIO object using defaults"
curs.execute("SELECT * FROM test_copy")
rows = curs.fetchall()
print " Select using cursor returned %d rows" % len(rows)
for r in rows:
print " %s %s\t%s" % (r[0], r[1], r[2])
curs.execute("delete from test_copy")
conn.commit()
# simple error test
print "6) About to raise an error"
data = StringIO.StringIO()
data.write('\n'.join(['Tom\tJenkins\t37',
'Madonna\t\N\t45',
'Federico\tDi Gregorio\taaa']))
data.seek(0)
try:
curs.copy_from(data, 'test_copy')
except StandardError, err:
conn.rollback()
print " Caught error (as expected):\n", err
conn.rollback()
curs.execute("DROP TABLE test_copy")
os.unlink('copy_from.txt')
conn.commit()