This commit is contained in:
uwsgi
2017-02-17 10:09:39 +01:00
3 changed files with 40 additions and 22 deletions

View File

@@ -40,6 +40,7 @@ def cleanup_cache():
ten_weeks_ago = current_time - timedelta(days=cfg.cache_days*2) ten_weeks_ago = current_time - timedelta(days=cfg.cache_days*2)
CrawlCache.query.filter(CrawlCache.fetched<ten_weeks_ago).delete() CrawlCache.query.filter(CrawlCache.fetched<ten_weeks_ago).delete()
def get_cached_page(furl): def get_cached_page(furl):
current_time = datetime.utcnow() current_time = datetime.utcnow()
ten_weeks_ago = current_time - timedelta(days=cfg.cache_days) ten_weeks_ago = current_time - timedelta(days=cfg.cache_days)
@@ -51,7 +52,8 @@ def get_cached_page(furl):
return cc return cc
def fetch_page(furl):
def fetch_page(furl,p={}):
u=urlparse.urlparse(furl) u=urlparse.urlparse(furl)
current_time = datetime.utcnow() current_time = datetime.utcnow()
cu=CrawlUrl.query.filter(CrawlUrl.url==furl).first() cu=CrawlUrl.query.filter(CrawlUrl.url==furl).first()
@@ -62,8 +64,13 @@ def fetch_page(furl):
clogger.debug("fetching url: "+ str(furl)) clogger.debug("fetching url: "+ str(furl))
if u[0]=='fb': if u[0]=='fb':
fb_time_since = str(int((current_time - timedelta(days=10)-datetime(1970,1,1)).total_seconds())) fb_time_since = str(int((current_time - timedelta(days=10)-datetime(1970,1,1)).total_seconds()))
if p.has_key("nofollow") and p["nofollow"]==False:
furl=u[1]+u[2]+"?fields=story,created_time,id,message,attachments"
else:
furl=u[1]+u[2]+"?since="+fb_time_since+"&fields=story,created_time,id,message,attachments" furl=u[1]+u[2]+"?since="+fb_time_since+"&fields=story,created_time,id,message,attachments"
cc=get_cached_page(furl) #CrawlCache.query.filter(CrawlCache.url==furl).filter(CrawlCache.fetched>ten_weeks_ago).first() cc=get_cached_page(furl) #CrawlCache.query.filter(CrawlCache.url==furl).filter(CrawlCache.fetched>ten_weeks_ago).first()
# cc=CrawlCache.query.filter(CrawlCache.url==furl).filter(CrawlCache.fetched>ten_weeks_ago).first()
if cc is None: if cc is None:
tx = json.dumps(graph.get_object(id=furl)) tx = json.dumps(graph.get_object(id=furl))
else: else:

View File

@@ -5,7 +5,7 @@ from src.database import db_session
from mqueues import fetch_queue, compile_queue, put_fetch_queue from mqueues import fetch_queue, compile_queue, put_fetch_queue
from fetching import fetch_page, downloadfile, announce_articleid from fetching import fetch_page, downloadfile, announce_articleid
from fixing import fix_html, fix_file from fixing import fix_html, fix_file
#from src import app from sqlalchemy.exc import InvalidRequestError
from compiler import article_types from compiler import article_types
from fixing import fix_link from fixing import fix_link
@@ -20,14 +20,14 @@ def process_article(art):
clogger.error("Invalid article hash:" + str(art)) clogger.error("Invalid article hash:" + str(art))
aa=None aa=None
else: else:
art["text"]=fix_html(art["text"],art["url"]) art["text"] = fix_html(art["text"], art["url"])
if "image" in art: if "image" in art:
art["image"]=fix_file(art["url"], art["image"]) art["image"]=fix_file(art["url"], art["image"])
clogger.info(art) clogger.info(art)
aa = Article.from_hash(art) aa = Article.from_hash(art)
aa.process_hash(art) aa.process_hash(art)
aa.last_fetched=datetime.now() aa.last_fetched = datetime.now()
aa.sourcetype=art["sourcetype"] aa.sourcetype = art["sourcetype"]
db_session.add(aa) db_session.add(aa)
try: try:
db_session.commit() db_session.commit()
@@ -92,4 +92,5 @@ def do_process(tpe, cont,params={}):
if a.has_key("url")==False: if a.has_key("url")==False:
a["url"]=cont["url"] a["url"]=cont["url"]
process_article(a) process_article(a)
db_session.remove()
return return

View File

@@ -8,6 +8,8 @@ from itertools import repeat
from models import CrawlUrl from models import CrawlUrl
from src import clogger from src import clogger
from src.database import db_session2,db_session from src.database import db_session2,db_session
from Queue import Empty
def start_workers(f,c,p): def start_workers(f,c,p):
for _ in range(f): for _ in range(f):
clogger.debug("spawn fetchworker") clogger.debug("spawn fetchworker")
@@ -18,17 +20,26 @@ def start_workers(f,c,p):
spawn(work_process) spawn(work_process)
def work_fetch(): def work_fetch():
try:
while True: while True:
run_fetch() run_fetch()
except Empty:
clogger.info("Fetch - Worker died")
def work_process(): def work_process():
try:
while True: while True:
run_process() run_process()
# db_session.close() except Empty:
clogger.info("Process - Worker died")
def work_compile(): def work_compile():
try:
while True: while True:
run_compile() run_compile()
except Empty:
clogger.info("Compile - Worker died")
def queue_url(tpe, url,params={"nofollow": False}): def queue_url(tpe, url,params={"nofollow": False}):
@@ -38,16 +49,15 @@ def queue_url(tpe, url,params={"nofollow": False}):
def queue_url_upd(tpe, url,params={"nofollow": True}): def queue_url_upd(tpe, url,params={"nofollow": True}):
fetch_queue.put((True,tpe,url,params)) fetch_queue.put((True,tpe,url,params))
# fetch a page from the url list # fetch a page from the url list
def run_fetch(): def run_fetch():
try: try:
tc, tpe, url, p= fetch_queue.get() tc, tpe, url, p= fetch_queue.get(True, 100)
except ValueError: except ValueError:
tc, tpe, url= fetch_queue.get() tc, tpe, url= fetch_queue.get(True, 100)
clogger.debug("fetched : "+url) clogger.info("Fechted url:"+url)
if tpe is not "dummyarticle" and tpe is not "dummyindex": if tpe is not "dummyarticle" and tpe is not "dummyindex":
rw=fetch_page(url) rw = fetch_page(url, p)
else: else:
rw="<p> dummytext</p>" rw="<p> dummytext</p>"
compile_queue.put((0, tpe, {"url": url, "sourcetype": tpe, "raw": rw},p)) compile_queue.put((0, tpe, {"url": url, "sourcetype": tpe, "raw": rw},p))
@@ -56,7 +66,7 @@ def run_fetch():
#comile something from the compile list #comile something from the compile list
def run_compile(): def run_compile():
tc,tpe,h, p = compile_queue.get() tc,tpe,h, p = compile_queue.get(True, 100)
if p.has_key('parent_item'): if p.has_key('parent_item'):
h["parent_item"]=p["parent_item"] h["parent_item"]=p["parent_item"]
h=do_compile(tpe,h,p) h=do_compile(tpe,h,p)
@@ -65,7 +75,7 @@ def run_compile():
# compile_queue.task_done() # compile_queue.task_done()
def run_process(): def run_process():
tc,tpe,h,p = process_queue.get() tc,tpe,h,p = process_queue.get(True, 100)
do_process(tpe, h,p) do_process(tpe, h,p)
return h return h
# process_queue.task_done() # process_queue.task_done()