master
WuXianChaoPin 6 years ago
parent e290dce448
commit e66ce5fb21
  1. 42
      core/src/main/java/core/thrift/ConnectionPoolFactory.java
  2. 37
      core/src/main/java/core/thrift/ThriftClientPool.java
  3. 1603
      core/src/main/java/core/thrift/task/TSDM.java
  4. 5
      core/src/test/resources/test.thrift
  5. 27
      db/src/main/java/db/config/HibernateConfig.java
  6. 33
      web/src/main/java/web/controller/DataController.java
  7. 7
      web/src/main/java/web/service/BaseService.java
  8. 29
      web/src/main/java/web/util/DynamicTimer.java
  9. 16
      web/src/test/java/SpringMVCTest.java
  10. 2
      web/src/test/java/ThriftServerDemo.java
  11. 14
      web/src/test/java/ThriftTest.java

@ -4,10 +4,11 @@ import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TTransport;
import java.net.Socket;
public class ConnectionPoolFactory extends BasePooledObjectFactory<TTransport> {
public class ConnectionPoolFactory extends BasePooledObjectFactory<TNonblockingSocket> {
private String ip;
private int port;
private int socketTimeout;
@ -19,19 +20,44 @@ public class ConnectionPoolFactory extends BasePooledObjectFactory<TTransport> {
}
@Override
//创建TTransport类型对象方法
public TTransport create() throws Exception {
//创建TNonblockingSocket类型对象方法
public TNonblockingSocket create() throws Exception {
return new TNonblockingSocket(ip, port, socketTimeout);
}
//把TTransport对象打包成池管理的对象PooledObject<TTransport>
//把TNonblockingSocket对象打包成池管理的对象PooledObject<TNonblockingSocket>
@Override
public PooledObject<TTransport> wrap(TTransport transport) {
public PooledObject<TNonblockingSocket> wrap(TNonblockingSocket transport) {
return new DefaultPooledObject<>(transport);
}
/**
* 验证对象
*/
@Override
public boolean validateObject(PooledObject<TNonblockingSocket> p) {
Socket socket=p.getObject().getSocketChannel().socket();
boolean closed = socket.isClosed();
boolean connected = socket.isConnected();
boolean outputShutdown = socket.isOutputShutdown();
boolean inputShutdown = socket.isInputShutdown();
boolean urgentFlag = false;
try {
socket.sendUrgentData(0xFF);
urgentFlag = true;
} catch (Exception e) {
}
return urgentFlag && connected && !closed && !inputShutdown && !outputShutdown;
}
/**
* 销毁对象
*/
@Override
public boolean validateObject(PooledObject<TTransport> p) {
return p.getObject().isOpen();
public void destroyObject(PooledObject<TNonblockingSocket> p) throws Exception {
p.getObject().close();
}
}

@ -1,16 +1,21 @@
package core.thrift;
import core.thrift.comment.QueryComment;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.thrift.async.TAsyncClient;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransport;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
@Service
public class ThriftClientPool {
@ -24,29 +29,37 @@ public class ThriftClientPool {
@Value("${thrift.timeout}")
private Integer timeout;
private static GenericObjectPool<TTransport> pool;
private static GenericObjectPool<TNonblockingSocket> pool;
private static final Map<Class<? extends TAsyncClient>, Constructor> map = new HashMap<>();
@PostConstruct
public void init() {
pool = new GenericObjectPool<>(new ConnectionPoolFactory(ip, port, timeout));
}
public static void doExecute(Thrift thrift) throws Exception {
public static <E extends TAsyncClient> void doExecute(Thrift<E> thrift,Class<E> c) throws Exception {
//从池里获取一个Transport对象
TTransport transport = pool.borrowObject();
TNonblockingSocket socket = pool.borrowObject();
thrift.doExecute(client(c,socket));
//把一个Transport对象归还到池里
pool.returnObject(socket);
}
private static <E extends TAsyncClient> E client(Class<E> c, TNonblockingSocket socket) throws NoSuchMethodException, IOException, IllegalAccessException, InvocationTargetException, InstantiationException {
if (!map.containsKey(c)) {
map.put(c, c.getConstructor(TProtocolFactory.class, TAsyncClientManager.class, TNonblockingTransport.class));
}
TAsyncClientManager clientManager = new TAsyncClientManager();
TProtocolFactory protocol = new TBinaryProtocol.Factory();
QueryComment.AsyncClient asyncClient = new QueryComment.AsyncClient(protocol, clientManager, (TNonblockingTransport) transport);
thrift.doExecute(asyncClient);
//把一个Transport对象归还到池里
pool.returnObject(transport);
return (E) map.get(c).newInstance(protocol,clientManager,socket);
}
public interface Thrift {
void doExecute(QueryComment.AsyncClient client) throws Exception;
public interface Thrift<E extends TAsyncClient> {
void doExecute(E client) throws Exception;
}
}

File diff suppressed because it is too large Load Diff

@ -0,0 +1,5 @@
namespace py core.thrift.task
service TSDM{
bool qiandao()
bool word()
}

@ -174,7 +174,7 @@ public class HibernateConfig {
* 初始化数据源session工厂
*/
@PostConstruct
private void init() throws ClassNotFoundException {
private void init() {
//连接根数据源
rootHibernateTemplate = initRootDB();
@ -188,11 +188,17 @@ public class HibernateConfig {
beanFactory.registerSingleton(DBBeanNameManager.getName(dataSourceModel.getDbId(),DBBeanNameManager.hibernateTemplate),hibernateTemplate);
Class c=ClassUtils.getClass(dataSourceModel.getDbId());
Class c;
try {
c = ClassUtils.getClass(dataSourceModel.getDbId());
} catch (ClassNotFoundException e) {
throw new RuntimeException("类"+dataSourceModel.getDbId()+"不存在");
}
if(c.isAnnotationPresent(Model.class)){
map.put(((Model) c.getAnnotation(Model.class)).value(), hibernateTemplate);
Model model= (Model) c.getAnnotation(Model.class);
map.put(model.value(), hibernateTemplate);
}else{
throw new RuntimeException(dataSourceModel.getAnnotation()+"不是注解标记!!");
throw new RuntimeException(dataSourceModel.getDbId()+"缺少"+Model.class+"注解");
}
}
}
@ -229,17 +235,4 @@ public class HibernateConfig {
return initSessionFactory(dataSource, config,null);
}
/**
* 遍历表备注信息注解
* @param c
*/
private void checkClass(Class c) {
if(c.isAnnotationPresent(TableInfo.class)){
TableInfo info= (TableInfo) c.getAnnotation(TableInfo.class);
tableNotes.put(c,info);
}
}
}

@ -2,6 +2,7 @@ package web.controller;
import core.thrift.MyAsyncMethodCallback;
import core.thrift.ThriftClientPool;
import core.thrift.comment.QueryComment;
import db.form.DBAction;
import db.model.bilibili.DataModel;
import org.apache.commons.io.FileUtils;
@ -33,11 +34,11 @@ import java.util.List;
@Controller
@RequestMapping("/data")
public class DataController extends TableController<DataModel, DataModelForm, DataService, OtherDataResult,DataTable<DataModel>> {
public class DataController extends TableController<DataModel, DataModelForm, DataService, OtherDataResult, DataTable<DataModel>> {
private final String regionJson = IOUtils.toString(DataController.class.getResourceAsStream("/region.json"), Charset.forName("UTF-8"));
private static final String key="regionJson";
private static final String key = "regionJson";
public DataController() throws IOException, InstantiationException, IllegalAccessException {
super();
@ -54,10 +55,10 @@ public class DataController extends TableController<DataModel, DataModelForm, Da
@RequestMapping(value = "count/{cid}")
public Integer count(@PathVariable int cid) {
try {
MyAsyncMethodCallback<Integer> call= new MyAsyncMethodCallback<>();
ThriftClientPool.doExecute(client -> client.commentSum(cid,call));
MyAsyncMethodCallback<Integer> call = new MyAsyncMethodCallback<>();
ThriftClientPool.doExecute(client -> client.commentSum(cid, call),QueryComment.AsyncClient.class);
return call.getResult();
}catch (Exception e){
} catch (Exception e) {
log.error(e);
}
return null;
@ -65,20 +66,20 @@ public class DataController extends TableController<DataModel, DataModelForm, Da
@RequestMapping("download")
public ResponseEntity export(DataModelForm form) throws Exception {
if(StringUtils.isNotEmpty(form.getFileName())&&form.getCids()!=null&&!form.getCids().isEmpty()) {
if (StringUtils.isNotEmpty(form.getFileName()) && form.getCids() != null && !form.getCids().isEmpty()) {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_OCTET_STREAM);
headers.setContentDispositionFormData("attachment", form.getFileName()+".zip");
MyAsyncMethodCallback<String> call= new MyAsyncMethodCallback<>();
ThriftClientPool.doExecute(client -> client.download(form.getCids(),form.getFileName(),call));
String filePath=call.getResult();
if(filePath!=null) {
headers.setContentDispositionFormData("attachment", form.getFileName() + ".zip");
MyAsyncMethodCallback<String> call = new MyAsyncMethodCallback<>();
ThriftClientPool.doExecute(client -> client.download(form.getCids(), form.getFileName(), call),QueryComment.AsyncClient.class);
String filePath = call.getResult();
if (filePath != null) {
return new ResponseEntity<>(FileUtils.readFileToByteArray(new File(filePath)), headers, HttpStatus.CREATED);
}else{
} else {
return new ResponseEntity<>("弹幕下载失败", HttpStatus.INTERNAL_SERVER_ERROR);
}
}else{
return new ResponseEntity("参数异常",HttpStatus.BAD_REQUEST);
} else {
return new ResponseEntity("参数异常", HttpStatus.BAD_REQUEST);
}
}
@ -114,13 +115,13 @@ public class DataController extends TableController<DataModel, DataModelForm, Da
@Override
protected List<OtherDataResult> otherDatas(DataModelForm command) {
return new ArrayList<OtherDataResult>(){{
return new ArrayList<OtherDataResult>() {{
add(new OtherDataResult());
}};
}
@Override
protected void setModel(Model model) {
model.addAttribute(key,regionJson);
model.addAttribute(key, regionJson);
}
}

@ -10,6 +10,7 @@ import org.hibernate.criterion.DetachedCriteria;
import org.hibernate.criterion.Projections;
import org.springframework.dao.DataAccessException;
import org.springframework.orm.hibernate5.HibernateTemplate;
import org.springframework.util.ClassUtils;
import java.util.List;
@ -25,7 +26,7 @@ public abstract class BaseService{
public final HibernateTemplate getHibernateTemplate() {
if(hibernateTemplate==null) {
hibernateTemplate=HibernateConfig.get(this.getClass().getAnnotation(Model.class).value());
hibernateTemplate=HibernateConfig.get(ClassUtils.getUserClass(this.getClass()).getAnnotation(Model.class).value());
}
return hibernateTemplate;
}
@ -54,6 +55,10 @@ public abstract class BaseService{
return getHibernateTemplate().findByCriteria(criteria,firstResult,maxResults<getMaxResults()?maxResults:getMaxResults());
}
public final List find(DetachedCriteria criteria){
return getHibernateTemplate().findByCriteria(criteria);
}
/**
* 限制分页查询最大显示数
* @return

@ -3,13 +3,16 @@ package web.util;
import db.model.bilibili.ScheduledTaskEntity;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.orm.hibernate5.HibernateTemplate;
import org.hibernate.criterion.DetachedCriteria;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.config.TriggerTask;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Service;
import web.service.BiliService;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;
@ -17,31 +20,27 @@ import java.util.Date;
* Created by reborn on 2017/5/8.
* 动态定时器任务管理类
*/
//@Component
public class DynamicTimer extends ScheduledTaskRegistrar {
@Service
public class DynamicTimer extends ScheduledTaskRegistrar implements ApplicationListener<ContextRefreshedEvent> {
private static Logger log=LogManager.getLogger();
@Resource
private HibernateTemplate hibernateTemplate;
@Resource
private BiliService biliService;
@PostConstruct
public void init(){
hibernateTemplate.loadAll(ScheduledTaskEntity.class).forEach(this::addTriggerTask);
}
private static Logger log=LogManager.getLogger();
public void addTriggerTask(ScheduledTaskEntity scheduledTaskEntity) {
super.scheduleTriggerTask(new TriggerTask(()->{
},(TriggerContext triggerContext)->{
CronTrigger trigger = new CronTrigger(scheduledTaskEntity.getExpression());
log.debug(triggerContext+"");
Date nextExec = trigger.nextExecutionTime(triggerContext);
return nextExec;
}));
}
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
biliService.find(DetachedCriteria.forClass(ScheduledTaskEntity.class)).forEach(entity->addTriggerTask((ScheduledTaskEntity)entity));
}
}

@ -1,8 +1,10 @@
import core.thrift.MyAsyncMethodCallback;
import core.thrift.ThriftClientPool;
import db.model.bilibili.ScheduledTaskEntity;
import db.util.ExportUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.hibernate.criterion.DetachedCriteria;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -32,7 +34,7 @@ import java.util.ArrayList;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringJUnitWebConfig
@ContextConfiguration(classes = {AppConfig.class, SpringConfig.class})
public class SpringTest {
public class SpringMVCTest {
private static Logger log = LogManager.getLogger();
private MockMvc mockMvc;
@ -40,19 +42,12 @@ public class SpringTest {
@Autowired
WebApplicationContext wac;
@Autowired
private DefaultListableBeanFactory beanFactory;
@Autowired
private DataService dataService;
@Autowired
private BiliService biliService;
@Autowired
@Qualifier("Hweb.service.DataService")
private HibernateTemplate template1;
@Autowired
@Qualifier("Hweb.service.BiliService")
private HibernateTemplate template2;
@ -106,5 +101,10 @@ public class SpringTest {
resultActions.andDo(MockMvcResultHandlers.print()).andReturn();
}
@Test
public void test7(){
biliService.find(DetachedCriteria.forClass(ScheduledTaskEntity.class)).forEach(a->log.info(a));
}
}

@ -1,5 +1,3 @@
package core.thrift;
import core.thrift.comment.QueryComment;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;

@ -1,5 +1,7 @@
import core.thrift.MyAsyncMethodCallback;
import core.thrift.ThriftClientPool;
import core.thrift.comment.QueryComment;
import core.thrift.task.TSDM;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.Test;
@ -22,7 +24,7 @@ public class ThriftTest {
@Test
public void commentSum() throws Exception {
MyAsyncMethodCallback<Integer> call = new MyAsyncMethodCallback<>();
ThriftClientPool.doExecute(client -> client.commentSum(49052, call));
ThriftClientPool.doExecute(client -> client.commentSum(49052,call),QueryComment.AsyncClient.class);
log.info(call.getResult());
}
@ -31,7 +33,15 @@ public class ThriftTest {
MyAsyncMethodCallback<String> call = new MyAsyncMethodCallback<>();
ThriftClientPool.doExecute(client ->client.download(new ArrayList<Integer>(){{
add(49052);
}},"test",call));
}},"test",call),QueryComment.AsyncClient.class);
log.info(call.getResult());
}
@Test
public void work() throws Exception {
MyAsyncMethodCallback<Boolean> call = new MyAsyncMethodCallback<>();
ThriftClientPool.doExecute(client -> client.qiandao(call),TSDM.AsyncClient.class);
log.info(call.getResult());
}
}

Loading…
Cancel
Save