diff --git a/compiler/fetching.py b/compiler/fetching.py index 3055a64..92b4656 100644 --- a/compiler/fetching.py +++ b/compiler/fetching.py @@ -40,6 +40,7 @@ def cleanup_cache(): ten_weeks_ago = current_time - timedelta(days=cfg.cache_days*2) CrawlCache.query.filter(CrawlCache.fetchedten_weeks_ago).first() +# cc=CrawlCache.query.filter(CrawlCache.url==furl).filter(CrawlCache.fetched>ten_weeks_ago).first() + if cc is None: tx = json.dumps(graph.get_object(id=furl)) else: diff --git a/compiler/mprocess.py b/compiler/mprocess.py index 3fd6944..e8bd1f6 100644 --- a/compiler/mprocess.py +++ b/compiler/mprocess.py @@ -5,7 +5,7 @@ from src.database import db_session from mqueues import fetch_queue, compile_queue, put_fetch_queue from fetching import fetch_page, downloadfile, announce_articleid from fixing import fix_html, fix_file -#from src import app +from sqlalchemy.exc import InvalidRequestError from compiler import article_types from fixing import fix_link @@ -20,20 +20,20 @@ def process_article(art): clogger.error("Invalid article hash:" + str(art)) aa=None else: - art["text"]=fix_html(art["text"],art["url"]) + art["text"] = fix_html(art["text"], art["url"]) if "image" in art: art["image"]=fix_file(art["url"], art["image"]) clogger.info(art) aa = Article.from_hash(art) aa.process_hash(art) - aa.last_fetched=datetime.now() - aa.sourcetype=art["sourcetype"] + aa.last_fetched = datetime.now() + aa.sourcetype = art["sourcetype"] db_session.add(aa) try: db_session.commit() except InvalidRequestError,e: db_session.rollback() - clogger.error(e) + clogger.error(e) clogger.info("Updated/Added Article "+ str(aa.id) + ": " + (aa.title.encode("utf-8"))) return aa # app.logger.info("Updated/Added Article "+ str(aa.id) + ": " + (aa.title.encode("utf-8"))) @@ -92,4 +92,5 @@ def do_process(tpe, cont,params={}): if a.has_key("url")==False: a["url"]=cont["url"] process_article(a) + db_session.remove() return diff --git a/compiler/mworker.py b/compiler/mworker.py index d401cb2..67c6a36 100644 --- a/compiler/mworker.py +++ b/compiler/mworker.py @@ -8,6 +8,8 @@ from itertools import repeat from models import CrawlUrl from src import clogger from src.database import db_session2,db_session +from Queue import Empty + def start_workers(f,c,p): for _ in range(f): clogger.debug("spawn fetchworker") @@ -18,17 +20,26 @@ def start_workers(f,c,p): spawn(work_process) def work_fetch(): - while True: - run_fetch() - + try: + while True: + run_fetch() + except Empty: + clogger.info("Fetch - Worker died") + def work_process(): - while True: - run_process() -# db_session.close() + try: + while True: + run_process() + except Empty: + clogger.info("Process - Worker died") + def work_compile(): - while True: - run_compile() + try: + while True: + run_compile() + except Empty: + clogger.info("Compile - Worker died") def queue_url(tpe, url,params={"nofollow": False}): @@ -38,16 +49,15 @@ def queue_url(tpe, url,params={"nofollow": False}): def queue_url_upd(tpe, url,params={"nofollow": True}): fetch_queue.put((True,tpe,url,params)) - # fetch a page from the url list def run_fetch(): try: - tc, tpe, url, p= fetch_queue.get() + tc, tpe, url, p= fetch_queue.get(True, 100) except ValueError: - tc, tpe, url= fetch_queue.get() - clogger.debug("fetched : "+url) + tc, tpe, url= fetch_queue.get(True, 100) + clogger.info("Fechted url:"+url) if tpe is not "dummyarticle" and tpe is not "dummyindex": - rw=fetch_page(url) + rw = fetch_page(url, p) else: rw="

dummytext

" compile_queue.put((0, tpe, {"url": url, "sourcetype": tpe, "raw": rw},p)) @@ -56,7 +66,7 @@ def run_fetch(): #comile something from the compile list def run_compile(): - tc,tpe,h, p = compile_queue.get() + tc,tpe,h, p = compile_queue.get(True, 100) if p.has_key('parent_item'): h["parent_item"]=p["parent_item"] h=do_compile(tpe,h,p) @@ -65,7 +75,7 @@ def run_compile(): # compile_queue.task_done() def run_process(): - tc,tpe,h,p = process_queue.get() + tc,tpe,h,p = process_queue.get(True, 100) do_process(tpe, h,p) return h # process_queue.task_done()