posts - 167,  comments - 30,  trackbacks - 0
          #!/usr/bin/env python
          #
           -*- coding: utf-8 -*-
          #
          ===============================================================================
          #
          #
           Copyright (c) 2015 Letv.com, Inc. All Rights Reserved
          #
          #
           python2.6版本安裝:
          #
                     yum install python-futures
          #
                     yum install python-qpid
          #
                     yum install supervisor  由supervisor來管理進程,當進程掛掉后自動監控并重啟
          #
           author: david
          #
           date: 2015/10/14 16:42:02
          #
          #
          ===============================================================================
          from qpid.messaging import *
          import time, sys
          import urllib2
          import hashlib
          import json
          import traceback
          from concurrent.futures import *
          import logging
          import logging.handlers

          LOG_DIR = "/home/ldw/logs/geturl/online"
          LOG_MID_FAIL=LOG_DIR + "/update_fail.log"
          LOG_FILE_NAME=LOG_DIR + "/geturl_update.log"

          # default test broker
          QPID_QUEUE_NAME_TEST = "ldw.update_v.queue"
          SERVERS_TEST = ['10.3.3.3']
          SERVERS_PRO = ['10.1.1.1.''10.2.2.2']
          BROKER_TEST = "xyz/xyz@10.11.1.1:5672"
          BROKER_PRO = "xyz/xyz@10.11.1.2:5672"
          RECONNECTION_URLS = ['']
          SERVERS = SERVERS_TEST
          BROKER = BROKER_TEST
          QUEUE_NAME = QPID_QUEUE_NAME_TEST
          THREAD_COUNT = 16

          # 獲取當前時間
          def getNowTime():
              ISOTIMEFORMAT='%Y-%m-%d %H:%M:%S'
              return time.strftime(ISOTIMEFORMAT, time.localtime())

          # 記錄處理失敗日志
          def fail2log(id):
              # 使用with關鍵字,自動關閉文件流
              with  open(LOG_MID_FAIL,'a') as log:
                  log.write('%s\n'%(id))

          # 記錄日志, 可以提出來共用
          def init_logger():
              logging.basicConfig()
              logger = logging.getLogger("__name__")
              logger.setLevel(logging.INFO)
              when:S:Seconds M:Minutes D:Days H:Hours interval:  backupCount:0 not deleted
              # 1天更換一次文件日志,7為保留日志文件個數
              logger_fh = logging.handlers.TimedRotatingFileHandler(LOG_FILE_NAME, 'D', 1, 7)
              logger_fh.suffix = "%Y%m%d-%H%M.log"
              logger_fh.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
              logger.addHandler(logger_fh)
              return logger
          _logger = init_logger()

          # 初始化環境變量,當在局部函數中修改全局變量時,使用global關鍵字
          def init_env(env):
              global BROKER
              # 其他變量類似處理
              if env != 'PRO':
                  BROKER = BROKER_PRO
              _logger.info('env:%s,broker:%s,queue_name:%s,reconnnection_urls:%s, servers: %s, thread_pool_count:%s'
                           % (env,BROKER,QUEUE_NAME,str(RECONNECTION_URLS),str(SERVERS), str(THREAD_COUNT)))

          # 處理業務邏輯
          def process(id, ip):
              key = 'xyz';
              action = 'delete';
              tm = str(int(time.time()))
              sig = hashlib.md5(tm+action+key).hexdigest()
              url = "http://ip/test?sig="+sig
              try:
                  req = urllib2.Request(url)
                  # 綁定Host
                  # req.add_header('Host', 'www.ldw.com')
                  response = urllib2.urlopen(req, timeout = 1)
                  result = response.read()
                  if result.find("400")>=0:
                      fail2log(id)
                      _logger.error('flush cache fail, url :%s' % (url))
                  else:
                      _logger.info('flush cache OK, IP :%s' % (ip))
              except Exception:
                  fail2log(id)
                  _logger.error('flush cbase cache error, url: %s' % (url))

          # init qpid connection
          def qpid_get_conn():
              conn = Connection(BROKER, heartbeat=6, reconnect=True, reconnect_limit=60, reconnect_interval=4, reconnect_urls=RECONNECTION_URLS)
              conn.open()
              sess = conn.session()
              rec = sess.receiver(QUEUE_NAME)
              return conn, sess, rec

          # receive message of qpid queue, multithread implements
          def qpid_receive(rec):
              try:
                  epool = ThreadPoolExecutor(max_workers=THREAD_COUNT)
                  while True:
                      try:
                          message = rec.fetch()
                          data = json.loads(message.content)
                          _logger.info(' receive message:%s' % (message.content))
                          mid = data['mid']
                          
                          for ip in SERVERS:
                              try:
                                  epool.submit(process(mid, ip))
                              except Exception,e:
                                  pass
                                  fail2log(mid)
                                  exstr = traceback.format_exc()
                                  _logger.error(' thread pool process message error:%s' % (exstr))
                      except Exception, e:
                          pass
                          fail2log(mid)
                          _logger.error(' fetch message error, msg:%s' % (message.content))
                      sess.acknowledge() # message ack
              except Exception, e:
                  pass
                  exstr = traceback.format_exc()
                  _logger.error(' start receive message error:%s' % (exstr))

          if __name__ ==   '__main__':
              if(len(sys.argv) < 2):
                  print "Usage: \"python input args error\""
                  sys.exit()
              env = sys.argv[1]
              if env != 'TEST' and env != 'PRO':
                  print "Usage: \"input evn args error\""
                  sys.exit()

              try:
                  init_env(env)
                  conn, sess, rec =  qpid_get_conn()
                  qpid_receive(rec)
                  for c in [ conn, sess, rec]:
                       if c: c.close()
              except Exception, e:
                  pass
                  exstr = traceback.format_exc()
                  _logger.error('init qpid connection message error:%s' % (exstr))
          posted on 2015-10-30 20:20 David1228 閱讀(2887) 評論(0)  編輯  收藏 所屬分類: 動態語言Python

          <2015年10月>
          27282930123
          45678910
          11121314151617
          18192021222324
          25262728293031
          1234567

          常用鏈接

          留言簿(4)

          隨筆分類

          隨筆檔案

          文章檔案

          新聞分類

          新聞檔案

          相冊

          收藏夾

          Java

          Linux知識相關

          Spring相關

          云計算/Linux/虛擬化技術/

          友情博客

          多線程并發編程

          開源技術

          持久層技術相關

          搜索

          •  

          積分與排名

          • 積分 - 359156
          • 排名 - 154

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 景德镇市| 桂阳县| 平阴县| 巴青县| 吉隆县| 湖北省| 屏东县| 田林县| 广平县| 奉贤区| 隆昌县| 东兴市| 军事| 临漳县| 福安市| 铁力市| 上林县| 镇康县| 桃江县| 克东县| 科技| 静宁县| 钟山县| 庄河市| 阆中市| 青岛市| 合阳县| 建德市| 鞍山市| 枣阳市| 太谷县| 九龙县| 丰台区| 扬州市| 吉安县| 桃源县| 元谋县| 五家渠市| 钟祥市| 郧西县| 大竹县|