Sample requests for psycopg2

An example of class implementation on psycopg2 for PostgreSQL… And also some examples without classes.

These are very old notes of mine, but I decided to publish them, perhaps someday they will be needed.

Net queries through psycopg2 faster than SqlAlchemy (add-on over psycopg2). And where speed is needed, it is logical to use pure psycopg2.

Then there are 2 options – write the full syntax each time or make a class for your own for quick use psycopg2… Below is an example of such a class. It can look different in each project.

An example of a class on psycopg2

import psycopg2
from psycopg2.extras import NamedTupleCursor

class DbPsycopg:
def __init__(self):
selfconn = psycopg2.connect(
# Dbname is sufficient for localhost
dbname=‘ploshadka’,

# example of a remote connection
# user = “ploshadka”,
# password = ‘ploshadka.net’,
# host = “name_of_bd.cixfs.us-east-2.rds.amazonaws.com”
)
selfconnautocommit = True

def fetch_one(self, query, arg=None, factory=None, clean=None):
“” “Get only one SINGLE value (not a row!) From a table
: param query: Query
: param arg: Variables
: param factory: dic (returns a dictionary – key / value) or list (returns list)
: param clean: With a parameter will only return the value. Without a parameter, will return the value in a tuple.
“” “

try:
cur = self.__ connection(factory)
self.__ execute(cur, query, arg)
return self.__ fetch(cur, clean)

except (Exception, psycopg2.Error) as error:
self.__ error(error)

def fetch_all(self, query, arg=None, factory=None):
“” “Get multiple data from a table
: param query: Query
: param arg: Variables
: param factory: dic (returns a dictionary – key / value) or list (returns list)
“” “

try:
cur = self.__ connection(factory)
self.__ execute(cur, query, arg)
return cur.fetchall()

except (Exception, psycopg2.Error) as error:
self.__ error(error)

def query_update(self, query, arg, message=None):
“” “Updates the data in the table and returns a success message” “”
try:
cur = selfconncursor()
cur.execute(query, arg)
return message

except (Exception, psycopg2.Error) as error:
self.__ error(error)

def close(self):
cur = selfconncursor()
cur.close()
selfconnclose()

def __connection(self, factory=None):
# Dic – returns a dictionary – key / value
if factory == ‘dic’:
cur = selfconncursor(cursor_factory=psycopg2.extrasRealDictCursor)

# List – returns list (although called DictCursor)
elif factory == ‘list’:
cur = selfconncursor(cursor_factory=psycopg2.extrasDictCursor)

# Tuple
else:
cur = selfconncursor()

return cur

@staticmethod
def __execute(cur, query, arg=None):
# The ‘execute’ method always returns None
if arg:
cur.execute(query, arg)
else:
cur.execute(query)

@staticmethod
def __fetch(cur, clean):
# If the request was successful, fetch the data with ‘fetchone’
if clean == ‘no’:
# Returns:
# Name:
# (‘Royal Caribbean Cruises’,)
# Date:
# (datetime.datetime (2020, 6, 2, 13, 36, 35, 61052, tzinfo = psycopg2.tz.FixedOffsetTimezone (offset = 0, name = None)),)
fetch = cur.fetchone()
else:
# Returns:
# Name:
# Royal Caribbean Cruises
# Date:
# (da2020-06-02 13: 36: 35.061052 + 00: 00
fetch = cur.fetchone()[0]
return fetch

@staticmethod
def __error(error):
# Including, if there is no data in the database, there will be an error at the fetchone stage
print(‘There is no data in the database or an error: {}’format(error))
return None

Examples of using the psycopg2 class

We initialize the class in the main file __init__.py:

# For direct requests (hardcore)
db_psycopg = DbPsycopg()

Then, where we will use the class, we import:

from app import db_psycopg

fetch_one

Function:

def __get_time_from_last_insert_row():
“” “Get the time since the last entry in the table” “”
query = ‘SELECT date FROM operations ORDER BY date DESC limit 1’
return db_psycopg.fetch_one(query)

Receives:

2020-10-29 21: 31: 15.934801 + 03: 00

Same query for SQLAlchemy:

db.sessionquery(Operation.date)order_by(db.desc(Operation.date))limit(1)scalar()

query_update

An example of updating data:

def __save_operations(operations, user_id):
“” “Add operation to DB” “”
for operation in operations:
query = “” “
INSERT INTO operations (user_id, name, price, date)
VALUES (% s,% s,% s,% s)
ON CONFLICT DO NOTHING
; “” “

db_psycopg.query_update(query, (user_id,
operation[‘name’],
operation[‘price’],
operation[‘date’]))

If you need to update the data in case of a conflict with new ones, then instead of NOTHING put UPDATE

It may also look like this:

def update_price(self, figi, buy, sell):
“” “Saves the last buy and sell price of the stock” “”
query = ‘UPDATE portfolio SET price_last_buy =% s, price_last_sell =% s WHERE figi =% s’
response = selfdbquery_update(query, (buy, sell, figi), ‘Price has been updated’)

We use the same update method, but for deletion:

def clear_portfolio_table(self, name):
# Removes the disappeared stock from the portfolio
for i in name:
query = “” “
DELETE FROM portfolio WHERE name =% s;
“” “

response = selfdbquery_update(query, (i,), ‘Stock ‘ + i + ‘removed from portfolio’)

SQLAlchemy example

Add new operation to SQLAlchemy:

def __save_operations(operations, user_id):
for operation in operations:
exist = db.sessionquery(Operation.id)filter_by(id=operation[‘id’])first()

if not exist:
new = Operation(
user_id=user_id,
id=operation[‘id’],
price=operation[‘price’],
date=operation[‘date’]
)
db.sessionadd(new)
db.sessioncommit()

psycopg2 without class

Code for updating data in the database

def update_portfolio(self, portfolio):
try:
with selfconnectcursor() as cursor:
selfconnectautocommit = True

for share in portfolio:
query = “” “
INSERT INTO portfolio (figi, name, average_price, currency, lots, expected_yield)
VALUES (% s,% s,% s,% s,% s,% s)
ON CONFLICT (figi)
DO UPDATE SET
(average_price, lots, expected_yield, updated_at)
= (EXCLUDED.average_price, EXCLUDED.lots, EXCLUDED.expected_yield, EXCLUDED.updated_at)
; “” “

cursor.execute(query, (share.figi,
share.name,
share.average_position_pricevalue,
share.average_position_pricecurrency,
share.lots,
share.expected_yieldvalue))

return ‘The last portfolio was saved in the portfolio table’

except (Exception, psycopg2.Error) as error:
return ‘Failed inserting record into portfolio table from Api {}’format(error)

Code for inserting data into the database via psycopg2

def add_shares(self, dic):
with selfconnectcursor() as cursor:
selfconnectautocommit = True

for item in dic:
values = [(
                item[‘id_tinkoff’],
item[‘figi’],
item[‘name’],
item[‘currency’],
item[‘price’],
item[‘quantity’],
item[‘payment’],
item[‘commission’],
item[‘operation_type’],
item[‘date’]
)]

query = sql.SQL(
‘INSERT INTO shares (id_tinkoff, figi, name, currency, price, quantity, payment, commission, operation_type, date)’
‘VALUES {} ON CONFLICT DO NOTHING’)format(sql.SQL(‘,’)join(map(sql.Literal, values)))
cursor.execute(query)

An example of psycopg2 initialization in one file

import psycopg2
from psycopg2 import sql

class PostgreSQL:

def __init__(self):
selfconnect = psycopg2.connect(
dbname=,
user=,
password=,
host=
)

def some_function(self, dic):
with selfconnectcursor() as cursor:
selfconnectautocommit = True

# Here further is the code of the query in the database

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *