From eb071d9f954cd9b6b69190624f810fce2bb96fa0 Mon Sep 17 00:00:00 2001 From: Andreas Stephanides Date: Wed, 15 Feb 2017 13:53:37 +0100 Subject: [PATCH 1/2] worker die --- compiler/mprocess.py | 7 ++++--- compiler/mworker.py | 37 ++++++++++++++++++++++++------------- 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/compiler/mprocess.py b/compiler/mprocess.py index c7b0dc7..44f067a 100644 --- a/compiler/mprocess.py +++ b/compiler/mprocess.py @@ -19,14 +19,14 @@ def process_article(art): clogger.error("Invalid article hash:" + str(art)) aa=None else: - art["text"]=fix_html(art["text"],art["url"]) + art["text"] = fix_html(art["text"], art["url"]) if "image" in art: art["image"]=fix_file(art["url"], art["image"]) clogger.info(art) aa = Article.from_hash(art) aa.process_hash(art) - aa.last_fetched=datetime.now() - aa.sourcetype=art["sourcetype"] + aa.last_fetched = datetime.now() + aa.sourcetype = art["sourcetype"] db_session.add(aa) db_session.commit() clogger.info("Updated/Added Article "+ str(aa.id) + ": " + (aa.title.encode("utf-8"))) @@ -82,4 +82,5 @@ def do_process(tpe, cont,params={}): if a.has_key("url")==False: a["url"]=cont["url"] process_article(a) + db_session.remove() return diff --git a/compiler/mworker.py b/compiler/mworker.py index 0a80e28..477a443 100644 --- a/compiler/mworker.py +++ b/compiler/mworker.py @@ -7,7 +7,9 @@ from gevent import spawn from itertools import repeat from models import CrawlUrl from src import clogger +from Queue import Empty from src.database import db_session2 + def start_workers(f,c,p): for _ in range(f): clogger.debug("spawn fetchworker") @@ -18,15 +20,25 @@ def start_workers(f,c,p): spawn(work_process) def work_fetch(): - while True: - run_fetch() - + try: + while True: + run_fetch() + except Empty: + clogger.info("Fetch - Worker died") + def work_process(): - while True: - run_process() + try: + while True: + run_process() + except Empty: + clogger.info("Process - Worker died") + def work_compile(): - while True: - run_compile() + try: + while True: + run_compile() + except Empty: + clogger.info("Compile - Worker died") def queue_url(tpe, url,params={"nofollow": False}): @@ -36,14 +48,13 @@ def queue_url(tpe, url,params={"nofollow": False}): def queue_url_upd(tpe, url,params={"nofollow": True}): fetch_queue.put((True,tpe,url,params)) - # fetch a page from the url list def run_fetch(): try: - tc, tpe, url, p= fetch_queue.get() + tc, tpe, url, p= fetch_queue.get(True, 100) except ValueError: - tc, tpe, url= fetch_queue.get() - + tc, tpe, url= fetch_queue.get(True, 100) + clogger.info("Fechted url:"+url) if tpe is not "dummyarticle" and tpe is not "dummyindex": rw=fetch_page(url) else: @@ -54,7 +65,7 @@ def run_fetch(): #comile something from the compile list def run_compile(): - tc,tpe,h, p = compile_queue.get() + tc,tpe,h, p = compile_queue.get(True, 100) if p.has_key('parent_item'): h["parent_item"]=p["parent_item"] h=do_compile(tpe,h,p) @@ -63,7 +74,7 @@ def run_compile(): # compile_queue.task_done() def run_process(): - tc,tpe,h,p = process_queue.get() + tc,tpe,h,p = process_queue.get(True, 100) do_process(tpe, h,p) return h # process_queue.task_done() From 80c42c04cdbb56dac4a64d2c1277c260b20dc95c Mon Sep 17 00:00:00 2001 From: Andreas Stephanides Date: Fri, 17 Feb 2017 09:58:35 +0100 Subject: [PATCH 2/2] fetch all fb --- compiler/fetching.py | 7 +++++-- compiler/mprocess.py | 9 ++++++--- compiler/mworker.py | 2 +- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/compiler/fetching.py b/compiler/fetching.py index 54dfb10..6fe24b0 100644 --- a/compiler/fetching.py +++ b/compiler/fetching.py @@ -36,7 +36,7 @@ from datetime import datetime, timedelta -def fetch_page(furl): +def fetch_page(furl,p={}): current_time = datetime.utcnow() ten_weeks_ago = current_time - timedelta(days=cfg.cache_days) u=urlparse.urlparse(furl) @@ -49,7 +49,10 @@ def fetch_page(furl): clogger.debug("fetching url: "+ str(furl)) if u[0]=='fb': fb_time_since = str(int((current_time - timedelta(days=10)-datetime(1970,1,1)).total_seconds())) - furl=u[1]+u[2]+"?since="+fb_time_since+"&fields=story,created_time,id,message,attachments" + if p.has_key("nofollow") and p["nofollow"]==False: + furl=u[1]+u[2]+"?fields=story,created_time,id,message,attachments" + else: + furl=u[1]+u[2]+"?since="+fb_time_since+"&fields=story,created_time,id,message,attachments" cc=CrawlCache.query.filter(CrawlCache.url==furl).filter(CrawlCache.fetched>ten_weeks_ago).first() if cc is None: tx = json.dumps(graph.get_object(id=furl)) diff --git a/compiler/mprocess.py b/compiler/mprocess.py index 44f067a..30f2ed9 100644 --- a/compiler/mprocess.py +++ b/compiler/mprocess.py @@ -5,7 +5,7 @@ from src.database import db_session from mqueues import fetch_queue, compile_queue, put_fetch_queue from fetching import fetch_page, downloadfile, announce_articleid from fixing import fix_html, fix_file - +from sqlalchemy.exc import InvalidRequestError from compiler import article_types from fixing import fix_link # process article expects an hash with raw data for the article and puts it into an @@ -28,9 +28,12 @@ def process_article(art): aa.last_fetched = datetime.now() aa.sourcetype = art["sourcetype"] db_session.add(aa) - db_session.commit() + try: + db_session.commit() + except InvalidRequestError,e: + db_session.rollback() + clogger.error(e) clogger.info("Updated/Added Article "+ str(aa.id) + ": " + (aa.title.encode("utf-8"))) -# announce_articleid(aa.id) return aa # process a single found url diff --git a/compiler/mworker.py b/compiler/mworker.py index 477a443..3edd4d0 100644 --- a/compiler/mworker.py +++ b/compiler/mworker.py @@ -56,7 +56,7 @@ def run_fetch(): tc, tpe, url= fetch_queue.get(True, 100) clogger.info("Fechted url:"+url) if tpe is not "dummyarticle" and tpe is not "dummyindex": - rw=fetch_page(url) + rw = fetch_page(url, p) else: rw="

dummytext

" compile_queue.put((0, tpe, {"url": url, "sourcetype": tpe, "raw": rw},p))