python数据处理小工具
python处理数据常用方法,包括:
1)按照指定行数 split_size,分割超大csv文件
2)读取csv文件数据,并发送http-json请求,订正生产或者测试环境数据
3)csv文件按照某一列分割成多个csv文件
4) 连接指定数据库,实施查询、更新、或者导出csv操作
##常用处理csv文件数据的python脚本方法###### 1)按照指定行数 split_size,分割超大csv文件import pandas as pdfrom pathlib import Pathdef split_big_csv_file():res_file_path = Path("C:\\Users\\admin\\Desktop\\prod\\test-net.csv") #待分割文件路径split_size = 2000000 #子文件行数最大值tar_dir = res_file_path.parent/("split_"+res_file_path.name.split(".")[0])if not tar_dir.exists():tar_dir.mkdir()print("创建文件夹\t"+str(tar_dir))print("目标路径:\t"+str(tar_dir))print("分割文件:\t"+str(res_file_path))print("分割大小:\t"+"{:,}".format(split_size))tmp = pd.read_csv(res_file_path,nrows = 10)columns = tmp.columns.to_list()idx = 0while(len(tmp)>0):start = 1+(idx*split_size)tmp = pd.read_csv(res_file_path, header = None, names = columns, skiprows = start, nrows = split_size)if len(tmp) <= 0:breakfile_name = res_file_path.name.split(".")[0]+"_{}_{}".format(start,start+len(tmp))+".csv"file_path = tar_dir/file_nametmp.to_csv(file_path,index=False)idx+=1print(file_name +"\t保存成功")###### 2)读取csv文件数据,并发送http-json请求,订正生产或者测试环境数据from datetime import datetime, timedeltaimport numpy as npimport pandas as pdimport requestsimport jsonimport sysfrom concurrent.futures import ThreadPoolExecutorfrom threading import BoundedSemaphoreurl = "http://xxxx/v1/modify"headers = {"Content-Type": "application/json","Accept": "application/json","User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/65.0.3325.181 Safari/537.36"}count = 0request_header = headersstart_time = datetime.now()total_line = 0haveActivedCount = 0def read_from_extract_file():global start_timestop_line = 0#根据列名读取csv文件df = pd.read_csv("C:\\Users\\admin\\Desktop\\prod\\test-net.csv",keep_default_na=False, dtype={'device_sn':np.str_,'device_id':np.str_,'device_mac':np.str_,'device_vendor':np.str_,'device_model':np.str_}, encoding='utf-8')print(df.shape)print(df.size)#线程池方式# executor = BoundedExecutor(10, 3000)for index, row in df.iterrows():if index + 1 > stop_line:print(row)# response = executor.submit(post_data_request, row)response = post_data_request(row)def post_data_request(row):global total_lineglobal countglobal haveActivedCount#请求json格式data = {"deviceId": row['device_id'],"sn": row['device_sn'],"deviceVendorId": row['device_vendor'],"deviceModelId": row['device_model'],"deviceType": 'xxxxx',"mac": row['device_mac'],"activeTime":row['create_date']}print("开始更新数据", data)global request_headertry:response = requests.post(url, headers=request_header, json=data)#业务状态码判断if response.status_code == 200:if json.loads(json.dumps(response.json())).get('code') == 200:count = count + 1print("成功更新: ",count)elif json.loads(json.dumps(response.json())).get('code') == 4100000:# print(response.json())haveActivedCount = haveActivedCount + 1print("已经激活: ", haveActivedCount)else:print("共更新总数##### : ", count)print("其中已激活总数##### : ", haveActivedCount)print(response)sys.exit()except requests.exceptions:print("请求发送异常,当前更新的行数",count + haveActivedCount)sys.exit()class BoundedExecutor:"""BoundedExecutor behaves as a ThreadPoolExecutor which will block oncalls to submit() once the limit given as "bound" work items are queued forexecution.:param bound: Integer - the maximum number of items in the work queue:param max_workers: Integer - the size of the thread pool"""def __init__(self, max_workers, bound):self.executor = ThreadPoolExecutor(max_workers=max_workers)self.semaphore = BoundedSemaphore(bound + max_workers)"""See concurrent.futures.Executor#submit"""def submit(self, fn, *args, **kwargs):self.semaphore.acquire()try:future = self.executor.submit(fn, *args, **kwargs)except:self.semaphore.release()raiseelse:future.add_done_callback(lambda x: self.semaphore.release())retu future"""See concurrent.futures.Executor#shutdown"""def shutdown(self, wait=True):self.executor.shutdown(wait)###### 3)csv文件按照某一列分割成多个csv文件import pandas as pddef split_csv_file_with_assign_column():x_head_key = ['partner_name', 'prod_model', 'prod_id', 'sn', 'mac', 'active_time']csv_file = 'C:/Users/admin/Desktop/prod-20220830/test001.csv'df = pd.read_csv(csv_file, header=0,encoding="utf-8")df.columns = x_head_key# 去掉重复数据# ind_frame = df.drop_duplicates(keep='first')# 对数据进行分组处理grouped = df.groupby(x_head_key[0]) # 按照partner_name分组#分割后文件存放目录file = './'# file = 'C:\\Users\\admin\\Desktop\\prod-20220830\\python\\'allCount=0;for value, group in grouped:filename = file + str(value) + '.csv'try:f = open(filename, 'w')if f:# 清空文件内容f.truncate()# 将新数据写入文件allCount=allCount+group.value_counts().size;print(filename+',数量='+str(group.value_counts().size))group.to_csv(filename, header=x_head_key, index=False, mode='ab+')except UnicodeEncodeError:print("编码错误, 该数据无法写到文件中, 直接忽略该数据");print('总记录数:'+str(allCount));###### 4) 连接指定数据库,实施查询、更新、或者导出csv操作#coding:utf-8import pymysqlimport csv# 读取csv文件中的数据作为sql查询参数,发起查询,导出所需数据def get_csv_data_by_params():# print(os.getcwd()) 打印当前路径filenameParam = 'C:/Users/admin/Desktop/prod-20220830/test002_params.csv' # sql查询参数--文件名和路径for line in open(filenameParam, encoding='utf-8'):print(line),arr = line.replace("\n","").split(',')select_data_and_write_csv(arr[0],arr[1],arr[2])# 将数据保存为csv文件def select_data_and_write_csv(user_id,start_time,end_time):data = mysql_db_sz_stock(user_id,start_time,end_time)# print(len(data))# print(data)# print(os.getcwd()) 打印当前路径filename = 'C:/Users/admin/Desktop/prod-20220830/getdata/Export_'+user_id+'.csv' # 文件名和路径with open(filename, mode='a',newline="", encoding='utf-8') as f:write = csv.writer(f, dialect='excel')write.writerow(['id', 'user_id', 'center_ability_code', 'center_ability_name', 'create_time']) # 先写下标题for item in data:write.writerow(item)def mysql_db_sz_stock(user_id,startTime,endTime):# # 连接数据库connect = pymysql.connect(host="127.0.0.1",# 本地数据库 port=3306, user="user", password="password", db="db_name", charset="utf8") # 服务器名,账户,密码,数据库名称##采用配置方式# 连接数据库# connect = pymysql.connect(host=MYSQL_HOST,# 本地数据库#port=MYSQL_PORT,#user=MYSQL_USER,#password=MYSQL_PWD,#db=MYSQL_DB,#charset='utf8') # 服务器名,账户,密码,数据库名称cursor = connect.cursor()sql = "SELECT id,user_id,create_time FROM `table123` where create_time >=%(startTime)s and create_time <%(endTime)s and user_id =%(user_id)s"#sql参数values = {"user_id": user_id, "startTime": startTime, "endTime": endTime}cursor.execute(sql, values)data = cursor.fetchall()connect.commit() # 提交到数据库执行print("查询参数:user_id="+user_id+",startTime="+startTime+",endTime="+endTime)print("查询结果size="+str(len(data)))# print(data)# ---------------------关闭数据库cursor.close() # 关闭游标connect.close() # 关闭数据库连接retu dataif __name__ == '__main__':# split_big_csv_file();# read_from_extract_file();# split_csv_file_with_assign_column();get_csv_data_by_params();
作者:乌云de博客
来源链接:https://www.cnblogs.com/wuyun-blog/p/16718140.html
版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。
2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。