71 lines
1.8 KiB
Python
71 lines
1.8 KiB
Python
|
|
from mqueues import fetch_queue, compile_queue, process_queue
|
|
from compiler import do_compile
|
|
from mprocess import do_process
|
|
from fetching import fetch_page
|
|
from gevent import spawn
|
|
from itertools import repeat
|
|
from models import CrawlUrl
|
|
from src import clogger
|
|
from src.database import db_session2
|
|
def start_workers(f,c,p):
|
|
for _ in range(f):
|
|
clogger.debug("spawn fetchworker")
|
|
spawn(work_fetch)
|
|
for _ in range(c):
|
|
spawn(work_compile)
|
|
for _ in range(p):
|
|
spawn(work_process)
|
|
|
|
def work_fetch():
|
|
while True:
|
|
run_fetch()
|
|
|
|
def work_process():
|
|
while True:
|
|
run_process()
|
|
def work_compile():
|
|
while True:
|
|
run_compile()
|
|
|
|
|
|
def queue_url(tpe, url,params={"nofollow": False}):
|
|
fetch_queue.put((False,tpe,url,params))
|
|
|
|
#param nofollow = True : Don't follow pagination recursivly to only fetch an update
|
|
def queue_url_upd(tpe, url,params={"nofollow": True}):
|
|
fetch_queue.put((True,tpe,url,params))
|
|
|
|
|
|
# fetch a page from the url list
|
|
def run_fetch():
|
|
try:
|
|
tc, tpe, url, p= fetch_queue.get()
|
|
except ValueError:
|
|
tc, tpe, url= fetch_queue.get()
|
|
|
|
if tpe is not "dummyarticle" and tpe is not "dummyindex":
|
|
rw=fetch_page(url)
|
|
else:
|
|
rw="<p> dummytext</p>"
|
|
compile_queue.put((0, tpe, {"url": url, "sourcetype": tpe, "raw": rw},p))
|
|
return rw
|
|
# fetch_queue.task_done()
|
|
|
|
#comile something from the compile list
|
|
def run_compile():
|
|
tc,tpe,h, p = compile_queue.get()
|
|
if p.has_key('parent_item'):
|
|
h["parent_item"]=p["parent_item"]
|
|
h=do_compile(tpe,h,p)
|
|
process_queue.put((0,tpe, h,p))
|
|
return h
|
|
# compile_queue.task_done()
|
|
|
|
def run_process():
|
|
tc,tpe,h,p = process_queue.get()
|
|
do_process(tpe, h,p)
|
|
return h
|
|
# process_queue.task_done()
|
|
|