worker die

This commit is contained in:
Andreas Stephanides
2017-02-15 13:53:37 +01:00
parent 49ac42b9a5
commit eb071d9f95
2 changed files with 28 additions and 16 deletions

View File

@@ -19,14 +19,14 @@ def process_article(art):
clogger.error("Invalid article hash:" + str(art)) clogger.error("Invalid article hash:" + str(art))
aa=None aa=None
else: else:
art["text"]=fix_html(art["text"],art["url"]) art["text"] = fix_html(art["text"], art["url"])
if "image" in art: if "image" in art:
art["image"]=fix_file(art["url"], art["image"]) art["image"]=fix_file(art["url"], art["image"])
clogger.info(art) clogger.info(art)
aa = Article.from_hash(art) aa = Article.from_hash(art)
aa.process_hash(art) aa.process_hash(art)
aa.last_fetched=datetime.now() aa.last_fetched = datetime.now()
aa.sourcetype=art["sourcetype"] aa.sourcetype = art["sourcetype"]
db_session.add(aa) db_session.add(aa)
db_session.commit() db_session.commit()
clogger.info("Updated/Added Article "+ str(aa.id) + ": " + (aa.title.encode("utf-8"))) clogger.info("Updated/Added Article "+ str(aa.id) + ": " + (aa.title.encode("utf-8")))
@@ -82,4 +82,5 @@ def do_process(tpe, cont,params={}):
if a.has_key("url")==False: if a.has_key("url")==False:
a["url"]=cont["url"] a["url"]=cont["url"]
process_article(a) process_article(a)
db_session.remove()
return return

View File

@@ -7,7 +7,9 @@ from gevent import spawn
from itertools import repeat from itertools import repeat
from models import CrawlUrl from models import CrawlUrl
from src import clogger from src import clogger
from Queue import Empty
from src.database import db_session2 from src.database import db_session2
def start_workers(f,c,p): def start_workers(f,c,p):
for _ in range(f): for _ in range(f):
clogger.debug("spawn fetchworker") clogger.debug("spawn fetchworker")
@@ -18,15 +20,25 @@ def start_workers(f,c,p):
spawn(work_process) spawn(work_process)
def work_fetch(): def work_fetch():
while True: try:
run_fetch() while True:
run_fetch()
except Empty:
clogger.info("Fetch - Worker died")
def work_process(): def work_process():
while True: try:
run_process() while True:
run_process()
except Empty:
clogger.info("Process - Worker died")
def work_compile(): def work_compile():
while True: try:
run_compile() while True:
run_compile()
except Empty:
clogger.info("Compile - Worker died")
def queue_url(tpe, url,params={"nofollow": False}): def queue_url(tpe, url,params={"nofollow": False}):
@@ -36,14 +48,13 @@ def queue_url(tpe, url,params={"nofollow": False}):
def queue_url_upd(tpe, url,params={"nofollow": True}): def queue_url_upd(tpe, url,params={"nofollow": True}):
fetch_queue.put((True,tpe,url,params)) fetch_queue.put((True,tpe,url,params))
# fetch a page from the url list # fetch a page from the url list
def run_fetch(): def run_fetch():
try: try:
tc, tpe, url, p= fetch_queue.get() tc, tpe, url, p= fetch_queue.get(True, 100)
except ValueError: except ValueError:
tc, tpe, url= fetch_queue.get() tc, tpe, url= fetch_queue.get(True, 100)
clogger.info("Fechted url:"+url)
if tpe is not "dummyarticle" and tpe is not "dummyindex": if tpe is not "dummyarticle" and tpe is not "dummyindex":
rw=fetch_page(url) rw=fetch_page(url)
else: else:
@@ -54,7 +65,7 @@ def run_fetch():
#comile something from the compile list #comile something from the compile list
def run_compile(): def run_compile():
tc,tpe,h, p = compile_queue.get() tc,tpe,h, p = compile_queue.get(True, 100)
if p.has_key('parent_item'): if p.has_key('parent_item'):
h["parent_item"]=p["parent_item"] h["parent_item"]=p["parent_item"]
h=do_compile(tpe,h,p) h=do_compile(tpe,h,p)
@@ -63,7 +74,7 @@ def run_compile():
# compile_queue.task_done() # compile_queue.task_done()
def run_process(): def run_process():
tc,tpe,h,p = process_queue.get() tc,tpe,h,p = process_queue.get(True, 100)
do_process(tpe, h,p) do_process(tpe, h,p)
return h return h
# process_queue.task_done() # process_queue.task_done()