我正在研究postgres复制流API.在努力工作时遇到了不寻常的行为.
当我使用复制槽在主块内写入整个代码时,一切正常.
public class Server implements Config { public static void main(String[] args) { Properties prop = new Properties(); prop.load(new FileInputStream(System.getProperty("prop"))); String user = prop.getProperty("user"); String password = prop.getProperty("password"); String url = prop.getProperty("url"); Properties props = new Properties(); PGProperty.USER.set(props,user); PGProperty.PASSWORD.set(props,password); PGProperty.ASSUME_MIN_SERVER_VERSION.set(props,"9.4"); PGProperty.REPLICATION.set(props,"database"); PGProperty.PREFER_QUERY_MODE.set(props,"simple"); Connection conn= null; PGConnection replicationConnection= null; PGReplicationStream stream = null; conn = DriverManager.getConnection(url,props); replicationConnection = conn.unwrap(PGConnection.class); stream = replicationConnection.getReplicationAPI().replicationStream().logical() .withSlotName("replication_slot") .withSlotOption("include-xids",true) .withSlotOption("include-timestamp","on") .withSlotOption("skip-empty-xacts",true) .withStatusInterval(20,TimeUnit.SECONDS).start(); while (true) { ByteBuffer msg; try { msg = stream.readPending(); if (msg == null) { TimeUnit.MILLISECONDS.sleep(10L); continue; } int offset = msg.arrayOffset(); byte[] source = msg.array(); int length = source.length - offset; // convert byte buffer into string String data = new String(source,offset,length); // then convert it into bufferedreader BufferedReader reader = new BufferedReader(new StringReader(data)); String line = reader.readLine(); while (line != null) { System.out.println(line); line = reader.readLine(); } stream.setAppliedLSN(stream.getLastReceiveLSN()); stream.setFlushedLSN(stream.getLastReceiveLSN()); } catch (sqlException | IOException | InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
但是当我尝试使用这样的单独方法分离流逻辑时
public class Server implements Config { public static void main(String[] args) { PGReplicationStream stream = getReplicationStream(); while (true) { ByteBuffer msg; try { msg = stream.readPending(); if (msg == null) { TimeUnit.MILLISECONDS.sleep(10L); continue; } int offset = msg.arrayOffset(); byte[] source = msg.array(); int length = source.length - offset; String data = new String(source,length); BufferedReader reader = new BufferedReader(new StringReader(data)); String line = reader.readLine(); while (line != null) { System.out.println(line); line = reader.readLine(); } stream.setAppliedLSN(stream.getLastReceiveLSN()); stream.setFlushedLSN(stream.getLastReceiveLSN()); } catch (sqlException | IOException | InterruptedException e) { e.printStackTrace(); } } } public static PGReplicationStream getReplicationStream() { Properties prop = new Properties(); try { prop.load(new FileInputStream(System.getProperty("prop"))); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } String user = prop.getProperty("user"); String password = prop.getProperty("password"); String url = prop.getProperty("url"); Properties props = new Properties(); PGProperty.USER.set(props,"simple"); Connection conn= null; PGConnection replicationConnection= null; PGReplicationStream stream = null; try { conn = DriverManager.getConnection(url,TimeUnit.SECONDS).start(); } catch (sqlException e) { e.printStackTrace(); } return stream; }
}
读完一些数据后,程序出错了
org.postgresql.util.PsqlException: Database connection Failed when reading from copy at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1028) at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:41) at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:155) at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:124) at org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:78) at Server.main(Server.java:47) Caused by: java.net.SocketException: Socket closed at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:140) at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:109) at org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:191) at org.postgresql.core.PGStream.receive(PGStream.java:495) at org.postgresql.core.PGStream.receive(PGStream.java:479) at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1161) at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1026) ... 5 more