本文是本人實際開發中遇到,特留文記錄。在次我花了3天的時間解決,這個問題還是點難度的。
所用到知識點:
一般jdk
|-私有屬性反射
|-序列化
|-正則表達使用
|-多線程使用
|-dom4j的xml讀取
|+hibernate
|-攔截器
|-一個Session工廠同時連接不同數據庫(本文關鍵)
|-oracle Blob 存取
等........
需求功能介紹:
為性能考慮,單一服務器改成集群(每太服務器數據允許在一定時間內保持相步),給出的修改時間短,不過代碼持久層比較統一(hibernate 感謝天還好是她! )。網絡連接不穩定(鐵路內網!)。
完成后效果:
當網絡連接成功時,多數據庫的同步。
當網絡連接失敗時,本地應用程序運用hibernate攔截器攔截正操作對象并記錄下操作動作,序列化到本地時局庫 z_jcyy_tb 表中。表數據屬性為:id,inputdate(記錄時間),object(序列對象),action(操作動作)。并安一定時間測試連接。如果成功,讀取 z_jcyy_tb 表中數據 反序列化 再同步到 其他數據庫中。
代碼說明:
1.新Session 建立
hibernate.cfg.xml 在文件<session-factory>中添加
2.異地數據同步失敗后動作 TBDao->save() 凍結狀態到數據庫
3,失敗后又成功連接后 (線程實現) TBDao->action()
4.hbn 攔截器 ->Interceptor
所用到知識點:
一般jdk
|-私有屬性反射
|-序列化
|-正則表達使用
|-多線程使用
|-dom4j的xml讀取
|+hibernate
|-攔截器
|-一個Session工廠同時連接不同數據庫(本文關鍵)
|-oracle Blob 存取
等........
需求功能介紹:
為性能考慮,單一服務器改成集群(每太服務器數據允許在一定時間內保持相步),給出的修改時間短,不過代碼持久層比較統一(hibernate 感謝天還好是她! )。網絡連接不穩定(鐵路內網!)。
完成后效果:
當網絡連接成功時,多數據庫的同步。
當網絡連接失敗時,本地應用程序運用hibernate攔截器攔截正操作對象并記錄下操作動作,序列化到本地時局庫 z_jcyy_tb 表中。表數據屬性為:id,inputdate(記錄時間),object(序列對象),action(操作動作)。并安一定時間測試連接。如果成功,讀取 z_jcyy_tb 表中數據 反序列化 再同步到 其他數據庫中。
代碼說明:
1.新Session 建立
hibernate.cfg.xml 在文件<session-factory>中添加
<property name="connection.url_b">jdbc:oracle:thin:@192.168.1.114:1521:JCYY</property>
<property name="connection.username_b">jcyy</property>
<property name="connection.password_b">jcyy</property>
TBDao -> OpenSession() <property name="connection.username_b">jcyy</property>
<property name="connection.password_b">jcyy</property>
private static String url_b = null ;
private static String use_b = null ;
private static String pass_b = null ;
private static String dirver_b = null ;
static {try {
//取得hibernate.cfg.xml邏輯路徑,和原來程序關聯上
Field field = SessionManager.class.getDeclaredField("CONFIG_FILE_LOCATION");
field.setAccessible( true );
String path = (String) field.get(SessionManager. class );
//通過 dom4j 加載 配置文件
Document docT = new SAXReader().read( TBDao.class.getResourceAsStream(path) );
//正則+xpath讀取 在hbn文件中加入的<property name="..._b"> 的屬性
String xpath = "/hibernate-configuration/session-factory/property[@name='XPATH_I']" ;
Pattern p = Pattern.compile("(XPATH_I)");
Matcher ma = p.matcher(xpath);
url_b = DocumentHelper.createXPath( ma.replaceAll("connection.url_b") ).selectSingleNode(docT).getText();
use_b = DocumentHelper.createXPath( ma.replaceAll("connection.username_b")).selectSingleNode(docT).getText();
pass_b = DocumentHelper.createXPath( ma.replaceAll("connection.password_b")).selectSingleNode(docT).getText();
dirver_b = DocumentHelper.createXPath( ma.replaceAll("connection.driver_class")).selectSingleNode(docT).getText();
} catch (Exception e) {e.printStackTrace();}}
//利用hbn的SessionFactory得到 openSession(Connection); 打開異地數據庫連接。
//利用私有反射得到 加載完成的SessionFactory
public Session openSessionb(){
try {
Class.forName(dirver_b);
Connection conn = DriverManager.getConnection(url_b,use_b,pass_b);
Field[] fields = SessionManager.class.getDeclaredFields();
Field field = null ;
for(int i=0;i<fields.length;i++){
if( SessionFactory.class.equals( fields[i].getType() ) )
field = fields[i];
}
field.setAccessible(true);
SessionFactory sessionFactory = (SessionFactory) field.get(SessionManager.class );
return sessionFactory.openSession(conn);
} catch (Exception e) {
System.out.println("--沒有連接到總服務(openSessionb)--");
return null ;
}
}
private static String use_b = null ;
private static String pass_b = null ;
private static String dirver_b = null ;
static {try {
//取得hibernate.cfg.xml邏輯路徑,和原來程序關聯上
Field field = SessionManager.class.getDeclaredField("CONFIG_FILE_LOCATION");
field.setAccessible( true );
String path = (String) field.get(SessionManager. class );
//通過 dom4j 加載 配置文件
Document docT = new SAXReader().read( TBDao.class.getResourceAsStream(path) );
//正則+xpath讀取 在hbn文件中加入的<property name="..._b"> 的屬性
String xpath = "/hibernate-configuration/session-factory/property[@name='XPATH_I']" ;
Pattern p = Pattern.compile("(XPATH_I)");
Matcher ma = p.matcher(xpath);
url_b = DocumentHelper.createXPath( ma.replaceAll("connection.url_b") ).selectSingleNode(docT).getText();
use_b = DocumentHelper.createXPath( ma.replaceAll("connection.username_b")).selectSingleNode(docT).getText();
pass_b = DocumentHelper.createXPath( ma.replaceAll("connection.password_b")).selectSingleNode(docT).getText();
dirver_b = DocumentHelper.createXPath( ma.replaceAll("connection.driver_class")).selectSingleNode(docT).getText();
} catch (Exception e) {e.printStackTrace();}}
//利用hbn的SessionFactory得到 openSession(Connection); 打開異地數據庫連接。
//利用私有反射得到 加載完成的SessionFactory
public Session openSessionb(){
try {
Class.forName(dirver_b);
Connection conn = DriverManager.getConnection(url_b,use_b,pass_b);
Field[] fields = SessionManager.class.getDeclaredFields();
Field field = null ;
for(int i=0;i<fields.length;i++){
if( SessionFactory.class.equals( fields[i].getType() ) )
field = fields[i];
}
field.setAccessible(true);
SessionFactory sessionFactory = (SessionFactory) field.get(SessionManager.class );
return sessionFactory.openSession(conn);
} catch (Exception e) {
System.out.println("--沒有連接到總服務(openSessionb)--");
return null ;
}
}
2.異地數據同步失敗后動作 TBDao->save() 凍結狀態到數據庫
public void save(Object obj,String action) {
Session session = null ;
try {
session = SessionManager.currentSession(null,null);
Transaction tr = session.beginTransaction();
ZJcyyTb zj = new ZJcyyTb();
zj.setAction(action);
zj.setInputdate(new Date());
session.save(zj);
session.flush();
session.refresh(zj,LockMode.UPGRADE);
//oracle Blob數據持久 請參考-->序列化和反序列化對象到 數據庫
zj.setObject( new ObjectConvert().ObjectToBlob(obj) );
tr.commit();
} catch (Exception e) {
e.printStackTrace();
}finally{
if(session!=null&& session.isOpen() )session.close();
}
}
Session session = null ;
try {
session = SessionManager.currentSession(null,null);
Transaction tr = session.beginTransaction();
ZJcyyTb zj = new ZJcyyTb();
zj.setAction(action);
zj.setInputdate(new Date());
session.save(zj);
session.flush();
session.refresh(zj,LockMode.UPGRADE);
//oracle Blob數據持久 請參考-->序列化和反序列化對象到 數據庫
zj.setObject( new ObjectConvert().ObjectToBlob(obj) );
tr.commit();
} catch (Exception e) {
e.printStackTrace();
}finally{
if(session!=null&& session.isOpen() )session.close();
}
}
3,失敗后又成功連接后 (線程實現) TBDao->action()
public int isSql(){
int is_count = 0 ;
Session session = null ;
try {
//得到本地Session 查看是否有連接失敗后序列動作被保存
session = SessionManager.currentSession(null,null);
Transaction tr = session.beginTransaction();
Connection conn = session.connection();
Statement stat = conn.createStatement();
ResultSet rs = stat.executeQuery("select count(*) from z_jcyy_tb");
rs.next();
is_count = rs.getInt(1);
tr.commit();
} catch (Exception e) {
e.printStackTrace();
}finally{
if(session!=null&& session.isOpen() )session.close();
}
return is_count ;
}
public boolean action(){
int isSql = 0 ;
ObjectConvert oc = new ObjectConvert();
Session session = null ;
Session session_b = null ;
try {
//有失敗連接動作后嘗試 遠程數據庫
if( (isSql=isSql())>0 ){
session = SessionManager.currentSession(null,null);
//遠程數據庫連接
//如果成功連接:z_jcyy_tb表中數據同步到其他數據庫中
session_b = openSessionb();
if(session_b!=null){
Transaction tr_b = session_b.beginTransaction();
Transaction tr = session.beginTransaction();
Query qu = session.createQuery(" from ZJcyyTb t order by t.inputdate");
for(int i=0;i<=isSql/10;i++){
qu.setFirstResult(i*10);
qu.setMaxResults(10);
List list = qu.list();
for(Iterator it=list.iterator();it.hasNext();){
ZJcyyTb tb = (ZJcyyTb)it.next();
Object obj = null ;
obj = oc.BlobToObject(tb.getObject(),obj);
if(obj!=null){
String action = tb.getAction();
if(action.equals( TBDao.DELETE )){
session_b.delete(obj);
}
if(action.equals( TBDao.INSERT )){
session_b.save(obj);
}
if(action.equals( TBDao.UPDATE )){
session_b.update(obj);
}
}
session.delete(tb);
tr.commit();
}
}
tr_b.commit();
}
} return true ;
} catch (Exception e) {
System.out.println("--沒有連接到總服務(action)--");
}finally{
if(session_b!=null&&session_b.isOpen())session_b.close();
if(session!=null&& session.isOpen() )session.close();
SessionManager.closeSession();
}
return false ;
}
int is_count = 0 ;
Session session = null ;
try {
//得到本地Session 查看是否有連接失敗后序列動作被保存
session = SessionManager.currentSession(null,null);
Transaction tr = session.beginTransaction();
Connection conn = session.connection();
Statement stat = conn.createStatement();
ResultSet rs = stat.executeQuery("select count(*) from z_jcyy_tb");
rs.next();
is_count = rs.getInt(1);
tr.commit();
} catch (Exception e) {
e.printStackTrace();
}finally{
if(session!=null&& session.isOpen() )session.close();
}
return is_count ;
}
public boolean action(){
int isSql = 0 ;
ObjectConvert oc = new ObjectConvert();
Session session = null ;
Session session_b = null ;
try {
//有失敗連接動作后嘗試 遠程數據庫
if( (isSql=isSql())>0 ){
session = SessionManager.currentSession(null,null);
//遠程數據庫連接
//如果成功連接:z_jcyy_tb表中數據同步到其他數據庫中
session_b = openSessionb();
if(session_b!=null){
Transaction tr_b = session_b.beginTransaction();
Transaction tr = session.beginTransaction();
Query qu = session.createQuery(" from ZJcyyTb t order by t.inputdate");
for(int i=0;i<=isSql/10;i++){
qu.setFirstResult(i*10);
qu.setMaxResults(10);
List list = qu.list();
for(Iterator it=list.iterator();it.hasNext();){
ZJcyyTb tb = (ZJcyyTb)it.next();
Object obj = null ;
obj = oc.BlobToObject(tb.getObject(),obj);
if(obj!=null){
String action = tb.getAction();
if(action.equals( TBDao.DELETE )){
session_b.delete(obj);
}
if(action.equals( TBDao.INSERT )){
session_b.save(obj);
}
if(action.equals( TBDao.UPDATE )){
session_b.update(obj);
}
}
session.delete(tb);
tr.commit();
}
}
tr_b.commit();
}
} return true ;
} catch (Exception e) {
System.out.println("--沒有連接到總服務(action)--");
}finally{
if(session_b!=null&&session_b.isOpen())session_b.close();
if(session!=null&& session.isOpen() )session.close();
SessionManager.closeSession();
}
return false ;
}
4.hbn 攔截器 ->Interceptor
package com.jjm.hibernate;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import org.dom4j.XPath;
import org.dom4j.io.SAXReader;
import org.hibernate.CallbackException;
import org.hibernate.EntityMode;
import org.hibernate.Hibernate;
import org.hibernate.HibernateException;
import org.hibernate.Interceptor;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.Transaction;
import org.hibernate.cfg.Configuration;
import org.hibernate.type.Type;
import com.jjm.rlzy.dao.TBDao;
public class TestInterceptor implements Interceptor,Serializable{
static private boolean isConn_b = false ;
static private TBDao tb = new TBDao();
//線程 一分鐘 檢測連接失敗 同步
static{
new Thread(new Runnable(){
public void run() {
while(true){
isConn_b = tb.action();
try {
Thread.sleep(60*1000);
} catch (InterruptedException e) {e.printStackTrace();}
}
}
}).start();
}
public boolean onFlushDirty(Object entity, Serializable id, Object[] currentState, Object[] previousState, String[] propertyNames, Type[] types) throws CallbackException {
Session session = null ;
try {
if(isConn_b){
session = tb.openSessionb();
Transaction tr = session.beginTransaction();
session.update(entity);
tr.commit();
}else{
tb.save(entity,TBDao.UPDATE);
}
} catch (Exception e) {
e.printStackTrace() ;
tb.save(entity,TBDao.UPDATE);
isConn_b = false ;
}finally{
if(session!=null)session.close();
// 攔截器中 絕對不能有這句 -> SessionManager.closeSession();
}
return false;
}
public boolean onSave(Object entity, Serializable id, Object[] state, String[] propertyNames, Type[] types) throws CallbackException {
Session session = null ;
try {
if(isConn_b){
session = tb.openSessionb();
Transaction tr = session.beginTransaction();
session.save(entity);
tr.commit();
}else{
tb.save(entity,TBDao.INSERT);
}
} catch (Exception e) {
e.printStackTrace() ;
tb.save(entity,TBDao.INSERT);
isConn_b = false ;
}finally{
if(session!=null)session.close();
}
return false;
}
public void onDelete(Object entity, Serializable id, Object[] state, String[] propertyNames, Type[] types) throws CallbackException {
Session session = null ;
try {
if(isConn_b){
session = tb.openSessionb();
Transaction tr = session.beginTransaction();
session.delete(entity);
tr.commit();
}else{
tb.save(entity,TBDao.DELETE);
}
} catch (Exception e) {
e.printStackTrace() ;
tb.save(entity,TBDao.DELETE);
isConn_b = false ;
}finally{
if(session!=null)session.close();
}
}
................................
}
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import org.dom4j.XPath;
import org.dom4j.io.SAXReader;
import org.hibernate.CallbackException;
import org.hibernate.EntityMode;
import org.hibernate.Hibernate;
import org.hibernate.HibernateException;
import org.hibernate.Interceptor;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.Transaction;
import org.hibernate.cfg.Configuration;
import org.hibernate.type.Type;
import com.jjm.rlzy.dao.TBDao;
public class TestInterceptor implements Interceptor,Serializable{
static private boolean isConn_b = false ;
static private TBDao tb = new TBDao();
//線程 一分鐘 檢測連接失敗 同步
static{
new Thread(new Runnable(){
public void run() {
while(true){
isConn_b = tb.action();
try {
Thread.sleep(60*1000);
} catch (InterruptedException e) {e.printStackTrace();}
}
}
}).start();
}
public boolean onFlushDirty(Object entity, Serializable id, Object[] currentState, Object[] previousState, String[] propertyNames, Type[] types) throws CallbackException {
Session session = null ;
try {
if(isConn_b){
session = tb.openSessionb();
Transaction tr = session.beginTransaction();
session.update(entity);
tr.commit();
}else{
tb.save(entity,TBDao.UPDATE);
}
} catch (Exception e) {
e.printStackTrace() ;
tb.save(entity,TBDao.UPDATE);
isConn_b = false ;
}finally{
if(session!=null)session.close();
// 攔截器中 絕對不能有這句 -> SessionManager.closeSession();
}
return false;
}
public boolean onSave(Object entity, Serializable id, Object[] state, String[] propertyNames, Type[] types) throws CallbackException {
Session session = null ;
try {
if(isConn_b){
session = tb.openSessionb();
Transaction tr = session.beginTransaction();
session.save(entity);
tr.commit();
}else{
tb.save(entity,TBDao.INSERT);
}
} catch (Exception e) {
e.printStackTrace() ;
tb.save(entity,TBDao.INSERT);
isConn_b = false ;
}finally{
if(session!=null)session.close();
}
return false;
}
public void onDelete(Object entity, Serializable id, Object[] state, String[] propertyNames, Type[] types) throws CallbackException {
Session session = null ;
try {
if(isConn_b){
session = tb.openSessionb();
Transaction tr = session.beginTransaction();
session.delete(entity);
tr.commit();
}else{
tb.save(entity,TBDao.DELETE);
}
} catch (Exception e) {
e.printStackTrace() ;
tb.save(entity,TBDao.DELETE);
isConn_b = false ;
}finally{
if(session!=null)session.close();
}
}
................................
}