I`ve got some bottleneck with data, and will be appreciated for senior advice.
I have an API, where i recieve financial data that looks like this GBPUSD 2020-01-01 00:00:01.001 1.30256 1.30250, my target is to write those data directly into databse as fast as it possible.
Inputs:
- Python 3.8
- PastgreSQL 12
- Redis Queue (Linux)
- SQLAlchemy
Incoming data structure, as showed above, comes in one dictionary {symbol: {datetime: (price1, price2)}}. All of the data comes in String datatype.
API is streaming 29 symbols, so I can recieve for example from 30 to 60+ values of different symbols just in one second.
How it works now:
- I recieve new value in dictionary;
- All new values of each symbol, when they come to me, is storing in one variable dict -
data_dict; - Next I'm asking those dictionary by symbol key and last value, and send those data to Redis Queue -
data_dict[symbol][last_value].enqueue(save_record, args=(datetime, price1, price2)). Till this point everything works fine and fast. - When it comes to Redis worker, there is save_record function:
"
def save_record(Datetime, price1, price2, Instr, adf): # Parameters #---------- # Datetime : 'string' : Datetime value # price1 : 'string' : Bid Value # price2 : 'string' : Ask Value # Instr : 'string' : symbol to save # adf : 'string' : Cred to DataBase engine #------- # result : : Execute save command to database engine = create_engine(adf) meta = MetaData(bind=engine,reflect=True) table_obj = Table(Instr,meta) insert_state = table_obj.insert().values(Datetime=Datetime,price1=price1,price2=price2) with engine.connect() as conn: conn.execute(insert_state) When i`m execute last row of function, it takes from 0.5 to 1 second to write those row into the database:
12:49:23 default: DT.save_record('2020-00-00 00:00:01.414538', 1.33085, 1.33107, 'USDCAD', 'postgresql cred') (job_id_1)
12:49:24 default: Job OK (job_id_1)
12:49:24 default: DT.save_record('2020-00-00 00:00:01.422541', 1.56182, 1.56213, 'EURCAD', 'postgresql cred') (job_id_2)
12:49:25 default: Job OK (job_id_2)
Queued jobs for inserting each row directly into database is that bottleneck, because I can insert only 1 - 2 value(s) in 1 second, and I can recieve over 60 values in 1 second. If I run this saving, it starts to create huge queue (maximum i get was 17.000 records in queue after 1 hour of API listening), and it won't stop rhose size.
I'm currently using only 1 queue, and 17 workers. This make my PC CPU run in 100%.
So question is how to optimize this process and not create huge queue. Maybe try to save for example in JSON some sequence and then insert into DB, or store incoming data in separated variables..
Sorry if something is doubted, ask - and I`ll answer.
--UPD-- So heres my little review about some experiments:
- Move
enginemetaout of function
Due to my architechture, API application located on Windows 10, and Redis Queue located on Linux. There was an issue wis moving meta and engine out of function, it returns TypeError (it is not depends on OS), a little info about it here
- Insert multiple rows in a batch: This approach seemed to be the most simple and easy - so it is! Basically, i've just created dictionary:
data_dict = {'data_pack': []}, to begin storing there incoming values. Then I ask if there is more than 20 values per symbol is written allready - i'm sending those branch to Redis Queue, and it takes 1.5 second to write down in database. Then i delete taken records fromdata_dict, and process continue. So thanks Mike Organek for good advice.
Those approach is quite enough for my targets to exist, at the same time I can say that this stack of tech can provide you really good flexibility!
engine.connect()create a new connection to the database for each row you want to insert?