email_archiver.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044
  1. #!/usr/bin/env python
  2. # -*- Mode: Python; tab-width: 4 -*-
  3. #
  4. # Netfarm Mail Archiver - release 2
  5. #
  6. # Copyright (C) 2005-2007 Gianluigi Tiesi <sherpya@netfarm.it>
  7. # Copyright (C) 2005-2007 NetFarm S.r.l. [http://www.netfarm.it]
  8. #
  9. # This program is free software; you can redistribute it and/or modify
  10. # it under the terms of the GNU General Public License as published by the
  11. # Free Software Foundation; either version 2, or (at your option) any later
  12. # version.
  13. #
  14. # This program is distributed in the hope that it will be useful, but
  15. # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
  16. # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
  17. # for more details.
  18. # ======================================================================
  19. ## @file archiver.py
  20. ## Netfarm Mail Archiver [core]
  21. __doc__ = '''Netfarm Archiver release 2.1.0 - Main worker'''
  22. __version__ = '2.1.0'
  23. __all__ = [ 'BackendBase',
  24. 'StorageTypeNotSupported',
  25. 'BadConfig',
  26. 'BACKEND_OK',
  27. 'E_NONE',
  28. 'E_ERR',
  29. 'E_INFO',
  30. 'E_TRACE',
  31. 'E_ALWAYS',
  32. 'platform' ] # import once
  33. from sys import platform, hexversion
  34. if platform != 'win32':
  35. from signal import signal, SIGTERM, SIGINT, SIGHUP, SIG_IGN
  36. from stat import ST_MTIME
  37. from os import stat, fork, kill, seteuid, setegid, getuid, chdir
  38. from pwd import getpwnam, getpwuid
  39. from mtplib import MTPServer
  40. from time import strftime, time, localtime, sleep, mktime
  41. from sys import argv, exc_info, stdin, stdout, stderr
  42. from sys import exit as sys_exit
  43. from os import unlink, chmod, access, F_OK, R_OK
  44. from os import close, dup, getpid
  45. from mimetools import Message
  46. from multifile import MultiFile
  47. from smtplib import SMTP, SMTPRecipientsRefused, SMTPSenderRefused
  48. from ConfigParser import ConfigParser
  49. from threading import Thread, Lock, RLock, Event
  50. from cStringIO import StringIO
  51. from getopt import getopt
  52. from types import IntType, DictType, StringType
  53. from random import sample as random_sample
  54. from string import ascii_letters
  55. from utils import mime_decode_header, unquote, split_hdr
  56. from utils import parse_message, dupe_check, safe_parseaddr, hash_headers
  57. try:
  58. from bsddb3 import hashopen
  59. except:
  60. from bsddb import hashopen
  61. import re
  62. ### Mandatory python >= 2.3 dependancy
  63. if hexversion < 0x02030000:
  64. raise (Exception, 'Upgrade to python 2.3, this program needs python >= 2.3')
  65. ### Debug levels
  66. E_NONE = 0
  67. E_ERR = 1
  68. E_INFO = 2
  69. E_TRACE = 3
  70. E_ALWAYS = -1
  71. DEBUGLEVELS = { 'none' : E_NONE,
  72. 'error' : E_ERR,
  73. 'info' : E_INFO,
  74. 'trace' : E_TRACE,
  75. 'always': E_ALWAYS }
  76. ### Usefull constants
  77. NL = '\n'
  78. AID = 'X-Archiver-ID'
  79. STARTOFBODY = NL + NL
  80. GRANULARITY = 10
  81. BACKEND_OK = (1, 200, 'Ok')
  82. MINSIZE = 8
  83. ### Globals
  84. LOG = None
  85. dbchecker = None
  86. pidfile = None
  87. isRunning = False
  88. main_svc = False
  89. serverPoll = []
  90. runas = None
  91. ##
  92. re_aid = re.compile(r'^(X-Archiver-ID: .*?)[\r|\n]', re.IGNORECASE | re.MULTILINE)
  93. whitelist = []
  94. subjpattern = None
  95. input_classes = { 'smtp': MTPServer }
  96. output_classes = { 'smtp': SMTP }
  97. class StorageTypeNotSupported(Exception):
  98. """StorageTypeNotSupported The storage type is not supported"""
  99. pass
  100. class BadStageTypeError(Exception):
  101. """BadStageTypeError The Stage type is wrong"""
  102. pass
  103. class BadStageInput(Exception):
  104. """BadStageInput The Input Stage is wrong"""
  105. pass
  106. class BadStageOutput(Exception):
  107. """BadStageOutput The Output Stage is wrong"""
  108. pass
  109. class BadBackendTypeError(Exception):
  110. """BadBackendTypeError An error occurred when importing Backend module"""
  111. pass
  112. class BadConfig(Exception):
  113. """BadConfig An error occurred while parsing Backend configuration"""
  114. pass
  115. class BackendBase:
  116. """BackendBase Class
  117. This class should be derived to make a specialized Backend class"""
  118. def process(self, data):
  119. """method to process data
  120. should be implemented when subclassing"""
  121. del data
  122. return 0, 433, 'Backend not configured'
  123. def shutdown(self):
  124. """method to shudown and cleanup the backend
  125. should be implemented when subclassing"""
  126. pass
  127. class DebugBackend(BackendBase):
  128. """A fake Backend
  129. used only to debug the process"""
  130. def process(self, data):
  131. LOG(E_INFO, "[DebugBackend]: %s" % str(data))
  132. return 1234, 250, 'Ok'
  133. def shutdown(self): pass
  134. class Logger:
  135. """Message Logger class
  136. Used to log message to a file"""
  137. def __init__(self, config=None, debug=False):
  138. """The constructor"""
  139. if debug:
  140. self.log_fd = stdout
  141. else:
  142. try:
  143. self.log_fd = open(config.get('global', 'logfile'), 'a')
  144. except:
  145. print ('Cannot open logfile, using stderr')
  146. self.log_fd = stderr
  147. try:
  148. self.loglevel = DEBUGLEVELS[config.get('global', 'loglevel').lower()]
  149. except:
  150. self.loglevel = E_ERR
  151. try:
  152. self.logstrtime = config.get('global', 'logstrtime')
  153. except:
  154. self.logstrtime = '%m/%d/%Y %H:%M:%S'
  155. def __call__(self, level, msg):
  156. """Default call method for Logger class
  157. It's used to append a message to the logfile depending on
  158. the severity"""
  159. if self.loglevel < level:
  160. return
  161. timestr = strftime(self.logstrtime, localtime(time()))
  162. outstr = '%s %s\n' % (timestr, msg)
  163. try:
  164. self.log_fd.write(outstr)
  165. self.log_fd.flush()
  166. except:
  167. pass
  168. del timestr, outstr
  169. def fileno(self):
  170. """returns logfile fd
  171. Used to pass it on some backends like xmlrpc"""
  172. return self.log_fd.fileno()
  173. def flush(self):
  174. """flushes the Logger fd to force the write operation"""
  175. return self.log_fd.flush()
  176. def close(self):
  177. """closes the Logger fd"""
  178. try:
  179. self.log_fd.close()
  180. except: pass
  181. def StageHandler(config, stage_type):
  182. """Meta class for a StageHandler Backend"""
  183. ##### Class Wrapper - Start
  184. ### I need class type before __init__
  185. try:
  186. input_class = config.get(stage_type, 'input').split(':', 1)[0]
  187. except:
  188. input_class = 'invalid or missing input in stage %s' % stage_type
  189. if not input_classes.has_key(input_class):
  190. raise (BadStageInput, input_class)
  191. class StageHandler(Thread, input_classes[input_class]):
  192. """Base class for a StageHandler Backend"""
  193. def __init__(self, Class, config, stage_type):
  194. """StageHandler Constructor"""
  195. self.process_message = getattr(self, 'process_' + stage_type, None)
  196. if self.process_message is None:
  197. raise (BadStageTypeError, stage_type)
  198. try:
  199. self.proto, self.address = config.get(stage_type, 'input').split(':', 1)
  200. except:
  201. raise BadStageInput
  202. try:
  203. timeout = config.getfloat('global', 'timeout')
  204. except:
  205. timeout = None
  206. Thread.__init__(self)
  207. ## Init MTPServer Class
  208. Class.__init__(self, self.address, self.del_hook, timeout=timeout)
  209. self.lock = RLock()
  210. self.type = stage_type
  211. ## Setup handle_accept Hook
  212. self._handle_accept = self.handle_accept
  213. self.handle_accept = self.accept_hook
  214. try:
  215. self.usepoll = config.getboolean('global', 'usepoll')
  216. except:
  217. self.usepoll = True
  218. try:
  219. self.granularity = config.getint('global', 'granularity')
  220. except:
  221. self.granularity = GRANULARITY
  222. ## Win32 Fixups
  223. if platform == 'win32':
  224. ## No support for poll on win32
  225. self.usepoll = False
  226. ## Bug: hang on close if using psycopg / Not needed if run as service
  227. self.setDaemon(main_svc)
  228. try:
  229. self.nowait = config.getboolean('global', 'nowait')
  230. except:
  231. self.nowait = False
  232. try:
  233. self.datefromemail = config.getboolean('global', 'datefromemail')
  234. except:
  235. self.datefromemail = False
  236. ## Init Hashdb to avoid re-archiving
  237. try:
  238. self.hashdb = hashopen(config.get(self.type, 'hashdb'), 'c')
  239. except:
  240. LOG(E_TRACE, '%s: Cannot open hashdb file' % self.type)
  241. raise (Exception, 'Cannot open hashdb file')
  242. try:
  243. self.debuglevel = config.getint(self.type, 'debuglevel')
  244. except:
  245. self.debuglevel = 0
  246. ## Set custom banner
  247. self.banner = 'Netfarm Archiver [%s] version %s' % (stage_type, __version__)
  248. try:
  249. output, address = config.get(stage_type, 'output').split(':', 1)
  250. except:
  251. output = 'invalid or missing output in stage %s' % stage_type
  252. if not output_classes.has_key(output):
  253. raise (BadStageOutput, output)
  254. self.output = output_classes[output]
  255. try:
  256. self.output_address, self.output_port = address.split(':', 1)
  257. self.output_port = int(self.output_port)
  258. except:
  259. raise (BadStageOutput, self.output)
  260. ## Backend factory
  261. self.config = config
  262. backend_type = self.config.get(stage_type, 'backend')
  263. try:
  264. backend = getattr(__import__('backend_%s' % backend_type, globals(), locals(), []), 'Backend')
  265. except ImportError:
  266. t, val, tb = exc_info()
  267. del tb
  268. LOG(E_ERR, '%s: Cannot import backend: %s' % (self.type, str(val)))
  269. raise (BadBackendTypeError, str(val))
  270. self.backend = backend(self.config, stage_type, globals())
  271. self.shutdown_backend = self.backend.shutdown
  272. def run(self):
  273. self.setName(self.type)
  274. LOG(E_ALWAYS, '[%d] Starting Stage Handler %s: %s %s' % (getpid(), self.type, self.proto, self.address))
  275. self.loop(self.granularity, self.usepoll, self.map)
  276. ## Hooks to gracefully stop threads
  277. def accept_hook(self):
  278. """hook called when the server accepts an incoming connection"""
  279. LOG(E_TRACE, '%s: I got a connection: Acquiring lock' % self.type)
  280. self.lock.acquire()
  281. return self._handle_accept()
  282. def del_hook(self):
  283. """hook called when a connection is terminated"""
  284. LOG(E_TRACE, '%s: Connection closed: Releasing lock' % self.type)
  285. try:
  286. self.lock.release()
  287. except:
  288. pass
  289. def finish(self, force=True):
  290. """shutdown the Archiver system waiting for unterminated jobs"""
  291. if not self.nowait and not force:
  292. LOG(E_TRACE, '%s: Waiting thread job...' % self.getName())
  293. self.lock.acquire()
  294. LOG(E_TRACE, '%s: Done' % self.getName())
  295. self.close_all()
  296. ## low entropy message id generator, fake because it's not changed in the msg
  297. def new_mid(self):
  298. m = ''.join(random_sample(ascii_letters, 20)) + '/NMA'
  299. return '<' + '@'.join([m, self.address]) + '>'
  300. def sendmail(self, m_from, m_opts, m_to, m_rcptopts, msg, aid=None, hash=None):
  301. """Rerouting of mails to nexthop (postfix)"""
  302. if msg is None: # E.g. regex has failed
  303. LOG(E_ERR, '%s-sendmail: msg is None something went wrong ;(' % self.type)
  304. return self.do_exit(443, 'Internal server error')
  305. try:
  306. server = self.output(self.output_address, self.output_port)
  307. except:
  308. t, val, tb = exc_info()
  309. del tb
  310. LOG(E_ERR, '%s-sendmail: Failed to connect to output server: %s' % (self.type, str(val)))
  311. return self.do_exit(443, 'Failed to connect to output server')
  312. ## Null path - smtplib doesn't enclose '' in brackets
  313. if m_from == '':
  314. m_from = '<>'
  315. rcpt_options = []
  316. ## Fake rcpt options for NOTIFY passthrough
  317. if len(m_rcptopts) > 0:
  318. option = m_rcptopts[0][1].upper()
  319. if option.find('NOTIFY') != -1:
  320. rcpt_options = ['NOTIFY' + option.split('NOTIFY', 1).pop()]
  321. ## Mail options is disabled for now
  322. try:
  323. try:
  324. server_reply = server.sendmail(m_from, m_to, msg, mail_options=[], rcpt_options=rcpt_options)
  325. except (SMTPRecipientsRefused, SMTPSenderRefused):
  326. LOG(E_ERR, '%s-sendmail: Server refused sender or recipients' % (self.type))
  327. return self.do_exit(550, 'Server refused sender or recipients')
  328. except:
  329. t, v, tb = exc_info()
  330. LOG(E_ERR, '%s-sendmail: sent failed: %s: %s' % (self.type, t, v))
  331. return self.do_exit(443, 'Delivery failed to next hop')
  332. else:
  333. okmsg = 'Sendmail Ok'
  334. if aid: okmsg = 'Archived as: ' + str(aid)
  335. if server_reply != {}:
  336. LOG(E_ERR, '%s-sendmail: ok but not all recipients where accepted %s' % (self.type, server_reply))
  337. if hash is not None and self.hashdb.has_key(hash):
  338. LOG(E_TRACE, '%s-sendmail: expunging msg %s from hashdb' % (self.type, aid))
  339. try:
  340. del self.hashdb[hash]
  341. self.hashdb.sync()
  342. except:
  343. pass
  344. return self.do_exit(250, okmsg, 200)
  345. finally:
  346. try:
  347. server.close()
  348. except: pass
  349. def do_exit(self, code, msg='', extcode=None):
  350. """Exit function
  351. @returns: exit code and messages"""
  352. self.del_channel()
  353. if not extcode:
  354. extcode = code
  355. excode = '.'.join([x for x in str(extcode)])
  356. return ' '.join([str(code), excode, msg])
  357. def process_storage(self, peer, sender, mail_options, recips, rcptopts, data):
  358. """Stores the archived email using a Backend"""
  359. size = len(data)
  360. if size < MINSIZE:
  361. return self.do_exit(550, 'Invalid Mail')
  362. if not data.endswith(NL):
  363. data = data + NL
  364. stream = StringIO(data)
  365. msg = Message(stream)
  366. aid = msg.get(AID, None)
  367. ## Check if I have msgid in my cache
  368. mid = msg.get('message-id', self.new_mid())
  369. LOG(E_TRACE, '%s: Message-id: %s' % (self.type, mid))
  370. hash = hash_headers(msg.get)
  371. if self.hashdb.has_key(hash):
  372. aid = self.hashdb[hash]
  373. LOG(E_ERR, '%s: Message already processed' % self.type)
  374. return self.sendmail(sender, mail_options, recips, rcptopts, data, aid, hash)
  375. ## Date extraction
  376. m_date = None
  377. if self.datefromemail:
  378. m_date = msg.getdate('Date')
  379. try:
  380. mktime(m_date)
  381. except:
  382. m_date = None
  383. if m_date is None:
  384. m_date = localtime(time())
  385. del msg, stream
  386. ## Mail needs to be processed
  387. if aid:
  388. try:
  389. year, pid = aid.split('-', 1)
  390. year = int(year)
  391. pid = int(pid)
  392. except:
  393. t, val, tb = exc_info()
  394. del tb
  395. LOG(E_ERR, '%s: Invalid X-Archiver-ID header [%s]' % (self.type, str(val)))
  396. return self.do_exit(550, 'Invalid X-Archiver-ID header')
  397. args = dict(mail=data, year=year, pid=pid, date=m_date, mid=mid, hash=hash)
  398. LOG(E_TRACE, '%s: year is %d - pid is %d (%s)' % (self.type, year, pid, mid))
  399. status, code, msg = self.backend.process(args)
  400. if status == 0:
  401. LOG(E_ERR, '%s: process failed %s' % (self.type, msg))
  402. return self.do_exit(code, msg)
  403. ## Inserting in hashdb
  404. LOG(E_TRACE, '%s: inserting %s msg in hashdb' % (self.type, aid))
  405. self.hashdb[hash] = aid
  406. self.hashdb.sync()
  407. LOG(E_TRACE, '%s: backend worked fine' % self.type)
  408. else:
  409. ## Mail in whitelist - not processed
  410. LOG(E_TRACE, '%s: X-Archiver-ID header not found in mail [whitelist]' % self.type)
  411. ## Next hop
  412. LOG(E_TRACE, '%s: passing data to nexthop: %s:%s' % (self.type, self.output_address, self.output_port))
  413. return self.sendmail(sender, mail_options, recips, rcptopts, data, aid, hash)
  414. def add_aid(self, data, msg, aid):
  415. archiverid = '%s: %s' % (AID, aid)
  416. LOG(E_INFO, '%s: %s' % (self.type, archiverid))
  417. archiverid = archiverid + NL
  418. headers = data[:msg.startofbody]
  419. if msg.get(AID, None):
  420. LOG(E_TRACE, '%s: Warning overwriting X-Archiver-ID header' % self.type)
  421. ## Overwrite existing header
  422. try:
  423. data = re_aid.sub(archiverid, headers, 1).strip() + STARTOFBODY + data[msg.startofbody:]
  424. except:
  425. t, val, tb = exc_info()
  426. del tb
  427. LOG(E_ERR, '%: Error overwriting X-Archiver-ID header: %s' % (self.type, str(val)))
  428. return None
  429. else:
  430. data = headers.strip() + NL + archiverid + STARTOFBODY + data[msg.startofbody:]
  431. return data
  432. def remove_aid(self, data, msg):
  433. if msg.get(AID, None):
  434. LOG(E_TRACE, '%s: This mail should not have X-Archiver-ID header, removing it' % self.type)
  435. try:
  436. headers = data[:msg.startofbody]
  437. data = re_aid.sub('', headers, 1).strip() + STARTOFBODY + data[msg.startofbody:]
  438. except:
  439. t, val, tb = exc_info()
  440. del tb
  441. LOG(E_ERR, '%s: Error removing X-Archiver-ID header: %s' % (self.type, str(val)))
  442. return data
  443. def process_archive(self, peer, sender, mail_options, recips, rcptopts, data):
  444. """Archives email meta data using a Backend"""
  445. LOG(E_INFO, '%s: Sender is <%s> - Recipients (Envelope): %s' % (self.type, sender, ','.join(recips)))
  446. size = len(data)
  447. if size < MINSIZE:
  448. return self.do_exit(550, 'Invalid Mail')
  449. if not data.endswith(NL):
  450. data = data + NL
  451. args = {}
  452. aid = None
  453. mid = None
  454. stream = StringIO(data)
  455. msg = Message(stream)
  456. if sender == '':
  457. LOG(E_INFO, '%s: Null return path mail, not archived' % (self.type))
  458. return self.sendmail('<>', mail_options, recips, rcptopts, data, aid)
  459. ## Check if I have msgid in my cache
  460. mid = msg.get('message-id', self.new_mid())
  461. hash = hash_headers(msg.get)
  462. if self.hashdb.has_key(hash):
  463. LOG(E_TRACE, '%s: Message-id: %s' % (self.type, mid))
  464. aid = self.hashdb[hash]
  465. LOG(E_TRACE, '%s: Message already has year/pid pair, only adding header' % self.type)
  466. return self.sendmail(sender, mail_options, recips, rcptopts, self.add_aid(data, msg, aid), aid, hash)
  467. args['m_mid'] = mid
  468. args['hash'] = hash
  469. ## Check for duplicate headers
  470. dupe = dupe_check(msg.headers)
  471. if dupe is not None:
  472. LOG(E_ERR, '%s: Duplicate header %s' % (self.type, dupe))
  473. return self.do_exit(552, 'Duplicate header %s' % dupe)
  474. ## Extraction of From field
  475. m_from = msg.getaddrlist('From')
  476. if len(m_from) == 1:
  477. m_from = safe_parseaddr(m_from[0][1])
  478. else:
  479. m_from = None
  480. ## Empty or invalid 'From' field, try to use sender
  481. if m_from is None:
  482. LOG(E_ERR, '%s: no From header in mail using sender' % self.type)
  483. m_from = safe_parseaddr(sender)
  484. ## No luck
  485. if m_from is None:
  486. return self.do_exit(552, 'Mail has not suitable From/Sender')
  487. args['m_from'] = m_from
  488. ## Extract 'To' field
  489. m_to = []
  490. for h in msg.getaddrlist('To'):
  491. rec = safe_parseaddr(h[1])
  492. if rec is None: continue
  493. m_to.append(rec)
  494. ## Empty 'To' field use recipients
  495. if len(m_to) == 0:
  496. LOG(E_ERR, '%s: no To header in mail using recipients' % self.type)
  497. for recipient in recips:
  498. rec = safe_parseaddr(recipient)
  499. if rec is None:
  500. continue
  501. m_to.append(rec)
  502. if len(m_to) == 0:
  503. return self.do_exit(552, 'Mail has not suitable To/Recipient')
  504. ## Extract 'Cc' field
  505. for h in msg.getaddrlist('Cc'):
  506. rec = safe_parseaddr(h[1])
  507. if rec is None: continue
  508. m_to.append(rec)
  509. ## Cleanup: remove duplicates
  510. recs = []
  511. for rec in m_to:
  512. if rec not in recs:
  513. recs.append(rec)
  514. args['m_rec'] = recs
  515. ## Extract 'Subject' field
  516. m_sub = mime_decode_header(msg.get('Subject', 'No Subject'))
  517. if subjpattern is not None and m_sub.find(subjpattern) != -1:
  518. LOG(E_INFO, '%s: Subject pattern matched, not archived' % self.type)
  519. return self.sendmail(sender, mail_options, recips, rcptopts, self.remove_aid(data, msg))
  520. args['m_sub'] = m_sub
  521. ## Whitelist check: From, To and Sender (envelope)
  522. checklist = [m_from] + m_to
  523. ss = safe_parseaddr(sender)
  524. if ss is not None:
  525. checklist.append(ss)
  526. for check in checklist:
  527. if check.split('@', 1)[0] in whitelist:
  528. LOG(E_INFO, '%s: Mail to: %s in whitelist, not archived' % (self.type, check))
  529. return self.sendmail(sender, mail_options, recips, rcptopts, self.remove_aid(data, msg))
  530. ## Sender size limit check - in kb
  531. if dbchecker is not None and dbchecker.quota_check(m_from, size >> 10):
  532. return self.do_exit(422, 'Sender quota execeded')
  533. args['m_size'] = size
  534. ## Extract 'Date' field
  535. m_date = None
  536. if self.datefromemail:
  537. m_date = msg.getdate('Date')
  538. try:
  539. mktime(m_date)
  540. except:
  541. m_date = None
  542. if m_date is None:
  543. m_date = localtime(time())
  544. args['m_date'] = m_date
  545. m_attach = []
  546. if msg.maintype != 'multipart':
  547. m_parse = parse_message(msg)
  548. if m_parse is not None:
  549. m_attach.append(m_parse)
  550. else:
  551. filepart = MultiFile(stream)
  552. filepart.push(msg.getparam('boundary'))
  553. try:
  554. while filepart.next():
  555. submsg = Message(filepart)
  556. subpart = parse_message(submsg)
  557. if subpart is not None:
  558. m_attach.append(subpart)
  559. except:
  560. LOG(E_ERR, '%s: Error in multipart splitting' % self.type)
  561. args['m_attach'] = m_attach
  562. if dbchecker is not None:
  563. ## Collect data for mb lookup
  564. addrs = []
  565. for addr in [m_from] + m_to:
  566. addrs.append(addr)
  567. args['m_mboxes'] = dbchecker.mblookup(addrs)
  568. else:
  569. args['m_mboxes'] = []
  570. year, pid, error = self.backend.process(args)
  571. if year == 0:
  572. LOG(E_ERR, '%s: Backend Error: %s' % (self.type, error))
  573. return self.do_exit(pid, error)
  574. ## Adding X-Archiver-ID: header
  575. aid = '%d-%d' % (year, pid)
  576. data = self.add_aid(data, msg, aid)
  577. LOG(E_TRACE, '%s: inserting %s msg in hashdb' % (self.type, aid))
  578. self.hashdb[hash] = aid
  579. self.hashdb.sync()
  580. ## Next hop
  581. LOG(E_TRACE, '%s: backend worked fine' % self.type)
  582. LOG(E_TRACE, '%s: passing data to nexthop: %s:%s' % (self.type, self.output_address, self.output_port))
  583. return self.sendmail(sender, mail_options, recips, rcptopts, data, aid, hash)
  584. ##### Class Wrapper - End
  585. return apply(StageHandler, (input_classes[input_class], config, stage_type))
  586. #### Mailbox DB and Quota DB reader/checker
  587. class DBChecker(Thread):
  588. def __init__(self, dbfiles, timeout):
  589. from mblookup import getusers
  590. self.getusers = getusers
  591. self.dbfiles = dbfiles
  592. self.postuser = None
  593. self.ev = Event()
  594. self.running = True
  595. self.timeout = timeout
  596. self.lock = Lock()
  597. self.updatedblist()
  598. Thread.__init__(self)
  599. def getpuser(self):
  600. try:
  601. fd = open('/etc/imapd.conf', 'r')
  602. for line in fd:
  603. line = line.strip()
  604. if line.startswith('postuser:'):
  605. self.postuser = line.split(':', 1).pop().strip()
  606. break
  607. fd.close()
  608. except: pass
  609. def run(self):
  610. self.getpuser()
  611. while self.running:
  612. #LOG(E_TRACE, '[DBChecker] CheckPoint')
  613. self.updatedblist()
  614. self.ev.wait(self.timeout)
  615. LOG(E_ALWAYS, '[DBChecker] Done')
  616. def stop(self):
  617. self.running = False
  618. self.ev.set()
  619. def updatedb(self, db):
  620. update = False
  621. try:
  622. info = stat(db['filename'])
  623. if info[ST_MTIME] != db['timestamp']:
  624. update = True
  625. except:
  626. update = True
  627. if update:
  628. try:
  629. dbdict = {}
  630. dbf = hashopen(db['filename'], 'r')
  631. dbdict.update(dbf)
  632. dbf.close()
  633. db['timestamp'] = info[ST_MTIME]
  634. db['db'] = dbdict
  635. LOG(E_INFO, '[DBChecker] (Re)Loaded db %s' % db['filename'])
  636. except Exception as e:
  637. LOG(E_ERR, '[DBChecker] Error (Re)Loading db %s, %s' % (db['filename'], e))
  638. def updatedblist(self):
  639. ## Check timestamp and update data structs
  640. self.lock.acquire()
  641. for db in self.dbfiles.values():
  642. self.updatedb(db)
  643. self.lock.release()
  644. def quota_check(self, email, size):
  645. ## Quota Check
  646. if not self.dbfiles.has_key('quota'): return False
  647. if self.dbfiles['quota']['db'] is None: return False
  648. sender = self.mblookup([email])
  649. if len(sender) != 1: return False
  650. sender = sender[0]
  651. res = False
  652. self.lock.acquire()
  653. if self.dbfiles['quota']['db'].has_key(sender):
  654. try:
  655. csize = long(self.dbfiles['quota']['db'][sender])
  656. except:
  657. csize = 0;
  658. if (csize > 0) and (size > csize):
  659. LOG(E_ERR, '[DBChecker] Quota for %s exceded' % email)
  660. res = True
  661. self.lock.release()
  662. return res
  663. def mblookup(self, emails):
  664. ## Mailbox lookup
  665. if not (self.dbfiles.has_key('virtual') and \
  666. self.dbfiles.has_key('aliases')):
  667. return []
  668. if (self.dbfiles['virtual']['db'] is None) or \
  669. (self.dbfiles['aliases']['db'] is None):
  670. return []
  671. self.lock.acquire()
  672. res = self.getusers(emails, self.dbfiles, self.postuser)
  673. self.lock.release()
  674. return res
  675. def multiplex(objs, function, *args):
  676. """Generic method multiplexer
  677. It executes the given method and args for each object in the list"""
  678. res = []
  679. for obj in objs:
  680. method = getattr(obj, function, None)
  681. if method: res.append(apply(method, args))
  682. return res
  683. def sig_int_term(signum, frame):
  684. """Handler for SIGINT and SIGTERM signals
  685. Terminates the StageHandler threads"""
  686. global isRunning
  687. del signum, frame # Not needed avoid pychecker warning
  688. if not isRunning: return # already called
  689. LOG(E_ALWAYS, "[Main] Got SIGINT/SIGTERM")
  690. isRunning = False
  691. if len(serverPoll):
  692. LOG(E_ALWAYS, '[Main] Shutting down stages')
  693. multiplex(serverPoll, 'finish')
  694. multiplex(serverPoll, 'shutdown_backend')
  695. multiplex(serverPoll, 'stop')
  696. def do_shutdown(res = 0):
  697. """Archiver system shutdown"""
  698. if platform != 'win32' and pidfile is not None:
  699. try:
  700. unlink(pidfile)
  701. except: pass
  702. LOG(E_ALWAYS, '[Main] Waiting for child threads')
  703. multiplex(serverPoll, 'close')
  704. LOG(E_ALWAYS, '[Main] Shutdown complete')
  705. LOG.close()
  706. if main_svc:
  707. sys_exit(res)
  708. else:
  709. return res
  710. ## Specific Startup on unix
  711. def unix_startup(config, user=None, debug=False):
  712. """ Unix specific startup actions """
  713. global pidfile
  714. if user:
  715. try:
  716. userpw = getpwnam(user)
  717. setegid(userpw[3])
  718. seteuid(userpw[2])
  719. except:
  720. t, val, tb = exc_info()
  721. del t, tb
  722. print ('Cannot swith to user', user, str(val))
  723. sys_exit(-2)
  724. else:
  725. user = getpwuid(getuid())[0]
  726. try:
  727. pidfile = config.get('global', 'pidfile')
  728. except:
  729. LOG(E_ALWAYS, '[Main] Missing pidfile in config')
  730. do_shutdown(-4)
  731. locked = 1
  732. try:
  733. pid = int(open(pidfile).read().strip())
  734. LOG(E_TRACE, '[Main] Lock: Sending signal to the process')
  735. try:
  736. kill(pid, 0)
  737. LOG(E_ERR, '[Main] Stale Lockfile: Process is alive')
  738. except:
  739. LOG(E_ERR, '[Main] Stale Lockfile: Old process is not alive')
  740. locked = 0
  741. except:
  742. locked = 0
  743. if locked:
  744. LOG(E_ALWAYS, '[Main] Unable to start Netfarm Archiver, another instance is running')
  745. do_shutdown(-5)
  746. ## Daemonize - Unix only - win32 has service
  747. if not debug:
  748. try:
  749. pid = fork()
  750. except:
  751. t, val, tb = exc_info()
  752. del t
  753. print ('Cannot go in background mode', str(val))
  754. if pid: sys_exit(0)
  755. chdir('/')
  756. null = open('/dev/null', 'r')
  757. close(stdin.fileno())
  758. dup(null.fileno())
  759. null.close()
  760. close(stdout.fileno())
  761. dup(LOG.fileno())
  762. close(stderr.fileno())
  763. dup(LOG.fileno())
  764. ## Save my process id to file
  765. mypid = str(getpid())
  766. try:
  767. open(pidfile,'w').write(mypid)
  768. except:
  769. LOG(E_ALWAYS, '[Main] Pidfile is not writable')
  770. do_shutdown(-6)
  771. return user, mypid
  772. ## Specific Startup on win32
  773. def win32_startup():
  774. """ Win32 specific startup actions"""
  775. return 'Windows User', getpid()
  776. ## Start the Archiver Service
  777. def ServiceStartup(configfile, user=None, debug=False, service_main=False):
  778. """ Archiver Service Main """
  779. global LOG, main_svc, dbchecker, runas, whitelist, subjpattern, isRunning
  780. main_svc = service_main
  781. if not access(configfile, F_OK | R_OK):
  782. print ('Cannot read configuration file', configfile)
  783. return -3
  784. config = ConfigParser()
  785. config.read(configfile)
  786. LOG = Logger(config, debug)
  787. if platform == 'win32':
  788. runas, mypid = win32_startup()
  789. else:
  790. runas, mypid = unix_startup(config, user, debug)
  791. ### Quota and Mailbox lookup stuff
  792. if platform != 'win32':
  793. try:
  794. sleeptime = float(config.get('global', 'sleeptime'))
  795. except:
  796. sleeptime = 60.0
  797. dbfiles = {}
  798. try:
  799. dbfiles['quota'] = { 'file': config.get('global', 'quotafile'), 'timestamp': 0, 'db': None }
  800. LOG(E_ALWAYS, '[Main] QuotaCheck Enabled')
  801. except:
  802. pass
  803. try:
  804. virtualdb, aliasdb = config.get('global', 'mbfiles').split(',')
  805. dbfiles['virtual'] = { 'filename': virtualdb.strip(), 'timestamp': 0, 'db': None }
  806. dbfiles['aliases'] = { 'filename': aliasdb.strip(), 'timestamp': 0, 'db': None }
  807. LOG(E_ALWAYS, '[Main] Mailbox Lookup is enabled')
  808. except:
  809. pass
  810. ## Whitelist
  811. try:
  812. whitelist = config.get('global', 'whitelist').split(',')
  813. LOG(E_TRACE, '[Main] My whitelist is ' + ','.join(whitelist))
  814. except:
  815. pass
  816. ## Subject pattern
  817. try:
  818. subjpattern = config.get('global', 'subjpattern')
  819. except:
  820. pass
  821. ## Starting up
  822. LOG(E_INFO, '[Main] Running as user %s pid %s' % (runas, mypid))
  823. ## Creating stage sockets
  824. sections = config.sections()
  825. if 'archive' in sections:
  826. serverPoll.append(StageHandler(config, 'archive'))
  827. if 'storage' in sections:
  828. serverPoll.append(StageHandler(config, 'storage'))
  829. if len(serverPoll) == 0:
  830. LOG(E_ALWAYS, '[Main] No stages configured, Aborting...')
  831. return do_shutdown(-7)
  832. if platform != 'win32' and len(dbfiles):
  833. dbchecker = DBChecker(dbfiles, sleeptime)
  834. serverPoll.append(dbchecker)
  835. multiplex(serverPoll, 'start')
  836. isRunning = True
  837. try:
  838. granularity = config.getint('global', 'granularity')
  839. except:
  840. granularity = GRANULARITY
  841. ## Install Signal handlers
  842. if platform != 'win32':
  843. LOG(E_TRACE, '[Main] Installing signal handlers')
  844. signal(SIGINT, sig_int_term)
  845. signal(SIGTERM, sig_int_term)
  846. signal(SIGHUP, SIG_IGN)
  847. while isRunning:
  848. try:
  849. multiplex(serverPoll, 'join', granularity)
  850. except:
  851. ## Program Termination when sigint is not catched (mainly on win32)
  852. sig_int_term(0, 0)
  853. ## Shutdown
  854. return do_shutdown(0)
  855. ## Main
  856. if __name__ == '__main__':
  857. if platform == 'win32':
  858. configfile = 'archiver.ini'
  859. arglist = 'dc:'
  860. else:
  861. configfile = '/etc/archiver.conf'
  862. arglist = 'dc:u:'
  863. try:
  864. optlist, args = getopt(argv[1:], arglist)
  865. if len(args) > 0:
  866. raise Exception
  867. except:
  868. usage = 'Usage [%s] [-d] [-c alternate_config]' % argv[0]
  869. if platform != 'win32':
  870. usage = usage + ' [-u user]'
  871. print (usage)
  872. sys_exit(-1)
  873. debug = False
  874. user = None
  875. for arg in optlist:
  876. if arg[0] == '-c':
  877. configfile = arg[1]
  878. continue
  879. if arg[0] == '-d':
  880. debug = True
  881. continue
  882. if arg[0] == '-u':
  883. user = arg[1]
  884. continue
  885. ServiceStartup(configfile, user, debug, True)