Grouping by 'ts_code' is just a trivial groupby() function. DataFrame.rolling() function is for single columns, so it's a tricky to apply it if you need data from multiple columns. You can use "from numpy_ext import rolling_apply as rolling_apply_ext" as in this example: Pandas rolling apply using multiple columns. However, I just created a function that manually groups the dataframe into n length sub-dataframes, then applies the function to calculate the value. idxmax() finds the index value of the peak of the low column, then we find the min() of the values that follow. The rest is pretty straightforward.
import numpy as np import pandas as pd df = pd.DataFrame([['A', 20, 10], ['A', 30, 5], ['A', 40, 20], ['A', 50, 10], ['A', 20, 30], ['B', 50, 10], ['B', 30, 5], ['B', 40, 20], ['B', 10, 10], ['B', 20, 30]], columns=['ts_code', 'high', 'low'] ) def custom_f(df, n): s = pd.Series(np.nan, index=df.index) def sub_f(df_): high_peak_idx = df_['high'].idxmax() min_low_after_peak = df_.loc[high_peak_idx:]['low'].min() max_high = df_['high'].max() return 1 - min_low_after_peak / max_high for i in range(df.shape[0] - n + 1): df_ = df.iloc[i:i + n] s.iloc[i + n - 1] = sub_f(df_) return s df['l3_high_low_pct_chg'] = df.groupby("ts_code").apply(custom_f, 3).values df['l4_high_low_pct_chg'] = df.groupby("ts_code").apply(custom_f, 4).values print(df)
If you prefer to use the rolling function, this method gives the same output:
def rolling_f(rolling_df): df_ = df.loc[rolling_df.index] high_peak_idx = df_['high'].idxmax() min_low_after_peak = df_.loc[high_peak_idx:]["low"].min() max_high = df_['high'].max() return 1 - min_low_after_peak / max_high df['l3_high_low_pct_chg'] = df.groupby("ts_code").rolling(3).apply(rolling_f).values[:, 0] df['l4_high_low_pct_chg'] = df.groupby("ts_code").rolling(4).apply(rolling_f).values[:, 0] print(df)
Finally, if you want to do a true rolling window calculation that avoids any index lookup, you can use the numpy_ext (https://pypi.org/project/numpy-ext/)
from numpy_ext import rolling_apply def np_ext_f(rolling_df, n): def rolling_apply_f(high, low): return 1 - low[np.argmax(high):].min() / high.max() try: return pd.Series(rolling_apply(rolling_apply_f, n, rolling_df['high'].values, rolling_df['low'].values), index=rolling_df.index) except ValueError: return pd.Series(np.nan, index=rolling_df.index) df['l3_high_low_pct_chg'] = df.groupby('ts_code').apply(np_ext_f, n=3).sort_index(level=1).values df['l4_high_low_pct_chg'] = df.groupby('ts_code').apply(np_ext_f, n=4).sort_index(level=1).values print(df)
output:
ts_code high low l3_high_low_pct_chg l4_high_low_pct_chg 0 A 20 10 NaN NaN 1 A 30 5 NaN NaN 2 A 40 20 0.50 NaN 3 A 50 10 0.80 0.80 4 A 20 30 0.80 0.80 5 B 50 10 NaN NaN 6 B 30 5 NaN NaN 7 B 40 20 0.90 NaN 8 B 10 10 0.75 0.90 9 B 20 30 0.75 0.75
For large datasets, the speed of these operations becomes an issue. So, to compare the speed of these different methods, I created a timing function:
import time def timeit(f): def timed(*args, **kw): ts = time.time() result = f(*args, **kw) te = time.time() print ('func:%r took: %2.4f sec' % \ (f.__name__, te-ts)) return result return timed
Next, let's make a large DataFrame, just by copying the existing dataframe 500 times:
df = pd.concat([df for x in range(500)], axis=0) df = df.reset_index()
Finally, we run the three tests under a timing function:
@timeit def method_1(): df['l52_high_low_pct_chg'] = df.groupby("ts_code").apply(custom_f, 52).values method_1() @timeit def method_2(): df['l52_high_low_pct_chg'] = df.groupby("ts_code").rolling(52).apply(rolling_f).values[:, 0] method_2() @timeit def method_3(): df['l52_high_low_pct_chg'] = df.groupby('ts_code').apply(np_ext_f, n=52).sort_index(level=1).values method_3()
Which gives us this output:
func:'method_1' took: 2.5650 sec func:'method_2' took: 15.1233 sec func:'method_3' took: 0.1084 sec
So, the fastest method is to use the numpy_ext, which makes sense because that's optimized for vectorized calculations. The second fastest method is the custom function I wrote, which is somewhat efficient because it does some vectorized calculations while also doing some Pandas lookups. The slowest method by far is using Pandas rolling function.