整理几个有意思的Python snippets.
目录
开头结尾
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
reload(sys)
sys.setdefaultencoding('utf8')
# 结尾
if __name__ == '__main__':
main()
函数超时设置
import time, signal
def timeout(timeout=1):
class TimeoutError(Exception):
pass
def handler(signum, frame):
raise TimeoutError()
# set the timeout handler
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout)
for i in range(1000):
print("deal %s " %i)
time.sleep(0.5)
# 打印到1的时候, 会退出, 理由是TimeoutError
try:
timeout()
except:
print("finish")
单元测试
这个是写代码必备的, 用的多.
import unittest
class MyTest(unittest.TestCase):
def setUp(self):
# 设置一些共享的初始化参数
self.x = 1
def tearDown(self):
pass
def test_ua(self):
y = 2 * self.x + 1
self.assertEquals(y, 3)
unittest.main()
时间处理函数
工作里面因为经常有unix时间和字符串时间的转换,所以写了一个脚本去转换. 另外遇到时间处理, 总是忘记一些函数, 把部分核心代码放进来.
import time, datetime
# 字符串转时间
day = "20160123"
day_item = time.strptime(day, '%Y%m%d')
day_unix = time.mktime(day_item)
one_day_minus = day_unix - 86400 * 1
#时间转unix
d = datetime.datetime(2015,12,10)
unixtime = time.mktime(d.timetuple())
# unix时间转字符串
print time.strftime("%Y-%m-%d %H:%M", time.localtime(1453520966))
print time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
# 日期加减
start_date = datetime.datetime(2015,12,18)
a = start_date.strftime("%Y%m%d%H%M")
if a == "201512310000":
start_date += datetime.timedelta(days=1)
并发处理函数
from multiprocessing import Process, Queue, Pool
# 多线程处理
class FuncRunner(object):
def __init__(self, func, cache_size=80, thread_num=3):
'''
并发跑任务,输入和输出的顺序未必是一致的, 需要手动确认
func: 函数引用
'''
self.cache_size = cache_size
self.push_queue = Queue(maxsize=cache_size)
self.res_queue = Queue()
self.func = func
self.threads = []
for i in range(thread_num):
p = Process(target=self.__deal_queue)
p.daemon = True
p.start()
self.threads.append(p)
def run(self, params):
'''
params: must be a list of param
'''
for param in params:
self.push_queue.put(param)
result = []
while len(result) < len(params):
result.append(self.res_queue.get())
for threads in self.threads:
threads.terminate()
return result
def __deal_queue(self):
while True:
param = self.push_queue.get()
self.res_queue.put(self.func(param))
def f(x):
return x ** 2
# 进程池处理, 这个函数不一定实用, 官网例子比较多
def pool_deal(data, f):
lines = []
pool = Pool(processes=10)
for i in data:
line = pool.apply_async(f, (i, ))
lines.append(line)
pool.close()
pool.join()
result = []
for line in lines:
r = line.get() if line.successful() else None
result.append(r)
return result
def test():
nums = xrange(200)
runner = FuncRunner(f)
res1 = runner.run(nums)
res2 = pool_deal(nums, f)
print res1, res2
神奇小函数
# eval, 字符串格式互转
In [10]: data #{1: 2, 2: 3}
In [11]: str(data) # '{1: 2, 2: 3}'
In [12]: d = eval(str(data))
Out[13]: {1: 2, 2: 3}
# 路径遍历
os.walk(rootdir)
#将x转换为一个整数
int(x [,base ])
#创建一个复数
complex(real [,imag ])
# 将对象 x 转换为表达式字符串
repr(x)
# 将一个整数转换为一个字符
chr(x)
# 将一个整数转换为Unicode字符
unichr(x)
# 将一个字符转换为它的整数值
ord(x)
# 将一个整数转换为一个十六进制字符串
hex(x)
# 将一个整数转换为一个八进制字符串
oct(x)
本文总阅读量次