diff --git a/compiler/mprocess.py b/compiler/mprocess.py index c7b0dc7..44f067a 100644 --- a/compiler/mprocess.py +++ b/compiler/mprocess.py @@ -19,14 +19,14 @@ def process_article(art): clogger.error("Invalid article hash:" + str(art)) aa=None else: - art["text"]=fix_html(art["text"],art["url"]) + art["text"] = fix_html(art["text"], art["url"]) if "image" in art: art["image"]=fix_file(art["url"], art["image"]) clogger.info(art) aa = Article.from_hash(art) aa.process_hash(art) - aa.last_fetched=datetime.now() - aa.sourcetype=art["sourcetype"] + aa.last_fetched = datetime.now() + aa.sourcetype = art["sourcetype"] db_session.add(aa) db_session.commit() clogger.info("Updated/Added Article "+ str(aa.id) + ": " + (aa.title.encode("utf-8"))) @@ -82,4 +82,5 @@ def do_process(tpe, cont,params={}): if a.has_key("url")==False: a["url"]=cont["url"] process_article(a) + db_session.remove() return diff --git a/compiler/mworker.py b/compiler/mworker.py index 0a80e28..477a443 100644 --- a/compiler/mworker.py +++ b/compiler/mworker.py @@ -7,7 +7,9 @@ from gevent import spawn from itertools import repeat from models import CrawlUrl from src import clogger +from Queue import Empty from src.database import db_session2 + def start_workers(f,c,p): for _ in range(f): clogger.debug("spawn fetchworker") @@ -18,15 +20,25 @@ def start_workers(f,c,p): spawn(work_process) def work_fetch(): - while True: - run_fetch() - + try: + while True: + run_fetch() + except Empty: + clogger.info("Fetch - Worker died") + def work_process(): - while True: - run_process() + try: + while True: + run_process() + except Empty: + clogger.info("Process - Worker died") + def work_compile(): - while True: - run_compile() + try: + while True: + run_compile() + except Empty: + clogger.info("Compile - Worker died") def queue_url(tpe, url,params={"nofollow": False}): @@ -36,14 +48,13 @@ def queue_url(tpe, url,params={"nofollow": False}): def queue_url_upd(tpe, url,params={"nofollow": True}): fetch_queue.put((True,tpe,url,params)) - # fetch a page from the url list def run_fetch(): try: - tc, tpe, url, p= fetch_queue.get() + tc, tpe, url, p= fetch_queue.get(True, 100) except ValueError: - tc, tpe, url= fetch_queue.get() - + tc, tpe, url= fetch_queue.get(True, 100) + clogger.info("Fechted url:"+url) if tpe is not "dummyarticle" and tpe is not "dummyindex": rw=fetch_page(url) else: @@ -54,7 +65,7 @@ def run_fetch(): #comile something from the compile list def run_compile(): - tc,tpe,h, p = compile_queue.get() + tc,tpe,h, p = compile_queue.get(True, 100) if p.has_key('parent_item'): h["parent_item"]=p["parent_item"] h=do_compile(tpe,h,p) @@ -63,7 +74,7 @@ def run_compile(): # compile_queue.task_done() def run_process(): - tc,tpe,h,p = process_queue.get() + tc,tpe,h,p = process_queue.get(True, 100) do_process(tpe, h,p) return h # process_queue.task_done()