master
WuXianChaoPin 7 years ago
parent a21dd29e01
commit ab19a7e43c
  1. 85
      core/src/main/java/core/thrift/SocketConfig.java
  2. 26
      core/src/main/java/core/thrift/TNonblockingSocketPool.java
  3. 26
      core/src/main/java/core/thrift/TSocketPool.java
  4. 46
      core/src/main/java/core/thrift/TTransportPool.java
  5. 70
      core/src/main/java/core/thrift/ThriftClientPool.java
  6. 1
      db/src/main/java/db/config/HibernateConfig.java
  7. 14
      db/src/main/java/db/config/MyEmptyInterceptor.java
  8. 0
      sql/db.sql
  9. 18
      sql/new.sql
  10. 6
      web/src/main/java/web/controller/DataController.java
  11. 3
      web/src/main/resources/config.properties
  12. 1
      web/src/test/java/MainTest.java
  13. 48
      web/src/test/java/thrift/ThriftTest.java
  14. 10
      web/src/test/java/web/controller/TestController.java

@ -0,0 +1,85 @@
package core.thrift;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.async.TAsyncClient;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
@Component
public class SocketConfig {
@Value("${thrift.ip}")
private String ip;
@Value("${thrift.port}")
private Integer port;
@Value("${thrift.timeout}")
private Integer timeout;
private GenericObjectPool<TNonblockingSocket> tNonblockingSocketPool;
private GenericObjectPool<TSocket> tSocketPool;
private final Map<Class, Constructor> clientMap = new HashMap<>();
@PostConstruct
public void init(){
tNonblockingSocketPool=new GenericObjectPool<>(new TNonblockingSocketPool(ip,port,timeout));
tSocketPool=new GenericObjectPool<>(new TSocketPool(ip,port,timeout));
}
public <C extends TAsyncClient> void doExecute(Class<C> c, AsyncClient<C> thrift) throws Exception {
//从池里获取一个Transport对象
TNonblockingSocket trans = tNonblockingSocketPool.borrowObject();
thrift.doExecute(client(c, trans));
//把一个Transport对象归还到池里
tNonblockingSocketPool.returnObject(trans);
}
public <C extends TServiceClient> void doExecute(Class<C> c,ServiceClient<C> thrift) throws Exception{
TSocket trans=tSocketPool.borrowObject();
thrift.doExecute(client(c,trans));
tSocketPool.returnObject(trans);
}
private interface Thrift<E> {
void doExecute(E client) throws Exception;
}
public interface AsyncClient<E extends TAsyncClient> extends Thrift<E>{}
public interface ServiceClient<E extends TServiceClient> extends Thrift<E>{}
public <T extends TTransport,C> C client(Class<C> c, T trans) throws Exception {
if (trans instanceof TNonblockingTransport) {
if (!clientMap.containsKey(c)) {
clientMap.put(c, c.getConstructor(TProtocolFactory.class, TAsyncClientManager.class, TNonblockingTransport.class));
}
TProtocolFactory protocol = new TBinaryProtocol.Factory();
TAsyncClientManager clientManager = new TAsyncClientManager();
return (C) clientMap.get(c).newInstance(protocol, clientManager, trans);
} else {
if (!clientMap.containsKey(c)) {
clientMap.put(c, c.getConstructor(TProtocol.class));
}
TBinaryProtocol protocol = new TBinaryProtocol(trans);
return (C) clientMap.get(c).newInstance(protocol);
}
}
}

@ -0,0 +1,26 @@
package core.thrift;
import org.apache.thrift.async.TAsyncClient;
import org.apache.thrift.transport.TNonblockingSocket;
import java.net.Socket;
public class TNonblockingSocketPool extends TTransportPool<TNonblockingSocket,TAsyncClient> {
public TNonblockingSocketPool(String ip, Integer port, Integer timeout) {
super(ip, port, timeout);
}
@Override
public Socket socket(TNonblockingSocket o) {
return o.getSocketChannel().socket();
}
@Override
public TNonblockingSocket create() throws Exception {
return new TNonblockingSocket(ip,port,timeout);
}
}

@ -0,0 +1,26 @@
package core.thrift;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.transport.TSocket;
import java.net.Socket;
public class TSocketPool extends TTransportPool<TSocket,TServiceClient>{
public TSocketPool(String ip, Integer port, Integer timeout) {
super(ip, port, timeout);
}
@Override
public Socket socket(TSocket o) {
return o.getSocket();
}
@Override
public TSocket create() throws Exception {
TSocket socket=new TSocket(ip,port,timeout);
socket.open();
return socket;
}
}

@ -3,31 +3,26 @@ package core.thrift;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TTransport;
import java.net.Socket;
public abstract class TTransportPool<T extends TTransport, C> extends BasePooledObjectFactory<T> {
public class ConnectionPoolFactory extends BasePooledObjectFactory<TNonblockingSocket> {
private String ip;
private int port;
private int socketTimeout;
protected String ip;
public ConnectionPoolFactory(String ip, int port, int socketTimeout) {
protected Integer port;
protected Integer timeout;
public TTransportPool(String ip, Integer port, Integer timeout) {
this.ip = ip;
this.port = port;
this.socketTimeout = socketTimeout;
}
@Override
//创建TNonblockingSocket类型对象方法
public TNonblockingSocket create() throws Exception {
return new TNonblockingSocket(ip, port, socketTimeout);
this.timeout = timeout;
}
//把TNonblockingSocket对象打包成池管理的对象PooledObject<TNonblockingSocket>
@Override
public PooledObject<TNonblockingSocket> wrap(TNonblockingSocket transport) {
public PooledObject<T> wrap(T transport) {
return new DefaultPooledObject<>(transport);
}
@ -35,8 +30,8 @@ public class ConnectionPoolFactory extends BasePooledObjectFactory<TNonblockingS
* 验证对象
*/
@Override
public boolean validateObject(PooledObject<TNonblockingSocket> p) {
Socket socket=p.getObject().getSocketChannel().socket();
public boolean validateObject(PooledObject<T> p) {
Socket socket = socket(p.getObject());
boolean closed = socket.isClosed();
boolean connected = socket.isConnected();
boolean outputShutdown = socket.isOutputShutdown();
@ -53,11 +48,26 @@ public class ConnectionPoolFactory extends BasePooledObjectFactory<TNonblockingS
return urgentFlag && connected && !closed && !inputShutdown && !outputShutdown;
}
/**
* 获取socket
*/
public abstract Socket socket(T o);
/**
* 销毁对象
*/
@Override
public void destroyObject(PooledObject<TNonblockingSocket> p) throws Exception {
public void destroyObject(PooledObject<T> p) {
p.getObject().close();
}
}

@ -1,70 +0,0 @@
package core.thrift;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.thrift.async.TAsyncClient;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TNonblockingTransport;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
@Service
public class ThriftClientPool {
@Value("${thrift.ip}")
private String ip;
@Value("${thrift.port}")
private Integer port;
@Value("${thrift.timeout}")
private Integer timeout;
private static GenericObjectPool<TNonblockingSocket> pool;
private static final Map<Class<? extends TAsyncClient>, Constructor> asyncClient = new HashMap<>();
private static final Map<Class<? extends TAsyncClient>, Constructor> synchronizeClient = new HashMap<>();
@PostConstruct
public void init() {
pool = new GenericObjectPool<>(new ConnectionPoolFactory(ip, port, timeout));
}
public static <E extends TAsyncClient> void doExecute(Thrift<E> thrift,Class<E> c) throws Exception {
//从池里获取一个Transport对象
TNonblockingSocket socket = pool.borrowObject();
thrift.doExecute(client(c,socket));
//把一个Transport对象归还到池里
pool.returnObject(socket);
}
private static <E extends TAsyncClient> E client(Class<E> c, TNonblockingSocket socket) throws NoSuchMethodException, IOException, IllegalAccessException, InvocationTargetException, InstantiationException {
if (!asyncClient.containsKey(c)) {
asyncClient.put(c, c.getConstructor(TProtocolFactory.class, TAsyncClientManager.class, TNonblockingTransport.class));
}
TAsyncClientManager clientManager = new TAsyncClientManager();
TProtocolFactory protocol = new TBinaryProtocol.Factory();
return (E) asyncClient.get(c).newInstance(protocol,clientManager,socket);
}
public interface Thrift<E extends TAsyncClient> {
void doExecute(E client) throws Exception;
}
}

@ -134,7 +134,6 @@ public class HibernateConfig {
//按照注解标记从指定包扫描实体类,并且注册到session工厂
bean.setPackagesToScan(basePackage);
bean.setEntityTypeFilters(new AnnotationTypeFilter(c));
bean.setEntityInterceptor(new MyEmptyInterceptor());
},dataSourceModel.getDbName());
if (sessionFactory != null) {

@ -1,14 +0,0 @@
package db.config;
import org.hibernate.EmptyInterceptor;
public class MyEmptyInterceptor extends EmptyInterceptor {
@Override
public String onPrepareStatement(String sql) {
return super.onPrepareStatement(sql);
}
}

@ -0,0 +1,18 @@
--
create table menu
(
id int auto_increment--
primary key,
href varchar(32) not null,--
menu_name varchar(10) not null,--
icon varchar(20) null,--
parent_id int not null,--id
menu_level int not null,--
sort int not null,--
constraint menu_href_uindex
unique (href)
)
engine=InnoDB
;

@ -1,8 +1,6 @@
package web.controller;
import core.thrift.MyAsyncMethodCallback;
import core.thrift.ThriftClientPool;
import core.thrift.comment.QueryComment;
import db.form.DBAction;
import db.model.bilibili.DataModel;
import org.apache.commons.io.FileUtils;
@ -54,7 +52,7 @@ public class DataController extends TableController<DataModel, DataModelForm, Da
public Integer count(@PathVariable int cid) {
try {
MyAsyncMethodCallback<Integer> call = new MyAsyncMethodCallback<>();
ThriftClientPool.doExecute(client -> client.commentSum(cid, call),QueryComment.AsyncClient.class);
// TTransportUtil.doExecute(client -> client.commentSum(cid, call),QueryComment.AsyncClient.class);
return call.getResult();
} catch (Exception e) {
log.error(e);
@ -69,7 +67,7 @@ public class DataController extends TableController<DataModel, DataModelForm, Da
headers.setContentType(MediaType.APPLICATION_OCTET_STREAM);
headers.setContentDispositionFormData("attachment", form.getFileName() + ".zip");
MyAsyncMethodCallback<String> call = new MyAsyncMethodCallback<>();
ThriftClientPool.doExecute(client -> client.download(form.getCids(), form.getFileName(), call),QueryComment.AsyncClient.class);
// TTransportUtil.doExecute(client -> client.download(form.getCids(), form.getFileName(), call),QueryComment.AsyncClient.class);
String filePath = call.getResult();
if (filePath != null) {
return new ResponseEntity<>(FileUtils.readFileToByteArray(new File(filePath)), headers, HttpStatus.CREATED);

@ -11,4 +11,5 @@ druid.maxActive=20
#thrift\u914D\u7F6E
thrift.ip=127.0.0.1
thrift.port=2233
thrift.timeout=3000
thrift.timeout=3000

@ -1,4 +1,5 @@
public class MainTest {
public static void main(String[] args) {
}
}

@ -1,23 +1,17 @@
package thrift;
import core.thrift.MyAsyncMethodCallback;
import core.thrift.ThriftClientPool;
import core.thrift.comment.QueryComment;
import core.thrift.SocketConfig;
import core.thrift.task.TSDM;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.web.SpringJUnitWebConfig;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.io.IOException;
import java.util.ArrayList;
import javax.annotation.Resource;
//让测试运行于Spring环境
@RunWith(SpringJUnit4ClassRunner.class)
@ -25,42 +19,22 @@ import java.util.ArrayList;
@ContextConfiguration(classes = SpringConfig.class)
public class ThriftTest {
private static Logger log = LogManager.getLogger();
@Resource
private SocketConfig config;
@Test
public void commentSum() throws Exception {
int i=100;
while (i-->0) {
MyAsyncMethodCallback<Integer> call = new MyAsyncMethodCallback<>();
ThriftClientPool.doExecute(client -> client.commentSum(49052, call), QueryComment.AsyncClient.class);
log.info(call.getResult());
Thread.sleep(10);
}
}
@Test
public void commentSum_() throws IOException, TException {
TTransport transport=new TSocket("sukura.top",2233);
transport.open();
TBinaryProtocol protocol=new TBinaryProtocol(transport);
QueryComment.Client client=new QueryComment.Client(protocol);
log.info(client.commentSum(49052));
}
private static Logger log = LogManager.getLogger();
@Test
public void download() throws Exception {
MyAsyncMethodCallback<String> call = new MyAsyncMethodCallback<>();
ThriftClientPool.doExecute(client -> client.download(new ArrayList<Integer>() {{
add(49052);
}}, "test", call), QueryComment.AsyncClient.class);
public void test1() throws Exception {
MyAsyncMethodCallback<Boolean> call=new MyAsyncMethodCallback<>();
config.doExecute(TSDM.AsyncClient.class,client -> client.qiandao(call));
log.info(call.getResult());
}
@Test
public void work() throws Exception {
MyAsyncMethodCallback<Boolean> call = new MyAsyncMethodCallback<>();
ThriftClientPool.doExecute(client -> client.word(call), TSDM.AsyncClient.class);
log.info(call.getResult());
public void test2() throws Exception{
config.doExecute(TSDM.Client.class,client -> log.info(client.qiandao()));
}
}

@ -13,11 +13,11 @@ public class TestController extends BiliController<TaskModel>{
@ResponseBody
@RequestMapping(value = "task",produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public TaskModel test(){
TaskModel model=new TaskModel();
model.setId(1);
model.setApi("www.baidu.com");
return model;
public String test(){
TaskModel taskModel=new TaskModel();
taskModel.setId(1);
taskModel.setApi("www.baidu.com");
return taskModel.toString();
}
@Override

Loading…
Cancel
Save