级别: 初级 里 金千, IBM中国软件开发中心任软件工程师
2005 年 12 月 12 日 在商业整合开发中,J2EE 应用程序经常需要与企业信息系统(EIS)交互,以获得所需的资源。J2EE Connector Architecture (JCA) 1.5 标准定义了资源适配器(Resource Adapter)组件规范,使 J2EE 应用程序可以通过标准接口和 EIS 系统交互。IBM 提供了工具软件 WebSphere Adapter Toolkit 6.0 (WAT)以及 Adapter Foundation Classes(AFC) 基础类库来帮助开发者快速的构建符合 JCA 标准的资源适配器。本文将通过具体的实例(WebSphere Demo FTP Adapter)来说明整个开发流程。
1 FTP资源适配器Inbound场景设计
本场景用来监控FTP服务器的文件或目录。一旦指定的FTP服务器目录有变化,FTP资源适配器马上通过邮件通知管理员。
1.1 体系结构
整体结构和数据流见下图。
FTP Server(EIS) -> Exporting -> WebSphere FTP Adapter -> Email Server
|
图一:Inbound场景体系结构
如上图所示,导出及SCA组件被打包成一个完整的WAS应用(.ear)。WebSphere FTP适配器单独组成适配器包(.rar)。在部署过程中我们将会通过创建一个定时事件来监控FTP服务器。该事件每到计时器指定时间间隔就会扫描FTP目录,如果有任何变化,它会把收集到的信息以邮件的形式发送给管理员。
1.2 FTP服务器
任何FTP服务器都可以在该场景使用。
1.3导出配置
这部分包括三个子组件:JCA适配器导出描述符、数据绑定应用和功能选择应用。
1.4 SCA组件
该组件不包括任何业务逻辑。
1.5 FTP适配器
所有相关的inbound类将在FTP适配器中实现,这是整个inbound场景中最重要的功能模块。FTP适配器会创建一个计时任务监控FTP某一个指定的目录。一旦该目录有变化,如创建、删除、更新文件等操作,FTP适配器会马上给管理人员发包含这些信息的邮件。同时会在本机(运行WAS的系统)创建一个本地镜像目录保存这些历史信息。
在部署过程中需要以下几个属性:
- FTP服务器IP
- FTP服务器端口
- 用户名
- 密码
- FTP服务器目录
- 本地镜像目录
- 管理人员邮箱地址
- 邮件服务器IP
- 监听的时间间隔 (已在AFC中实现)
1.6邮件服务器
邮件SMTP服务器IP地址。
2构建Inbound场景
2.1验证FTP适配器代码存根
这里我们可以重用以前的FTP适配器代码。运行WID并确认你的J2CA_ftp工程中已创建了如下三个Java文件。如果J2CA_ftpEventManager.java没有被自动创建,您需要手动创建它,它必须继承基础类库中的EventManager类。
J2CA_ftpActicationSpec.java
J2CA_ftpEventManager.java
J2CA_ftpEventStore.java
2.2实现FTP适配器输入类
1) 运行WID并且打开J2CA_ftp工程。选择Open With -> Deployment Descriptor Editor运行部署描述符编辑器编辑ra.xml文件。
图二:WAT-部署描述符编辑器
2) 在Deployment Descriptor Editor中选择Inbound Adapter标签。
图三:WAT-Inbound标签
点击Add按钮在消息监听器中手动添加如下属性:
| Property Name | Type | Value | | email | String | 电子邮箱地址 | | remote_dir | String | WAT_test/Inbound | | ftp_port | String | 21 | | ftp_ip | String | 9.181.27.26 | | ftp_user | String | wbia | | ftp_passwd | String | P0o9i8u7 | | userName | String
(email authorization) | test | | password | String
(email authorization) | test | | local_dir | String | d:\\JCA_ftp_Adapter\\Inbound | | emailHost | String | SMTP邮件服务器名 | | emailSender | String | cdlFTP |
添加工作完成后,您将会在ra.xml文件、J2CA_ftpActivationSpec.java中看到已添加的属性。
3)实现J2CA_ftpResourceAdapter.java
Inbound场景启动的时候,WBI将会调用endpointActication()方法初始化终端。WBI还会调用createEventManager()和createEventStore()两个方法。而endpointDeactication()方法会在Inbound停止的时候被调用。
J2CA_ftpResourceAdapter.java代码
// This will be called while initializing the Inbound
public void endpointActivation(MessageEndpointFactory arg0,
ActivationSpec arg1) throws ResourceException {
super.endpointActivation(arg0, arg1);
System.out.println("ResourceAdapter -> endpointActivation()");
spec = (J2CA_ftpActivationSpec)arg1;
eventManager.setSpec(spec);
if(spec==null) System.out.println("ResourceAdapter -> endpointActivation() -> spec==null");
else System.out.println("ResourceAdapter -> endpointActivation() -> spec!=null");
eventManager.addEndpointFactory(arg0,arg1);
}
// This will be called while stopping the Inbound
public void endpointDeactivation(MessageEndpointFactory arg0,
ActivationSpec arg1) {
super.endpointDeactivation(arg0, arg1);
if(eventManager.getEISconnection().getAftp()!=null)
eventManager.getEISconnection().stop(null);
eventManager.removeEndpointFactory(arg0);
}
// This will be called while initializing the Inbound
public com.ibm.j2ca.extension.eventmanagement.EventStore createEventStore(
javax.resource.spi.ActivationSpec arg0)
throws javax.resource.ResourceException {
System.out.println("ResourceAdapter -> createEventStore()");
J2CA_ftpEventStore event = new J2CA_ftpEventStore();
return event;
}
// This will be called while initializing the Inbound
public EventManager createEventManager() {
System.out.println("ResourceAdapter -> createEventManager()");
eventManager = new J2CA_ftpEventManager(getLogUtils());
eventManager.setSpec(spec);
return eventManager;
}
|
4)实现J2CA_ftpEventManager.java
在这个类中,pollForEvents()方法在整个运行过程中会被周期性的调用。所以Inbound操作都应该放在这个类中实现。
J2CA_ftpEventManager.java
J2CA_ftpActivationSpec spec = null;
J2CA_ftpManagedConnectionFactory mcf;
J2CA_ftpManagedConnection mc;
J2CA_ftpConnection EISconnection;
BufferedReader read = null;
BufferedWriter write = null;
FileReader fr = null;
FileWriter fw = null;
ArrayList content = null;
ArrayList filelist = null;
File[] files = null;
// Connect to FTP server
public void connect()
{
if(spec!=null)
for(int i=0;i<3;i++)
{
System.out.println("Try: "+String.valueOf(i));
EISconnection.connect(spec.getLocal_dir(),spec.getFtp_ip(),Integer.parseInt(spec.getFtp_port()),
spec.getFtp_user(),spec.getFtp_passwd());
if(EISconnection.getAftp()!=null) break;
try {Thread.sleep(3000);} catch (InterruptedException e1) {}
}
if(EISconnection.getAftp()!=null)
try {
EISconnection.setPath(spec.getRemote_dir());
} catch (IOException e) {
System.out.println("set FTP server path error!");
EISconnection.stop(null);
}
}
// Construct Function
public J2CA_ftpEventManager(com.ibm.j2ca.extension.logging.LogUtils arg0) {
super(arg0);
System.out.println("EventManager -> EventManager()");
content = new ArrayList();
filelist = new ArrayList();
try {
mcf = new J2CA_ftpManagedConnectionFactory();
mc = new J2CA_ftpManagedConnection(mcf,null,null);
EISconnection = new J2CA_ftpConnection(mc);
} catch (ResourceException e) {
System.out.println("EventManager -> Create EISconnection failed");
}
}
// This method is called periodically at runtime.
public void pollForEvents(int arg0) throws ResourceException, CommException {
System.out.println("EventManager -> Enter pollForEvents()");
String filename = "", info = "" , infoInFile= "";
int count = 0, k = 0, i = 0;
File file = null;
String message = "";
while(content.size()>0) content.remove(0);
while(filelist.size()>0) filelist.remove(0);
// compare the local inbound directory and remote ftp server directory
try {
if(EISconnection.getAftp()==null) connect();
if(EISconnection.getAftp()==null)
{
System.out.println("Can not connect to FTP Server!");
System.out.println("EventManager -> Out pollForEvents()");
return;
}
ArrayList array = EISconnection.getFileList();
System.out.println("In FTP server: "+array.size()+" files");
for(i=0; i<array.size(); i++)
{
count = 0;
info = (String)array.get(i);
System.out.println("File Info: "+info);
for(k=0;k<info.length();k++)
{
if(info.charAt(k)==' ')
{
count++;
while(k<info.length() && info.charAt(k)==' ')
k++;
}
if(count==8) break;
}
filename = ((String)array.get(i)).substring(k);
if(filename == null || filename.length()==0 ||
filename.equals(".") || filename.equals("..")) continue;
System.out.println("File: "+filename);
//see if the filename already in the local directory
//note: just store filename, not file itself
file = new File(spec.getLocal_dir()+"/"+filename);
if(file!=null && file.exists())
{
fr = new FileReader(file);
read = new BufferedReader(fr);
infoInFile = read.readLine();
read.close(); fr.close();
if(infoInFile!=null && infoInFile.equalsIgnoreCase(info))
{ // ok, the file is not chaneged in FTP server
;
} else { // the file is updated in FTP server
file.delete();
file = new File(spec.getLocal_dir()+"/"+filename);
fw = new FileWriter(file);
write = new BufferedWriter(fw);
write.write(info);
write.close();
fw.close();
message = "File: " + filename +" is updated in FTP Server "
+spec.getFtp_ip()+ " at Directory " + spec.getRemote_dir()
+ " with Property " + info;
content.add(message);
}
}
else // new file found in FTP server FTP
{
fw = new FileWriter(file);
write = new BufferedWriter(fw);
write.write(info);
write.close();
fw.close();
message = "File: " + filename +" is created in FTP Server "
+spec.getFtp_ip()+ " at Directory " + spec.getRemote_dir()
+ " with Property " + info;
content.add(message);
}
filelist.add(filename);
}
// see whether files are deleted from FTP server
try {
file = new File(spec.getLocal_dir());
if(file!=null && file.isDirectory())
files = file.listFiles();
} catch(Exception ee)
{
files = null;
}
if(files!=null) // some files in the local directory
for(i=0;i<files.length;i++)
{
for(k=0;k<filelist.size();k++)
{
if(files[i].getName().equals((String)filelist.get(k)))
{ break; } // ok, this file is still there
}
if(k==filelist.size())
{ // this file is deleted from FTP server
new File(spec.getLocal_dir()+"/"+files[i].getName()).delete();
message = "File: " + files[i].getName() +" is deleted from FTP Server "
+spec.getFtp_ip()+ " at Directory " + spec.getRemote_dir();
content.add(message);
}
}
} catch (IOException e) {
System.out.println("EventManager -> pollForEvents() -> connect to FTP server error");
e.printStackTrace();
}
// now log to system out
message = "";
for(i=0;i<content.size();i++)
{
System.out.println((String)content.get(i));
message = message + (String)content.get(i) + "\n";
}
// now send email to notify admin
if(content.size()>0) sendEmail(message);
System.out.println("EventManager -> Out pollForEvents()");
}
// Send Email to Admin Email Address
public void sendEmail(String message)
{
String host=spec.getEmailHost();
String from=spec.getEmailSender()+"@"+spec.getFtp_ip();
String to=spec.getEmail();
String name=spec.getUserName();
String pass=spec.getPassword();
String subject="FTP Server "+spec.getFtp_ip()+" Notification";
String contents=message;
MailAuthenticator ma;
System.out.println("host:"+host+" from:"+from+" to:"+to+" name:"+name+" pass:"+pass);
System.out.println("begin send email");
try{
ma=new MailAuthenticator(name,pass);
Properties props=System.getProperties();
props.put("mail.smtp.host",host);
props.put("mail.smtp.auth","true");
Session session=Session.getInstance(props,ma);
System.out.println("set properties finished");
MimeMessage msg=new MimeMessage(session);
System.out.println("init msg");
msg.setFrom(new InternetAddress(from));
msg.setRecipient(Message.RecipientType.TO,new InternetAddress(to));
msg.setSubject(subject);
msg.setText(contents);
System.out.println("set msg finished");
Transport transport = session.getTransport("smtp");
transport.connect();
System.out.println("connected to email host");
transport.sendMessage(msg, msg.getAllRecipients());
transport.close();
System.out.println("Send Email Success");
}catch(Exception e){
System.out.println(e.getMessage());
e.printStackTrace();
}
}
// Getter and Setter
public J2CA_ftpActivationSpec getSpec() {
return spec; }
public void setSpec(J2CA_ftpActivationSpec spec) {
this.spec = spec; }
public J2CA_ftpConnection getEISconnection() {
return EISconnection; }
public void setEISconnection(J2CA_ftpConnection sconnection) {
EISconnection = sconnection; }
|
5)实现J2CA_ftpActivationSpec.java
在该类的构造函数中,你可以为Inbound属性设定一些缺省值。本例中,尽管我们的FTP适配器不支持BiDi,我们仍然需要为四个BiDi属性赋值,否则Inbound场景不能正常运行。
J2CA_ftpActivationSpec.java代码
public J2CA_ftpActivationSpec() {
super();
try {
this.setPollPeriod(10000);
this.setPollQuantity(1);
// no-use, just required
this.setBiDiContextEIS("a");
this.setBiDiContextMetadata("a");
this.setBiDiContextSkip("a");
this.setBiDiContextSpecialFormat("a");
} catch (ResourceException e) {
System.out.println("set property error :(");
e.printStackTrace();
}
|
5)实现MailAuthenticator.java
您需要手动创建一个新类来处理邮件认证功能。如下:
MailAuthenticator.java代码
package cdlFTP.inbound;
import javax.mail.Authenticator;
import javax.mail.PasswordAuthentication;
public class MailAuthenticator extends Authenticator {
String authenName;
String authenPass;
public MailAuthenticator(String authenName,String authenPass) {
super();
this.authenName=authenName;
this.authenPass=authenPass;
}
public PasswordAuthentication getPasswordAuthentication()
{
return new PasswordAuthentication(authenName,authenPass);
}
}
|
到此为止,所有的FTP适配器Inbound场景代码已经全部开发完毕。
2.3创建Intbound场景的应用程序
在这个inbound应用中,我们不详细的介绍怎样使用WID创建WebSphere Business Integration Project。在此,我们只以一个简单应为例,列出其中每一个重要文件的内容并加以简单说明。所有的源文件都可以在附件cdlFTP_inbound.jar中找到。
1) cdlFTP_inbound.jar /sca.module
这是应用程序的描述文件,通常我们不需要修改它。
<?xml version="1.0" encoding="UTF-8"?>
<scdl:module name="cdlFTP_inbound"
xmlns:scdl="http://www.ibm.com/xmlns/prod/websphere/scdl/6.0.0">
</scdl:module>
|
2) cdlFTP_inbound.jar /cdlFTP_inbound/cdlFTPNotification.wsdl
该文件定义了SCA组件和导出系统的接口。
<?xml version="1.0" encoding="UTF-8"?>
<wsdl:definitions name="cdlFTPNotification"
targetNamespace="http://cdlFTP"
xmlns:tns="http://cdlFTP"
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:data="http://cdlFTP.Operation.data">
<wsdl:types>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<xsd:import namespace="http://cdlFTP.Operation.data"
schemaLocation="FTPOperation.xsd">
</xsd:import>
</xsd:schema>
</wsdl:types>
<wsdl:message name="NotificationRequest">
<wsdl:part name="request" element="data:Operation"></wsdl:part>
</wsdl:message>
<wsdl:portType name="Notification">
<wsdl:operation name="create">
<wsdl:input message="tns:NotificationRequest"></wsdl:input>
</wsdl:operation>
<wsdl:operation name="update">
<wsdl:input message="tns:NotificationRequest"></wsdl:input>
</wsdl:operation>
<wsdl:operation name="delete">
<wsdl:input message="tns:NotificationRequest"></wsdl:input>
</wsdl:operation>
</wsdl:portType>
</wsdl:definitions>
|
3) cdlFTP_inbound.jar /cdlFTP_inbound/cdlFTPNotification.component
该SCA组件没有任何操作。在这里它只作为一个导出目标的实例。
<?xml version="1.0" encoding="UTF-8"?>
<scdl:component
name="cdlFTP_inbound/cdlFTPNotification"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:scdl="http://www.ibm.com/xmlns/prod/websphere/scdl/6.0.0"
xmlns:policy="http://www.ibm.com/xmlns/prod/websphere/scdl/policy/plugin/6.0.0"
xmlns:java="http://www.ibm.com/xmlns/prod/websphere/scdl/java/6.0.0"
xmlns:wsdl="http://www.ibm.com/xmlns/prod/websphere/scdl/wsdl/6.0.0"
xmlns:eis="http://www.ibm.com/xmlns/prod/websphere/scdl/eis/6.0.0"
xmlns:notification="http://cdlFTP"
xsi:schemaLocation="http://cdlFTP cdlFTP_inbound/cdlFTPNotification.wsdl">
<interfaces>
<interface xsi:type="wsdl:WSDLPortType" portType="notification:Notification"/>
</interfaces>
</scdl:component>
|
4) cdlFTP_inbound.jar /cdlFTP_inbound / cdlFTPNotification.export
导出文件定义了WebSphere资源适配器是怎样被连接到SCA组件上。在这里,你需要指定如下信息:
- Service interface
- Binding type
- Binding Methods
- Resource Adapter Name
- InboundListener Class
- ActivationSpec Class
- Data Binding Class
<?xml version="1.0" encoding="UTF-8"?>
<scdl:export target="cdlFTP_inbound/cdlFTPNotification"
name="cdlFTP_inbound/cdlFTPNotification"
xmlns:scdl="http://www.ibm.com/xmlns/prod/websphere/scdl/6.0.0"
xmlns:policy="http://www.ibm.com/xmlns/prod/websphere/scdl/policy/plugin/6.0.0"
xmlns:java="http://www.ibm.com/xmlns/prod/websphere/scdl/java/6.0.0"
xmlns:wsdl="http://www.ibm.com/xmlns/prod/websphere/scdl/wsdl/6.0.0"
xmlns:eis="http://www.ibm.com/xmlns/prod/websphere/scdl/eis/6.0.0"
xmlns:notification="http://cdlFTP"
xsi:schemaLocation="http://cdlFTP cdlFTP_inbound/cdlFTPNotification.wsdl">
<interfaces>
<interface xsi:type="wsdl:WSDLPortType" portType="notification:Notification"/>
</interfaces>
<esbBinding xsi:type="eis:EISExportBinding" dataBindingType="cdlFTP_inbound.cdlFTPDataBinding">
<resourceAdapter name="IBM cdl FTP Adapter">
</resourceAdapter>
<connection type="cdlFTP.inbound.J2CA_ftpActivationSpec"
listenerType="commonj.connector.runtime.InboundListener">
</connection>
<methodBinding method="create" nativeMethod="CREATE">
</methodBinding>
<methodBinding method="update" nativeMethod="UPDATE">
</methodBinding>
<methodBinding method="delete" nativeMethod="DELETE">
</methodBinding>
</esbBinding>
</scdl:export>
|
2.4在WebSphere应用服务器中部署WebSphere资源适配器
FTP适配器Inbound部署过程与Outbound场景类似。
1) -10)见Outbound相应步骤的说明。
11) 导航至Resource adapters->IBM cdl FTP Adapter->J2C activation specification,然后点击"new"。
图四:WAS控制台-资源适配器活动规范
11) 在Name项中键入"cdl_inbound",JNDI name中键入"cdlFTP_inboundJNDI",然后点击OK。记得保存所做的更改。
图五:WAS控制台-新建资源适配器活动规范
12) 导航至Resource adapters->IBM cdl FTP Adapter->J2C activation specification0->cdlFTP_inbound->Custom properties。在这里你可以点击属性名称修改它的默认值。记得将email属性设定为你的邮件地址。"remote_dir"属性是你要监控的FTP服务器目录。
这里有四个BiDi属性,因为其是必要属性,所以必须给它们设定一个值,否则启动应用程序的时候会抛出异常。您可以随意给它们设定一个字符串,如下图所示。
图六:WAS控制台-资源适配器活动规范属性
现在,WebSphere资源适配器已经部署完毕。保存你所做的更改。
2.5在WebSphere应用服务器中部署Inbound应用
在开始前,请确认FTP适配器已经成功部署到WAS中。
1) 确认所有上述文件已经正确添加到cdlFTP_inbound.jar中。
2) 打开一个命令行窗口,并定位到cdlFTP_inbound.jar 和J2CA_ftp.rar所在的目录。输入如下命令:
<WID root Dir>\runtimes\bi_v6\bin\serviceDeploy cdlFTP_inbound.jar -classpath J2CA_ftp.rar -keep
|
如果你在当前目录中看到cdlFTP_inboundApp.ear文件,则这一步操作成功。否则你需要根据错误输出信息修改cdlFTP_inbound.jar中的源文件。
3) 启动WAS和WAS管理控制台。请确认FTP适配器已经部署到WAS上。在WAS管理控制台的左侧树型结构,选择"Applications->Enterprise Applications",然后点击右边面板的Install按钮。
图七:WAS控制台-企业应用程序
4) 点击"浏览"指定cdlFTP_inboundApp.ear文件所在的目录,点击next。在随后的配置过程中都使用默认值。
图八:WAS控制台-导入企业应用程序
5) 如果看到如下的信息说明你已经部署成功。点击Save to Master Configuration保存所有的修改。如果部署失败,请根据log及trace文件的错误信息修改你的源文件。
图九:WAS控制台-保存企业应用程序
6) 导航至"Enterprise Applications->cdlFTP_inboundApp"。点击WAS管理控制台右侧的"Provide listener bindings for message-driven bean"链接。
图十:WAS控制台-绑定消息驱动Bean
7) 在出现的面板中键入cdlFTP_inboundJNDI作为JNDI的名称。保存所做的修改。
图十一:WAS控制台-绑定资源适配器活动规范
8) 导航至 "Enterprise Applications"。将cdlFTP_inboundApp置为选中状态,然后点击Start。
图十二:WAS控制台-启动企业应用程序
9) 如果cdlFTP_inboundApp后面的status变成一个绿色的箭头,说明应用启动成功。
图十三:WAS控制台-企业应用程序启动成功
2.6运行Inbound场景
在开始前,请确认FTP适配器和Inbound应用程序均正确部署到WAS上。
有三种方法在ftp服务器上创建、更新、删除文件,步骤如下:
- 直接在FTP服务器上操作
- 使用某些FTP客户端工具
- 使用Outbound场景中的JSP客户端
1) 确认在FTP适配器的输入属性设置中,你键入的管理人员邮件地址和FTP服务器目录名称都正确。
在FTP服务器监控目录中复制或上传一个新文件。稍后,你可以收到如下的邮件,从中你可以获得新文件的状态及属性。
图十四:管理员收到新建文件的提示邮件
2) 确认在FTP适配器的输入属性设置中,你键入的管理人员邮件地址和FTP服务器目录名称都正确。
在FTP服务器监控目录中复制或上传(覆盖)一个新文件。稍后,你可以收到如下的邮件,从中你可以获得新文件的状态及属性。
图十五:管理员收到更新文件的提示邮件
3) 确认在FTP适配器的输入属性设置中,你键入的管理人员邮件地址和FTP服务器目录名称都正确。
丛FTP服务器中删除一个现有文件。稍后,你可以收到如下的邮件,从中你可以获得新文件的状态及属性。
图十六:管理员收到删除文件的提示邮件
注意:在一个监控时间间隔内做的所有动作,都会被记录在同一封邮件中。
祝贺你已经成功运行了Inbound场景。
注意:如果使用的WAS集成测试环境,您可以在如下目录中找到WAS的log和trace文件。
<WID Directory>\prof\logs\server1\
在inbound场景运行过程中,你可以从这些log及trace文件中看到输出信息。
关于作者  | |  | 金千里目前在IBM中国软件开发中心任软件工程师,主要工作是WebSphere商业整合中资源适配器的开发。他毕业于中国科学院,对数据整合和数据挖掘技术有浓厚的兴趣。 |
对本文的评价
|