[PATCH obexd v0 12/14] client-test: pbap-client uses new API

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



From: Mikel Astiz <mikel.astiz@xxxxxxxxxxxx>

---
 test/pbap-client |  187 +++++++++++++++++++++++++++++++++++++++++++----------
 1 files changed, 151 insertions(+), 36 deletions(-)

diff --git a/test/pbap-client b/test/pbap-client
index ecca14f..2432acf 100755
--- a/test/pbap-client
+++ b/test/pbap-client
@@ -1,41 +1,156 @@
 #!/usr/bin/python
 
+import gobject
+
 import sys
+import os
 import dbus
+import dbus.service
+import dbus.mainloop.glib
+
+class Transfer:
+    def __init__(self, callback_func):
+        self.callback_func = callback_func
+        self.path = None
+        self.filename = None
+
+class PbapClient:
+    def __init__(self, session_path):
+        self.pending_transfers = 0
+        self.transfer_dict = dict()
+        self.flush_func = None
+        bus = dbus.SessionBus()
+        self.session = dbus.Interface(bus.get_object("org.openobex.client",
+                                                     session_path),
+                                      "org.openobex.Session")
+        self.pbap = dbus.Interface(bus.get_object("org.openobex.client",
+                                                  session_path),
+                                   "org.openobex.PhonebookAccess")
+        bus.add_signal_receiver(
+            self.transfer_complete,
+            dbus_interface="org.openobex.Transfer",
+            signal_name="Complete",
+            path_keyword="path")
+        bus.add_signal_receiver(
+            self.transfer_error,
+            dbus_interface="org.openobex.Transfer",
+            signal_name="Error",
+            path_keyword="path")
+
+    def register(self, reply, transfer):
+        (path, properties) = reply
+        transfer.path = path
+        transfer.filename = properties["Filename"]
+        self.transfer_dict[path] = transfer
+        print "Transfer created: %s (file %s)" % (path, transfer.filename)
+
+    def error(self, err):
+        print err
+        mainloop.quit()
+
+    def transfer_complete(self, path):
+        req = self.transfer_dict.get(path)
+        if req == None:
+            return
+        self.pending_transfers -= 1
+        print "Transfer %s finished" % path
+        f = open(req.filename, "r")
+        os.remove(req.filename)
+        lines = f.readlines()
+        del self.transfer_dict[path]
+        req.callback_func(lines)
+
+        if (len(self.transfer_dict) == 0) and (self.pending_transfers == 0):
+            if self.flush_func != None:
+                f = self.flush_func
+                self.flush_func = None
+                f()
+
+    def transfer_error(self, code, message, path):
+        req = self.transfer_dict.get(path)
+        if req == None:
+            return
+        print "Transfer finished with error %s: %s" % (code, message)
+        mainloop.quit()
+
+    def pull(self, vcard, func):
+        req = Transfer(func)
+        self.pbap.Pull(vcard, "",
+                       reply_handler=lambda r: self.register(r, req),
+                       error_handler=self.error)
+        self.pending_transfers += 1
+
+    def pull_all(self, func):
+        req = Transfer(func)
+        self.pbap.PullAll("",
+                          reply_handler=lambda r: self.register(r, req),
+                          error_handler=self.error)
+        self.pending_transfers += 1
+
+    def flush_transfers(self, func):
+        if (len(self.transfer_dict) == 0) and (self.pending_transfers == 0):
+            return
+        self.flush_func = func
+
+    def interface(self):
+        return self.pbap
+
+if  __name__ == '__main__':
+
+    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+
+    bus = dbus.SessionBus()
+    mainloop = gobject.MainLoop()
+
+    client = dbus.Interface(bus.get_object("org.openobex.client", "/"),
+                            "org.openobex.Client")
+
+    if (len(sys.argv) < 2):
+	print "Usage: %s <device>" % (sys.argv[0])
+	sys.exit(1)
+
+    print "Creating Session"
+    session_path = client.CreateSession(sys.argv[1], { "Target": "PBAP" })
+
+    pbap_client = PbapClient(session_path)
+
+    def process_result(lines, header):
+        if header != None:
+            print header
+        for line in lines:
+            print line,
+        print
+
+    def test_paths(paths):
+        if len(paths) == 0:
+            print
+            print "FINISHED"
+            mainloop.quit()
+            return
+
+        path = paths[0]
+
+        print "\n--- Select Phonebook %s ---\n" % (path)
+        pbap_client.interface().Select("int", path)
+
+        print "\n--- GetSize ---\n"
+        ret = pbap_client.interface().GetSize()
+        print "Size = %d\n" % (ret)
+
+        print "\n--- List vCard ---\n"
+        ret = pbap_client.interface().List()
+        for item in ret:
+            print "%s : %s" % (item[0], item[1])
+            pbap_client.interface().SetFormat("vcard30")
+            pbap_client.interface().SetFilter(["VERSION", "FN", "TEL"]);
+            pbap_client.pull(item[0], lambda x: process_result(x, None))
+
+        pbap_client.interface().SetFormat("vcard30")
+        pbap_client.interface().SetFilter(["VERSION", "FN", "TEL"]);
+        pbap_client.pull_all(lambda x: process_result(x, "\n--- PullAll ---\n"))
+
+        pbap_client.flush_transfers(lambda: test_paths(paths[1:]))
+
+    test_paths(["PB", "ICH", "OCH", "MCH", "CCH"])
 
-bus = dbus.SessionBus()
-
-client = dbus.Interface(bus.get_object("org.openobex.client", "/"),
-						"org.openobex.Client")
-
-print "Creating Session"
-session_path = client.CreateSession(sys.argv[1], { "Target": "PBAP" })
-pbap = dbus.Interface(bus.get_object("org.openobex.client", session_path),
-						"org.openobex.PhonebookAccess")
-session = dbus.Interface(bus.get_object("org.openobex.client", session_path),
-							"org.openobex.Session")
-
-paths = ["PB", "ICH", "OCH", "MCH", "CCH"]
-
-for path in paths:
-	print "\n--- Select Phonebook %s ---\n" % (path)
-	pbap.Select("int", path)
-
-	print "\n--- GetSize ---\n"
-	ret = pbap.GetSize()
-	print "Size = %d\n" % (ret)
-
-	print "\n--- List vCard ---\n"
-	ret = pbap.List()
-	for item in ret:
-		print "%s : %s" % (item[0], item[1])
-		pbap.SetFormat("vcard30")
-		pbap.SetFilter(["VERSION", "FN", "TEL"]);
-		ret = pbap.Pull(item[0])
-		print "%s" % (ret)
-
-	print "\n--- PullAll ---\n"
-	pbap.SetFormat("vcard30")
-	pbap.SetFilter(["VERSION", "FN", "TEL"]);
-	ret = pbap.PullAll()
-	print "%s" % (ret)
+    mainloop.run()
-- 
1.7.7.6

--
To unsubscribe from this list: send the line "unsubscribe linux-bluetooth" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [Bluez Devel]     [Linux Wireless Networking]     [Linux Wireless Personal Area Networking]     [Linux ATH6KL]     [Linux USB Devel]     [Linux Media Drivers]     [Linux Audio Users]     [Linux Kernel]     [Linux SCSI]     [Big List of Linux Books]

  Powered by Linux