Sample requests for psycopg2
An example of class implementation on psycopg2 for PostgreSQL… And also some examples without classes.
These are very old notes of mine, but I decided to publish them, perhaps someday they will be needed.
Net queries through psycopg2 faster than SqlAlchemy (add-on over psycopg2). And where speed is needed, it is logical to use pure psycopg2.
Then there are 2 options – write the full syntax each time or make a class for your own for quick use psycopg2… Below is an example of such a class. It can look different in each project.
An example of a class on psycopg2
from psycopg2.extras import NamedTupleCursor
class DbPsycopg:
def __init__(self):
self…conn = psycopg2.connect(
# Dbname is sufficient for localhost
dbname=‘ploshadka’,
# example of a remote connection
# user = “ploshadka”,
# password = ‘ploshadka.net’,
# host = “name_of_bd.cixfs.us-east-2.rds.amazonaws.com”
)
self…conn…autocommit = True
def fetch_one(self, query, arg=None, factory=None, clean=None):
“” “Get only one SINGLE value (not a row!) From a table
: param query: Query
: param arg: Variables
: param factory: dic (returns a dictionary – key / value) or list (returns list)
: param clean: With a parameter will only return the value. Without a parameter, will return the value in a tuple.
“” “
try:
cur = self.__ connection(factory)
self.__ execute(cur, query, arg)
return self.__ fetch(cur, clean)
except (Exception, psycopg2.Error) as error:
self.__ error(error)
def fetch_all(self, query, arg=None, factory=None):
“” “Get multiple data from a table
: param query: Query
: param arg: Variables
: param factory: dic (returns a dictionary – key / value) or list (returns list)
“” “
try:
cur = self.__ connection(factory)
self.__ execute(cur, query, arg)
return cur.fetchall()
except (Exception, psycopg2.Error) as error:
self.__ error(error)
def query_update(self, query, arg, message=None):
“” “Updates the data in the table and returns a success message” “”
try:
cur = self…conn…cursor()
cur.execute(query, arg)
return message
except (Exception, psycopg2.Error) as error:
self.__ error(error)
def close(self):
cur = self…conn…cursor()
cur.close()
self…conn…close()
def __connection(self, factory=None):
# Dic – returns a dictionary – key / value
if factory == ‘dic’:
cur = self…conn…cursor(cursor_factory=psycopg2.extras…RealDictCursor)
# List – returns list (although called DictCursor)
elif factory == ‘list’:
cur = self…conn…cursor(cursor_factory=psycopg2.extras…DictCursor)
# Tuple
else:
cur = self…conn…cursor()
return cur
@staticmethod
def __execute(cur, query, arg=None):
# The ‘execute’ method always returns None
if arg:
cur.execute(query, arg)
else:
cur.execute(query)
@staticmethod
def __fetch(cur, clean):
# If the request was successful, fetch the data with ‘fetchone’
if clean == ‘no’:
# Returns:
# Name:
# (‘Royal Caribbean Cruises’,)
# Date:
# (datetime.datetime (2020, 6, 2, 13, 36, 35, 61052, tzinfo = psycopg2.tz.FixedOffsetTimezone (offset = 0, name = None)),)
fetch = cur.fetchone()
else:
# Returns:
# Name:
# Royal Caribbean Cruises
# Date:
# (da2020-06-02 13: 36: 35.061052 + 00: 00
fetch = cur.fetchone()[0]
return fetch
@staticmethod
def __error(error):
# Including, if there is no data in the database, there will be an error at the fetchone stage
print(‘There is no data in the database or an error: {}’…format(error))
return None
Examples of using the psycopg2 class
We initialize the class in the main file __init__.py:
# For direct requests (hardcore)
db_psycopg = DbPsycopg()
Then, where we will use the class, we import:
from app import db_psycopg
fetch_one
Function:
def __get_time_from_last_insert_row():
“” “Get the time since the last entry in the table” “”
query = ‘SELECT date FROM operations ORDER BY date DESC limit 1’
return db_psycopg.fetch_one(query)
Receives:
2020-10-29 21: 31: 15.934801 + 03: 00
Same query for SQLAlchemy:
db.session…query(Operation.date)…order_by(db.desc(Operation.date))…limit(1)…scalar()
query_update
An example of updating data:
“” “Add operation to DB” “”
for operation in operations:
query = “” “
INSERT INTO operations (user_id, name, price, date)
VALUES (% s,% s,% s,% s)
ON CONFLICT DO NOTHING
; “” “
db_psycopg.query_update(query, (user_id,
operation[‘name’],
operation[‘price’],
operation[‘date’]))
If you need to update the data in case of a conflict with new ones, then instead of NOTHING put UPDATE…
It may also look like this:
def update_price(self, figi, buy, sell):
“” “Saves the last buy and sell price of the stock” “”
query = ‘UPDATE portfolio SET price_last_buy =% s, price_last_sell =% s WHERE figi =% s’
response = self…db…query_update(query, (buy, sell, figi), ‘Price has been updated’)
We use the same update method, but for deletion:
def clear_portfolio_table(self, name):
# Removes the disappeared stock from the portfolio
for i in name:
query = “” “
DELETE FROM portfolio WHERE name =% s;
“” “
response = self…db…query_update(query, (i,), ‘Stock ‘ + i + ‘removed from portfolio’)
SQLAlchemy example
Add new operation to SQLAlchemy:
for operation in operations:
exist = db.session…query(Operation.id)…filter_by(id=operation[‘id’])…first()
if not exist:
new = Operation(
user_id=user_id,
id=operation[‘id’],
price=operation[‘price’],
date=operation[‘date’]
)
db.session…add(new)
db.session…commit()
psycopg2 without class
Code for updating data in the database
try:
with self…connect…cursor() as cursor:
self…connect…autocommit = True
for share in portfolio:
query = “” “
INSERT INTO portfolio (figi, name, average_price, currency, lots, expected_yield)
VALUES (% s,% s,% s,% s,% s,% s)
ON CONFLICT (figi)
DO UPDATE SET
(average_price, lots, expected_yield, updated_at)
= (EXCLUDED.average_price, EXCLUDED.lots, EXCLUDED.expected_yield, EXCLUDED.updated_at)
; “” “
cursor.execute(query, (share.figi,
share.name,
share.average_position_price…value,
share.average_position_price…currency,
share.lots,
share.expected_yield…value))
return ‘The last portfolio was saved in the portfolio table’
except (Exception, psycopg2.Error) as error:
return ‘Failed inserting record into portfolio table from Api {}’…format(error)
Code for inserting data into the database via psycopg2
with self…connect…cursor() as cursor:
self…connect…autocommit = True
for item in dic:
values = [(
item[‘id_tinkoff’],
item[‘figi’],
item[‘name’],
item[‘currency’],
item[‘price’],
item[‘quantity’],
item[‘payment’],
item[‘commission’],
item[‘operation_type’],
item[‘date’]
)]
query = sql.SQL(
‘INSERT INTO shares (id_tinkoff, figi, name, currency, price, quantity, payment, commission, operation_type, date)’
‘VALUES {} ON CONFLICT DO NOTHING’)…format(sql.SQL(‘,’)…join(map(sql.Literal, values)))
cursor.execute(query)
An example of psycopg2 initialization in one file
from psycopg2 import sql
class PostgreSQL:
def __init__(self):
self…connect = psycopg2.connect(
dbname=”,
user=”,
password=”,
host=”
)
def some_function(self, dic):
with self…connect…cursor() as cursor:
self…connect…autocommit = True
# Here further is the code of the query in the database