1

I`ve got some bottleneck with data, and will be appreciated for senior advice.

I have an API, where i recieve financial data that looks like this GBPUSD 2020-01-01 00:00:01.001 1.30256 1.30250, my target is to write those data directly into databse as fast as it possible.

Inputs:

  • Python 3.8
  • PastgreSQL 12
  • Redis Queue (Linux)
  • SQLAlchemy

Incoming data structure, as showed above, comes in one dictionary {symbol: {datetime: (price1, price2)}}. All of the data comes in String datatype.

API is streaming 29 symbols, so I can recieve for example from 30 to 60+ values of different symbols just in one second.

How it works now:

  1. I recieve new value in dictionary;
  2. All new values of each symbol, when they come to me, is storing in one variable dict - data_dict;
  3. Next I'm asking those dictionary by symbol key and last value, and send those data to Redis Queue - data_dict[symbol][last_value].enqueue(save_record, args=(datetime, price1, price2)) . Till this point everything works fine and fast.
  4. When it comes to Redis worker, there is save_record function:

"

def save_record(Datetime, price1, price2, Instr, adf): # Parameters #---------- # Datetime : 'string' : Datetime value # price1 : 'string' : Bid Value # price2 : 'string' : Ask Value # Instr : 'string' : symbol to save # adf : 'string' : Cred to DataBase engine #------- # result : : Execute save command to database engine = create_engine(adf) meta = MetaData(bind=engine,reflect=True) table_obj = Table(Instr,meta) insert_state = table_obj.insert().values(Datetime=Datetime,price1=price1,price2=price2) with engine.connect() as conn: conn.execute(insert_state) 

When i`m execute last row of function, it takes from 0.5 to 1 second to write those row into the database:

12:49:23 default: DT.save_record('2020-00-00 00:00:01.414538', 1.33085, 1.33107, 'USDCAD', 'postgresql cred') (job_id_1)

12:49:24 default: Job OK (job_id_1)

12:49:24 default: DT.save_record('2020-00-00 00:00:01.422541', 1.56182, 1.56213, 'EURCAD', 'postgresql cred') (job_id_2)

12:49:25 default: Job OK (job_id_2)

Queued jobs for inserting each row directly into database is that bottleneck, because I can insert only 1 - 2 value(s) in 1 second, and I can recieve over 60 values in 1 second. If I run this saving, it starts to create huge queue (maximum i get was 17.000 records in queue after 1 hour of API listening), and it won't stop rhose size.

I'm currently using only 1 queue, and 17 workers. This make my PC CPU run in 100%.

So question is how to optimize this process and not create huge queue. Maybe try to save for example in JSON some sequence and then insert into DB, or store incoming data in separated variables..

Sorry if something is doubted, ask - and I`ll answer.

--UPD-- So heres my little review about some experiments:

  1. Move engine meta out of function

Due to my architechture, API application located on Windows 10, and Redis Queue located on Linux. There was an issue wis moving meta and engine out of function, it returns TypeError (it is not depends on OS), a little info about it here

  1. Insert multiple rows in a batch: This approach seemed to be the most simple and easy - so it is! Basically, i've just created dictionary: data_dict = {'data_pack': []}, to begin storing there incoming values. Then I ask if there is more than 20 values per symbol is written allready - i'm sending those branch to Redis Queue, and it takes 1.5 second to write down in database. Then i delete taken records from data_dict, and process continue. So thanks Mike Organek for good advice.

Those approach is quite enough for my targets to exist, at the same time I can say that this stack of tech can provide you really good flexibility!

5
  • Are you required to use SQLAlchemy? If so, I cannot help you. Does the call to engine.connect() create a new connection to the database for each row you want to insert? Commented Sep 22, 2020 at 11:03
  • No i'm not required to use SQLAlchemy, and yes it is create every time new connection to insert each row to database), that approach was first working, so nothing there is important to use, i'm free to choose any module or bibl. Commented Sep 22, 2020 at 11:05
  • 2
    The first place to look is the connection for each row you insert. Establishing the connection has high overhead, and you can change this and continue to use SQLAlchemy. If that does not speed it up enough, then I am not sure whether the following steps can be done using SQLAlchemy. The second place is performing the inserts only one row at a time. You should insert multiple rows in batch. The third place is in using a prepared statement to eliminate query parsing overhead. Please comment if you have questions on any of this. Commented Sep 22, 2020 at 11:25
  • thanks, agree with you in each step, will try one by one and then see, anyway will reply about result! Commented Sep 22, 2020 at 12:31
  • 1
    @MikeOrganek added UPD section, thanks! Commented Sep 23, 2020 at 11:03

1 Answer 1

1

Every time you call save_record you re-create the engine and (reflected) meta objects, both of which are expensive operations. Running your sample code as-is gave me a throughput of

20 rows inserted in 4.9 seconds

Simply moving the engine = and meta = statements outside of the save_record function (and thereby only calling them once) improved throughput to

20 rows inserted in 0.3 seconds

Additional note: It appears that you are storing the values for each symbol in a separate table, i.e. 'GBPUSD' data in a table named GBPUSD, 'EURCAD' data in a table named EURCAD, etc.. That is a "red flag" suggesting bad database design. You should be storing all of the data in a single table with a column for the symbol.

Sign up to request clarification or add additional context in comments.

3 Comments

great appointment, thanks, there is some issue with calling engine meta outside of a function, but i will search for approach
Can you please provide your approach?, how did you call it outside, cuz when i'm trying to do that i get TypeError: cannot pickle '_thread._local' object
@Artem - I literally just moved the engine = and meta = statements just above the save_record function. I've never used redis so I can't help you there, and I don't do threading a lot because it usually isn't all that helpful for ETL jobs.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.