From ab19a7e43cfb92cd1e86a882158092207a01dcec Mon Sep 17 00:00:00 2001 From: WuXianChaoPin <1029559041@qq.com> Date: Wed, 27 Jun 2018 17:14:21 +0800 Subject: [PATCH] thrift --- .../main/java/core/thrift/SocketConfig.java | 85 +++++++++++++++++++ .../core/thrift/TNonblockingSocketPool.java | 26 ++++++ .../main/java/core/thrift/TSocketPool.java | 26 ++++++ ...onPoolFactory.java => TTransportPool.java} | 46 ++++++---- .../java/core/thrift/ThriftClientPool.java | 70 --------------- .../main/java/db/config/HibernateConfig.java | 1 - .../java/db/config/MyEmptyInterceptor.java | 14 --- {init => sql}/db.sql | 0 sql/new.sql | 18 ++++ .../java/web/controller/DataController.java | 6 +- web/src/main/resources/config.properties | 3 +- web/src/test/java/MainTest.java | 1 + web/src/test/java/thrift/ThriftTest.java | 48 +++-------- .../java/web/controller/TestController.java | 10 +-- 14 files changed, 204 insertions(+), 150 deletions(-) create mode 100644 core/src/main/java/core/thrift/SocketConfig.java create mode 100644 core/src/main/java/core/thrift/TNonblockingSocketPool.java create mode 100644 core/src/main/java/core/thrift/TSocketPool.java rename core/src/main/java/core/thrift/{ConnectionPoolFactory.java => TTransportPool.java} (50%) delete mode 100644 core/src/main/java/core/thrift/ThriftClientPool.java delete mode 100644 db/src/main/java/db/config/MyEmptyInterceptor.java rename {init => sql}/db.sql (100%) create mode 100644 sql/new.sql diff --git a/core/src/main/java/core/thrift/SocketConfig.java b/core/src/main/java/core/thrift/SocketConfig.java new file mode 100644 index 0000000..029b00f --- /dev/null +++ b/core/src/main/java/core/thrift/SocketConfig.java @@ -0,0 +1,85 @@ +package core.thrift; + +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.thrift.TServiceClient; +import org.apache.thrift.async.TAsyncClient; +import org.apache.thrift.async.TAsyncClientManager; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TNonblockingSocket; +import org.apache.thrift.transport.TNonblockingTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.lang.reflect.Constructor; +import java.util.HashMap; +import java.util.Map; + +@Component +public class SocketConfig { + @Value("${thrift.ip}") + private String ip; + + @Value("${thrift.port}") + private Integer port; + + @Value("${thrift.timeout}") + private Integer timeout; + + + private GenericObjectPool tNonblockingSocketPool; + + private GenericObjectPool tSocketPool; + + + private final Map clientMap = new HashMap<>(); + + @PostConstruct + public void init(){ + tNonblockingSocketPool=new GenericObjectPool<>(new TNonblockingSocketPool(ip,port,timeout)); + tSocketPool=new GenericObjectPool<>(new TSocketPool(ip,port,timeout)); + } + + public void doExecute(Class c, AsyncClient thrift) throws Exception { + //从池里获取一个Transport对象 + TNonblockingSocket trans = tNonblockingSocketPool.borrowObject(); + thrift.doExecute(client(c, trans)); + //把一个Transport对象归还到池里 + tNonblockingSocketPool.returnObject(trans); + } + + public void doExecute(Class c,ServiceClient thrift) throws Exception{ + TSocket trans=tSocketPool.borrowObject(); + thrift.doExecute(client(c,trans)); + tSocketPool.returnObject(trans); + } + + private interface Thrift { + void doExecute(E client) throws Exception; + } + + public interface AsyncClient extends Thrift{} + + public interface ServiceClient extends Thrift{} + + public C client(Class c, T trans) throws Exception { + if (trans instanceof TNonblockingTransport) { + if (!clientMap.containsKey(c)) { + clientMap.put(c, c.getConstructor(TProtocolFactory.class, TAsyncClientManager.class, TNonblockingTransport.class)); + } + TProtocolFactory protocol = new TBinaryProtocol.Factory(); + TAsyncClientManager clientManager = new TAsyncClientManager(); + return (C) clientMap.get(c).newInstance(protocol, clientManager, trans); + } else { + if (!clientMap.containsKey(c)) { + clientMap.put(c, c.getConstructor(TProtocol.class)); + } + TBinaryProtocol protocol = new TBinaryProtocol(trans); + return (C) clientMap.get(c).newInstance(protocol); + } + } +} diff --git a/core/src/main/java/core/thrift/TNonblockingSocketPool.java b/core/src/main/java/core/thrift/TNonblockingSocketPool.java new file mode 100644 index 0000000..0df2ca7 --- /dev/null +++ b/core/src/main/java/core/thrift/TNonblockingSocketPool.java @@ -0,0 +1,26 @@ +package core.thrift; + +import org.apache.thrift.async.TAsyncClient; +import org.apache.thrift.transport.TNonblockingSocket; + +import java.net.Socket; + +public class TNonblockingSocketPool extends TTransportPool { + + + public TNonblockingSocketPool(String ip, Integer port, Integer timeout) { + super(ip, port, timeout); + } + + @Override + public Socket socket(TNonblockingSocket o) { + return o.getSocketChannel().socket(); + } + + @Override + public TNonblockingSocket create() throws Exception { + return new TNonblockingSocket(ip,port,timeout); + } + + +} diff --git a/core/src/main/java/core/thrift/TSocketPool.java b/core/src/main/java/core/thrift/TSocketPool.java new file mode 100644 index 0000000..1d6ad19 --- /dev/null +++ b/core/src/main/java/core/thrift/TSocketPool.java @@ -0,0 +1,26 @@ +package core.thrift; + +import org.apache.thrift.TServiceClient; +import org.apache.thrift.transport.TSocket; + +import java.net.Socket; + +public class TSocketPool extends TTransportPool{ + + + public TSocketPool(String ip, Integer port, Integer timeout) { + super(ip, port, timeout); + } + + @Override + public Socket socket(TSocket o) { + return o.getSocket(); + } + + @Override + public TSocket create() throws Exception { + TSocket socket=new TSocket(ip,port,timeout); + socket.open(); + return socket; + } +} diff --git a/core/src/main/java/core/thrift/ConnectionPoolFactory.java b/core/src/main/java/core/thrift/TTransportPool.java similarity index 50% rename from core/src/main/java/core/thrift/ConnectionPoolFactory.java rename to core/src/main/java/core/thrift/TTransportPool.java index f3f4fcc..9f7e292 100644 --- a/core/src/main/java/core/thrift/ConnectionPoolFactory.java +++ b/core/src/main/java/core/thrift/TTransportPool.java @@ -3,31 +3,26 @@ package core.thrift; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; -import org.apache.thrift.transport.TNonblockingSocket; +import org.apache.thrift.transport.TTransport; import java.net.Socket; +public abstract class TTransportPool extends BasePooledObjectFactory { -public class ConnectionPoolFactory extends BasePooledObjectFactory { - private String ip; - private int port; - private int socketTimeout; + protected String ip; - public ConnectionPoolFactory(String ip, int port, int socketTimeout) { + protected Integer port; + + protected Integer timeout; + + public TTransportPool(String ip, Integer port, Integer timeout) { this.ip = ip; this.port = port; - this.socketTimeout = socketTimeout; - } - - @Override - //创建TNonblockingSocket类型对象方法 - public TNonblockingSocket create() throws Exception { - return new TNonblockingSocket(ip, port, socketTimeout); + this.timeout = timeout; } - //把TNonblockingSocket对象打包成池管理的对象PooledObject @Override - public PooledObject wrap(TNonblockingSocket transport) { + public PooledObject wrap(T transport) { return new DefaultPooledObject<>(transport); } @@ -35,8 +30,8 @@ public class ConnectionPoolFactory extends BasePooledObjectFactory p) { - Socket socket=p.getObject().getSocketChannel().socket(); + public boolean validateObject(PooledObject p) { + Socket socket = socket(p.getObject()); boolean closed = socket.isClosed(); boolean connected = socket.isConnected(); boolean outputShutdown = socket.isOutputShutdown(); @@ -53,11 +48,26 @@ public class ConnectionPoolFactory extends BasePooledObjectFactory p) throws Exception { + public void destroyObject(PooledObject p) { p.getObject().close(); } + + + } + + + + + + diff --git a/core/src/main/java/core/thrift/ThriftClientPool.java b/core/src/main/java/core/thrift/ThriftClientPool.java deleted file mode 100644 index f60ca63..0000000 --- a/core/src/main/java/core/thrift/ThriftClientPool.java +++ /dev/null @@ -1,70 +0,0 @@ -package core.thrift; - -import org.apache.commons.pool2.impl.GenericObjectPool; -import org.apache.thrift.async.TAsyncClient; -import org.apache.thrift.async.TAsyncClientManager; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocolFactory; -import org.apache.thrift.transport.TNonblockingSocket; -import org.apache.thrift.transport.TNonblockingTransport; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Service; - -import javax.annotation.PostConstruct; -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.util.HashMap; -import java.util.Map; - -@Service -public class ThriftClientPool { - - @Value("${thrift.ip}") - private String ip; - - @Value("${thrift.port}") - private Integer port; - - @Value("${thrift.timeout}") - private Integer timeout; - - private static GenericObjectPool pool; - - private static final Map, Constructor> asyncClient = new HashMap<>(); - - private static final Map, Constructor> synchronizeClient = new HashMap<>(); - - @PostConstruct - public void init() { - pool = new GenericObjectPool<>(new ConnectionPoolFactory(ip, port, timeout)); - } - - public static void doExecute(Thrift thrift,Class c) throws Exception { - //从池里获取一个Transport对象 - TNonblockingSocket socket = pool.borrowObject(); - thrift.doExecute(client(c,socket)); - //把一个Transport对象归还到池里 - pool.returnObject(socket); - } - - private static E client(Class c, TNonblockingSocket socket) throws NoSuchMethodException, IOException, IllegalAccessException, InvocationTargetException, InstantiationException { - if (!asyncClient.containsKey(c)) { - asyncClient.put(c, c.getConstructor(TProtocolFactory.class, TAsyncClientManager.class, TNonblockingTransport.class)); - } - TAsyncClientManager clientManager = new TAsyncClientManager(); - TProtocolFactory protocol = new TBinaryProtocol.Factory(); - return (E) asyncClient.get(c).newInstance(protocol,clientManager,socket); - } - - - public interface Thrift { - void doExecute(E client) throws Exception; - } - - -} - - - - diff --git a/db/src/main/java/db/config/HibernateConfig.java b/db/src/main/java/db/config/HibernateConfig.java index 317811c..eed6871 100644 --- a/db/src/main/java/db/config/HibernateConfig.java +++ b/db/src/main/java/db/config/HibernateConfig.java @@ -134,7 +134,6 @@ public class HibernateConfig { //按照注解标记从指定包扫描实体类,并且注册到session工厂 bean.setPackagesToScan(basePackage); bean.setEntityTypeFilters(new AnnotationTypeFilter(c)); - bean.setEntityInterceptor(new MyEmptyInterceptor()); },dataSourceModel.getDbName()); if (sessionFactory != null) { diff --git a/db/src/main/java/db/config/MyEmptyInterceptor.java b/db/src/main/java/db/config/MyEmptyInterceptor.java deleted file mode 100644 index 78514fb..0000000 --- a/db/src/main/java/db/config/MyEmptyInterceptor.java +++ /dev/null @@ -1,14 +0,0 @@ -package db.config; - -import org.hibernate.EmptyInterceptor; - -public class MyEmptyInterceptor extends EmptyInterceptor { - - - - - @Override - public String onPrepareStatement(String sql) { - return super.onPrepareStatement(sql); - } -} diff --git a/init/db.sql b/sql/db.sql similarity index 100% rename from init/db.sql rename to sql/db.sql diff --git a/sql/new.sql b/sql/new.sql new file mode 100644 index 0000000..e031393 --- /dev/null +++ b/sql/new.sql @@ -0,0 +1,18 @@ +--菜单表 +create table menu +( + id int auto_increment--自增主键 + primary key, + href varchar(32) not null,--菜单超链接 + menu_name varchar(10) not null,--菜单名 + icon varchar(20) null,--菜单图标 + parent_id int not null,--父菜单id + menu_level int not null,--菜单树深度 + sort int not null,--排序 + constraint menu_href_uindex + unique (href) +) +engine=InnoDB +; + + diff --git a/web/src/main/java/web/controller/DataController.java b/web/src/main/java/web/controller/DataController.java index 080f20f..5d91c49 100644 --- a/web/src/main/java/web/controller/DataController.java +++ b/web/src/main/java/web/controller/DataController.java @@ -1,8 +1,6 @@ package web.controller; import core.thrift.MyAsyncMethodCallback; -import core.thrift.ThriftClientPool; -import core.thrift.comment.QueryComment; import db.form.DBAction; import db.model.bilibili.DataModel; import org.apache.commons.io.FileUtils; @@ -54,7 +52,7 @@ public class DataController extends TableController call = new MyAsyncMethodCallback<>(); - ThriftClientPool.doExecute(client -> client.commentSum(cid, call),QueryComment.AsyncClient.class); +// TTransportUtil.doExecute(client -> client.commentSum(cid, call),QueryComment.AsyncClient.class); return call.getResult(); } catch (Exception e) { log.error(e); @@ -69,7 +67,7 @@ public class DataController extends TableController call = new MyAsyncMethodCallback<>(); - ThriftClientPool.doExecute(client -> client.download(form.getCids(), form.getFileName(), call),QueryComment.AsyncClient.class); +// TTransportUtil.doExecute(client -> client.download(form.getCids(), form.getFileName(), call),QueryComment.AsyncClient.class); String filePath = call.getResult(); if (filePath != null) { return new ResponseEntity<>(FileUtils.readFileToByteArray(new File(filePath)), headers, HttpStatus.CREATED); diff --git a/web/src/main/resources/config.properties b/web/src/main/resources/config.properties index eb5dbac..88fab0b 100644 --- a/web/src/main/resources/config.properties +++ b/web/src/main/resources/config.properties @@ -11,4 +11,5 @@ druid.maxActive=20 #thrift\u914D\u7F6E thrift.ip=127.0.0.1 thrift.port=2233 -thrift.timeout=3000 \ No newline at end of file +thrift.timeout=3000 + diff --git a/web/src/test/java/MainTest.java b/web/src/test/java/MainTest.java index db30d14..57fda09 100644 --- a/web/src/test/java/MainTest.java +++ b/web/src/test/java/MainTest.java @@ -1,4 +1,5 @@ public class MainTest { public static void main(String[] args) { + } } diff --git a/web/src/test/java/thrift/ThriftTest.java b/web/src/test/java/thrift/ThriftTest.java index 0c0c5ae..a334b1f 100644 --- a/web/src/test/java/thrift/ThriftTest.java +++ b/web/src/test/java/thrift/ThriftTest.java @@ -1,23 +1,17 @@ package thrift; import core.thrift.MyAsyncMethodCallback; -import core.thrift.ThriftClientPool; -import core.thrift.comment.QueryComment; +import core.thrift.SocketConfig; import core.thrift.task.TSDM; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit.jupiter.web.SpringJUnitWebConfig; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import java.io.IOException; -import java.util.ArrayList; +import javax.annotation.Resource; //让测试运行于Spring环境 @RunWith(SpringJUnit4ClassRunner.class) @@ -25,42 +19,22 @@ import java.util.ArrayList; @ContextConfiguration(classes = SpringConfig.class) public class ThriftTest { - private static Logger log = LogManager.getLogger(); + @Resource + private SocketConfig config; - @Test - public void commentSum() throws Exception { - int i=100; - while (i-->0) { - MyAsyncMethodCallback call = new MyAsyncMethodCallback<>(); - ThriftClientPool.doExecute(client -> client.commentSum(49052, call), QueryComment.AsyncClient.class); - log.info(call.getResult()); - Thread.sleep(10); - } - } - - @Test - public void commentSum_() throws IOException, TException { - TTransport transport=new TSocket("sukura.top",2233); - transport.open(); - TBinaryProtocol protocol=new TBinaryProtocol(transport); - QueryComment.Client client=new QueryComment.Client(protocol); - log.info(client.commentSum(49052)); - } + private static Logger log = LogManager.getLogger(); @Test - public void download() throws Exception { - MyAsyncMethodCallback call = new MyAsyncMethodCallback<>(); - ThriftClientPool.doExecute(client -> client.download(new ArrayList() {{ - add(49052); - }}, "test", call), QueryComment.AsyncClient.class); + public void test1() throws Exception { + MyAsyncMethodCallback call=new MyAsyncMethodCallback<>(); + config.doExecute(TSDM.AsyncClient.class,client -> client.qiandao(call)); log.info(call.getResult()); + } @Test - public void work() throws Exception { - MyAsyncMethodCallback call = new MyAsyncMethodCallback<>(); - ThriftClientPool.doExecute(client -> client.word(call), TSDM.AsyncClient.class); - log.info(call.getResult()); + public void test2() throws Exception{ + config.doExecute(TSDM.Client.class,client -> log.info(client.qiandao())); } } diff --git a/web/src/test/java/web/controller/TestController.java b/web/src/test/java/web/controller/TestController.java index 29e6f46..f27b6cb 100644 --- a/web/src/test/java/web/controller/TestController.java +++ b/web/src/test/java/web/controller/TestController.java @@ -13,11 +13,11 @@ public class TestController extends BiliController{ @ResponseBody @RequestMapping(value = "task",produces = MediaType.APPLICATION_JSON_UTF8_VALUE) - public TaskModel test(){ - TaskModel model=new TaskModel(); - model.setId(1); - model.setApi("www.baidu.com"); - return model; + public String test(){ + TaskModel taskModel=new TaskModel(); + taskModel.setId(1); + taskModel.setApi("www.baidu.com"); + return taskModel.toString(); } @Override