1 module dpq.connection; 2 3 //import derelict.pq.pq; 4 import libpq.libpq; 5 6 import dpq.exception; 7 import dpq.result; 8 import dpq.value; 9 import dpq.attributes; 10 import dpq.querybuilder; 11 import dpq.meta; 12 import dpq.prepared; 13 import dpq.smartptr; 14 import dpq.serialisation : isAnyNull; 15 16 import std..string; 17 //import derelict.pq.pq; 18 import libpq.libpq; 19 import std.conv : to; 20 import std.traits; 21 import std.typecons; 22 23 24 version(unittest) 25 { 26 import std.stdio; 27 Connection c; 28 } 29 30 /** 31 Represents the PostgreSQL connection and allows executing queries on it. 32 33 Examples: 34 ------------- 35 auto conn = Connection("host=localhost dbname=testdb user=testuser"); 36 //conn.exec ... 37 ------------- 38 */ 39 struct Connection 40 { 41 alias ConnectionPtr = SmartPointer!(PGconn*, PQfinish); 42 43 private ConnectionPtr _connection; 44 private PreparedStatement[string] _prepared; 45 46 /** 47 Connection constructor 48 49 Params: 50 connString = connection string 51 52 See Also: 53 http://www.postgresql.org/docs/9.3/static/libpq-connect.html#LIBPQ-CONNSTRING 54 */ 55 this(string connString) 56 { 57 char* err; 58 auto opts = PQconninfoParse(cast(char*)connString.toStringz, &err); 59 60 if (err != null) 61 throw new DPQException(err.fromStringz.to!string); 62 63 _connection = new ConnectionPtr(PQconnectdb(connString.toStringz)); 64 65 if (status != CONNECTION_OK) 66 throw new DPQException(errorMessage); 67 68 _dpqLastConnection = &this; 69 } 70 71 unittest 72 { 73 c = Connection("host=127.0.0.1 dbname=test user=test"); 74 writeln(" * Database connection with connection string"); 75 assert(c.status == CONNECTION_OK); 76 } 77 78 /** 79 Close the connection manually 80 */ 81 void close() 82 { 83 _connection.clear(); 84 } 85 86 @property const(ConnStatusType) status() 87 { 88 return PQstatus(_connection); 89 } 90 91 /** Returns the name of the database currently selected */ 92 @property const(string) db() 93 { 94 return PQdb(_connection).to!string; 95 } 96 97 /** Returns the name of the current user */ 98 @property const(string) user() 99 { 100 return PQuser(_connection).to!string; 101 } 102 103 /// ditto, but password 104 @property const(string) password() 105 { 106 return PQpass(_connection).to!string; 107 } 108 109 /// ditto, but host 110 @property const(string) host() 111 { 112 return PQhost(_connection).to!string; 113 } 114 115 /// ditto, but port 116 @property const(ushort) port() 117 { 118 return PQport(_connection).fromStringz.to!ushort; 119 } 120 121 /** // FIXME: BROKEN ATM 122 Executes the given string directly 123 124 Throws on fatal query errors like bad syntax 125 126 Examples: 127 ---------------- 128 Connection conn; // An established connection 129 130 conn.exec("CREATE TABLE IF NOT EXISTS test_table"); 131 ---------------- 132 */ 133 Result exec(string command) 134 { 135 PGresult* res = PQexec(_connection, cast(const char*)command.toStringz); 136 return Result(res); 137 } 138 139 /* 140 unittest 141 { 142 auto res = c.exec("SELECT 1::INT4 AS int4, 2::INT8 AS some_long"); 143 writeln(" * exec for selecting INT4 and INT8"); 144 assert(res.rows == 1); 145 assert(res.columns == 2); 146 147 auto r = res[0]; 148 assert(r[0].as!int == 1); 149 assert(r[0].as!long == 2); 150 151 writeln(" * Row opIndex(int) and opIndex(string) equality "); 152 assert(r[0] == r["int4"]); 153 assert(r[1] == r["some_long"]); 154 155 } 156 */ 157 158 /// ditto, async 159 bool send(string command) 160 { 161 return PQsendQuery(_connection, cast(const char*)command.toStringz) == 1; 162 } 163 164 /** 165 Executes the given string with given params 166 167 Params should be given as $1, $2, ... $n in the actual command. 168 All params are sent in a binary format and should not be escaped. 169 If a param's type cannot be inferred, this method will throw an exception, 170 in this case, either specify the type using the :: (cast) notation or 171 make sure the type can be inferred by PostgreSQL in your query. 172 173 Examples: 174 ---------------- 175 Connection conn; // An established connection 176 177 conn.execParams("SELECT $1::string, $2::int, $3::double"); 178 ---------------- 179 180 See also: 181 http://www.postgresql.org/docs/9.3/static/libpq-exec.html 182 */ 183 Result execParams(T...)(string command, T params) 184 { 185 Value[] values; 186 foreach(param; params) 187 values ~= Value(param); 188 189 return execParams(command, values); 190 } 191 192 193 void sendParams(T...)(string command, T params) 194 { 195 Value[] values; 196 foreach(param; params) 197 values ~= Value(param); 198 199 execParams(command, values, true); 200 } 201 202 unittest 203 { 204 auto res = c.execParams("SELECT 1::INT4 AS int4, 2::INT8 AS some_long", []); 205 writeln("\t * execParams"); 206 writeln("\t\t * Rows and cols"); 207 208 assert(res.rows == 1); 209 assert(res.columns == 2); 210 211 writeln("\t\t * Static values"); 212 auto r = res[0]; 213 assert(r[0].as!int == 1); 214 assert(r[1].as!long == 2); 215 216 writeln("\t\t * opIndex(int) and opIndex(string) equality"); 217 assert(r[0] == r["int4"]); 218 assert(r[1] == r["some_long"]); 219 220 int int4 = 1; 221 long int8 = 2; 222 string str = "foo bar baz"; 223 float float4 = 3.14; 224 double float8 = 3.1415; 225 226 writeln("\t\t * Passed values"); 227 res = c.execParams( 228 "SELECT $1::INT4, $2::INT8, $3::TEXT, $4::FLOAT4, $5::FLOAT8", 229 int4, 230 int8, 231 str, 232 float4, 233 float8); 234 235 assert(res.rows == 1); 236 r = res[0]; 237 238 assert(r[0].as!int == int4); 239 assert(r[1].as!long == int8); 240 assert(r[2].as!string == str); 241 assert(r[3].as!float == float4); 242 assert(r[4].as!double == float8); 243 } 244 245 /// ditto, but taking an array of params, instead of variadic template 246 Result execParams(string command, Value[] params, bool async = false) 247 { 248 const char* cStr = cast(const char*) command.toStringz; 249 250 auto pTypes = params.paramTypes; 251 auto pValues = params.paramValues; 252 auto pLengths = params.paramLengths; 253 auto pFormats = params.paramFormats; 254 255 if (async) 256 { 257 PQsendQueryParams( 258 _connection, 259 cStr, 260 params.length.to!int, 261 pTypes.ptr, 262 cast(const(char*)*)pValues.ptr, 263 pLengths.ptr, 264 pFormats.ptr, 265 1); 266 267 return Result(null); 268 } 269 else 270 return Result(PQexecParams( 271 _connection, 272 cStr, 273 params.length.to!int, 274 pTypes.ptr, 275 cast(const(char*)*)pValues.ptr, 276 pLengths.ptr, 277 pFormats.ptr, 278 1)); 279 } 280 281 /// ditto, async 282 void sendParams(string command, Value[] params) 283 { 284 execParams(command, params, true); 285 } 286 287 /** 288 Returns the last error message 289 290 Examples: 291 -------------------- 292 Connection conn; // An established connection 293 294 writeln(conn.errorMessage); 295 -------------------- 296 297 */ 298 @property string errorMessage() 299 { 300 return PQerrorMessage(_connection).to!string; 301 } 302 303 unittest 304 { 305 writeln("\t * errorMessage"); 306 try 307 { 308 c.execParams("SELECT_BADSYNTAX $1::INT4", 1); 309 } 310 catch {} 311 312 assert(c.errorMessage.length != 0); 313 314 } 315 316 /** 317 Generates and runs the DDL from the given structures 318 319 Attributes from dpq.attributes should be used to define 320 primary keys, indexes, and relationships. 321 322 A custom type can be specified with the @type attribute. 323 324 Examples: 325 ----------------------- 326 Connection conn; // An established connection 327 struct User 328 { 329 @serial8 @PKey long id; 330 string username; 331 byte[] passwordHash; 332 }; 333 334 struct Article { ... }; 335 336 conn.ensureSchema!(User, Article); 337 ----------------------- 338 */ 339 void ensureSchema(T...)(bool createType = false) 340 { 341 import std.stdio; 342 string[] additional; 343 string[] relations; 344 345 foreach (type; T) 346 { 347 enum name = relationName!(type); 348 relations ~= name; 349 350 string str; 351 if (createType) 352 str = "CREATE TYPE \"" ~ name ~ "\" AS (%s)"; 353 else 354 str = "CREATE TABLE IF NOT EXISTS \"" ~ name ~ "\" (%s)"; 355 356 string cols; 357 foreach(m; serialisableMembers!type) 358 { 359 string colName = attributeName!(mixin("type." ~ m)); 360 cols ~= "\"" ~ colName ~ "\""; 361 362 // HACK: typeof a @property seems to be failing hard 363 static if (is(FunctionTypeOf!(mixin("type." ~ m)) == function)) 364 alias t = typeof(mixin("type()." ~ m)); 365 else 366 alias t = typeof(mixin("type." ~ m)); 367 368 cols ~= " "; 369 370 // Basic data types 371 static if (hasUDA!(mixin("type." ~ m), PGTypeAttribute)) 372 cols ~= getUDAs!(mixin("type." ~ m), PGTypeAttribute)[0].type; 373 else 374 { 375 alias tu = Unqual!t; 376 static if (ShouldRecurse!(mixin("type." ~ m))) 377 { 378 ensureSchema!(NoNullable!tu)(true); 379 cols ~= '"' ~ relationName!(NoNullable!tu) ~ '"'; 380 } 381 else 382 cols ~= SQLType!tu; 383 } 384 385 // Primary key 386 static if (hasUDA!(mixin("type." ~ m), PrimaryKeyAttribute)) 387 { 388 if (!createType) 389 cols ~= " PRIMARY KEY"; 390 } 391 // Index 392 else static if (hasUDA!(mixin("type." ~ m), IndexAttribute)) 393 { 394 enum uda = getUDAs!(mixin("type." ~ m), IndexAttribute)[0]; 395 additional ~= "CREATE%sINDEX \"%s\" ON \"%s\" (\"%s\")".format( 396 uda.unique ? " UNIQUE " : " ", 397 "%s_%s_index".format(name, colName), 398 name, 399 colName); 400 401 // DEBUG 402 } 403 // Foreign key 404 else static if (hasUDA!(mixin("type." ~ m), ForeignKeyAttribute)) 405 { 406 enum uda = getUDAs!(mixin("type." ~ m), ForeignKeyAttribute)[0]; 407 additional ~= 408 "ALTER TABLE \"%s\" ADD CONSTRAINT \"%s\" FOREIGN KEY (\"%s\") REFERENCES \"%s\" (\"%s\")".format( 409 name, 410 "%s_%s_fk_%s".format(name, colName, uda.relation), 411 colName, 412 uda.relation, 413 uda.pkey); 414 415 // Create an index on the FK too 416 additional ~= "CREATE INDEX \"%s\" ON \"%s\" (\"%s\")".format( 417 "%s_%s_fk_index".format(name, colName), 418 name, 419 colName); 420 421 } 422 423 cols ~= ", "; 424 } 425 426 cols = cols[0 .. $ - 2]; 427 str = str.format(cols); 428 if (createType) 429 { 430 try 431 { 432 exec(str); 433 } 434 catch {} // Do nothing, type already exists 435 } 436 else 437 exec(str); 438 } 439 foreach (cmd; additional) 440 { 441 try 442 { 443 exec(cmd); 444 } 445 catch {} // Horrible, I know, but this just means the constraint/index already exists 446 } 447 448 // After we're done creating all the types, a good idea would be to get OIDs for all of them 449 if (relations.length > 0) 450 { 451 auto res = execParams( 452 "SELECT typname, oid FROM pg_type WHERE typname IN ('%s')".format( 453 relations.join("','"))); 454 455 foreach (r; res) 456 _dpqCustomOIDs[r[0].as!string] = r[1].as!Oid; 457 } 458 459 460 } 461 462 unittest 463 { 464 // Probably needs more thorough testing, let's assume right now 465 // everything is correct if the creating was successful. 466 467 writeln("\t * ensureSchema"); 468 struct Inner 469 { 470 string innerStr; 471 int innerInt; 472 } 473 474 struct TestTable1 475 { 476 @serial8 @PK long id; 477 string str; 478 int n; 479 @embed Inner inner; 480 } 481 482 c.ensureSchema!TestTable1; 483 484 auto res = c.execParams( 485 "SELECT COUNT(*) FROM pg_catalog.pg_tables WHERE tablename = $1", 486 relationName!TestTable1); 487 488 assert(res.rows == 1); 489 assert(res[0][0].as!long == 1); 490 491 c.exec("DROP TABLE " ~ relationName!TestTable1); 492 c.exec("DROP TYPE \"" ~ relationName!Inner ~ "\" CASCADE"); 493 } 494 495 /** 496 Returns the requested structure or a Nullable null value if no rows are returned 497 498 This method queries for the given structure by its primary key. If no 499 primary key can be found, a compile-time error will be generated. 500 501 Examples: 502 ---------------------- 503 Connection conn; // An established connection 504 struct User 505 { 506 @serial @PKey int id; 507 ... 508 }; 509 510 auto user = conn.findOne!User(1); // will search by the id attribute 511 ---------------------- 512 */ 513 Nullable!T findOne(T, U)(U id) 514 { 515 return findOneBy!T(primaryKeyAttributeName!T, id); 516 } 517 518 unittest 519 { 520 writeln("\t * findOne(T)(U id), findOneBy, findOne"); 521 struct Testy 522 { 523 @serial @PK int id; 524 string foo; 525 int bar; 526 long baz; 527 int[] intArr; 528 //string[] stringArr; // TODO: string[] 529 } 530 531 c.ensureSchema!Testy; 532 533 writeln("\t\t * Null result"); 534 auto shouldBeNull = c.findOne!Testy(0); 535 assert(shouldBeNull.isNull); 536 537 c.exec("INSERT INTO " ~ relationName!Testy ~ " (id, foo, bar, baz, " ~ attributeName!(Testy.intArr) ~ ") "~ 538 "VALUES (1, 'somestr', 2, 3, '{1,2,3}')"); 539 540 writeln("\t\t * Valid result"); 541 Testy t = c.findOne!Testy(1); 542 assert(t.id == 1, `t.id == 1` ); 543 assert(t.foo == "somestr", `t.foo == "somestr"`); 544 assert(t.bar == 2, `t.bar == 2`); 545 assert(t.baz == 3, `t.baz == 3`); 546 assert(t.intArr == [1,2,3], `t.intArr == [1,2,3]`); 547 //assert(t.stringArr == ["asd", "qwe"]); 548 549 writeln("\t\t * findOne with custom filter"); 550 Testy t2 = c.findOne!Testy("id = $1", 1); 551 assert(t == t2); 552 553 c.exec("DROP TABLE " ~ relationName!Testy); 554 } 555 556 /** 557 Returns the requestes structure, searches by the given column name 558 with the given value 559 If not rows are returned, a Nullable null value is returned 560 561 Examples: 562 ---------------------- 563 Connection conn; // An established connection 564 struct User 565 { 566 @serial @PKey int id; 567 ... 568 }; 569 570 auto user = conn.findOneBy!User("id", 1); // will search by "id" 571 ---------------------- 572 */ 573 Nullable!T findOneBy(T, U)(string col, U val) 574 { 575 import std.stdio; 576 577 auto members = AttributeList!T; 578 579 QueryBuilder qb; 580 qb.select(members) 581 .from(relationName!T) 582 .where( col ~ " = {col_" ~ col ~ "}") 583 .limit(1); 584 585 qb["col_" ~ col] = val; 586 587 auto q = qb.query(this); 588 589 auto r = q.run(); 590 if (r.rows == 0) 591 return Nullable!T.init; 592 593 //return T(); 594 595 auto res = deserialise!T(r[0]); 596 return Nullable!T(res); 597 } 598 599 /** 600 Returns the requested structure, searches by the specified filter 601 with given params 602 603 The filter is not further escaped, so programmer needs to make sure 604 not to properly escape or enclose reserved keywords (like user -> "user") 605 so PostgreSQL can understand them. 606 607 If not rows are returned, a Nullable null value is returned 608 609 Examples: 610 ---------------------- 611 Connection conn; // An established connection 612 struct User 613 { 614 @serial @PKey int id; 615 string username; 616 int posts; 617 }; 618 619 auto user = conn.findOne!User("username = $1 OR posts > $2", "foo", 42); 620 if (!user.isNull) 621 { 622 ... // do something 623 } 624 ---------------------- 625 */ 626 Nullable!T findOne(T, U...)(string filter, U vals) 627 { 628 QueryBuilder qb; 629 qb.select(AttributeList!T) 630 .from(relationName!T) 631 .where(filter) 632 .limit(1); 633 634 auto q = qb.query(this); 635 auto r = q.run(vals); 636 637 if (r.rows == 0) 638 return Nullable!T.init; 639 640 auto res = deserialise!T(r[0]); 641 return Nullable!T(res); 642 } 643 644 /** 645 Returns an array of the specified type, filtered with the given filter and 646 params 647 648 If no rows are returned by PostgreSQL, an empty array is returned. 649 650 Examples: 651 ---------------------- 652 Connection conn; // An established connection 653 struct User 654 { 655 @serial @PKey int id; 656 string username; 657 int posts; 658 }; 659 660 auto users = conn.find!User("username = $1 OR posts > $2", "foo", 42); 661 foreach (u; users) 662 { 663 ... // do something 664 } 665 ---------------------- 666 */ 667 T[] find(T, U...)(string filter = "", U vals = U.init) 668 { 669 QueryBuilder qb; 670 qb.select(AttributeList!T) 671 .from(relationName!T) 672 .where(filter); 673 674 auto q = qb.query(this); 675 676 T[] res; 677 foreach (r; q.run(vals)) 678 res ~= deserialise!T(r); 679 680 return res; 681 } 682 683 unittest 684 { 685 writeln("\t * find"); 686 687 @relation("find_test") 688 struct Test 689 { 690 @serial @PK int id; 691 @attr("my_n") int n; 692 } 693 694 c.ensureSchema!Test; 695 696 Test t; 697 t.n = 1; 698 699 c.insert(t); 700 c.insert(t); 701 ++t.n; 702 c.insert(t); 703 c.insert(t); 704 c.insert(t); 705 706 Test[] ts = c.find!Test("my_n = $1", 1); 707 assert(ts.length == 2); 708 ts = c.find!Test("my_n > 0"); 709 assert(ts.length == 5); 710 ts = c.find!Test("false"); 711 assert(ts.length == 0); 712 713 c.exec("DROP TABLE find_test"); 714 } 715 716 int update(T, U...)(string filter, string update, U vals) 717 { 718 QueryBuilder qb; 719 qb.update(relationName!T) 720 .set(update) 721 .where(filter); 722 723 auto r = qb.query(this).run(vals); 724 return r.rows; 725 } 726 727 unittest 728 { 729 writeln("\t * update"); 730 731 @relation("update_test") 732 struct Test 733 { 734 @serial @PK int id; 735 int n; 736 } 737 738 c.ensureSchema!Test; 739 740 Test t; 741 t.n = 5; 742 c.insert(t); 743 744 int nUpdates = c.update!Test("n = $1", "n = $2", 5, 123); 745 assert(nUpdates == 1, `nUpdates == 1`); 746 747 t = c.findOneBy!Test("n", 123); 748 assert(t.n == 123, `t.n == 123`); 749 750 writeln("\t\t * async"); 751 c.updateAsync!Test("n = $1", "n = $2", 123, 6); 752 auto r = c.nextResult(); 753 754 assert(r.rows == 1); 755 assert(!c.findOneBy!Test("n", 6).isNull); 756 757 c.exec("DROP TABLE update_test"); 758 } 759 760 void updateAsync(T, U...)(string filter, string update, U vals) 761 { 762 QueryBuilder qb; 763 qb.update(relationName!T) 764 .set(update) 765 .where(filter); 766 767 qb.query(this).runAsync(vals); 768 } 769 770 int update(T, U)(U id, Value[string] updates, bool async = false) 771 { 772 QueryBuilder qb; 773 774 qb.update(relationName!T) 775 .set(updates) 776 .where(primaryKeyAttributeName!T, id); 777 778 auto q = qb.query(this); 779 780 if (async) 781 { 782 q.runAsync(); 783 return -1; 784 } 785 786 auto r = q.run(); 787 return r.rows; 788 } 789 790 unittest 791 { 792 writeln("\t * update with AA updates"); 793 794 @relation("update_aa_test") 795 struct Test 796 { 797 @serial @PK int id; 798 int n; 799 } 800 c.ensureSchema!Test; 801 802 Test t; 803 t.n = 1; 804 c.insert(t); 805 806 int rows = c.update!Test(1, ["n": Value(2)]); 807 assert(rows == 1, `r.rows == 1`); 808 809 c.exec("DROP TABLE update_aa_test"); 810 } 811 812 void updateAsync(T, U)(U id, Value[string] updates) 813 { 814 update!T(id, updates, true); 815 } 816 817 int update(T, U)(U id, T updates, bool async = false) 818 { 819 import dpq.attributes; 820 821 QueryBuilder qb; 822 823 qb.update(relationName!T) 824 .where(primaryKeyAttributeName!T, id); 825 826 foreach (m; serialisableMembers!T) 827 qb.set(attributeName!(mixin("T." ~ m)), __traits(getMember, updates, m)); 828 829 auto q = qb.query(this); 830 if (async) 831 { 832 qb.query(this).runAsync(); 833 return -1; 834 } 835 836 auto r = q.run(); 837 return r.rows; 838 } 839 840 unittest 841 { 842 writeln("\t * update with object"); 843 844 @relation("update_object_test") 845 struct Test 846 { 847 @serial @PK int id; 848 int n; 849 } 850 c.ensureSchema!Test; 851 852 Test t; 853 t.n = 1; 854 t.id = 1; // assumptions <3 855 856 c.insert(t); 857 858 t.n = 2; 859 c.update!Test(1, t); 860 861 t = c.findOne!Test(1); 862 assert(t.n == 2); 863 864 t.n = 3; 865 c.updateAsync!Test(1, t); 866 auto r = c.nextResult(); 867 868 writeln("\t\t * async"); 869 assert(r.rows == 1); 870 871 c.exec("DROP TABLE update_object_test"); 872 } 873 874 void updateAsync(T, U)(U id, T updates) 875 { 876 update!T(id, updates, true); 877 } 878 879 880 private void addVals(T, U)(ref QueryBuilder qb, U val) 881 { 882 if (isAnyNull(val)) 883 qb.addValue(null); 884 else 885 { 886 foreach (m; serialisableMembers!(NoNullable!T)) 887 { 888 static if (isPK!(T, m) || hasUDA!(mixin("T." ~ m), IgnoreAttribute)) 889 continue; 890 else 891 qb.addValue(__traits(getMember, val, m)); 892 } 893 } 894 } 895 896 Result insertR(T)(T val, string ret = "") 897 { 898 QueryBuilder qb; 899 qb.insert(relationName!T, AttributeList!(T, true, true)); 900 if (ret.length > 0) 901 qb.returning(ret); 902 903 addVals!T(qb, val); 904 905 return qb.query(this).run(); 906 } 907 908 bool insert(T)(T val, bool async = false) 909 { 910 QueryBuilder qb; 911 qb.insert(relationName!T, AttributeList!(T, true, true)); 912 913 addVals!T(qb, val); 914 915 if (async) 916 return qb.query(this).runAsync(); 917 918 auto r = qb.query(this).run(); 919 return r.rows > 0; 920 } 921 922 unittest 923 { 924 writeln("\t * insert"); 925 926 @relation("insert_test_inner") 927 struct Inner 928 { 929 int bar; 930 } 931 932 @relation("insert_test") 933 struct Test 934 { 935 int n; 936 Nullable!int n2; 937 @embed Inner foo; 938 } 939 c.ensureSchema!Test; 940 941 Test t; 942 t.n = 1; 943 t.n2 = 2; 944 t.foo.bar = 2; 945 946 auto r = c.insert(t); 947 assert(r == true); 948 949 auto r2 = c.insertR(t, "n"); 950 assert(r2.rows == 1); 951 assert(r2[0][0].as!int == t.n); 952 953 Test t2 = c.findOneBy!Test("n", 1); 954 assert(t2 == t, t.to!string ~ " != " ~ t2.to!string); 955 956 writeln("\t\t * async"); 957 t.n = 123; 958 t.n2.nullify; 959 c.insertAsync(t); 960 961 auto res = c.nextResult(); 962 assert(res.rows == 1); 963 t2 = c.findOneBy!Test("n", 123); 964 assert(t.n2.isNull); 965 assert(t2.n2.isNull); 966 967 c.exec("DROP TABLE insert_test"); 968 c.exec("DROP TYPE \"%s\" CASCADE".format(relationName!Inner)); 969 } 970 971 void insertAsync(T)(T val) 972 { 973 insert(val, true); 974 } 975 976 int remove(T, U)(U id) 977 { 978 QueryBuilder qb; 979 qb.remove!T 980 .where(primaryKeyAttributeName!T, id); 981 982 return qb.query(this).run().rows; 983 } 984 985 bool removeAsync(T, U)(U id) 986 { 987 QueryBuilder qb; 988 qb.remove!T 989 .where(primaryKeyAttributeName!T, id); 990 991 return qb.query(this).runAsync() == 1; 992 } 993 994 995 int remove(T, U...)(string filter, U vals) 996 { 997 QueryBuilder qb; 998 qb.remove!T 999 .where(filter); 1000 1001 foreach (v; vals) 1002 qb.addValue(v); 1003 1004 return qb.query(this).run().rows; 1005 } 1006 1007 bool removeAsync(T, U...)(string filter, U vals) 1008 { 1009 QueryBuilder qb; 1010 qb.remove!T 1011 .where(filter); 1012 1013 foreach (v; vals) 1014 qb.addValue(v); 1015 1016 return qb.query(this).runAsync() == 1; 1017 } 1018 1019 unittest 1020 { 1021 @relation("remove_test") 1022 struct Test 1023 { 1024 @serial @PK int id; 1025 int n; 1026 } 1027 c.ensureSchema!Test; 1028 1029 foreach (i; 0 .. 10) 1030 c.insert(Test(0, i)); 1031 1032 writeln("\t * remove(id)"); 1033 int n = c.remove!Test(1); 1034 assert(n == 1, `n == 1`); 1035 1036 1037 writeln("\t\t * async"); 1038 c.removeAsync!Test(2); 1039 auto r = c.nextResult(); 1040 assert(r.rows == 1, `r.rows == 1`); 1041 1042 writeln("\t * remove(filter, vals...)"); 1043 n = c.remove!Test("id IN($1,$2,$3,$4,$5)", 3, 4, 5, 6, 7); 1044 assert(n == 5); 1045 1046 writeln("\t\t * async"); 1047 c.removeAsync!Test("id >= $1", 7); 1048 r = c.nextResult(); 1049 assert(r.rows == 3); 1050 1051 c.exec("DROP TABLE remove_test"); 1052 } 1053 1054 long count(T, U...)(string filter = "", U vals = U.init) 1055 { 1056 import dpq.query; 1057 auto q = Query(this); 1058 string str = `SELECT COUNT(*) FROM "%s"`.format(relationName!T); 1059 1060 if (filter.length > 0) 1061 str ~= " WHERE " ~ filter; 1062 1063 q = str; 1064 auto r = q.run(vals); 1065 1066 return r[0][0].as!long; 1067 } 1068 1069 unittest 1070 { 1071 writeln("\t * count"); 1072 1073 @relation("test_count") 1074 struct Test 1075 { 1076 @serial @PK int id; 1077 int n; 1078 } 1079 1080 c.ensureSchema!Test; 1081 1082 Test t; 1083 c.insert(t); 1084 1085 assert(c.count!Test == 1, `count == 1`); 1086 c.insert(t); 1087 assert(c.count!Test == 2, `count == 2`); 1088 1089 c.exec("DROP TABLE test_count"); 1090 } 1091 1092 bool isBusy() 1093 { 1094 return PQisBusy(_connection) == 1; 1095 } 1096 1097 unittest 1098 { 1099 writeln("\t * isBusy"); 1100 1101 assert(c.isBusy() == false); 1102 1103 c.send("SELECT 1::INT"); 1104 1105 assert(c.isBusy() == true); 1106 1107 c.nextResult(); 1108 assert(c.isBusy() == false); 1109 } 1110 1111 1112 /** 1113 Blocks until a result is read, then returns it 1114 1115 If no more results remain, a null result will be returned 1116 1117 Make sure to call this until a null is returned. 1118 */ 1119 Result nextResult() 1120 { 1121 import core.thread; 1122 1123 /* 1124 do 1125 { 1126 PQconsumeInput(_connection); 1127 //Thread.sleep(dur!"msecs"(1)); // What's a reasonable amount of time to wait? 1128 } 1129 while (isBusy()); 1130 */ 1131 1132 PGresult* res = PQgetResult(_connection); 1133 return Result(res); 1134 } 1135 1136 Result[] allResults() 1137 { 1138 Result[] res; 1139 1140 PGresult* r; 1141 while ((r = PQgetResult(_connection)) != null) 1142 res ~= Result(r); 1143 1144 return res; 1145 } 1146 1147 Result lastResult() 1148 { 1149 Result res; 1150 1151 PGresult* r; 1152 while ((r = PQgetResult(_connection)) != null) 1153 res = Result(r); 1154 1155 return res; 1156 } 1157 1158 unittest 1159 { 1160 writeln("\t * nextResult"); 1161 auto x = c.nextResult(); 1162 assert(x.isNull); 1163 1164 int int1 = 1; 1165 int int2 = 2; 1166 1167 c.sendParams("SELECT $1", int1); 1168 1169 // In every way the same as lastResult 1170 Result r, t; 1171 while(!(t = c.nextResult()).isNull) 1172 r = t; 1173 1174 assert(r.rows == 1); 1175 assert(r.columns == 1); 1176 assert(r[0][0].as!int == int1); 1177 1178 writeln("\t * lastResult"); 1179 c.sendParams("SELECT $1", int2); 1180 r = c.lastResult(); 1181 1182 assert(r.rows == 1); 1183 assert(r.columns == 1); 1184 assert(r[0][0].as!int == int2); 1185 } 1186 1187 Result prepare(T...)(string name, string command, T paramTypes) 1188 { 1189 Oid[] oids; 1190 foreach (pType; paramTypes) 1191 oids ~= pType; 1192 1193 char* cName = cast(char*) name.toStringz; 1194 char* cComm = cast(char*) command.toStringz; 1195 1196 auto p = PreparedStatement(this, name, command, oids); 1197 _prepared[name] = p; 1198 1199 return Result(PQprepare( 1200 _connection, 1201 cName, 1202 cComm, 1203 oids.length.to!int, 1204 oids.ptr)); 1205 } 1206 1207 Result execPrepared(string name, Value[] params...) 1208 { 1209 char* cStr = cast(char*) name.toStringz; 1210 1211 return Result(PQexecPrepared( 1212 _connection, 1213 cStr, 1214 params.length.to!int, 1215 cast(char**) params.paramValues.ptr, 1216 params.paramLengths.ptr, 1217 params.paramFormats.ptr, 1218 1)); 1219 } 1220 1221 Result execPrepared(T...)(string name, T params) 1222 { 1223 Value[] vals; 1224 foreach (p; params) 1225 vals ~= Value(p); 1226 1227 return execPrepared(name, vals); 1228 } 1229 1230 bool sendPrepared(string name, Value[] params...) 1231 { 1232 char* cStr = cast(char*) name.toStringz; 1233 1234 return PQsendQueryPrepared( 1235 _connection, 1236 cStr, 1237 params.length.to!int, 1238 cast(char**) params.paramValues.ptr, 1239 params.paramLengths.ptr, 1240 params.paramFormats.ptr, 1241 1) == 1; 1242 } 1243 1244 bool sendPrepared(T...)(string name, T params) 1245 { 1246 Value[] vals; 1247 foreach (p; params) 1248 vals ~= Value(p); 1249 1250 return sendPrepared(name, vals); 1251 } 1252 1253 unittest 1254 { 1255 writeln("\t * prepare"); 1256 // The result of this isn't really all that useful, but as long as it 1257 // throws on errors, it kinda is 1258 c.prepare("prepare_test", "SELECT $1", Type.INT4); 1259 1260 writeln("\t * execPrepared"); 1261 auto r = c.execPrepared("prepare_test", 1); 1262 assert(r.rows == 1); 1263 assert(r[0][0].as!int == 1); 1264 1265 writeln("\t\t * sendPrepared"); 1266 bool s = c.sendPrepared("prepare_test", 1); 1267 assert(s); 1268 1269 r = c.lastResult(); 1270 assert(r.rows == 1); 1271 assert(r[0][0].as!int == 1); 1272 } 1273 1274 ref PreparedStatement prepared(string name) 1275 { 1276 return _prepared[name]; 1277 } 1278 1279 ref PreparedStatement opIndex(string name) 1280 { 1281 return prepared(name); 1282 } 1283 } 1284 1285 1286 /** 1287 Deserialises the given Row to the requested type 1288 1289 Params: 1290 T = (template) type to deserialise into 1291 r = Row to deserialise 1292 */ 1293 T deserialise(T)(Row r, string prefix = "") 1294 { 1295 T res; 1296 foreach (m; serialisableMembers!T) 1297 { 1298 enum n = attributeName!(mixin("T." ~ m)); 1299 alias mType = Unqual!(typeof(mixin("T." ~ m))); 1300 1301 try 1302 { 1303 auto x = r[prefix ~ n].as!mType; 1304 if (!x.isNull) 1305 __traits(getMember, res, m) = cast(TypedefType!(mType)) x; 1306 } 1307 catch (DPQException e) 1308 { 1309 if (!isInstanceOf!(Nullable, mType)) 1310 throw e; 1311 } 1312 } 1313 return res; 1314 } 1315 1316 /// Hold the last created connection, not to be used outside the library 1317 package Connection* _dpqLastConnection; 1318 1319 /// Holds a list of all the OIDs of our custom types, indexed by their relationName 1320 package Oid[string] _dpqCustomOIDs; 1321