master
WuXianChaoPin 7 years ago
parent 341f0f7c2f
commit f46342bc9a
  1. 7
      core/pom.xml
  2. 32
      core/src/main/java/core/thrift/ConnectionPoolFactory.java
  3. 28
      core/src/main/java/core/thrift/MyAsyncMethodCallback.java
  4. 61
      core/src/main/java/core/thrift/ThriftClientDemo.java
  5. 53
      core/src/main/java/core/thrift/ThriftClientPool.java
  6. 2
      web/src/main/java/web/config/SpringConfig.java
  7. 11
      web/src/main/java/web/controller/DataController.java
  8. 8
      web/src/main/resources/config.properties
  9. 13
      web/src/main/resources/db.properties
  10. 8
      web/src/test/java/SpringTest.java
  11. 3
      web/src/test/java/Test.java

@ -13,6 +13,7 @@
<properties>
<thrift.version>RELEASE</thrift.version>
<commons-pool2.version>RELEASE</commons-pool2.version>
</properties>
<dependencies>
@ -59,6 +60,12 @@
<version>${thrift.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>${commons-pool2.version}</version>
</dependency>
</dependencies>
<build>
<finalName>core</finalName>

@ -0,0 +1,32 @@
package core.thrift;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TTransport;
public class ConnectionPoolFactory extends BasePooledObjectFactory<TTransport>{
private String ip;
private int port;
private int socketTimeout;
public ConnectionPoolFactory(String ip,int port,int socketTimeout) {
this.ip = ip;
this.port = port;
this.socketTimeout = socketTimeout;
}
@Override
//创建TTransport类型对象方法
public TTransport create() throws Exception {
return new TNonblockingSocket(ip, port,socketTimeout);
}
//把TTransport对象打包成池管理的对象PooledObject<TTransport>
@Override
public PooledObject<TTransport> wrap(TTransport transport) {
return new DefaultPooledObject<>(transport);
}
}

@ -0,0 +1,28 @@
package core.thrift;
import org.apache.thrift.async.AsyncMethodCallback;
public class MyAsyncMethodCallback<E> implements AsyncMethodCallback<E> {
private boolean onComplete=true;
private E result;
public E getResult() throws InterruptedException {
while (onComplete){
Thread.sleep(100);
}
return result;
}
@Override
public void onComplete(E o) {
onComplete=false;
result=o;
}
@Override
public void onError(Exception e) {
e.printStackTrace();
}
}

@ -1,61 +0,0 @@
package core.thrift;
import core.thrift.comment.QueryComment;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import java.util.List;
public class ThriftClientDemo {
private static final String host="127.0.0.1";
private static final int port=2233;
private static Logger log=LogManager.getLogger();
private static final TTransport tTransport = getTTransport(host, port, 5000);
public static List<Integer> commentSumList(List<Integer> cids) throws Exception {
List<Integer> result= client().commentSumList(cids);
log.info("查询结果:"+result);
return result;
}
public static int commentSum(int cid) throws Exception {
int result=client().commentSum(cid);
log.info("查询结果:"+result);
return result;
}
public static String downloadXml(List<Integer> cids,String fileName) throws Exception {
String result=client().download(cids,fileName);
log.info("查询结果:"+result);
return result;
}
private static QueryComment.Client client() throws Exception {
TTransport tTransport = getTTransport();
TProtocol protocol = new TBinaryProtocol(tTransport);
return new QueryComment.Client(protocol);
}
public static TTransport getTTransport() throws Exception{
try{
if(!tTransport.isOpen()){
tTransport.open();
}
return tTransport;
}catch(Exception e){
e.printStackTrace();
}
return null;
}
private static TTransport getTTransport(String host, int port, int timeout) {
final TSocket tSocket = new TSocket(host, port, timeout);
final TTransport transport = new TFramedTransport(tSocket);
return transport;
}
}

@ -0,0 +1,53 @@
package core.thrift;
import core.thrift.comment.QueryComment;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransport;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Service
public class ThriftClientPool {
@Value("${thrift.ip}")
private String ip;
@Value("${thrift.port}")
private int port;
@Value("${thrift.timeout}")
private int timeout;
private static GenericObjectPool<TTransport> pool;
@PostConstruct
public void init() {
pool = new GenericObjectPool<>(new ConnectionPoolFactory(ip, port, timeout));
}
public static void doExecute(Thrift thrift) throws Exception {
//从池里获取一个Transport对象
TTransport transport = pool.borrowObject();
TAsyncClientManager clientManager = new TAsyncClientManager();
TProtocolFactory protocol = new TBinaryProtocol.Factory();
QueryComment.AsyncClient asyncClient = new QueryComment.AsyncClient(protocol, clientManager, (TNonblockingTransport) transport);
thrift.doExecute(asyncClient);
//把一个Transport对象归还到池里
pool.returnObject(transport);
}
public interface Thrift {
void doExecute(QueryComment.AsyncClient client) throws Exception;
}
}

@ -19,7 +19,7 @@ import java.util.List;
@Configuration
@EnableWebMvc
@PropertySource({"classpath:config.properties"})
@ComponentScan({"web.controller","web.service","web.util","web.aop"})
@ComponentScan({"web.controller","web.service","web.util","web.aop","core.thrift"})
public class SpringConfig implements WebMvcConfigurer {
@Override

@ -1,6 +1,7 @@
package web.controller;
import core.thrift.ThriftClientDemo;
import core.thrift.MyAsyncMethodCallback;
import core.thrift.ThriftClientPool;
import db.form.DBAction;
import db.model.bilibili.DataModel;
import org.apache.commons.io.FileUtils;
@ -53,7 +54,9 @@ public class DataController extends TableController<DataModel, DataModelForm, Da
@RequestMapping(value = "count/{cid}")
public Integer count(@PathVariable int cid) {
try {
return ThriftClientDemo.commentSum(cid);
MyAsyncMethodCallback<Integer> call= new MyAsyncMethodCallback<>();
ThriftClientPool.doExecute(client -> client.commentSum(cid,call));
return call.getResult();
}catch (Exception e){
log.error(e);
}
@ -66,7 +69,9 @@ public class DataController extends TableController<DataModel, DataModelForm, Da
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_OCTET_STREAM);
headers.setContentDispositionFormData("attachment", form.getFileName()+".zip");
String filePath=ThriftClientDemo.downloadXml(form.getCids(),form.getFileName());
MyAsyncMethodCallback<String> call= new MyAsyncMethodCallback<>();
ThriftClientPool.doExecute(client -> client.download(form.getCids(),form.getFileName(),call));
String filePath=call.getResult();
if(filePath!=null) {
return new ResponseEntity<>(FileUtils.readFileToByteArray(new File(filePath)), headers, HttpStatus.CREATED);
}else{

@ -4,5 +4,11 @@ db_username=sukura
db_password=Luffy9412!
hibernate.dialect=org.hibernate.dialect.MySQL57Dialect
hibernate.connection.driver_class=com.mysql.cj.jdbc.Driver
#DruidDataSource\u914D\u7F6E
druid.maxActive=20
druid.maxActive=20
#thrift\u914D\u7F6E
thrift.ip=sukura.top
thrift.port=2233
thrift.timeout=3000

@ -1,13 +0,0 @@
#
#
##\u6570\u636E\u6E901
#hibernate.connection.url1=jdbc:mysql://sukura.top:3306/bilibili?serverTimezone=UTC&useSSL=false
#hibernate.connection.username1=sukura
#hibernate.connection.password1=@
#
##\u6570\u636E\u6E902
#hibernate.connection.url2=jdbc:mysql://mysql.sukura.top:8635/bilibili?serverTimezone=UTC&useSSL=false
#hibernate.connection.username2=sukura
#hibernate.connection.password2=Luffy9412!

@ -1,3 +1,5 @@
import core.thrift.MyAsyncMethodCallback;
import core.thrift.ThriftClientPool;
import db.util.ExportUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -104,4 +106,10 @@ public class SpringTest {
resultActions.andDo(MockMvcResultHandlers.print()).andReturn();
}
@Test
public void thrift() throws Exception {
MyAsyncMethodCallback<Integer> call= new MyAsyncMethodCallback<>();
ThriftClientPool.doExecute(client -> client.commentSum(49052,call));
log.info(call.getResult());
}
}

@ -1,4 +1,3 @@
import core.thrift.ThriftClientDemo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -6,6 +5,6 @@ public class Test {
private static Logger log = LogManager.getLogger();
public static void main(String[] args) throws Exception {
ThriftClientDemo.getTTransport();
}
}

Loading…
Cancel
Save