在python中进行线程处理 – 同时处理多个大文件

我是
python的新手,我无法理解线程是如何工作的.通过浏览文档,我的理解是在线程上调用join()是阻止它完成之前的阻塞方法.

为了给出一些背景知识,我有48个大型csv文件(多GB),我试图解析这些文件以找到不一致的地方.线程没有共享状态.这可以在一个合理的时间内单个螺纹完成一次性,但我试图将它作为练习同时进行.

这是文件处理的框架:

def process_file(data_file):
  with open(data_file) as f:
    print "Start processing {0}".format(data_file)
    line = f.readline()
    while line:
      # logic omitted for brevity; can post if required
      # pretty certain it works as expected, single 'thread' works fine
      line = f.readline()

  print "Finished processing file {0} with {1} errors".format(data_file, error_count)

def process_file_callable(data_file):
  try:
    process_file(data_file)
  except:
    print >> sys.stderr, "Error processing file {0}".format(data_file)

并发位:

def partition_list(l, n):
    """ Yield successive n-sized partitions from a list.
    """
    for i in xrange(0, len(l), n):
        yield l[i:i+n]

partitions = list(partition_list(data_files, 4))
for partition in partitions:
  threads = []
  for data_file in partition:
    print "Processing file {0}".format(data_file)
    t = Thread(name=data_file, target=process_file_callable, args = (data_file,))
    threads.append(t)
    t.start()

  for t in threads:
    print "Joining {0}".format(t.getName())
    t.join(5)

  print "Joined the first chunk of {0}".format(map(lambda t: t.getName(), threads))

我这样运行:

python -u datautils/cleaner.py > cleaner.out 2> cleaner.err

我的理解是join()应该阻止调用线程等待它被调用的线程完成,但是我观察到的行为与我的期望不一致.

我从未在错误文件中看到错误,但我也从未在stdout上看到预期的日志消息.

除非我从shell中明确地终止它,否则父进程不会终止.如果我检查我有多少打印完成…它永远不会是预期的48,但在12到15之间.但是,运行这个单线程,我可以确认多线程运行实际上处理所有内容并完成所有操作预期的验证,只是它似乎没有干净地终止.

我知道我一定做错了什么,但如果你能指出我正确的方向,我真的很感激.

最佳答案 我无法理解代码中的错误.但我可以建议你稍微重构一下.

首先,python中的线程根本不是并发的.这只是幻觉,因为有一个
Global Interpreter Lock,所以只能同时执行一个线程.这就是为什么我建议你使用
multiprocessing module

from multiprocessing import Pool, cpu_count
pool = Pool(cpu_count)
for partition in partition_list(data_files, 4):
    res = pool.map(process_file_callable, partition)
    print res

第二,你使用的不是pythonic方式来读取文件:

with open(...) as f:
   line = f.readline()
    while line:
       ... # do(line)
      line = f.readline()

这是pythonic方式:

with open(...) as f:
    for line in f:
         ... # do(line)

This is memory efficient, fast, and leads to simple code. (c) PyDoc

顺便说一句,我只有一个假设,你的程序会以多线程方式发生什么 – 应用程序变得更慢,因为无序访问硬盘驱动器明显慢于有序.如果您使用的是Linux,可以尝试使用iostat或htop检查此假设.

如果您的应用程序未完成工作,并且它在进程监视器中没有执行任何操作(cpu或磁盘未处于活动状态),则表示您遇到某种死锁或阻止访问同一资源.

点赞