
定时清理指定路径下的文件(日
雪洁 2019-02-19 09:05:41
#coding=utf-8 import threading,redis,time,random from time import ctime,sleep import urllib.parse,urllib.request import json def http_get(url, data={}, encoding='utf-8'): url_values = urllib.parse.urlencode(data) full_url = url + '?' + url_values result = {} try: with urllib.request.urlopen(url) as response: result['http_code'] = str(response.code) result['http_result'] = response.read().decode(encoding) result['http_url'] = str(url) result['http_param'] = str(data) result['http_info'] = str(response.info()) result['http_real_url'] = str(response.geturl()) except urllib.error.HTTPError as e: result['http_code'] = str(e.code) result['http_result'] = e.read().decode(encoding) result['http_url'] = str(url) result['http_param'] = str(data) result['http_info'] = str(e.info()) result['http_real_url'] = str(e.geturl()) return result def http_post(url, data={}, headers={}, encoding='utf-8'): data_urlencode = urllib.parse.urlencode(data) data_encode = data_urlencode.encode('ascii') # data should be bytes request_obj = urllib.request.Request(url, data_encode, headers) result = {} try: with urllib.request.urlopen(request_obj) as response: result['http_code'] = response.code result['http_result'] = response.read().decode(encoding) result['http_url'] = url result['http_param'] = data result['http_info'] = str(response.info()) result['http_real_url'] = response.geturl() except urllib.error.HTTPError as e: result['http_code'] = e.code result['http_result'] = e.read().decode(encoding) result['http_url'] = url result['http_param'] = data result['http_info'] = str(e.info()) result['http_real_url'] = e.geturl() return result def test_http_get(num, url, data={}, charset='gbk'): result = http_get(url, data, charset) print(result) def test_http_post(num, url, data={}, charset='gbk', headers={}): user_agent = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)' if not headers: headers = {'User-Agent':user_agent} result = http_post(url, data, headers, charset) with open('result.txt', 'a', encoding="utf-8") as f: f.write("Time: "+ctime()+" "+ json.dumps(result)+"n") f.close() if __name__ == '__main__': for num in range(0,1): for i in range(0,1): print(str(num)+'~'+str(i)) print("start listening thread..............") #POST请求实例 url = 'http://localhost:8000/test/' data = { 'activity_id' : 4, 'open_id': 'test', 'exchange_number': time.strftime('%Y%m%d%H%M%S',time.localtime(time.time())) + str(random.randint(100000, 2000000)), 'prize_pools': 0 } new_thread_post = threading.Thread(target=test_http_post,args=(str(num)+str(i),url,data,'utf-8')) #GET请求实例 url = 'http://pv.sohu.com/cityjson' data = { 'ip': '180.169.124.154' } new_thread_get = threading.Thread(target=test_http_get,args=(str(num)+str(i),url,data)) # new_thread.setDaemon(True) new_thread_post.start() new_thread_get.start() print("end listening thread..............") sleep(1)
注:
get请求的结果直接打印,POST请求的结果存储在当前脚本所在文件夹的result.txt中。
站长QQ: 513569228 本博客旨在记录工作中遇到的问题,并为大家提供帮助,如有疑问可加群332646789,欢迎共同交流技术上的难题...