所以这是我到目前为止发现的
类org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer
@Override public void synchronizeToLocalDirectory(final File localDirectory) { final String remoteDirectory = this.remoteDirectoryExpression.getValue(this.evaluationContext,String.class); try { int transferred = this.remoteFileTemplate.execute(new SessionCallback<F,Integer>() { @Override public Integer doInSession(Session<F> session) throws IOException { F[] files = session.list(remoteDirectory); if (!ObjectUtils.isEmpty(files)) { List<F> filteredFiles = filterFiles(files); for (F file : filteredFiles) { try { if (file != null) { copyFileToLocalDirectory( remoteDirectory,file,localDirectory,session); } } catch (RuntimeException e) { if (AbstractInboundFileSynchronizer.this.filter instanceof ReversibleFileListFilter) { ((ReversibleFileListFilter<F>) AbstractInboundFileSynchronizer.this.filter) .rollback(file,filteredFiles); } throw e; } catch (IOException e) { if (AbstractInboundFileSynchronizer.this.filter instanceof ReversibleFileListFilter) { ((ReversibleFileListFilter<F>) AbstractInboundFileSynchronizer.this.filter) .rollback(file,filteredFiles); } throw e; } } return filteredFiles.size(); } else { return 0; } } }); if (this.logger.isDebugEnabled()) { this.logger.debug(transferred + " files transferred"); } } catch (Exception e) { throw new MessagingException("Problem occurred while synchronizing remote to local directory",e); } }
过滤要下载的文件.我想使用org.springframework.integration.ftp.filters.FtpPersistentAcceptOnceFileListFilter,它比较文件名和最后修改的日期.
然后,它将使用已过滤的文件(待复制)调用copyFileToLocalDirectory函数.
protected void copyFileToLocalDirectory(String remoteDirectoryPath,F remoteFile,File localDirectory,Session<F> session) throws IOException { String remoteFileName = this.getFilename(remoteFile); String localFileName = this.generateLocalFileName(remoteFileName); String remoteFilePath = remoteDirectoryPath != null ? (remoteDirectoryPath + this.remoteFileSeparator + remoteFileName) : remoteFileName; if (!this.isFile(remoteFile)) { if (this.logger.isDebugEnabled()) { this.logger.debug("cannot copy,not a file: " + remoteFilePath); } return; } File localFile = new File(localDirectory,localFileName); if (!localFile.exists()) { String tempFileName = localFile.getAbsolutePath() + this.temporaryFileSuffix; File tempFile = new File(tempFileName); OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile)); try { session.read(remoteFilePath,outputStream); } catch (Exception e) { if (e instanceof RuntimeException) { throw (RuntimeException) e; } else { throw new MessagingException("Failure occurred while copying from remote to local directory",e); } } finally { try { outputStream.close(); } catch (Exception ignored2) { } } if (tempFile.renameTo(localFile)) { if (this.deleteRemoteFiles) { session.remove(remoteFilePath); if (this.logger.isDebugEnabled()) { this.logger.debug("deleted " + remoteFilePath); } } } if (this.preserveTimestamp) { localFile.setLastModified(getModified(remoteFile)); } } }
但是,如果该文件已经存在于本地磁盘上,则此方法将仅检查(仅基于文件名),如果该文件不存在则仅下载.所以基本上没有机会下载更新的文件(带有新的时间戳).
我玩的时候试图改变FtpInboundFileSynchronizer,但它变得太复杂了.什么是“定制”Synchron- / copyToLocalDirectory方法的最佳方式?
解决方法
更新13 / Nov / 2016:找出如何在几秒钟内获得修改时间戳.
更新AbstractInboundFileSynchronizer的主要问题是它具有setter方法,但没有(protected)getter方法.如果将来,setter方法做得很聪明,这里介绍的更新版本会中断.
在本地目录中更新文件的主要问题是并发性:如果正在接收更新的同时处理本地文件,则可能会遇到各种麻烦.简单的方法是将本地文件移动到(临时)处理目录,以便可以接收更新作为新文件,从而无需更新AbstractInboundFileSynchronizer.另见骆驼时间戳remarks.
默认情况下,FTP服务器会在几分钟内提供修改时间戳.对于测试,我更新了FTP客户端以使用MLSD命令,该命令在几秒钟内提供修改时间戳(如果幸运的话,则为毫秒),但并不是所有的FTP服务器都支持这一点.
如Spring FTP reference所述,本地文件过滤器需要是FileSystemPersistentAcceptOnceFileListFilter,以确保修改时间戳更改时本地文件被拾取.
在我更新的AbstractInboundFileSynchronizer的版本之下,然后是我使用的一些测试类.
public class FtpUpdatingFileSynchronizer extends FtpInboundFileSynchronizer { protected final Log logger = LogFactory.getLog(this.getClass()); private volatile Expression localFilenameGeneratorExpression; private volatile EvaluationContext evaluationContext; private volatile boolean deleteRemoteFiles; private volatile String remoteFileSeparator = "/"; private volatile boolean preserveTimestamp; public FtpUpdatingFileSynchronizer(SessionFactory<FTPFile> sessionFactory) { super(sessionFactory); setPreserveTimestamp(true); } @Override public void setLocalFilenameGeneratorExpression(Expression localFilenameGeneratorExpression) { super.setLocalFilenameGeneratorExpression(localFilenameGeneratorExpression); this.localFilenameGeneratorExpression = localFilenameGeneratorExpression; } @Override public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) { super.setIntegrationEvaluationContext(evaluationContext); this.evaluationContext = evaluationContext; } @Override public void setDeleteRemoteFiles(boolean deleteRemoteFiles) { super.setDeleteRemoteFiles(deleteRemoteFiles); this.deleteRemoteFiles = deleteRemoteFiles; } @Override public void setRemoteFileSeparator(String remoteFileSeparator) { super.setRemoteFileSeparator(remoteFileSeparator); this.remoteFileSeparator = remoteFileSeparator; } @Override public void setPreserveTimestamp(boolean preserveTimestamp) { // updated Assert.isTrue(preserveTimestamp,"for updating timestamps must be preserved"); super.setPreserveTimestamp(preserveTimestamp); this.preserveTimestamp = preserveTimestamp; } @Override protected void copyFileToLocalDirectory(String remoteDirectoryPath,FTPFile remoteFile,Session<FTPFile> session) throws IOException { String remoteFileName = this.getFilename(remoteFile); String localFileName = this.generateLocalFileName(remoteFileName); String remoteFilePath = (remoteDirectoryPath != null ? (remoteDirectoryPath + this.remoteFileSeparator + remoteFileName) : remoteFileName); if (!this.isFile(remoteFile)) { if (this.logger.isDebugEnabled()) { this.logger.debug("cannot copy,not a file: " + remoteFilePath); } return; } // start update File localFile = new File(localDirectory,localFileName); boolean update = false; if (localFile.exists()) { if (this.getModified(remoteFile) > localFile.lastModified()) { this.logger.info("Updating local file " + localFile); update = true; } else { this.logger.info("File already exists: " + localFile); return; } } // end update String tempFileName = localFile.getAbsolutePath() + this.getTemporaryFileSuffix(); File tempFile = new File(tempFileName); OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile)); try { session.read(remoteFilePath,outputStream); } catch (Exception e) { if (e instanceof RuntimeException) { throw (RuntimeException) e; } else { throw new MessagingException("Failure occurred while copying from remote to local directory",e); } } finally { try { outputStream.close(); } catch (Exception ignored2) { } } // updated if (update && !localFile.delete()) { throw new MessagingException("Unable to delete local file [" + localFile + "] for update."); } if (tempFile.renameTo(localFile)) { if (this.deleteRemoteFiles) { session.remove(remoteFilePath); if (this.logger.isDebugEnabled()) { this.logger.debug("deleted " + remoteFilePath); } } // updated this.logger.info("Stored file locally: " + localFile); } else { // updated throw new MessagingException("Unable to rename temporary file [" + tempFile + "] to [" + localFile + "]"); } if (this.preserveTimestamp) { localFile.setLastModified(getModified(remoteFile)); } } private String generateLocalFileName(String remoteFileName) { if (this.localFilenameGeneratorExpression != null) { return this.localFilenameGeneratorExpression.getValue(this.evaluationContext,remoteFileName,String.class); } return remoteFileName; } }
遵循我使用的一些测试类.
我使用依赖关系org.springframework.integration:spring-integration-ftp:4.3.5.RELEASE和org.apache.ftpserver:ftpserver-core:1.0.6(加上通常的日志和测试依赖项).
public class TestFtpSync { static final Logger log = LoggerFactory.getLogger(TestFtpSync.class); static final String FTP_ROOT_DIR = "target" + File.separator + "ftproot"; // org.apache.ftpserver:ftpserver-core:1.0.6 static FtpServer server; @BeforeClass public static void startServer() throws FtpException { File ftpRoot = new File (FTP_ROOT_DIR); ftpRoot.mkdirs(); TestUserManager userManager = new TestUserManager(ftpRoot.getAbsolutePath()); FtpServerFactory serverFactory = new FtpServerFactory(); serverFactory.setUserManager(userManager); ListenerFactory factory = new ListenerFactory(); factory.setPort(4444); serverFactory.addListener("default",factory.createListener()); server = serverFactory.createServer(); server.start(); } @AfterClass public static void stopServer() { if (server != null) { server.stop(); } } File ftpFile = Paths.get(FTP_ROOT_DIR,"test1.txt").toFile(); File ftpFile2 = Paths.get(FTP_ROOT_DIR,"test2.txt").toFile(); @Test public void syncDir() { // org.springframework.integration:spring-integration-ftp:4.3.5.RELEASE AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(); try { ctx.register(FtpSyncConf.class); ctx.refresh(); PollableChannel msgChannel = ctx.getBean("inputChannel",PollableChannel.class); for (int j = 0; j < 2; j++) { for (int i = 0; i < 2; i++) { storeFtpFile(); } for (int i = 0; i < 4; i++) { fetchMessage(msgChannel); } } } catch (Exception e) { throw new AssertionError("FTP test Failed.",e); } finally { ctx.close(); cleanup(); } } boolean tswitch = true; void storeFtpFile() throws IOException,InterruptedException { File f = (tswitch ? ftpFile : ftpFile2); tswitch = !tswitch; log.info("Writing message " + f.getName()); Files.write(f.toPath(),("Hello " + System.currentTimeMillis()).getBytes()); } Message<?> fetchMessage(PollableChannel msgChannel) { log.info("Fetching message."); Message<?> msg = msgChannel.receive(1000L); if (msg == null) { log.info("No message."); } else { log.info("Have a message: " + msg); } return msg; } void cleanup() { delFile(ftpFile); delFile(ftpFile2); File d = new File(FtpSyncConf.LOCAL_DIR); if (d.isDirectory()) { for (File f : d.listFiles()) { delFile(f); } } log.info("Finished cleanup"); } void delFile(File f) { if (f.isFile()) { if (f.delete()) { log.info("Deleted " + f); } else { log.error("Cannot delete file " + f); } } } } public class MlistFtpSessionFactory extends AbstractFtpSessionFactory<MlistFtpClient> { @Override protected MlistFtpClient createClientInstance() { return new MlistFtpClient(); } } public class MlistFtpClient extends FTPClient { @Override public FTPFile[] listFiles(String pathname) throws IOException { return super.mlistDir(pathname); } } @EnableIntegration @Configuration public class FtpSyncConf { private static final Logger log = LoggerFactory.getLogger(FtpSyncConf.class); public static final String LOCAL_DIR = "/tmp/received"; @Bean(name = "ftpMetaData") public ConcurrentMetadataStore ftpMetaData() { return new SimpleMetadataStore(); } @Bean(name = "localMetaData") public ConcurrentMetadataStore localMetaData() { return new SimpleMetadataStore(); } @Bean(name = "ftpFileSyncer") public FtpUpdatingFileSynchronizer ftpFileSyncer( @Qualifier("ftpMetaData") ConcurrentMetadataStore MetadataStore) { MlistFtpSessionFactory ftpSessionFactory = new MlistFtpSessionFactory(); ftpSessionFactory.setHost("localhost"); ftpSessionFactory.setPort(4444); ftpSessionFactory.setUsername("demo"); ftpSessionFactory.setPassword("demo"); FtpPersistentAcceptOnceFileListFilter fileFilter = new FtpPersistentAcceptOnceFileListFilter(MetadataStore,"ftp"); fileFilter.setFlushOnUpdate(true); FtpUpdatingFileSynchronizer ftpFileSync = new FtpUpdatingFileSynchronizer(ftpSessionFactory); ftpFileSync.setFilter(fileFilter); // ftpFileSync.setDeleteRemoteFiles(true); return ftpFileSync; } @Bean(name = "syncFtp") @InboundChannelAdapter(value = "inputChannel",poller = @Poller(fixedDelay = "500",maxMessagesPerPoll = "1")) public MessageSource<File> syncChannel( @Qualifier("localMetaData") ConcurrentMetadataStore MetadataStore,@Qualifier("ftpFileSyncer") FtpUpdatingFileSynchronizer ftpFileSync) throws Exception { FtpInboundFileSynchronizingMessageSource messageSource = new FtpInboundFileSynchronizingMessageSource(ftpFileSync); File receiveDir = new File(LOCAL_DIR); receiveDir.mkdirs(); messageSource.setLocalDirectory(receiveDir); messageSource.setLocalFilter(new FileSystemPersistentAcceptOnceFileListFilter(MetadataStore,"local")); log.info("Message source bean created."); return messageSource; } @Bean(name = "inputChannel") public PollableChannel inputChannel() { QueueChannel channel = new QueueChannel(); log.info("Message channel bean created."); return channel; } } /** * Copied from https://github.com/spring-projects/spring-integration-samples/tree/master/basic/ftp/src/test/java/org/springframework/integration/samples/ftp/support * @author Gunnar Hillert * */ public class TestUserManager extends AbstractUserManager { private BaseUser testUser; private BaseUser anonUser; private static final String TEST_USERNAME = "demo"; private static final String TEST_PASSWORD = "demo"; public TestUserManager(String homeDirectory) { super("admin",new ClearTextPasswordEncryptor()); testUser = new BaseUser(); testUser.setAuthorities(Arrays.asList(new Authority[] {new ConcurrentLoginPermission(1,1),new WritePermission()})); testUser.setEnabled(true); testUser.setHomeDirectory(homeDirectory); testUser.setMaxIdleTime(10000); testUser.setName(TEST_USERNAME); testUser.setPassword(TEST_PASSWORD); anonUser = new BaseUser(testUser); anonUser.setName("anonymous"); } public User getUserByName(String username) throws FtpException { if(TEST_USERNAME.equals(username)) { return testUser; } else if(anonUser.getName().equals(username)) { return anonUser; } return null; } public String[] getAllUserNames() throws FtpException { return new String[] {TEST_USERNAME,anonUser.getName()}; } public void delete(String username) throws FtpException { throw new UnsupportedOperationException("Deleting of FTP Users is not supported."); } public void save(User user) throws FtpException { throw new UnsupportedOperationException("Saving of FTP Users is not supported."); } public boolean doesExist(String username) throws FtpException { return (TEST_USERNAME.equals(username) || anonUser.getName().equals(username)) ? true : false; } public User authenticate(Authentication authentication) throws AuthenticationFailedException { if(UsernamePasswordAuthentication.class.isAssignableFrom(authentication.getClass())) { UsernamePasswordAuthentication upAuth = (UsernamePasswordAuthentication) authentication; if(TEST_USERNAME.equals(upAuth.getUsername()) && TEST_PASSWORD.equals(upAuth.getPassword())) { return testUser; } if(anonUser.getName().equals(upAuth.getUsername())) { return anonUser; } } else if(AnonymousAuthentication.class.isAssignableFrom(authentication.getClass())) { return anonUser; } return null; } }
更新15 / Nov / 2016:xml配置注意事项.
通过spring-integration-ftp-4.3.5.RELEASE.jar!/ Meta-INF / spring,通过FtpNamespaceHandler通过org.springframework.integration.ftp.config.FtpInboundChannelAdapterParser将xml-element入站通道适配器直接链接到FtpInboundFileSynchronizer .handlers.
遵循xml-custom参考指南,在本地Meta-INF / spring.handlers文件中指定自定义FtpNamespaceHandler应允许您使用FtpUpdatingFileSynchronizer而不是FtpInboundFileSynchronizer.它并没有为我的单元测试工作,一个正确的解决方案可能涉及创建额外的/修改的xsd文件,以便常规入站通道适配器正在使用常规FtpInboundFileSynchronizer和一个特殊的入站更新通道适配器是使用FtpUpdatingFileSynchronizer.正确地做这个是有点超出了这个答案的范围.
一个快速的黑客可以让你开始.您可以通过在本地项目中创建包org.springframework.integration.ftp.config和类FtpNamespaceHandler来覆盖默认的FtpNamespaceHandler.内容如下:
package org.springframework.integration.ftp.config; public class FtpNamespaceHandler extends org.springframework.integration.config.xml.AbstractIntegrationNamespaceHandler { @Override public void init() { System.out.println("Initializing FTP updating file synchronizer."); // one updated line below,rest copied from original FtpNamespaceHandler registerBeanDefinitionParser("inbound-channel-adapter",new MyFtpInboundChannelAdapterParser()); registerBeanDefinitionParser("inbound-streaming-channel-adapter",new FtpStreamingInboundChannelAdapterParser()); registerBeanDefinitionParser("outbound-channel-adapter",new FtpOutboundChannelAdapterParser()); registerBeanDefinitionParser("outbound-gateway",new FtpOutboundGatewayParser()); } } package org.springframework.integration.ftp.config; import org.springframework.integration.file.remote.synchronizer.InboundFileSynchronizer; import org.springframework.integration.ftp.config.FtpInboundChannelAdapterParser; public class MyFtpInboundChannelAdapterParser extends FtpInboundChannelAdapterParser { @Override protected Class<? extends InboundFileSynchronizer> getInboundFileSynchronizerClass() { System.out.println("Returning updating file synchronizer."); return FtpUpdatingFileSynchronizer.class; } }
另外在xml文件中添加preserve-timestamp =“true”,以防止新的IllegalArgumentException:必须保留更新时间戳.