[問題] django celery問題

看板Python作者 (BLG_Eric)時間9年前 (2016/10/27 15:37), 編輯推噓0(0016)
留言16則, 3人參與, 最新討論串1/1
各位大大好 最近在用celery處理 csv,xlsx檔案寫入postgresql的功能 但是有一些問題想請教 P.S下面的程式碼沒加入celery時都可以正常執行(大檔案例外) 1. csv檔寫入時celery debug有以下錯誤 http://imgur.com/8Fg3dmJ
http://imgur.com/DSpZU1s
附上task.py程式碼 # -*- coding: utf-8 -*- from django.shortcuts import render_to_response from django.template import RequestContext from django.http import HttpResponseRedirect from django.core.urlresolvers import reverse from django.contrib import messages from django.conf import settings from django.db import connection from django.views.decorators.csrf import csrf_exempt from celery import Celery from celery import task import json import csv import sys import random import psycopg2 import xlrd import openpyxl as pyxl from .models import Document from .forms import DocumentForm app = Celery( 'tasks', broker='amqp://guest:guest@localhost:5672//', backend='rpc://' ) CELERY_RESULT_BACKEND = 'rpc://' CELERY_RESULT_PERSISTENT = False @app.task() def csvwritein(doc):# Transform csv to table doc = doc conn = psycopg2.connect("dbname='apidb' user='api' host='localhost' password='eric40502' port='5432'") readcur = conn.cursor() readcur.execute("select exists(select * from information_schema.tables where table_name='%s')" % doc.tablename) # check if same file is already in database check = readcur.fetchone()[0] try: fr = open(doc.path,encoding = 'utf-8-sig') dr.delay(fr,doc,check) fr.close() except Exception as e: fr = open(doc.path,encoding = 'big5') dr.delay(fr,doc,check) fr.close() conn.commit() readcur.close() @app.task() def dr(fr,doc,check): # make datareader as function to keep code 'dry' csvt = 0 #count csv reader loop time row_id = 1 # used for following id field conn = psycopg2.connect("dbname='apidb' user='api' host='localhost' password='eric40502' port='5432'") maincur = conn.cursor() writecur = conn.cursor() datareader = csv.reader(fr, delimiter=',') for row in datareader: if csvt == 0: # first time in loop(create field) and check no same file exists if check == True: app = ''.join([random.SystemRandom().choice('abcdefghijklmnopqrstuvwxyz0123456789') for i in range(6)]) tname = '%s-%s' % (doc.tablename,app tablename = '"%s-%s"' % (doc.tablename,app) doc.tablename = tname doc.save() else: tablename = '"%s"' % doc.tablename maincur.execute("CREATE TABLE %s (id SERIAL PRIMARY KEY);" % tablename) row_count = sum(1 for line in datareader) col_count = len(row) frow = row for i in range(0,col_count,1): row[i] = '"%s"' % row[i] # change number to string maincur.execute("ALTER TABLE %s ADD %s CITEXT;" % (tablename,row[i])) csvt = csvt+1 fr.seek(0) next(datareader) elif csvt > 0: # not first time(insert data) and check no same file exists for j in range(0,col_count,1): if j == 0: writecur.execute("INSERT INTO %s (%s) VALUES ('%s');" % (tablename,frow[j],row[j])) else: writecur.execute("UPDATE %s SET %s = '%s' WHERE id = '%d';" %(tablename,frow[j],row[j],row_id)) csvt = csvt+1 row_id = row_id+1 else: break conn.commit() maincur.close() writecur.close() conn.close() csvt = 0 doc = Document.objects.all() 呼叫時是用csvwritein.delay(doc) 2. xlsx 檔案(13萬筆資料)寫入時 worker卡了兩三分鐘後跑出以下錯誤 http://imgur.com/rC1uun0
以下是task.py xlsx寫入函數 @app.task() def xlsxwritein(doc): # write into database for file type xlsx xlsxt = 0 conn = psycopg2.connect("dbname='apidb' user='api' host='localhost' password='eric40502' port='5432'") maincur = conn.cursor() readcur = conn.cursor() writecur = conn.cursor() readcur.execute("select exists(select * from information_schema.tables where table_name='%s')" % doc.tablename) # check if same file is already in database check = readcur.fetchone()[0] row_id = 1 # used for following id field wb = pyxl.load_workbook(doc.path) sheetnames = wb.get_sheet_names() ws = wb.get_sheet_by_name(sheetnames[0]) for rown in range(ws.get_highest_row()): if xlsxt == 0: if check == True: app = ''.join([random.SystemRandom().choice('abcdefghijklmnopqrstuvwxyz0123456789') for i in range(6)]) tname = '%s-%s' % (doc.tablename,app) tablename = '"%s-%s"' % (doc.tablename,app) doc.tablename = tname doc.save() else: tablename = '"%s"' % doc.tablename field = [ws.cell(row=1,column=col_index).value for col_index in range(1,ws.get_highest_column()+1)] maincur.execute("CREATE TABLE %s (id SERIAL PRIMARY KEY);" % tablename) for coln in range(ws.get_highest_column()): field[coln] = '"%s"' % field[coln] # change number to string if field[coln] == 'ID': field[coln] = 'original_id' maincur.execute("ALTER TABLE %s ADD %s CITEXT;" % (tablename,field[coln])) xlsxt = xlsxt+1 elif xlsxt > 0 and check == False: # not first time(insert data) and check no same file exists for coln in range(ws.get_highest_column()): if coln == 0: writecur.execute("INSERT INTO %s (%s) VALUES ('%s');" %(tablename,field[coln],str(ws.cell(row=rown,column=coln+1).value))) else: writecur.execute("UPDATE %s SET %s = '%s' WHERE id = '%d';" %(tablename,field[coln],str(ws.cell(row=rown+1,column=coln+1).value),row_id)) xlsxt = xlsxt+1 row_id = row_id+1 else: break conn.commit() maincur.close() readcur.close() writecur.close() conn.close() xlsxt = 0 -- ※ 發信站: 批踢踢實業坊(ptt.cc), 來自: 114.32.19.185 ※ 文章網址: https://www.ptt.cc/bbs/Python/M.1477553869.A.5F0.html

10/27 16:12, , 1F
好像是被作業系統 kernel 踢出去了?
10/27 16:12, 1F

10/27 16:12, , 2F
比方吃太多記憶體等,被 linux OOM killer 處理掉
10/27 16:12, 2F

10/27 16:52, , 3F
那應該要怎麼樣處理
10/27 16:52, 3F

10/27 18:33, , 4F
你先獨立把那個處理task寫成獨立檔案單獨終端跑看看
10/27 18:33, 4F

10/27 18:33, , 5F
後續用 free 與 top 看一下記憶體使用情況
10/27 18:33, 5F

10/27 18:34, , 6F
或許是實際那個 server 本來記憶體就不多所以就爆掉了
10/27 18:34, 6F

10/27 18:40, , 7F
題外話你的程式碼貼這邊很亂很難看清楚
10/27 18:40, 7F

10/27 18:41, , 8F
另外建議請用 4 個空白代替 tab, 建議這樣在 python 上
10/27 18:41, 8F

10/27 20:24, , 9F
先試試看 DEBUG = False, 這兩個記憶體用量差很多
10/27 20:24, 9F

10/27 20:24, , 10F
第一個問題要看你 doc 到底是什麼
10/27 20:24, 10F

10/27 21:26, , 11F
感謝各位回答 我的doc是個django的model
10/27 21:26, 11F

10/27 21:27, , 12F
內容是上傳檔案的一些資料
10/27 21:27, 12F

10/27 21:28, , 13F
像是tablename path id...之類的
10/27 21:28, 13F

10/27 21:30, , 14F
這裡主要是用來傳遞tablename來做為建table的依據
10/27 21:30, 14F

10/27 21:32, , 15F
多問一個 一般來說寫入一個13萬行的資料需要很多記憶
10/27 21:32, 15F

10/27 21:32, , 16F
體嗎?
10/27 21:32, 16F
文章代碼(AID): #1O4QxDNm (Python)
文章代碼(AID): #1O4QxDNm (Python)