锅炉信息网 > 锅炉知识 > 锅炉学习

常见的7种K臂赌博机策略集合(下)

代码部分是实战demo。个人原创,禁止转载。Doraemon:常见的7种K臂赌博机策略集合(上)上篇我们介绍了7种K臂赌博机策略的推导公式,今天我

代码部分是实战demo。个人原创,禁止转载。

Doraemon:常见的7种K臂赌博机策略集合(上)

上篇我们介绍了7种K臂赌博机策略的推导公式,今天我们看下如何用python实现它们。实际代码中包含了10种策略。

策略类构建

第一块代码构建了一个类,采用pandas模拟的数值更新。该类包含策略及更新过程的模拟。

这里先说明一下,因为该代码是将赌博机策略应用在了电商场景下,所以变量是以该场景下进行命名的。后面会有结果变量的解读。

import randomnimport numpy as npnfrom scipy.stats import normnimport pandas as pdnimport mathnfrom tqdm import trangenn'''n用单步更新算法模拟强化学习在某一固定曝光位上推送优质商品的过程(该场景先验分布为beta分布)n'''nnclass RL:n def __init__(self):n self.product_num = 10 # 品的数量n self.product_id = [i for i in range(1,self.product_num+1)] # 品的ID列表n self.product_ctr = [0.1,0.15,0.2,0.25,0.3,0.35,0.4,0.45,0.5,0.55] # 真实收益nn random.shuffle(self.product_id) # 随机打乱品ID的顺序n random.shuffle(self.product_ctr) # 随机打乱品收益的顺序n self.product_exp = [0 for i in range(self.product_num)] # 初始化品的曝光次数n self.product_click = [0 for i in range(self.product_num)] # 初始化品的点击次数n self.product_weight = [1000 for i in range(self.product_num)] # 初始化品的权重n self.product_remark = [0 for i in range(self.product_num)] # 初始化品的备注项(贪心算法使用)n self.product_ha = [0 for i in range(self.product_num)] # 初始化偏好函数(梯度提升算法使用)n self.product_hap = [0 for i in range(self.product_num)] # 初始化偏好权重(梯度提升算法使用)n self.product_info = pd.DataFrame({'product_id':self.product_id,n 'product_ctr':self.product_ctr,n 'exp':self.product_exp,n 'click':self.product_click,n 'weight':self.product_weight,n 'remark':self.product_remark,n 'ha':self.product_ha,n 'hap':self.product_hap}) # 要更新的商品信息表nn self.times = 2000 # 实验次数nnn def power_law(self,num,a,b):n '''n 收益往往遵循幂律分布n '''nn x = np.arange(1*10, (num+1)*10, 10) # 初始化数据点n # noise = norm.rvs(0, size=num, scale=0.1) # 正态噪声n noise = [random.normalvariate(0, 0.1) for i in range(num)]n y = []n for i in range(len(x)):n y.append(10.8*pow(x[i],-0.3)+noise[i])nn # [a,b]归一化n k = (b-a)/(max(y)-min(y))n result = [float(a+k*(z-min(y))) for z in y]nn return resultnnnn def product_is_click(self,pro_id):n '''n 模拟商品是否点击n '''n ctr = self.product_info[self.product_info['product_id']==pro_id]['product_ctr'][0]nn result_random = random.uniform(0,1)nn if result_random <= ctr:n return 1 n else:n return 0n # return result_randomnnn def random_bandit(self):n '''n 【随机】n --------------------------n '''n self.product_info['remark'] = self.product_info['click']/self.product_info['exp']n self.product_info.fillna(0,inplace=True)n self.product_info['weight'] = 0n random_pro_id = random.choice(self.product_id)n random_pro_idx = self.product_info[self.product_info['product_id'] == random_pro_id].indexn self.product_info.loc[random_pro_idx,'weight'] = 1000nnn def naive_bandit(self,max_ratio,now_times):n '''n 【朴素bandit算法】n 随机选择第一步n 更新曝光点击n {更新remark的平均收益n 当前更新步数是否大于max_times最大随机次数n 是否存在权重为10000的品:n 继续n 否则n 权重归0n 选择收益最大的品,将该品权重设为10000n 否则n 权重归0n 随机选择一个品,该品权重设为1000n 继续}n --------------------------n max_ratio:随机次数百分比n now_times:当前试验次数n '''n self.product_info['remark'] = self.product_info['click']/self.product_info['exp']n self.product_info.fillna(0,inplace=True)n if now_times >= self.times*max_ratio:n if self.product_info['weight'].max() == 10000:n passn else:n self.product_info['weight'] = 0 n naive_pro_idx = random.choice(list(self.product_info[self.product_info['remark']==self.product_info['remark'].max()].index))n self.product_info.loc[naive_pro_idx,'weight'] = 10000n else:n self.product_info['weight'] = 0 n naive_pro_id = random.choice(self.product_id)n naive_pro_idx = self.product_info[self.product_info['product_id'] == naive_pro_id].indexn self.product_info.loc[naive_pro_idx,'weight'] = 1000nn def greedy(self,epsilon):n '''n 【epsilon-greedy算法】n 随机选择第一步n 更新曝光点击n {更新remark的平均收益n 权重归0n 随机数(0,1)判断n 小于随机数,进行探索,随机选择品ID进行实验,将探索品权重设为1000n 否则n 大于随机数,进行利用,随机选择最大的remark值的品ID,将利用品权重设为1000}n --------------------------n epsilon:随机探索的概率,1-epsilon为利用的概率n '''n self.product_info['remark'] = self.product_info['click']/self.product_info['exp']n self.product_info.fillna(0,inplace=True)n self.product_info['weight'] = 0 n if random.uniform(0,1) < epsilon:n greed_pro_id = random.choice(self.product_id)n greed_pro_idx = self.product_info[self.product_info['product_id'] == greed_pro_id].indexn self.product_info.loc[greed_pro_idx,'weight'] = 1000n else:n greed_pro_idx = random.choice(list(self.product_info[self.product_info['remark']==self.product_info['remark'].max()].index))n self.product_info.loc[greed_pro_idx,'weight'] = 1000nnn def auto_greedy(self,now_times):n '''n 【epsilon-greedy算法】n 随机选择第一步n 更新曝光点击n {更新remark的平均收益n 权重归0n 随机数(0,1)判断n 小于随机数,进行探索,随机选择品ID进行实验,将探索品权重设为1000n 否则n 大于随机数,进行利用,随机选择最大的remark值的品ID,将利用品权重设为1000}n --------------------------n now_times:当前尝试次数n epsilon:随机探索的概率,1-epsilon为利用的概率,根据1/sqrt(t)更新n '''n epsilon = 1/np.sqrt(now_times+1)n self.product_info['remark'] = self.product_info['click']/self.product_info['exp']n self.product_info.fillna(0,inplace=True)n self.product_info['weight'] = 0 n if random.uniform(0,1) < epsilon:n greed_pro_id = random.choice(self.product_id)n greed_pro_idx = self.product_info[self.product_info['product_id'] == greed_pro_id].indexn self.product_info.loc[greed_pro_idx,'weight'] = 1000n else:n greed_pro_idx = random.choice(list(self.product_info[self.product_info['remark']==self.product_info['remark'].max()].index))n self.product_info.loc[greed_pro_idx,'weight'] = 1000nnn def softmax(self,tow):n '''n 【采用Boltzmann分布进行品采样】n 随机选择第一步n 更新曝光点击n {更新remark的平均收益n 权重归0n 计算Boltzmann分布概率n 根据概率选品n 将品权重设为1000}n --------------------------n tow:温度,值越小,越趋近利用,值越大,越趋近探索n '''n self.product_info['remark'] = self.product_info['click']/self.product_info['exp']n self.product_info.fillna(0,inplace=True)n self.product_info['weight'] = 0 n boltzmann_p = np.array(list(np.exp(self.product_info['remark']/tow)/np.exp(self.product_info['remark']/tow).sum()))n softmax_pro_id = np.random.choice(list(self.product_info['product_id']), p = boltzmann_p.ravel())n softmax_pro_idx = self.product_info[self.product_info['product_id'] == softmax_pro_id].indexn self.product_info.loc[softmax_pro_idx,'weight'] = 1000nnn def ucb(self,c,now_times):n '''n https://zhuanlan.zhihu.com/p/52964567n ucb = ctr_avg + c*sqrt(log(t)/n)n --------------------------n c:为超参,c越小,收敛越快n now_times:当前试验次数n '''n ucb_reward = self.product_info['click']/self.product_info['exp']n ucb_explore = c*np.sqrt(np.log(now_times+1)/self.product_info['exp'])n ucb_weight = ucb_reward + ucb_exploren self.product_info['weight'] = ucb_weightnnn def ucb1(self):n '''n 【UCB算法】n ucb1 = ctr_avg + sqrt(2*ln(n_total)/n)n --------------------------n '''n ucb_reward = self.product_info['click']/self.product_info['exp']n ucb_explore = np.sqrt(2 * np.log(self.product_info['exp'].sum()))/self.product_info['exp']n ucb_weight = ucb_reward + ucb_exploren self.product_info['weight'] = ucb_weightnnn def thompson_sampling(self):n '''n 【汤普森采样算法】n np.random.beta(1+success,1+(total-success))n --------------------------n '''n self.product_info['weight'] = np.random.beta(1+self.product_info['click'],1+(self.product_info['exp']-self.product_info['click']))nnn def bayesian_ucb(self,c):n '''n 【基于beta分布的贝叶斯优化UCB】n https://zhuanlan.zhihu.com/p/218398647n --------------------------n c:有多少个标准单位考虑置信上界,越大越趋近探索,越小越趋近利用,值为0即纯贪心算法n '''n ucb_reward = self.product_info['click']/self.product_info['exp']n ucb_explore = np.random.beta(1+self.product_info['click'],1+(self.product_info['exp']-self.product_info['click']))n ucb_weight = ucb_reward + c*ucb_exploren self.product_info['weight'] = ucb_weightnnn def gradient_bandit(self,a):n '''n 【梯度提升偏好算法】n https://zhuanlan.zhihu.com/p/44325923n 更新hapn 选品n 更新曝光/点击/收益n 更新han --------------------------n a:0.25n '''n self.product_info['weight'] = 0n # 更新hapn self.product_info['hap'] = np.e**self.product_info['ha']/(np.e**self.product_info['ha']).sum()n # 选品n gradient_pro_id = np.random.choice(list(self.product_info['product_id']), p = np.array(self.product_info['hap']).ravel())n gradient_pro_idx = self.product_info[self.product_info['product_id'] == gradient_pro_id].indexn self.product_info.loc[gradient_pro_idx,'weight'] = 1 # 选此次要曝光的品n # 更新商品信息表的顺序n self.product_info.fillna(1000,inplace=True)n self.product_info.sort_values(by='weight',ascending=False,inplace=True)n self.product_info.reset_index(drop=True,inplace=True)n # 更新曝光n exp_old = self.product_info.loc[0,'exp']n exp_new = exp_old+1n self.product_info.loc[0,'exp'] = exp_newn # 更新点击n tmp_id = int(self.product_info.loc[0,'product_id'])n click_old = self.product_info.loc[0,'click']n click_increment = self.product_is_click(tmp_id)n click_new = click_old + click_incrementn self.product_info.loc[0,'click'] = click_newn # 更新实时CTRn self.product_info['remark'] = self.product_info['click']/self.product_info['exp'] n self.product_info.fillna(0,inplace=True)n # 计算总体CTRn average_reward = self.product_info['click'].sum()/self.product_info['exp'].sum()n # print(average_reward,'n',self.product_info)n # 更新各个品的preferencen # print(self.product_info['ha'][0],'+',a,'*(',click_increment,'-',average_reward,')*(',self.product_info['weight'][0],'-',self.product_info['hap'][0],')')n self.product_info['ha'] = self.product_info['ha'] + a*(click_increment-average_reward)*(self.product_info['weight']-self.product_info['hap'])n # print('=',self.product_info['ha'][0])nnnn def record(self,indicator):n '''n 记录收敛情况n grade_exp_prop:最佳商品曝光占比n accumulate_reward_avg:平均累积奖赏n '''n if indicator == 'grade_exp_prop':n grade_id = self.product_info[self.product_info['product_ctr'] == self.product_info['product_ctr'].max()]['product_id'].values[0]n grade_id_exp = self.product_info[self.product_info['product_id'] == grade_id]['exp'].values[0]n total_exp = self.product_info['exp'].sum()n grade_exp_prop = round(grade_id_exp/total_exp,4) n result = grade_exp_propn elif indicator == 'accumulate_reward_avg':n accumulate_reward_avg = self.product_info['click'].sum()/self.product_info['exp'].sum()n result = accumulate_reward_avgn else:n raise ValueError('indicator name error')nn return resultnnn def update(self,method,parameter,i):n '''n 更新数据n '''n # 更新曝光n exp_old = self.product_info.loc[0,'exp']n exp_new = exp_old+1n self.product_info.loc[0,'exp'] = exp_newnn # 更新点击nn tmp_id = int(self.product_info.loc[0,'product_id'])nn click_old = self.product_info.loc[0,'click']n click_increment = self.product_is_click(tmp_id)n click_new = click_old + click_incrementn self.product_info.loc[0,'click'] = click_newnn if method == 'random_bandit':n self.random_bandit()n elif method == 'naive_bandit':n self.naive_bandit(parameter,i)n elif method == 'greedy':n self.greedy(parameter)n elif method == 'auto_greedy':n self.auto_greedy(i)n elif method == 'softmax':n self.softmax(parameter)n elif method == 'ucb':n self.ucb(parameter,i)n elif method == 'ucb1':n self.ucb1()n elif method == 'thompson_sampling':n self.thompson_sampling()n elif method == 'bayesian_ucb':n self.bayesian_ucb(parameter)n else:n raise ValueError('method error')nn # 更新商品信息表的顺序n self.product_info.fillna(1000,inplace=True)n self.product_info.sort_values(by='weight',ascending=False,inplace=True)n self.product_info.reset_index(drop=True,inplace=True)nnn def iteration(self,method,parameter=0):n '''n 更新迭代n '''n grade_exp_prop_lst = []n accumulate_reward_avg_lst = []n # print(self.product_info)n for i in range(self.times):n if method == 'gradient_bandit':n self.gradient_bandit(parameter)n else:n self.update(method,parameter,i)n grade_exp_prop_lst.append(self.record(indicator='grade_exp_prop'))n accumulate_reward_avg_lst.append(self.record(indicator='accumulate_reward_avg'))n print(method)n print(self.product_info)nn return grade_exp_prop_lst,accumulate_reward_avg_lst

执行代码

第二块代码则是执行,输出每种策略的输出结果。

if __name__ == '__main__':n random_num = 16n random.seed(random_num)n rl_model1 = RL()n random_exp_prop,random_reward_avg = rl_model1.iteration('random_bandit')n random.seed(random_num)n rl_model2 = RL()n naive_exp_prop,naive_reward_avg = rl_model2.iteration('naive_bandit',parameter=0.3)tn random.seed(random_num)n rl_model3 = RL()n greedy_exp_prop,greedy_reward_avg = rl_model3.iteration('greedy',parameter=0.3)n random.seed(random_num)n rl_model4 = RL()n agreedy_exp_prop,agreedy_reward_avg = rl_model4.iteration('auto_greedy')n random.seed(random_num)n rl_model5 = RL()n softmax_exp_prop,softmax_reward_avg = rl_model5.iteration('softmax',parameter=0.13)n random.seed(random_num)n rl_model6 = RL()n ucb_exp_prop,ucb_reward_avg = rl_model6.iteration('ucb',parameter=0.25)n random.seed(random_num)n rl_model7 = RL()n ucb1_exp_prop,ucb1_reward_avg = rl_model7.iteration('ucb1')n random.seed(random_num)n rl_model8 = RL()n ts_exp_prop,ts_reward_avg = rl_model8.iteration('thompson_sampling')n random.seed(random_num)n rl_model9 = RL()n bayes_exp_prop,bayes_reward_avg = rl_model9.iteration('bayesian_ucb',parameter=3)n random.seed(random_num)n rl_model10 = RL()n gradient_exp_prop,gradient_reward_avg = rl_model10.iteration('gradient_bandit',parameter=0.25)

结果解读

下方为代码执行结果示例:

随机策略

product_id为臂ID,product_ctr为真实收益,exp为摇臂的次数,click为获得收益的次数,remark为当前实时收益。


欢迎关注~一个医学生(预防医学)的数据成长之路。。。

上一篇:21考研的用20版的书有影响吗?

下一篇:2.4T发动机,实用秒杀SUV!车长5米4这新车仅7.78万起

锅炉资讯

锅炉资讯

锅炉学习

锅炉学习

锅炉视频

锅炉视频

锅炉百科

锅炉百科